2

Using Spark streaming (1.6) I have a filestream for reading lookup data with 2s of batch size, however files are copyied to the directory only every hour.
Once there's a new file, its content is read by the stream, this is what I want to cache into memory and keep there until new files are read.
There's another stream to which I want to join this dataset therefore I'd like to cache.

This is a follow-up question of Batch lookup data for Spark streaming.
The answer does work fine with updateStateByKey however I don't know how to deal with cases when a KV pair is deleted from the lookup files, as the Sequence of values in updateStateByKey keeps growing. Also any hint how to do this with mapWithState would be great.

This is what I tried so far, but the data doesn't seem to be persisted:

val dictionaryStream = ssc.textFileStream("/my/dir")
dictionaryStream.foreachRDD{x => 
  if (!x.partitions.isEmpty) {
    x.unpersist(true)
    x.persist()
  }
}
Community
  • 1
  • 1
Bruckwald
  • 797
  • 8
  • 23

1 Answers1

3

DStreams can be persisted directly using persist method which persist every RDD in the stream:

dictionaryStream.persist

According to the official documentation this applied automatically for

window-based operations like reduceByWindow and reduceByKeyAndWindow and state-based operations like updateStateByKey

so there should be no need for explicit caching in your case. Also there is no need for manual unpersisting. To quote the docs once again:

by default, all input data and persisted RDDs generated by DStream transformations are automatically cleared

and a retention period is tuned automatically based on the transformations which are used in the pipeline.

Regarding mapWithState you'll have to provide a StateSpec. A minimal example requires a functions which takes key, Option of current value and previous state. Lets say you have DStream[(String, Long)] and you want to record maximum value so far:

val state = StateSpec.function(
  (key: String, current: Option[Double], state: State[Double]) => {
    val max  = Math.max(
      current.getOrElse(Double.MinValue),
      state.getOption.getOrElse(Double.MinValue)
    )
    state.update(max)
    (key, max)
  }
)

val inputStream: DStream[(String, Double)] = ??? 
inputStream.mapWithState(state).print()

It is also possible to provide initial state, timeout interval and capture current batch time. The last two can be used to implement removal strategy for the keys which haven't been update for some period of time.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thanks again! I only want to achieve that once a `fileStream` has received some data, cache it and use it till the next time it will receive something. With persist I couldn't achieve this. If I put `current.headOption.orElse(prev)` in `updateStateByKey` as you suggested in the previous question, then it will cache the element but I won't be able to tell whether `current` is null because at the given batch no new data was read by `fileStream` or it's null because the value was in the meanwhile deleted. – Bruckwald Jun 02 '16 at 21:56
  • `persist` / `unpersist` doesn't affect the data you see. It is only a performance hint. Just because you `unpersist` data you observe __won't__ change. `timeout` in `mapWithState` can be used to forget the state but is slightly awkward. – zero323 Jun 03 '16 at 12:44
  • Thank you, I see. My other idea for this problem is to somehow periodically broadcast the filestream data and do lookups from the mainStream. However I don't know what happens in the interval when the new data is (re)broadcasted to the worker nodes in terms of consistency. What would you suggest me for this problem? – Bruckwald Jun 03 '16 at 12:49
  • I don't it is a good choice. The only way I can think of to make it possible is to do it in a relatively reliable way is to use `transform` / `foreachRDD` closure and rebroadcast on every batch. And it still doesn't provide determinism in case of recovery. – zero323 Jun 03 '16 at 12:59
  • You could also try to pass things using file system and access data via objects which will take care of updating in a way slightly similar to [this](http://stackoverflow.com/q/37343437/1560062). – zero323 Jun 03 '16 at 13:09
  • Thanks for all your support. I finally created a custom ´InputDStream´ based on ´ConstantInputDStream´ which reads the latest file from an HDFS dir, caches the RDD and returns this RDD for the subsequent calls from compute() until there's a change on HDFS. Local tests seems to be fine so far. – Bruckwald Jun 05 '16 at 23:58