I have a TPL Datalow pipeline with two sources and two targets linked in a many-to-many fashion. The target blocks appear to complete successfully, however, it usually drops one or more inputs. I've attached the simplest possible full repro I could come up with below. Any ideas?
Notes:
- The problem only occurs if the artificial delay is used while generating the input.
- Complete() is successfully called for both sources, but one of the source's Completion task hangs in the WaitingForActivation state, even though both Targets complete successfully.
- I can't find any documentation stating many-to-many dataflows aren't supported, and this question's answer implies it is - https://social.msdn.microsoft.com/Forums/en-US/19d831af-2d3f-4d95-9672-b28ae53e6fa0/completion-of-complex-graph-dataflowgraph-object?forum=tpldataflow
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class Program
{
    private const int NumbersPerSource = 10;
    private const int MaxDelayMilliseconds = 10;
    static async Task Main(string[] args)
    {
        int numbersProcessed = 0;
        var source1 = new BufferBlock<int>();
        var source2 = new BufferBlock<int>();
        var target1 = new ActionBlock<int>(i => Interlocked.Increment(ref numbersProcessed));
        var target2 = new ActionBlock<int>(i => Interlocked.Increment(ref numbersProcessed));
        var linkOptions = new DataflowLinkOptions() { PropagateCompletion = true };
        source1.LinkTo(target1, linkOptions);
        source1.LinkTo(target2, linkOptions);
        source2.LinkTo(target1, linkOptions);
        source2.LinkTo(target2, linkOptions);
        var task1 = Task.Run(() => Post(source1));
        var task2 = Task.Run(() => Post(source2));
        // source1 or source2 Completion tasks may never complete even though Complete is always successfully called.
        //await Task.WhenAll(task1, task2, source1.Completion, source2.Completion, target1.Completion, target2.Completion);
        await Task.WhenAll(task1, task2, target1.Completion, target2.Completion);
        Console.WriteLine($"{numbersProcessed} of {NumbersPerSource * 2} numbers processed.");
    }
    private static async Task Post(BufferBlock<int> source)
    {
        foreach (var i in Enumerable.Range(0, NumbersPerSource)) {
            await Task.Delay(TimeSpan.FromMilliseconds(GetRandomMilliseconds()));
            Debug.Assert(source.Post(i));
        }
        source.Complete();
    }
    private static Random Random = new Random();
    private static int GetRandomMilliseconds()
    {
        lock (Random) {
            return Random.Next(0, MaxDelayMilliseconds);
        }
    }
}
 
    