I am trying to aggregate data in pyspark dataframe on a particular criteria. I am trying to align the acct based on switchOUT amount to switchIN amount. So that accounts with money switching out of becomes from account and other accounts become to_accounts.
Data I am getting in the dataframe to begin with
+--------+------+-----------+----------+----------+-----------+ 
| person | acct | close_amt | open_amt | switchIN | switchOUT | 
+--------+------+-----------+----------+----------+-----------+ 
| A      | 1    |       125 | 50       | 75       | 0         | 
+--------+------+-----------+----------+----------+-----------+ 
| A      | 2    |       100 | 75       | 25       | 0         | 
+--------+------+-----------+----------+----------+-----------+ 
| A      | 3    |       200 | 300      | 0        | 100       | 
+--------+------+-----------+----------+----------+-----------+ 
To this table
+--------+--------+-----------+----------+----------+
| person | from_acct| to_acct | switchIN | switchOUT| 
+--------+----------+--------+----------+-----------+ 
| A      | 3        |      1 | 75       | 100       |
+--------+----------+--------+----------+-----------+
| A      | 3        |      2 | 25       | 100       | 
+--------+----------+--------+----------+-----------+ 
And also how can I do it so that it works for N number of rows (not just 3 accounts)
So far I have used this code
# define udf
def sorter(l):
  res = sorted(l, key=operator.itemgetter(1))
  return [item[0] for item in res]
def list_to_string(l):
  res = 'from_fund_' +str(l[0]) + '_to_fund_'+str(l[1])
  return res
def listfirstAcc(l):
    res = str(l[0])
    return res
def listSecAcc(l):
    res = str(l[1])
    return res
sort_udf = F.udf(sorter)
list_str = F.udf(list_to_string)
extractFirstFund = F.udf(listfirstAcc)
extractSecondFund = F.udf(listSecAcc)
# Add additional columns
df= df.withColumn("move", sort_udf("list_col").alias("sorted_list"))
df= df.withColumn("move_string", list_str("move"))
df= df.withColumn("From_Acct",extractFirstFund("move"))
df= df.withColumn("To_Acct",extractSecondFund("move"))
Current outcome I am getting:
+--------+--------+-----------+----------+----------+
| person | from_acct| to_acct | switchIN | switchOUT| 
+--------+----------+--------+----------+-----------+ 
| A      | 3        |    1,2 | 75       | 100       |
+--------+----------+--------+----------+-----------+
 
    