I have the DF that need a new column added based on the broadcasted dictionary
Input spark DF:
df_k_col1   cust_grp_map
Col1            1       
Col2            2
Col3            3
Col5            5
using the nested dict below -
cust_grp_dict = {'cust_grp_map': {1: {'cust_grp_name': 'cg1', 'cust_type': 'ct1'},
                                  2: {'cust_grp_name': 'cg2', 'cust_type': 'ct2'},
                                  3: {'cust_grp_name': 'cg3', 'cust_type': 'ct3'},
                                  4: {'cust_grp_name': 'cg4', 'cust_type': 'ct4'},
                                  5: {'cust_grp_name': 'cg5', 'cust_type': ''}}}
I am expecting the output as -
Expected Output:                  
df_k_col1   cust_grp_map    cust_grp_name   cust_type
Col1            1               cg1             ct1
Col2            2               cg2             ct2
Col3            3               cg3             ct3
Col5            5               cg5             
I tried using the UDF and Chain method but getting into issues.
def get_customer_group(df_k, data):
    def get_df_k_cust_grp(mapping_data, key):
        def get_val(x):
            return mapping_data.value(x).get(key)
        return F.udf[get_val]
    b = spark.sparkContext.broadcast(data)
    df_k_custgrps = (df_k.withColumn("cust_grp_name",get_df_k_cust_grp(b, "cust_grp_name")(F.col("cust_grp_map"))))
    return df_k_custgrps
and Chain method -
cust_grp_mapper = F.create_map([F.lit(i) for i in chain(*{k+x:y for k,v in cust_grp_dict['cust_grp_map'].items() for x,y in v.items()}.items())])
df_k_custgrps = df_k.withColumn('cust_grp_name',cust_grp_mapper[F.concat('grp','name')])
Thanks for your help in advance
 
    