There are two dataframes. One - df_table_a is created by reading directly from a Hive table.
Another one - df_table_a_slow is created by applying a UDF transformation on top of df_table_a
df_table_a = spark.sql('SELECT INT_COL, DATE_WEEK FROM table_a')
shift_date_udf = F.udf(lambda day: day - pd.offsets.Week(3, weekday=0), DateType())
df_table_a_slow = df_fast.withColumn('DATE_WEEK', shift_date_udf('DATE_WEEK'))
Then there is df_table_b dataframe, which is created by reading directly from Hive table as well. 
df_table_b = spark.sql('SELECT INT_COL, DATE_WEEK, OTHER_COLUMN FROM table_b')
Now we join df_table_b to both dataframes defined above.
df_fast_join = df_table_a.join(df, on=['INT_COL', 'DATE_WEEK'], how='left')
df_slow_join = df_table_a_slow.join(df, on=['INT_COL', 'DATE_WEEK'], how='left')
I wanted to time the execution time of both joins, so here is a function to approximate transformation times:
def time_transformation(df, subject):
    start = time.time()
    cnt = df.count()
    end = time.time()
    print(f'>>> {subject} - {cnt}', end - start)
Results:
time_transformation(df_fast_join, 'fast join')
 >> fast join - 75739267 37.43
time_transformation(df_slow_join, 'slow join')
 >>  slow join - 75739267 553.32
The UDF transformation itself does not seem to take much time:
time_transformation(df_slow, 'df_slow')
  >> df_slow - 75739267 0.25
The execution plans for both joins differ by a single line:
+- BatchEvalPython [<lambda>(DATE_WEEK#1)], [INT_COL#0, DATE_WEEK#1, pythonUDF0#843]
Question: Why does applying a UDF to one of the dataframes slows down the join by more than 10 times? How can it be fixed?
 
     
    