2

I need to look up some data in a Spark-streaming job from a file on HDFS This data is fetched once a day by a batch job.
Is there a "design pattern" for such a task?

  • how can I reload the data in memory (a hashmap) immediately after a
    daily update?
  • how to serve the streaming job continously while this lookup data is
    being fetched?
Bruckwald
  • 797
  • 8
  • 23

1 Answers1

2

One possible approach is to drop local data structures and use stateful stream instead. Lets assume you have main data stream called mainStream:

val mainStream: DStream[T] = ???

Next you can create another stream which reads lookup data:

val lookupStream: DStream[(K, V)] = ???

and a simple function which can be used to update state

def update(
  current: Seq[V],  // A sequence of values for a given key in the current batch
  prev: Option[V]   // Value for a given key from in the previous state
): Option[V] = { 
  current
    .headOption    // If current batch is not empty take first element 
    .orElse(prev)  // If it is empty (None) take previous state
 }

This two pieces can be used to create state:

val state = lookup.updateStateByKey(update)

All whats left is to key-by mainStream and connect data:

def toPair(t: T): (K, T) = ???

mainStream.map(toPair).leftOuterJoin(state)

While this is probably less than optimal from a performance point of view it leverages architecture which is already in place and frees you from manually dealing with invalidation or failure recovery.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thanks! I need to refetch all the lookup data each day. So I'm thinking of purging the existing one and applying the new one. Does updateStateByKey will work in this case? What is unclear is how I can read data to a DStream once a day. As for joining you mean that I join all the lookup data from the lookup stream with the records in the mainstream? – Bruckwald May 26 '16 at 19:11
  • Streaming cares only about incoming data. If push new data to lookup stream once a day it will be updated once a day. By join I mean exactly the type of code I used in example. As far as you understand this is the kind of operation you wanted. – zero323 May 27 '16 at 07:53
  • I'm using your suggestion as reference, it works fine. It would be great however if you could give some explanation on the update function please. I'm trying to rewrite it to `mapWithState`. – Bruckwald May 30 '16 at 19:48
  • Sure, is it easier to understand now? You can also check http://stackoverflow.com/q/35563876/1560062 – zero323 May 31 '16 at 08:52
  • Thank you for the update. I'd like to have further questions regarding this, created a new question: http://stackoverflow.com/questions/37550054/spark-streaming-cache-dstream-results-across-batches – Bruckwald May 31 '16 at 15:39