I am currently processing the data using spark and foreach partition open a connection to mysql and insert it to the database in a batch of 1000. As mentioned in the SparkDocumentation default value of spark.sql.shuffle.partitions is 200 but i want to keep it dynamic. So, how do i calculate it. Hence, neither choosing very high value causing performance degradation nor choosing very small value causing OOM. 
            Asked
            
        
        
            Active
            
        
            Viewed 5,166 times
        
    7
            
            
         
    
    
        Naresh
        
- 5,073
- 12
- 67
- 124
2 Answers
-1
            
            
        Try below option -
val numExecutors         = spark.conf.get("spark.executor.instances").toInt
val numExecutorsCores    = spark.conf.get("spark.executor.cores").toInt
val numShufflePartitions = (numExecutors * numExecutorsCores)
spark.conf.set("spark.sql.shuffle.partitions", numShufflePartitions)
This will help you set the right number of shuffle partitions based on executor and executors cores used for your spark job without compromising performance and leading to Out Of Memory issues.
If you still get out of memeory them set below property -
spark.conf.set("spark.executor.memoryOverhead", "3G")
Other option is to calculate Dataframe size and didvie that by hdfs block size and use the resultant number to set spark.sql.shuffle.partitions.
 
    
    
        Horai Nuri
        
- 5,358
- 16
- 75
- 127
 
    
    
        Ajay Ahuja
        
- 1,196
- 11
- 26
-4
            
            
        You can use df.repartition(numPartitions) method for doing this. You can take decision based on the input/intermediate output and pass numPartitions to repartition() method.
df.repartition(numPartitions)   or rdd.repartition(numPartitions)
 
    
    
        Raju Bairishetti
        
- 344
- 2
- 7
- 
                    Nope, it doesn't work: df.repartition(numPartitions) – tauitdnmd Jun 14 '17 at 02:21