I am trying to dcast my spark dataframe using sdf_pivot() function. I want to
display values of columns like value.var parameter in dcast() from reshape2 package. Please look at the example below.
id <- c(1,1,1,1,1,2,2,2,3,3,3)
name <- c("A","B","C","D","E","A","B","C","D","E","F")
value <- c(1,2,3,1,1,2,3,1,1,2,3)
dt <- data.frame(id,name,value)
reshape2::dcast(dt,id~name,value.var = "value")
output1-
  id  A  B  C  D  E  F
1  1  1  2  3  1  1 NA
2  2  2  3  1 NA NA NA
3  3 NA NA NA  1  2  3
spark_dt <- copy_to(sc, dt)
sdf_pivot(spark_dt,id~name)
output2-
id     A     B     C     D     E     F
  <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1     1     1     1     1     1     1   NaN
2     3   NaN   NaN   NaN     1     1     1
3     2     1     1     1   NaN   NaN   NaN
It seems we don't have value.var parameter in sdf_pivot() function.
I am new to spark and any suggestions would be much appreciated.
Do I need to write custom function to do it?
Note**- I tried
##Pivoting
cohort_paste <- function(gdf) {
  expr <- invoke_static(
    sc,
    "org.apache.spark.sql.functions",
    "paste",
    "value"
  )
  gdf %>% invoke("agg", expr, list())
}
It is giving error
Error: java.lang.IllegalArgumentException: invalid method paste for object org.apache.spark.sql.functions
I actually want to use paste function.
df <- tibble(
    id = c(rep(1, 9), rep(2, 9)),
    name = rep(rep(c("A", "B", "C"), each=3), 2),
    value = sample(10,18,replace=T)
)[sample(1:18, size=10), ]
spark_dt <- copy_to(sc, df, overwrite=TRUE)
collect_list <- function(gdf) {
    expr <- invoke_static(
        sc,
        "org.apache.spark.sql.functions",
        "collect_list",
        "value"
    )
    gdf %>% invoke("agg", expr, list())
}
sdf_pivot(spark_dt, id ~ name, fun.aggregate=collect_list) %>% 
    mutate_at(vars(-id), funs(concat_ws(" ", .)))
Error log-
Error: org.apache.spark.sql.AnalysisException: cannot resolve 'concat_ws(' ', sparklyr_tmp_79e15abf584.
A)' due to data type mismatch: argument 2 requires (array or string) type, however, 'sparklyr_tmp_79e15abf584.A' is of array type.; line 1 pos 13; 'GlobalLimit 10 +- 'LocalLimit 10 +- 'Project [id#3038, concat_ws( , A#3156) AS A#3172, concat_ws( , B#3158) AS B#3173, concat_ws( , C#3160) AS C#3174] +- SubqueryAlias sparklyr_tmp_79e15abf584 +- Aggregate [id#3038], [id#3038, collect_list(if ((name#3039 = A)) value#3040 else cast(null as int), 0, 0) AS A#3156, collect_list(if ((name#3039 = B)) value#3040 else cast(null as int), 0, 0) AS B#3158, collect_list(if ((name#3039 = C)) value#3040 else cast(null as int), 0, 0) AS C#3160] +- Project [id#3038, name#3039, value#3040] +- SubqueryAlias df +- Relation[id#3038,name#3039,value#3040] csv