I am running sparklyr with R on a local instance with 8 cores and 64Gb RAM. My job is to left_join a [50 000 000, 12] dataframe with a [20 000 000, 3] dataframe, which I run with Spark.
# Load packages
library(tidyverse)
library(sparklyr)
# Initialize configuration with defaults
config <- spark_config()
# Memory
# Set memory allocation for whole local Spark instance
# Sys.setenv("SPARK_MEM" = "50g")
# Set driver and executor memory allocations
# config$spark.driver.memory <- "8g"
# config$spark.driver.maxResultSize <- "8g"
# Connect to local cluster with custom configuration
sc <- spark_connect(master = "local", config = config, spark_home = spark_home_dir())
# Read df1 and df2
df1 <- spark_read_parquet(sc, 
                          path = "/mnt/df1/",
                          memory = FALSE, overwrite = TRUE)
df2 <- spark_read_parquet(sc, 
                          path = "/mnt/df2/",
                          memory = FALSE, overwrite = TRUE)
# Left join
df3 <- df1 %>%
  dplyr::left_join(df2)
# Write or collect
sparklyr::spark_write_parquet(df3, path="/mnt/") # or
df3 <- df3 %>% collect()
No matter how I configure the Spark configuration file, the code fails with a java.lang.OutOfMemoryError: Java heap space.
Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 8.0 failed 1 times, most recent failure: Lost task 2.0 in stage 8.0 (TID 96, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
I have tried so far different combinations of
Sys.setenv("SPARK_MEM" = "50g")
config["sparklyr.shell.driver-memory"] <- "20G"  
config["sparklyr.shell.num-executors"] <- 8  
config$spark.driver.maxResultSize <- "8g" 
config$spark.executor.memory <- "8g"  
config$spark.memory.fraction <- 0.9 
either in the R script or in the spark configuration file.
Similar questions have been asked 1 2 3 but none of these solved my problem.