I have an asynchronous stream of tasks, that is generated by applying an async lambda to a stream of items:
IAsyncEnumerable<int> streamOfItems = AsyncEnumerable.Range(1, 10);
IAsyncEnumerable<Task<string>> streamOfTasks = streamOfItems.Select(async x =>
{
await Task.Delay(100);
return x.ToString();
})
The methods AsyncEnumerable.Range and Select above are provided from the System.Linq.Async package.
The result I want is a stream of results, expressed as an IAsyncEnumerable<string>. The results must be streamed in the same order as the originated tasks. Also the enumeration of the stream must be throttled, so than no more than a specified number of tasks are active at any given time.
I would like a solution in the form of an extension method on the IAsyncEnumerable<Task<T>> type, so that I could chain it multiple times and form a processing pipeline, similar in functionality with a TPL Dataflow pipeline, but expressed fluently. Below is the signature of the desirable extension method:
public async static IAsyncEnumerable<TResult> AwaitResults<TResult>(
this IAsyncEnumerable<Task<TResult>> source,
int concurrencyLevel);
Accepting also a CancellationToken as argument would be a nice feature.
Update: For completeness I am including an example of a fluent processing pipeline formed by chaining twice the AwaitResults method. This pipeline starts with a PLINQ block, just to demonstrate that mixing PLINQ and Linq.Async is possible.
int[] results = await Partitioner
.Create(Enumerable.Range(1, 20), EnumerablePartitionerOptions.NoBuffering)
.AsParallel()
.AsOrdered()
.WithDegreeOfParallelism(2)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(x =>
{
Thread.Sleep(100); // Simulate some CPU-bound operation
return x;
})
.ToAsyncEnumerable()
.Select(async x =>
{
await Task.Delay(300); // Simulate some I/O operation
return x;
})
.AwaitResults(concurrencyLevel: 5)
.Select(x => Task.Run(() =>
{
Thread.Sleep(100); // Simulate another CPU-bound operation
return x;
}))
.AwaitResults(concurrencyLevel: 2)
.ToArrayAsync();
Console.WriteLine($"Results: {String.Join(", ", results)}");
Expected output:
Results: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20
Note: In retrospect the AwaitResults method should probably be named Merge, and the concurrencyLevel argument should be named maxConcurrent, because its functionality resembles the Merge operator that exists in the Rx library. The System.Interactive.Async package does include an operator named Merge that produces IAsyncEnumerable<T>s, but none of its overloads operate on IAsyncEnumerable<Task<T>> sources. It operates on IEnumerable<IAsyncEnumerable<TSource>> and IAsyncEnumerable<IAsyncEnumerable<TSource>> sources. A parameter bufferCapacity could also be added, in order to control explicitly the size of the buffer needed for the awaiting/merging operation.