This code will compute for each row the difference between the current timestamp and the timestamp in the previous row.
I'm creating a dataframe for reproducibility.
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import *
from pyspark.sql.functions import regexp_replace, col, lag
import pandas as pd
spark = SparkSession.builder.appName("DataFarme").getOrCreate()
data = pd.DataFrame(
{
"timestamp": ["2020-11-17_19:15:33.438102","2020-11-17_19:18:33.433002","2020-11-17_20:05:21.538125","2020-11-17_20:13:08.528102"],
"event": ["scan","scan","scan","scan"],
"value": ["start","end","start","end"]
}
)
df=spark.createDataFrame(data)
df.show()
# +--------------------+-----+-----+
# | timestamp|event|value|
# +--------------------+-----+-----+
# |2020-11-17_19:15:...| scan|start|
# |2020-11-17_19:18:...| scan| end|
# |2020-11-17_20:05:...| scan|start|
# |2020-11-17_20:13:...| scan| end|
# +--------------------+-----+-----+
Convert "timestamp" column to TimestampType() to be able to compute differences:
df=df.withColumn("timestamp",
regexp_replace(col("timestamp"),"_"," "))
df.show(truncate=False)
# +——————————-------------———+---——+—---—+
# |timestamp |event|value|
# +————————————-------------—+---——+—---—+
# |2020-11-17 19:15:33.438102|scan |start|
# |2020-11-17 19:18:33.433002|scan |end |
# |2020-11-17 20:05:21.538125|scan |start|
# |2020-11-17 20:13:08.528102|scan |end |
# +——————————-------------———+---——+---——+
df = df.withColumn("timestamp",
regexp_replace(col("timestamp"),"_"," ").cast(TimestampType()))
df.dtypes
# [('timestamp', 'timestamp'), ('event', 'string'), ('value', 'string')]
Use pyspark.sql.functions.lag function that returns the value of the previous row (offset=1 by default).
See also How to calculate the difference between rows in PySpark? or Applying a Window function to calculate differences in pySpark
df.withColumn("lag_previous", col("timestamp").cast("long") - lag('timestamp').over(
Window.orderBy('timestamp')).cast("long")).show(truncate=False)
# WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
Using Window without partition gives a warning.
It is better to partition the dataframe for the Window operation, I partitioned here by type of event:
df.withColumn("lag_previous", col("timestamp").cast("long") - lag('timestamp').over(
Window.partitionBy("event").orderBy('timestamp')).cast("long")).show(truncate=False)
# +———————————-------------——+---——+—---—+—------—————+
# |timestamp |event|value|lag_previous|
# +———————————-------------——+---——+---——+------——————+
# |2020-11-17 19:15:33.438102|scan |start|null |
# |2020-11-17 19:18:33.433002|scan |end |180 |
# |2020-11-17 20:05:21.538125|scan |start|2808 |
# |2020-11-17 20:13:08.528102|scan |end |467 |
# +—————-------------————————+---——+—---—+—------—————+
From this table you can filter out the rows with "end" value to get the total durations.