I am using SparkR:::map and my function returns a large-ish R dataframe for each input row, each of the same shape. I would like to write these dataframes as parquet files without 'collect'ing them. Can I map write.df over my output list? Can I get the worker tasks to write the parquet instead?
I now have a working. example. I am happy with this other than I did not expect the reduce to implicitly 'collect' as I wanted to write the resultant DF as Parquet.
Also, I'm not convinced that :::map actually does anything in parallel. Do I need always to call 'parallelise' as well?
#! /usr/bin/Rscript
library(SparkR, lib.loc="/opt/spark-1.5.1-bin-without-hadoop/R/lib")
source("jdbc-utils.R")
options(stringsAsFactors = FALSE)
# I dislike having these here but when I move them into main(), it breaks - the sqlContext drops.
assign("sc", sparkR.init(master = "spark://poc-master-1:7077",
                         sparkHome = "/opt/spark-1.5.1-bin-without-hadoop/",
                         appName = "Peter Spark test",
                         list(spark.executor.memory="4G")), envir = .GlobalEnv)
assign("sqlContext", sparkRSQL.init(sc), envir =.GlobalEnv)
#### MAP function ####
run.model <- function(v) {
  x <- v$xs[1]
  y <- v$ys[1]
  startTime     <- format(Sys.time(), "%F %T")
  xs <- c(1:x)
  endTime <- format(Sys.time(), "%F %T")
  hostname <- system("hostname", intern = TRUE)
  xys <- data.frame(xs,y,startTime,endTime,hostname,stringsAsFactors = FALSE)
  return(xys)
}
# HERE BE THE SCRIPT BIT
main <- function() {
  # Make unique identifiers for each run
  xs <- c(1:365)
  ys <- c(1:1)
  xys <- data.frame(xs,ys,stringsAsFactors = FALSE)
  # Convert to Spark dataframe for mapping
  sqlContext <- get("sqlContext", envir = .GlobalEnv)
  xys.sdf <- createDataFrame(sqlContext, xys)
  # Let Spark do what Spark does
  output.list <- SparkR:::map(xys.sdf, run.model)
  # Reduce gives us a single R dataframe, which may not be what we want.
  output.redux <- SparkR:::reduce(output.list, rbind)
  # Or you can have it as a list of data frames.
  output.col <- collect(output.list)
  return(NULL)
}