using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace PerfBenchWorkPool { class Program { static void Main(string[] args) { if (args.Contains("old") && args.Contains("multiple")) { RunOldMultiple(); } else if (args.Contains("old") && args.Contains("single")) { RunOldSingle(); } else if (args.Contains("new") && args.Contains("multiple")) { RunNewMultiple(); } else if (args.Contains("new") && args.Contains("single")) { RunNewSingle(); } else if (args.Contains("old")) { RunOldSingle(); RunOldMultiple(); } else if (args.Contains("new")) { RunNewSingle(); RunNewMultiple(); } else { RunNewSingle(); RunNewMultiple(); RunOldSingle(); RunOldMultiple(); } } public static void RunOldMultiple() { List results = new List(100); for (int i = 0; i < 100; i++) { results.Add(TestOld()); } Console.WriteLine("Old Multiple Producers - Avg:{0}ms, Max:{1}ms, Min:{2}ms, Memory:{3}", results.Average(), results.Max(), results.Min(), GC.GetTotalMemory(true)); } public static void RunNewMultiple() { List results = new List(100); for (int i = 0; i < 100; i++) { results.Add(TestNew()); } Console.WriteLine("New Multiple Producers - Avg:{0}ms, Max:{1}ms, Min:{2}ms, Memory:{3}", results.Average(), results.Max(), results.Min(), GC.GetTotalMemory(true)); } public static void RunNewSingle() { List results = new List(100); for (int i = 0; i < 100; i++) { results.Add(TestNewSingleProducer()); } Console.WriteLine("New Single Producer - Avg:{0}ms, Max:{1}ms, Min:{2}ms, Memory:{3}", results.Average(), results.Max(), results.Min(), GC.GetTotalMemory(true)); } public static void RunOldSingle() { List results = new List(100); for (int i = 0; i < 100; i++) { results.Add(TestOldSingleProducer()); } Console.WriteLine("Old Single Producer - Avg:{0}ms, Max:{1}ms, Min:{2}ms, Memory:{3}", results.Average(), results.Max(), results.Min(), GC.GetTotalMemory(true)); } public static long TestNew() { Stopwatch sw = Stopwatch.StartNew(); WorkPool wp = new WorkPool(); wp.Start(); var t1 = Task.Run(() => { for (int i = 0; i < 100000; i++) { wp.Enqueue(new Work()); } }); var t2 = Task.Run(() => { for (int i = 0; i < 100000; i++) { wp.Enqueue(new Work()); } }); var t3 = Task.Run(() => { for (int i = 0; i < 100000; i++) { wp.Enqueue(new Work()); } }); var t4 = Task.Run(() => { for (int i = 0; i < 100000; i++) { wp.Enqueue(new Work()); } }); var t5 = Task.Run(() => { for (int i = 0; i < 100000; i++) { wp.Enqueue(new Work()); } }); Task.WaitAll(t1, t2, t3, t4, t5); wp.Stop(); return sw.ElapsedMilliseconds; } public static long TestOld() { Stopwatch sw = Stopwatch.StartNew(); WorkPoolOld wp = new WorkPoolOld(); wp.Start(); var t1 = Task.Run(() => { for (int i = 0; i < 100000; i++) { wp.Enqueue(new Work()); } }); var t2 = Task.Run(() => { for (int i = 0; i < 100000; i++) { wp.Enqueue(new Work()); } }); var t3 = Task.Run(() => { for (int i = 0; i < 100000; i++) { wp.Enqueue(new Work()); } }); var t4 = Task.Run(() => { for (int i = 0; i < 100000; i++) { wp.Enqueue(new Work()); } }); var t5 = Task.Run(() => { for (int i = 0; i < 100000; i++) { wp.Enqueue(new Work()); } }); Task.WaitAll(t1, t2, t3, t4, t5); wp.Stop(); return sw.ElapsedMilliseconds; } public static long TestNewSingleProducer() { Stopwatch sw = Stopwatch.StartNew(); WorkPool wp = new WorkPool(); wp.Start(); var t1 = Task.Run(() => { for (int i = 0; i < 100000; i++) { wp.Enqueue(new Work()); } }); t1.Wait(); wp.Stop(); return sw.ElapsedMilliseconds; } public static long TestOldSingleProducer() { Stopwatch sw = Stopwatch.StartNew(); WorkPoolOld wp = new WorkPoolOld(); wp.Start(); var t1 = Task.Run(() => { for (int i = 0; i < 100000; i++) { wp.Enqueue(new Work()); } }); t1.Wait(); wp.Stop(); return sw.ElapsedMilliseconds; } class WorkPool { readonly BlockingCollection workQueue; readonly CancellationTokenSource tokenSource; private Task task; public WorkPool() { workQueue = new BlockingCollection(); tokenSource = new CancellationTokenSource(); } public void Start() { task = Task.Run(Loop, CancellationToken.None); } public void Enqueue(Work work) { workQueue.Add(work); } async Task Loop() { foreach (var work in workQueue.GetConsumingEnumerable(tokenSource.Token)) { await work.Execute().ConfigureAwait(false); } } public Task Stop() { tokenSource.Cancel(); return task; } } class WorkPoolOld { readonly ConcurrentQueue workQueue; readonly TimeSpan waitTime; readonly CancellationTokenSource tokenSource; TaskCompletionSource messageArrived; Task task; public WorkPoolOld() { workQueue = new ConcurrentQueue(); messageArrived = new TaskCompletionSource(); waitTime = TimeSpan.FromMilliseconds(100); tokenSource = new CancellationTokenSource(); } public void Start() { task = Task.Run(Loop, CancellationToken.None); } public void Enqueue(Work work) { workQueue.Enqueue(work); messageArrived.TrySetResult(true); } async Task Loop() { while (tokenSource.IsCancellationRequested == false) { Work work; while (workQueue.TryDequeue(out work)) { await work.Execute().ConfigureAwait(false); //if (tokenSource.IsCancellationRequested) break; // This is missing from their code. } await Task.WhenAny(Task.Delay(waitTime, tokenSource.Token), messageArrived.Task).ConfigureAwait(false); messageArrived.TrySetResult(true); messageArrived = new TaskCompletionSource(); } } public Task Stop() { tokenSource.Cancel(); return task; } } class Work { public Work() { } public async Task Execute() { try { //async operation await Task.Yield(); } catch (Exception) { // intentionally caught } } } } }