I am working on a dataset which represents a stream of events (like fired as tracking events from a website). All the events have a timestamp. One use case we often have is trying to find the 1st non null value for a given field. So for example something like gets us most the way there:
val eventsDf = spark.read.json(jsonEventsPath) 
case class ProjectedFields(visitId: String, userId: Int, timestamp: Long ... )
val projectedEventsDs = eventsDf.select(
    eventsDf("message.visit.id").alias("visitId"),
    eventsDf("message.property.user_id").alias("userId"),
    eventsDf("message.property.timestamp"),
    ...
).as[ProjectedFields]
projectedEventsDs.groupBy($"visitId").agg(first($"userId", true))
The problem with the above code is that the order of the data being fed into that first aggregation function is not guaranteed. I would like it to be sorted by timestamp to ensure that it is the 1st non null userId by timestamp rather than any random non null userId. 
Is there a way to define the sorting within a grouping?
Using Spark 2.10
BTW, the way suggested for Spark 2.10 in SPARK DataFrame: select the first row of each group is to do ordering before the grouping -- that doesn't work. For example the following code:
case class OrderedKeyValue(key: String, value: String, ordering: Int)
val ds = Seq(
  OrderedKeyValue("a", null, 1), 
  OrderedKeyValue("a", null, 2), 
  OrderedKeyValue("a", "x", 3), 
  OrderedKeyValue("a", "y", 4), 
  OrderedKeyValue("a", null, 5)
).toDS()
ds.orderBy("ordering").groupBy("key").agg(first("value", true)).collect()
Will sometimes return Array([a,y]) and sometimes Array([a,x])
 
     
    