I am restoring a stream from a HDFS checkpoint (ConstantInputDSTream for example) but I keep getting SparkException: <X> has not been initialized.
Is there something specific I need to do when restoring from checkpointing?
I can see that it wants DStream.zeroTime set but when the stream is restored zeroTime is null. It doesn't get restored possibly due to it being a private member IDK. I can see that the StreamingContext referenced by the restored stream does have a value for zeroTime.
initialize is a private method and gets called at StreamingContext.graph.start but not by StreamingContext.graph.restart, presumably because it expects zeroTime to have been persisted.
Does someone have an example of a Stream that recovers from a checkpoint and has a non null value for zeroTime?
def createStreamingContext(): StreamingContext = {
val ssc = new StreamingContext(sparkConf, Duration(1000))
ssc.checkpoint(checkpointDir)
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir), createStreamingContext)
val socketStream = ssc.socketTextStream(...)
socketStream.checkpoint(Seconds(1))
socketStream.foreachRDD(...)