Part 1. Observe vs. Subscribe
Looking into the question, I see the need to observe elements after execution on a particular thread.
To be precise, observe in this context means *being able to work on a value in the stream on some specific thread. In RxJava, we have a proper operator called precisely like that, but in Project Reactor, we call identical operation as publishOn.
Thus,
* if you want to process data * on Schedulers.boundedElastic() then you should use the following construction
Mono.fromFuture(..)
.publishOn(Schedulers.boundedElastic())
BUT Wait, .subscribeOn worked as well???
Reading the previous construction, you may start worrying because you are 100% sure that
Mono.fromRunnable(..)
.subscribeOn(Schedulers.boundedElastic())
Sends onNext on the thread boundedElastic-1, so what is wrong with the same fromFuture.
and here comes a trick:
Never use subscribeOn with Futures / CompletableFuture or anything which can use own async mechanism underneath
If we look at what is going on behind subscribeOn, you will find out something like the following:
// Simplified version of SubscribeOn operator
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
Scheduler scheduler;
Publisher<T> parent;
scheduler.schedule(() -> parent.subscribe(actual));
}
Which basically means parent's subscribe method will be called on a separate thread.
Such a technique works for fromRunnable, fromSupplier, fromCallable because their logic happens in the subscribe method:
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
Operators.MonoSubscriber<T, T>
sds = new Operators.MonoSubscriber<>(actual);
actual.onSubscribe(sds);
// skiped some parts
T t = supplier.get();
if (t == null) {
sds.onComplete();
}
else {
sds.complete(t);
}
}
which means it is almost equal to
scheduler.schedule(() -> {
T t = supplier.get();
if (t == null) {
sds.onComplete();
}
else {
sds.complete(t);
}
})
In contrast, fromFuture works much trickier.
A short quiz.
On which thread we may observe a value? (assume execution happens on thread Main, and the task is executing on ForkJoinPool)
var future = CompletableFuture
.supplyAsync(() -> {
return value;
})
... // some code here, does not metter just code
future.thenAccept(value -> {
System.out.println(Thread.currentThread())
});
And the correct answer....
It may be Thread Main
or it may be Thread from ForkJoinPool
...
because it is racy... and at the point, we consume value, the value may be already delivered, so we just read volatile field on the reader thread (thread Main), otherwise, thread Main just going to set an acceptor so the acceptor will be invoked later on the ForkJoinPool thread.
Right, that is why when you use fromFuture with subscribeOn, there is no guarantee that the subscribeOn thread will observe the value of the given CompletableFuture.
That is why publishOn is the only way to ensure value processing is happening on the desired Thread.
Alright, should I use publishOn all the way down???
And yes and no. It depends.
If you use Mono - in 99% of the cases, you may use publishOn if you want to make sure that your data processing is happening on a particular thread - always use publishOn.
Do not worry about a potential overhead, Project Reactor takes care of you even if you used it accidentally. Project Reactor has several optimization which may replace your publishOn with subscribeOn (if it is safe without breaking the behavior) at runtime so you will get the best.
Part 2. Falling down the rabbit hole of Scheduelrs
Never ever use Schedulers.immediate()
it is almost no-ops scheduler which basically does
Schedulers.immediate().scheduler(runnable) {
runnable.run()
}
Right, it does nothing useful for reactor users, and we use it only for internal needs.
Alright, so how then I can use Scheduler to use it in an imperative world as executor
There are two options:
Fast path: Step by Step guide
1.a) Create your bounded Executor. (e.g. Executors.fixed...)
1.b) Create your bounded ScheduledExecutorService if you want to get the power of periodic task and delayed tasks
2) Create a Scheduler from your executor using Schedulers.fromExecutorXXX API
3) Use your bounded Executor in the imperative world, use your Scheduler which is a wrapper around the bounded one for the reactive world
Long path
Coming soon...
Part 3. How to serialize executions.
Coming soon