In my application I have three classes, Extractor, Transformer and Loader, which are coordinated by a fourth class, Coordinator. Extractor, Transformer and Loader are very simple and do the following:
Extractor
Exposes a member called Results of type IEnumerable<string>, for example by reading from a text file. Extraction should be synchronous.
Transformer
Exposes a member called Transform which accepts a string and transforms it to another string via some process which is expected to be time-consuming (use parallel processing here).
Loader
Exposes a member called Load which accepts a string and loads it into some final form (e.g. another text file). Loading should be synchronous.
The Coordinator classes coordinates the three operations. The transformation process should be done in parallel, and then push the results to a queue which is read by the loader. Coordinator's Run() method looks like this:
Extractor extractor = new Extractor();
Transformer transformer = new Transformer();
Loader loader = new Loader();
ConcurrentQueue<string> outputs = new ConcurrentQueue<string>();
Parallel.ForEach(extractor.Results, x => outputs.Enqueue(transformer.Transform(x)));
foreach(string output in outputs)
{
loader.Load(output);
}
This is working well, except that ALL transformation must be finished before any loading can be done - i.e. the Parallel.ForEach() completes before the following foreach starts. I would prefer that each output is passed to the loader as soon as it is ready.
I also tried this:
Extractor extractor = new Extractor();
Transformer transformer = new Transformer();
Loader loader = new Loader();
ConcurrentQueue<string> outputs = new ConcurrentQueue<string>();
foreach (string input in extractor.Results)
{
string input1 = input;
Task task = Task.Factory.StartNew(
() => outputs.Enqueue(transformer.Transform(input1)));
}
foreach(string output in outputs)
{
loader.Load(output);
}
But then the foreach loop at the bottom gets hit before any outputs have been added to the queue, and so it simply exits.
How do I get loading to happen as soon as results are available from the calls to transformer.Transform()?