I have problem similar to this but I want to check for duplicates in multiple columns and keep the record with the oldest timestamp.
I tried to create a timestamp column order with this and then drop Duplicates (Drop Duplicates will keep the first record and delete the next ones) so this works.
from pyspark.sql.functions import unix_timestamp
...
pattern = "yyyy-MM-dd hh:mm:ss"
#Valid from is the timestamp column
#Extract time from the field and order ascending
df= df.withColumn("timestampCol", unix_timestamp(df["valid_from"], pattern).\
                  cast("timestamp")).\
                  orderBy(["timestampCol"],ascending = True)
#Drop duplicates based on all column except timestamps so only the older 
#timestamps stay
df = df.dropDuplicates(subset= [x for x in df.columns if x not in ["valid_from", "timestampCol"]])
This code works fine for small datasets. But when I try to use a bigger dataset I have severe performance issues. I found out that dropDuplicates() after orderBy() has atrocious performance. I tried to cache the dataframe but no much progress was made.
The problem is when drop duplicates starts I take at the console
[Stage x:=============================> (1 + 1) / 2]
And it stacks there for almost 20 minutes.
So my question is this:
- Why dropDuplicates() after orderBy() has so bad performance? Is there another way to achieve the same goal (Drop Duplicates on multiple columns while keeping the older value? 
- Does the console output means that only 2 executors are running at that time? And if so how can I increase them? I submit my application in YARN with: --num-executors 5 --executor-cores 5 --executor-memory 20G . Why on this particular point I have only two executors running and how can I increase them for this step? 
 
    