I'm seeing a few scalability problems with a pyspark script I've written and was wondering if anyone would be able to shed a bit of light.
I have a very similar use case to the one presented here:
Separate multi line record with start and end delimiter
In that I have some multi line data that where there is a logical delimiter between records. E.g. the data looks like:
AA123
BB123
CCXYZ
AA321
BB321
CCZYX
...
Using the example in the previous answer, I've separated this into multiple records using a script like...
spark = SparkSession \
    .builder \
    .appName("TimetableSession") \
    #Played around with setting the available memory at runtime
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()
files = os.path.join("data","*_lots_of_gzipped_files.gz")
df=spark.sparkContext.textFile(files).toDF()
df=df.withColumn("id", monotonically_increasing_id())
w=Window.partitionBy().orderBy('id')
df=df.withColumn('AA_indicator', expr("case when entry like 'AA%' then 1 else 0 end"))
#!!!Blowing up with OOM errors here at scale!!!
df=df.withColumn('index', sum('AA_indicator').over(w))
df.show()
+--------------------+---+------------+-----+
|               entry| id|AA_indicator|index|
+--------------------+---+------------+-----+
|               AA123|  1|           1|    1|
|               BB123|  2|           0|    1|
|               CCXYZ|  3|           0|    1|
|               AA321|  4|           1|    2|
|               BB321|  5|           0|    2|
|               CCZYX|  6|           0|    2|
+--------------------+---+------------+-----+
This seems to work ok with data which is a reasonable size (e.g. 50MB of data) when I scale this up to > 1GB of data I'm seeing Java OOM errors. I'm seeing the same problem even when attempting to allocate > 20GB memory to spark.driver/executor.
I believe the problem is that the window for the data partitioned and everything is being collected into memory at once rather than being parralelised? But I might be way off the mark with this.
I'm running this script in a standalone docker container using the jupyter pyspark notebook https://github.com/jupyter/docker-stacks/tree/master/pyspark-notebook.
Any help in terms of a better approach to indexing 'records' or how to better approach the problem would be much appreciated.
 
    