I'm looking for a way to join the two following Spark Datasets:
# city_visits:
person_id         city                timestamp
-----------------------------------------------
        1        Paris      2017-01-01 00:00:00
        1    Amsterdam      2017-01-03 00:00:00
        1     Brussels      2017-01-04 00:00:00
        1       London      2017-01-06 00:00:00
        2       Berlin      2017-01-01 00:00:00
        2     Brussels      2017-01-02 00:00:00
        2       Berlin      2017-01-06 00:00:00
        2      Hamburg      2017-01-07 00:00:00
# ice_cream_events:
person_id      flavour                timestamp
-----------------------------------------------
        1      Vanilla      2017-01-02 00:12:00
        1    Chocolate      2017-01-05 00:18:00
        2   Strawberry      2017-01-03 00:09:00
        2      Caramel      2017-01-05 00:15:00
So that for each row in city_visits, the row in ice_cream_events with same person_id and next timestamp value is joined, leading to this output:
person_id       city            timestamp  ic_flavour          ic_timestamp
---------------------------------------------------------------------------
        1      Paris  2017-01-01 00:00:00     Vanilla   2017-01-02 00:12:00
        1  Amsterdam  2017-01-03 00:00:00   Chocolate   2017-01-05 00:18:00
        1   Brussels  2017-01-04 00:00:00   Chocolate   2017-01-05 00:18:00
        1     London  2017-01-06 00:00:00        null                  null
        2     Berlin  2017-01-01 00:00:00  Strawberry   2017-01-03 00:09:00
        2   Brussels  2017-01-02 00:00:00  Strawberry   2017-01-03 00:09:00
        2     Berlin  2017-01-06 00:00:00        null                  null
        2    Hamburg  2017-01-07 00:00:00        null                  null
Closest solution I've had so far is the following, however this obviously joins every row in ice_cream_events that matches the conditions, not just the first one:
val cv = city_visits.orderBy("person_id", "timestamp")
val ic = ice_cream_events.orderBy("person_id", "timestamp")
val result = cv.join(ic, ic("person_id") === cv("person_id")
                         && ic("timestamp") > cv("timestamp"))
Is there a (preferably efficient) way to specify that the join is desired only on the first matching ice_cream_events row and not all of them?
 
    


