How to tidy data frame(json ) with dynamic nested structs / arrays in PySpark ?
I have 10000 json files, each has static and dynamic fields as described below.
- Static names: data,label,units,date,val,num(can be hardcoded)
- Dynamic names:data_1_a,data_1000_xyz,name_1a,name_1b,name_10000_xyz,A,B(cannot be hardcoded as they are up to 10000 names / data sub categories)
Input df:
root
 |-- data: struct (nullable = true)
 |    |-- data_1000_xyz: struct (nullable = true)
 |    |    |-- name_10000_xyz: struct (nullable = true)
 |    |    |    |-- label: string (nullable = true)
 |    |    |    |-- units: struct (nullable = true)
 |    |    |    |    |-- A: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- date: string (nullable = true)
 |    |    |    |    |    |    |-- num: string (nullable = true)
 |    |    |    |    |    |    |-- val: long (nullable = true)
 |    |    |-- name_1b: struct (nullable = true)
 |    |    |    |-- label: string (nullable = true)
 |    |    |    |-- units: struct (nullable = true)
 |    |    |    |    |-- B: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- date: string (nullable = true)
 |    |    |    |    |    |    |-- val: long (nullable = true)
 |    |-- data_1_a: struct (nullable = true)
 |    |    |-- name_1a: struct (nullable = true)
 |    |    |    |-- label: string (nullable = true)
 |    |    |    |-- units: struct (nullable = true)
 |    |    |    |    |-- A: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- date: string (nullable = true)
 |    |    |    |    |    |    |-- val: long (nullable = true)
 |-- id: long (nullable = true)
+-----------------------------------------------------------------------------------------------------------------------------+---+
|data                                                                                                                         |id |
+-----------------------------------------------------------------------------------------------------------------------------+---+
|{{{null, {[{2018, str, 4}, {2019, null, 5}, {2020, str, 6}]}}, {null, {[{2019, 2}, {2020, 3}]}}}, {{label_1, {[{2020, 1}]}}}}|1  |
+-----------------------------------------------------------------------------------------------------------------------------+---+
Required df:
+---+--------------+----------------+---------+-------+------+-----+------+
|id |level_1       |level_2         |level_3  |level_4| date | val | num  | 
+---+--------------+----------------+---------+-------+------+-----+------+
|1  |data_1_a      | name_1a        | unit    | A     | 2020 |  1  | null |
|1  |data_1000_xyz | name_1b        | unit    | B     | 2019 |  2  | null |
|1  |data_1000_xyz | name_1b        | unit    | B     | 2020 |  3  | null |
|1  |data_1000_xyz | name_10000_xyz | unit    | A     | 2018 |  4  | str  |
|1  |data_1000_xyz | name_10000_xyz | unit    | A     | 2019 |  5  | null |
|1  |data_1000_xyz | name_10000_xyz | unit    | A     | 2020 |  6  | str  |
+-------------------------------------------------------------------------+
To reproduce input df:
json_1 = """{"id":1,"data":{"data_1_a":{"name_1a":{"label":"label_1","units":{"A":[{"date":"2020","val":1}]}}},"data_1000_xyz":{"name_1b":{"label":null,"units":{"B":[{"date":"2019","val":2},{"date":"2020","val":3}]}},"name_10000_xyz":{"label":null,"units":{"A":[{"date":"2018","val":4,"num":"str"},{"date":"2019","val":5},{"date":"2020","val":6,"num":"str"}]}}}}}"""
df = spark.read.json(sc.parallelize([json_1]))
Python Pandas solution:
    import pandas as pd
    
    # 1) flatten json
    df = pd.json_normalize(json_1)
    df_dic = df.to_dict('records')
    
    # 2) split to levels
    data = []
    for row in df_dic:
        k={}
        for item in row.items():
            if item[0] == 'id':
                id = item[1]
            else:  
                keys  = item[0].split('.')
                k = {i:s for i,s in enumerate(keys)}
                k.update({'value':item[1]})
                k.update({'id':id})
                data.append(k)
    
    df = (pd.DataFrame(data)[['id',1,2,3,4,'value']]
    .rename(columns={1:'level_1',2:'level_2',3:'level_3',4:'level_4' }))
    df = df.loc[~df['level_4'].isnull()]
    # 3) explode
    dfe = df.explode('value', ignore_index=True)
    
    # 4) pop the value column and create a new dataframe from it then join the new frame with the exploded frame.
    output_df = dfe.join(pd.DataFrame([*dfe.pop('value')], index=dfe.index))
    
    
        id  level_1         level_2         level_3 level_4 date    val num
    0   1   data_1_a        name_1a         units   A      2020     1   NaN
    1   1   data_1000_xyz   name_1b         units   B      2019     2   NaN
    2   1   data_1000_xyz   name_1b         units   B      2020     3   NaN
    3   1   data_1000_xyz   name_10000_xyz  units   A      2018     4   str
    4   1   data_1000_xyz   name_10000_xyz  units   A      2019     5   NaN
    5   1   data_1000_xyz   name_10000_xyz  units   A      2020     6   str
Useful links:
- How to flatten nested arrays by merging values in spark
- Is there a function in pyspark dataframe that is similar to pandas.io.json.json_normalize
- Automatically and Elegantly flatten DataFrame in Spark SQL
- Flatten nested array in Spark DataFrame
- https://learn.microsoft.com/en-us/azure/synapse-analytics/how-to-analyze-complex-schema
- Flatten a dynamic nested struct (struct inside struct) in PySpark
- https://towardsdatascience.com/nested-data-types-in-spark-3-1-663e5ed2f2aa
- https://towardsdatascience.com/flattening-json-records-using-pyspark-b83137669def
- https://docs.databricks.com/_static/notebooks/complex-nested-structured.html
- https://docs.databricks.com/delta/data-transformation/higher-order-lambda-functions.html
- https://docs.databricks.com/_static/notebooks/complex-nested-structured.html
- https://docs.databricks.com/_static/notebooks/transform-complex-data-types-python.html
- https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html
