I am seeing some performance issues while running queries using dataframes. I have seen in my research, that long running finally tasks can be a sign that data is not disturbed optimally, but have not found a detailed process for resolving this issue.
I am starting off loading two tables as dataframes, and I am then joining those tables on one field. I have tried to add distribute by(repartition), and sort by, in order to improve the performance, but am still seeing this single long running final task. Here is a simple version of my code, note that query one and two are not actually this simple and use UDFs to calculate some values.
I have tried a few different settings for spark.sql.shuffle. I have tried 100, but it failed(I didn't really debug this to much to be honest). I tried 300, 4000, and 8000. Performance decreased with each increase. I am selecting a single day of data, where each file is an hour. 
val df1 = sqlContext.sql("Select * from Table1")
val df2 = sqlContext.sql("Select * from Table2")
val distributeDf1 = df1
    .repartition(df1("userId"))
    .sortWithinPartitions(df1("userId"))
val distributeDf2 = df2
    .repartition(df2("userId"))
    .sortWithinPartitions(df2("userId"))
distributeDf1.registerTempTable("df1")
distributeDf2.registerTempTable("df2")
val df3 = sqlContext
  .sql("""
    Select 
      df1.* 
    from 
      df1 
    left outer join df2 on 
      df1.userId = df2.userId""")
Since it seems partitioning by userId is not ideal, I could partition by the timestamp instead. If I do this, should I just do the Date + Hour? If I have less then 200 unique combos for this, will I have empty executors?