I am new to spark and working on huge dataset of size around 20GB (multiple small files) and need help in transforming this data in below format:
I have data in this format:
+----------+-------------------------+-------------------+---------+------+
|   id     |       values            |     creation date | leadTime| span |
+----------+-------------------------+-------------------+---------+--+---+
|id_1      |[[v1, 0.368], [v2, 0.5]] |     2020-07-15    |      16 |  15  |
|id_2      |[[v1, 0.368], [v2, 0.4]] |     2020-07-15    |      16 |  15  |
|id_1      |[[v1, 0.468], [v2, 0.3]] |     2020-07-15    |      17 |  18  |
|id_2      |[[v1, 0.368], [v2, 0.3]] |     2020-07-15    |      17 |  18  | 
+----------+-------------------------+-------------------+---------+------+
I need data in below format by using values from column fields:
creating new column with column name using leadTime and span column value
+----------+--------------+--------------------+--------------------+--------------------+--------------------+
|   id     |creation date | final_v1_16_15_wk  |  final_v2_16_15_wk |final_v1_17_18_wk  |  final_v2_17_18_wk  |
+----------+--------------+--------------------+--------------------+--------------------+--------------------+
|id_1      |2020-07-15    |       0.368        |         0.5        |       0.468        |         0.3        |
|id_2      |2020-07-15    |       0.368        |         0.4        |       0.368        |         0.3        |
+----------+--------------+--------------------+--------------------+--------------------+--------------------+
Here is sample data frame:
val df = Seq(
  ("id_1", Map("v1" -> 0.368, "v2" -> 0.5, "v3" -> 0.6), "2020-07-15", 16, 15),
  ("id_1", Map("v1" -> 0.564, "v2" -> 0.78, "v3" -> 0.65), "2020-07-15", 17, 18),
  ("id_2", Map("v1" -> 0.468, "v2" -> 0.3, "v3" -> 0.66), "2020-07-15", 16, 15),
  ("id_2", Map("v1" -> 0.657, "v2" -> 0.65, "v3" -> 0.67), "2020-07-15", 17, 18)).toDF("id", "values", "creation date", "leadTime", "span")
Tried to generate column name/value using below logic but it did not work:
val modDF = finalDF.withColumn("final_" + newFinalDF("values").getItem(0).getItem("_1") + "_" + newFinalDF("leadTime") + "_" + newFinalDF("span") + "_wk", $"values".getItem(0).getItem("_2"));