I want to generate an Observable in real time from the results of a list of Futures.
In the simplest case, suppose I have a list of futures I'm running with Future.sequence, and I'm simply monitoring their progress with an Observable that tells me each time one has completed. I'm basically doing it like this:
def observeFuturesProgress(futs: List[Future[Int]]) : Observable[String] = {
Observable[String](observer => {
val loudFutures: List[Future[Int]] = futs.map(f => {
f onComplete {
case Success(a) => observer.onNext(s"just did $a more")
case Failure(e) => observer.onError(e)
}
f
})
Future.sequence(loudFutures) onComplete {
case Success(_) => observer.onCompleted()
case Failure(e) => observer.onError(e)
}
})
}
This works fine in my testing environment. But I've just read that onNext shouldn't be called from different threads, at least without being careful that there are no overlapping calls. What is the recommended way to fix this? It seems that many real-world uses of Observables would require onNext to be called from async code like this, but I can't find a similar example in the docs.