I thought this was an interesting question so I spent a bit of time on it.
The scenario I understand it is this:
- You have a BlockingCollection that is full
- A number of threads start, each trying to add to the BlockingCollection. These calls will all block; that is why they need to occur in parallel.
- As space becomes available, the Add calls will become unblocked.
- The calls to
Add need to complete in the order they were received.
First of all, let's talk about code structure. Instead of using a BlockingCollection and writing procedural code around it, I suggest extending the BlockingCollection and replacing the Add method with the functionality you need. It may look something like this:
public class QueuedBlockingCollection<T> : BlockingCollection<T>
{
private FifoMonitor monitor = new FifoMonitor();
public QueuedBlockingCollection(int max) : base (max) {}
public void Enqueue(T item)
{
using (monitor.Lock())
{
base.Add(item);
}
}
}
Here, the trick is the use of a FifoMonitor class, which will give you the functionality of a lock but will enforce order. Unfortunately, no class like that exists in the CLR. But we can write one:
public class FifoMonitor
{
public class FifoCriticalSection : IDisposable
{
private readonly FifoMonitor _parent;
public FifoCriticalSection(FifoMonitor parent)
{
_parent = parent;
_parent.Enter();
}
public void Dispose()
{
_parent.Exit();
}
}
private object _innerLock = new object();
private volatile int counter = 0;
private volatile int current = 1;
public FifoCriticalSection Lock()
{
return new FifoCriticalSection(this);
}
private void Enter()
{
int mine = Interlocked.Increment(ref counter);
Monitor.Enter(_innerLock);
while (current != mine) Monitor.Wait(_innerLock);
}
private void Exit()
{
Interlocked.Increment(ref current);
Monitor.PulseAll(_innerLock);
Monitor.Exit(_innerLock);
}
}
Now to test. Here's my program:
public class Program
{
public static void Main()
{
//Setup
var blockingCollection = new QueuedBlockingCollection<int>(10);
var tasks = new Task[10];
//Block the collection by filling it up
for (int i=1; i<=10; i++) blockingCollection.Add(99);
//Start 10 threads all trying to add another value
for (int i=1; i<=10; i++)
{
int index = i; //unclose
tasks[index-1] = Task.Run( () => blockingCollection.Enqueue(index) );
Task.Delay(100).Wait(); //Wait long enough for the Enqueue call to block
}
//Purge the collection, making room for more values
while (blockingCollection.Count > 0)
{
var n = blockingCollection.Take();
Console.WriteLine(n);
}
//Wait for our pending adds to complete
Task.WaitAll(tasks);
//Display the collection in the order read
while (blockingCollection.Count > 0)
{
var n = blockingCollection.Take();
Console.WriteLine(n);
}
}
}
Output:
99
99
99
99
99
99
99
99
99
99
1
2
3
4
5
6
7
8
9
10
Looks like it works! But just to be sure, I changed Enqueue back to Add, to ensure that the solution actually does something. Sure enough, it ends up out of order with the regular Add.
99
99
99
99
99
99
99
99
99
99
2
3
4
6
1
5
7
8
9
10
Check out the code on DotNetFiddle