How does one set the default value for pyspark.sql.functions.lag to a value within the current row?
For example, given:
testInput = [(1, 'a'),(2, 'c'),(3, 'e'),(1, 'a'),(1, 'b'),(1, 'b')]
columns = ['Col A', 'Col B']
df = sc.parallelize(testInput).toDF(columns)
df.show()
windowSpecification = Window.partitionBy(col('Col A')).orderBy(col('Col B'))
changedRows = col('Col B') != F.lag(col('Col B'), 1).over(windowSpecification)
df.select(col('Col A'), col('Col B'), changedRows.alias('New Col C')).show()
which outputs:
+-----+-----+
|Col A|Col B|
+-----+-----+
|    1|    a|
|    2|    c|
|    3|    e|
|    1|    a|
|    1|    b|
|    1|    b|
+-----+-----+
+-----+-----+---------+
|Col A|Col B|New Col C|
+-----+-----+---------+
|    1|    a|     null|
|    1|    a|    false|
|    1|    b|     true|
|    1|    b|    false|
|    3|    e|     null|
|    2|    c|     null|
+-----+-----+---------+
I would like the output to look like:
+-----+-----+---------+
|Col A|Col B|New Col C|
+-----+-----+---------+
|    1|    a|    false|
|    1|    a|    false|
|    1|    b|     true|
|    1|    b|    false|
|    3|    e|    false|
|    2|    c|    false|
+-----+-----+---------+
My current workaround is to add a second lag call to the changedRows, like so:
changedRows = (col('Col B') != F.lag(col('Col B'), 1).over(windowSpecification)) & F.lag(col('Col B'), 1).over(windowSpecification).isNotNull()
but this does not look clean to me.
I would like to do something like
changedRows = col('Col B') != F.lag(col('Col B'), 1, col('Col B')).over(windowSpecification)
but I get the error TypeError: 'Column' object is not callable.
 
     
    