How about something like this:
public static IObservable<TResult> SelectAsync<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> projectAsync)
{
return Observable.Create<TResult>(
observer =>
{
var throttle = new BehaviorSubject<TResult>(default);
var observable = source
.Zip(throttle, (value, _) => value)
.SelectMany(value => Observable.Defer(() => Observable.StartAsync(() => projectAsync(value))))
.Publish();
return new CompositeDisposable(
observable.Subscribe(throttle),
observable.Subscribe(observer),
observable.Connect(),
throttle
);
}
);
}
In this extension method the Zip combined with the BehaviorSubject form a throttle within which items are queued until projectAsync is complete.
It can then be used as follows:
public static async Task<Unit> DoSomethingAsync(int value)
{
Console.WriteLine($"[{Scheduler.Default.Now.DateTime.ToString("hh:mm:ss")}] Started processing value '{value}'");
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine($"[{Scheduler.Default.Now.DateTime.ToString("hh:mm:ss")}] Completed processing value '{value}'");
return Unit.Default;
}
public static async Task RunAsync()
{
IObservable<int> source = Observable.Generate(0, value => value < 25, value => value + 1, value => value, value => TimeSpan.FromSeconds(0.1));
await source
.Do(value => Console.WriteLine($"[{Scheduler.Default.Now.DateTime.ToString("hh:mm:ss")}] Received value '{value}'"))
.SelectAsync(value => DoSomethingAsync(value))
.ToTask();
}
Wherein, a source observable is generated which emits 25 items at 100ms intervals. The DoSomethingAsync method uses Task.Delay to simulate a 1 second processing delay. Running this code should result in the following output:
[02:07:56] Received value '0'
[02:07:56] Started processing value '0'
[02:07:56] Received value '1'
[02:07:56] Received value '2'
[02:07:57] Received value '3'
[02:07:57] Received value '4'
[02:07:57] Received value '5'
[02:07:57] Received value '6'
[02:07:57] Received value '7'
[02:07:57] Received value '8'
[02:07:57] Received value '9'
[02:07:57] Completed processing value '0'
[02:07:57] Started processing value '1'
[02:07:57] Received value '10'
[02:07:57] Received value '11'
[02:07:58] Received value '12'
[02:07:58] Received value '13'
[02:07:58] Received value '14'
[02:07:58] Received value '15'
[02:07:58] Received value '16'
[02:07:58] Received value '17'
[02:07:58] Received value '18'
[02:07:58] Completed processing value '1'
[02:07:58] Started processing value '2'
[02:07:58] Received value '19'
[02:07:58] Received value '20'
[02:07:59] Received value '21'
[02:07:59] Received value '22'
[02:07:59] Received value '23'
[02:07:59] Received value '24'
[02:07:59] Completed processing value '2'
[02:07:59] Started processing value '3'
[02:08:00] Completed processing value '3'
[02:08:00] Started processing value '4'
[02:08:01] Completed processing value '4'
...
[02:08:20] Started processing value '23'
[02:08:21] Completed processing value '23'
[02:08:21] Started processing value '24'
[02:08:22] Completed processing value '24'
You should be aware that this code does not provide any means of back-pressure to the source so, should the source continually emit items faster than projectAsync, memory pressure will build (via queuing within the Zip operator) until you receive an out of memory exception.
Furthermore, while I don't know the use case for this requirement, you might want to consider whether "System.Interactive.Async" or "System.Threading.Tasks.DataFlow" might be a better fit here.