Let me explain my question using an example: I have a dataframe:
pd_1 = pd.DataFrame({'day':[1,2,3,2,1,3], 
                     'code': [10, 10, 20,20,30,30],
                     'A': [44, 55, 66,77,88,99],
                     'B':['a',None,'c',None,'d', None],
                     'C':[None,None,'12',None,None, None]
                    })
df_1 = sc.createDataFrame(pd_1)
df_1.show()
Output:
+---+----+---+----+----+
|day|code|  A|   B|   C|
+---+----+---+----+----+
|  1|  10| 44|   a|null|
|  2|  10| 55|null|null|
|  3|  20| 66|   c|  12|
|  2|  20| 77|null|null|
|  1|  30| 88|   d|null|
|  3|  30| 99|null|null|
+---+----+---+----+----+
What I want to achieve is a new dataframe, each row corresponds to a code, and for each column I want to have the most recent non-null value (with highest day).
In pandas, I can simply do
pd_2 = pd_1.sort_values('day', ascending=True).groupby('code').last()
pd_2.reset_index()
to get
    code    day A   B   C
0   10       2  55  a   None
1   20       3  66  c   12
2   30       3  99  d   None
My question is, how can I do it in pyspark (preferably version < 3)?
What I have tried so far is:
from pyspark.sql import Window
import pyspark.sql.functions as F
w = Window.partitionBy('code').orderBy(F.desc('day')).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
## Update: after applying @Steven's idea to remove for loop:
df_1 = df_1 .select([F.collect_list(x).over(w).getItem(0).alias(x) for x in df_.columns])
##for x in df_1.columns:
##    df_1 = df_1.withColumn(x, F.collect_list(x).over(w).getItem(0))
df_1 = df_1.distinct()
df_1.show()
Output
+---+----+---+---+----+
|day|code|  A|  B|   C|
+---+----+---+---+----+
|  2|  10| 55|  a|null|
|  3|  30| 99|  d|null|
|  3|  20| 66|  c|  12|
+---+----+---+---+----+
Which I'm not very happy with, especially due to the for loop.