The Rx subscription code is always synchronous¹. What you need to do is to remove the processing code from the Subscribe delegate, and make it a side-effect of the observable sequence. Here is how it can be done:
Subject<int> subject = new();
int concurrentCount = 0;
Task processor = subject
.Buffer(TimeSpan.FromSeconds(1), 100)
.Select(list => Observable.Defer(() => Observable.Start(() =>
{
int c = Interlocked.Increment(ref concurrentCount);
if (c > 1) Console.WriteLine($"Executing {c} simultaneous batches");
Interlocked.Decrement(ref concurrentCount);
})))
.Merge(maxConcurrent: 2)
.DefaultIfEmpty() // Prevents exception in corner case (empty source)
.ToTask(); // or RunAsync (either one starts the processor)
for (int i = 0; i < 1_000_000; i++)
{
subject.OnNext(i);
}
subject.OnCompleted();
processor.Wait();
The Select+Observable.Defer+Observable.Start combination converts the source sequence to an IObservable<IObservable<Unit>>. It's a nested sequence, with each inner sequence representing the processing of one list. When the delegate of the Observable.Start completes, the inner sequence emits a Unit value and then completes. The wrapping Defer operator ensures that the inner sequences are "cold", so that they are not started before they are subscribed. Then follows the Merge operator, which unwraps the outer sequence to a flat IObservable<Unit> sequence. The maxConcurrent parameter configures how many of the inner sequences will be subscribed concurrently. Every time an inner sequence is subscribed by the Merge operator, the corresponding Observable.Start delegate starts running on a ThreadPool thread.
If you set the maxConcurrent too high, the ThreadPool may run out of workers (in other words it may become saturated), and
the concurrency of your code will then become dependent on the ThreadPool availability. If you wish, you can increase the number of workers that the ThreadPool creates instantly on demand, by using the ThreadPool.SetMinThreads method. But if your workload is CPU-bound, and you increase the worker threads above the Environment.ProcessorCount value, then most probably your CPU will be saturated instead.
If your workload is asynchronous, you can replace the Observable.Defer+Observable.Start combo with the Observable.FromAsync operator, as shown here.
¹ An unpublished library exists, the AsyncRx.NET, that plays with the idea of asynchronous subscriptions. It is based on the new interfaces IAsyncObservable<T> and IAsyncObserver<T>.