I have orders coming in from multiple threads and I want to process this data in one thread. If I understood it right, the way to do it is with ConcurrentQueue.
I had a look at SO question How to work threading with ConcurrentQueue<T>, but it did not answer my questions.
I wrote a small test application (with .NET Core 2.1) to see if I could get it to work.
This is what it should do: Make aggregates for 100 orders. There are 3 aggregates for 3 different order types: Type1, Type2 and Type3
The output should be something like:
Type: Type1 Count: 38
Type: Type2 Count: 31
Type: Type3 Count: 31
Total for all types: 100
I started of writing the application without ConcurrentQueue. As exepected, the results in _aggregates are wrong.
/* Incorrect version, not using ConcurrentQueue, showing incorrect results */
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Threading.Tasks;
namespace ConcurrentQueue
{
    class Program
    {
        private static readonly int OrderCount = 100;
        private static IEnumerable<Order> _orders;
        private static Dictionary<OrderTypeEnum, Aggregate> _aggregates;
        static void Main(string[] args)
        {
            //Prepare
            InitializeAggregates();
            _orders = CreateOrders();
            //Execute
            MainAsync(args).GetAwaiter().GetResult();
        }
        static async Task MainAsync(string[] args)
        {
            await Task.Run(() => ProcessOrders());
            ShowOutput();
        }
        public static void ProcessOrders()
        {
            var aggregator = new Aggregator();
            Parallel.ForEach(_orders, order => {
                aggregator.Aggregate(order, _aggregates);
            });
        }
        private static IEnumerable<Order> CreateOrders()
        {
            var orderList = new Collection<Order>();
            for (var i = 1; i <= OrderCount; i++)
            {
                var order = CreateOrder(i);
                orderList.Add(order);
            }
            return orderList;
        }
        private static void InitializeAggregates()
        {
            _aggregates = new Dictionary<OrderTypeEnum, Aggregate>();
            _aggregates[OrderTypeEnum.Type1] = new Aggregate();
            _aggregates[OrderTypeEnum.Type2] = new Aggregate();
            _aggregates[OrderTypeEnum.Type3] = new Aggregate();
        }
        private static Order CreateOrder(int id)
        {
            var order = new Order() { Id = id, OrderType = GetRandomAggregtationType() };
            return order;
        }
        private static OrderTypeEnum GetRandomAggregtationType()
        {
            Array values = Enum.GetValues(typeof(OrderTypeEnum));
            var random = new Random();
            return (OrderTypeEnum)values.GetValue(random.Next(values.Length));
        }
        private static void ShowOutput()
        {
            Console.WriteLine($"Type: {OrderTypeEnum.Type1} Count: {_aggregates[OrderTypeEnum.Type1].Count}");
            Console.WriteLine($"Type: {OrderTypeEnum.Type2} Count: {_aggregates[OrderTypeEnum.Type2].Count}");
            Console.WriteLine($"Type: {OrderTypeEnum.Type3} Count: {_aggregates[OrderTypeEnum.Type3].Count}");
            var total =
                _aggregates[OrderTypeEnum.Type1].Count +
                _aggregates[OrderTypeEnum.Type2].Count +
                _aggregates[OrderTypeEnum.Type3].Count;
            Console.WriteLine($"Total for all types: {total}");
            Console.ReadKey();
        }
    }
    public class Order
    {
        public int Id { get; set; }
        public OrderTypeEnum OrderType { get; set; }
    }
    public class Aggregator
    {
        public void Aggregate(Order order, Dictionary<OrderTypeEnum, Aggregate> aggregates)
        {
            aggregates[order.OrderType].Count++;
        }
    }
    public class Aggregate
    {
        public int Count { get; set; }
    }
    public enum OrderTypeEnum
    {
        Type1 = 1,
        Type2 = 2,
        Type3 = 3
    }
}
So I rewrote the application using ConcurrentQueue. The results are correct now, but I have got the feeling I am doing it wrong or it can be done more efficiently.
/* improved version using ConcurrentQueue, showing correct results */
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Threading.Tasks;
namespace ConcurrentQueue
{
    class Program
    {
        private static readonly int OrderCount = 100;
        private static IEnumerable<Order> _orders;
        private static Dictionary<OrderTypeEnum, Aggregate> _aggregates;
        static void Main(string[] args)
        {
            //Prepare
            InitializeAggregates();
            _orders = CreateOrders();
            //Execute
            var proxy = new OrderProxy();
            var ordersQueue = new ConcurrentQueue<OrderResult>();
            Parallel.ForEach(_orders, order => {
                var orderResult = proxy.PlaceOrder(order);
                ordersQueue.Enqueue(orderResult);
            });
            foreach (var order in ordersQueue)
            {
                _aggregates[order.OrderType].Count++;
            }
            ShowOutput();
        }
        private static IEnumerable<Order> CreateOrders()
        {
            var orderList = new Collection<Order>();
            for (var i = 1; i <= OrderCount; i++)
            {
                var order = CreateOrder(i);
                orderList.Add(order);
            }
            return orderList;
        }
        private static void InitializeAggregates()
        {
            _aggregates = new Dictionary<OrderTypeEnum, Aggregate>();
            _aggregates[OrderTypeEnum.Type1] = new Aggregate();
            _aggregates[OrderTypeEnum.Type2] = new Aggregate();
            _aggregates[OrderTypeEnum.Type3] = new Aggregate();
        }
        private static Order CreateOrder(int id)
        {
            var order = new Order() { Id = id, AggregateType = GetRandomAggregtationType() };
            return order;
        }
        private static OrderTypeEnum GetRandomAggregtationType()
        {
            Array values = Enum.GetValues(typeof(OrderTypeEnum));
            var random = new Random();
            return (OrderTypeEnum)values.GetValue(random.Next(values.Length));
        }
        private static void ShowOutput()
        {
            Console.WriteLine($"Type: {OrderTypeEnum.Type1} Count: {_aggregates[OrderTypeEnum.Type1].Count}");
            Console.WriteLine($"Type: {OrderTypeEnum.Type2} Count: {_aggregates[OrderTypeEnum.Type2].Count}");
            Console.WriteLine($"Type: {OrderTypeEnum.Type3} Count: {_aggregates[OrderTypeEnum.Type3].Count}");
            var total =
                _aggregates[OrderTypeEnum.Type1].Count +
                _aggregates[OrderTypeEnum.Type2].Count +
                _aggregates[OrderTypeEnum.Type3].Count;
            Console.WriteLine($"Total for all types: {total}");
            Console.ReadKey();
        }
    }
    public class Order
    {
        public int Id { get; set; }
        public OrderTypeEnum AggregateType { get; set; }
    }
    public class OrderResult
    {
        public int Id { get; set; }
        public OrderTypeEnum OrderType { get; set; }
    }
    public class OrderProxy
    {
        public OrderResult PlaceOrder(Order order)
        {
            var orderResult = new OrderResult() { Id = order.Id, OrderType = order.AggregateType };
            return orderResult;
        }
    }
    public class Aggregate
    {
        public OrderTypeEnum OrderType { get; set; }
        public int Count { get; set; }
    }
    public enum OrderTypeEnum
    {
        Type1 = 1,
        Type2 = 2,
        Type3 = 3
    }
}
As you see, I add objects of type OrderResult to ConcurrentQueue. I shouldn't need to use a class OrderResult. Of course I could just add the order to the queue, and iterate throught them and calculate the sums after I am finished retrieving data. Is that what I should do? I simply want to handle the incoming orders, and simply count the different type of orders right away and store them in my 'aggregates collection'. Is that possible? If yes, how?
 
     
    