I have a dataframe with three columns: id, index and value.
+---+-----+-------------------+
| id|index|              value|
+---+-----+-------------------+
|  A| 1023|0.09938822262205915|
|  A| 1046| 0.3110047630613805|
|  A| 1069| 0.8486710971453512|
+---+-----+-------------------+
root
 |-- id: string (nullable = true)
 |-- index: integer (nullable = false)
 |-- value: double (nullable = false)
Then, I have another dataframe which shows desirable periods for each id:
+---+-----------+---------+
| id|start_index|end_index|
+---+-----------+---------+
|  A|       1069|     1276|
|  B|       2066|     2291|
|  B|       1616|     1841|
|  C|       3716|     3932|
+---+-----------+---------+
root
 |-- id: string (nullable = true)
 |-- start_index: integer (nullable = false)
 |-- end_index: integer (nullable = false)
I have three templates as below
val template1 = Array(0.0, 0.1, 0.15, 0.2, 0.3, 0.33, 0.42, 0.51, 0.61, 0.7)
val template2 = Array(0.96, 0.89, 0.82, 0.76, 0.71, 0.65, 0.57, 0.51, 0.41, 0.35)
val template3 = Array(0.0, 0.07, 0.21, 0.41, 0.53, 0.42, 0.34, 0.25, 0.19, 0.06)
The goal is, for each row in dfIntervals, apply a function (let's assume it's correlation) in which the function receives value column from dfRaw and three template arrays and adds three columns to dfIntervals, each column related to each template.
Assumptions: 1 - Sizes of templates arrays are are exactly 10.
2 - There are no duplicates in index column of dfRaw
3 - start_index and end_index columns in dfIntervals exist in index column of dfRaw and when there are exactly 10 rows between them. For instance, dfRaw.filter($"id" === "A").filter($"index" >= 1069 && $"index" <= 1276).count (first row in dfIntervals) results in exactly 10.
Here's the code that generates these dataframes:
import org.apache.spark.sql.functions._
val mySeed = 1000
/* Defining templates for correlation analysis*/
val template1 = Array(0.0, 0.1, 0.15, 0.2, 0.3, 0.33, 0.42, 0.51, 0.61, 0.7)
val template2 = Array(0.96, 0.89, 0.82, 0.76, 0.71, 0.65, 0.57, 0.51, 0.41, 0.35)
val template3 = Array(0.0, 0.07, 0.21, 0.41, 0.53, 0.42, 0.34, 0.25, 0.19, 0.06)
/* Defining raw data*/
var dfRaw = Seq(
  ("A", (1023 to 1603 by 23).toArray),
  ("B", (341 to 2300 by 25).toArray),
  ("C", (2756 to 3954 by 24).toArray)
).toDF("id", "index")
dfRaw = dfRaw.select($"id", explode($"index") as "index").withColumn("value", rand(seed=mySeed))
/* Defining intervals*/
var dfIntervals = Seq(
  ("A", 1069, 1276),
  ("B", 2066, 2291),
  ("B", 1616, 1841),
  ("C", 3716, 3932)
).toDF("id", "start_index", "end_index")
There result is three columns added to dfIntervals dataframe with names corr_w_template1, corr_w_template2 and corr_w_template3
PS: I could not find a correlation function in Scala. Let's assume such a function exists (as below) and we are about to make a udf out of it is needed.
def correlation(arr1: Array[Double], arr2: Array[Double]): Double