I have a hive table that records user behavior
like this
| userid | behavior | timestamp | url | 
|---|---|---|---|
| 1 | view | 1650022601 | url1 | 
| 1 | click | 1650022602 | url2 | 
| 1 | click | 1650022614 | url3 | 
| 1 | view | 1650022617 | url4 | 
| 1 | click | 1650022622 | url5 | 
| 1 | view | 1650022626 | url7 | 
| 2 | view | 1650022628 | url8 | 
| 2 | view | 1650022631 | url9 | 
About 400GB is added to the table every day.
I want to order by timestamp asc, then one 'view' is in a group between another 'view' like this table, the first 3 lines belong to a same group , then subtract the timestamps, like 1650022614 - 1650022601 as the view time.
How to do this?
i try lag and lead function, or scala like this
        val pairRDD: RDD[(Int, String)] = record.map(x => {
            if (StringUtil.isDateString(x.split("\\s+")(0))) {
                partition = partition + 1
                (partition, x)
            } else {
                (partition, x)
            }
        })
or java like this
        LongAccumulator part = spark.sparkContext().longAccumulator("part");
        JavaPairRDD<Long, Row> pairRDD = spark.sql(sql).coalesce(1).javaRDD().mapToPair((PairFunction<Row, Long, Row>) row -> {
            if (row.getAs("event") == "pageview") {
                part.add(1L);
            }
        return new Tuple2<>(part.value(), row);
        });
but when a dataset is very large, this code just stupid.
save me plz