using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace PerfBenchWorkPool { class Program { static void Main(string[] args) { if (args.Contains("old") && args.Contains("multiple")) { RunOldMultiple(); } else if (args.Contains("old") && args.Contains("single")) { RunOldSingle(); } else if (args.Contains("new") && args.Contains("multiple")) { RunNewMultiple(); } else if (args.Contains("new") && args.Contains("single")) { RunNewSingle(); } else if (args.Contains("old")) { RunOldSingle(); RunOldMultiple(); } else if (args.Contains("new")) { RunNewSingle(); RunNewMultiple(); } else { RunNewSingle(); RunNewMultiple(); RunOldSingle(); RunOldMultiple(); } } public static void RunOldMultiple() { List results = new List(100); for (int i = 0; i < 100; i++) { results.Add(TestOld()); } Console.WriteLine("Old Multiple - Avg:{0}ms, Max:{1}ms, Min:{2}ms, Memory:{3}, Processed:{4}", results.Select(_ => _.Milliseconds).Average(), results.Select(_ => _.Milliseconds).Max(), results.Select(_ => _.Milliseconds).Min(), GC.GetTotalMemory(true), results.Select(_ => _.Processed).Sum()); } public static void RunNewMultiple() { List results = new List(100); for (int i = 0; i < 100; i++) { results.Add(TestNew()); } Console.WriteLine("New Multiple - Avg:{0}ms, Max:{1}ms, Min:{2}ms, Memory:{3}, Processed:{4}", results.Select(_ => _.Milliseconds).Average(), results.Select(_ => _.Milliseconds).Max(), results.Select(_ => _.Milliseconds).Min(), GC.GetTotalMemory(true), results.Select(_ => _.Processed).Sum()); } public static void RunNewSingle() { List results = new List(100); for (int i = 0; i < 100; i++) { results.Add(TestNewSingleProducer()); } Console.WriteLine("New Single - Avg:{0}ms, Max:{1}ms, Min:{2}ms, Memory:{3}, Processed:{4}", results.Select(_ => _.Milliseconds).Average(), results.Select(_ => _.Milliseconds).Max(), results.Select(_ => _.Milliseconds).Min(), GC.GetTotalMemory(true), results.Select(_ => _.Processed).Sum()); } public static void RunOldSingle() { List results = new List(100); for (int i = 0; i < 100; i++) { results.Add(TestOldSingleProducer()); } Console.WriteLine("Old Single Producer - Avg:{0}ms, Max:{1}ms, Min:{2}ms, Memory:{3}, Processed:{4}", results.Select(_=>_.Milliseconds).Average(), results.Select(_ => _.Milliseconds).Max(), results.Select(_ => _.Milliseconds).Min(), GC.GetTotalMemory(true), results.Select(_=> _.Processed).Sum()); } public static Result TestNew() { Stopwatch sw = Stopwatch.StartNew(); WorkPool wp = new WorkPool(); wp.Start(); var t1 = Task.Run(async () => { for (int i = 0; i < 100000; i++) { wp.Enqueue(new Work()); await Task.Yield(); } }); var t2 = Task.Run(async () => { for (int i = 0; i < 100000; i++) { wp.Enqueue(new Work()); await Task.Yield(); } }); var t3 = Task.Run(async () => { for (int i = 0; i < 100000; i++) { wp.Enqueue(new Work()); await Task.Yield(); } }); var t4 = Task.Run(async () => { for (int i = 0; i < 100000; i++) { wp.Enqueue(new Work()); await Task.Yield(); } }); var t5 = Task.Run(async () => { for (int i = 0; i < 100000; i++) { wp.Enqueue(new Work()); await Task.Yield(); } }); Task.WaitAll(t1, t2, t3, t4, t5); Task.Delay(5000); wp.Stop(); return new Result { Milliseconds = sw.ElapsedMilliseconds, Processed = wp.i }; } public static Result TestOld() { Stopwatch sw = Stopwatch.StartNew(); WorkPoolOld wp = new WorkPoolOld(); wp.Start(); var t1 = Task.Run(async () => { for (int i = 0; i < 100000; i++) { wp.Enqueue(new Work()); await Task.Yield(); } }); var t2 = Task.Run(async () => { for (int i = 0; i < 100000; i++) { wp.Enqueue(new Work()); await Task.Yield(); } }); var t3 = Task.Run(async () => { for (int i = 0; i < 100000; i++) { wp.Enqueue(new Work()); await Task.Yield(); } }); var t4 = Task.Run(async () => { for (int i = 0; i < 100000; i++) { wp.Enqueue(new Work()); await Task.Yield(); } }); var t5 = Task.Run(async () => { for (int i = 0; i < 100000; i++) { wp.Enqueue(new Work()); await Task.Yield(); } }); Task.WaitAll(t1, t2, t3, t4, t5); Task.Delay(5000); wp.Stop(); return new Result { Milliseconds = sw.ElapsedMilliseconds, Processed = wp.i }; } public static Result TestNewSingleProducer() { Stopwatch sw = Stopwatch.StartNew(); WorkPool wp = new WorkPool(); wp.Start(); var t1 = Task.Run(async () => { for (int i = 0; i < 100000; i++) { wp.Enqueue(new Work()); await Task.Yield(); } }); t1.Wait(); Task.Delay(5000); wp.Stop(); return new Result { Milliseconds = sw.ElapsedMilliseconds, Processed = wp.i }; } public static Result TestOldSingleProducer() { Stopwatch sw = Stopwatch.StartNew(); WorkPoolOld wp = new WorkPoolOld(); wp.Start(); var t1 = Task.Run(async () => { for (int i = 0; i < 100000; i++) { wp.Enqueue(new Work()); await Task.Yield(); } }); t1.Wait(); Task.Delay(5000); wp.Stop(); return new Result { Milliseconds = sw.ElapsedMilliseconds, Processed = wp.i }; } class WorkPool { readonly BlockingCollection workQueue; readonly CancellationTokenSource tokenSource; private Task task; public int i; public WorkPool() { workQueue = new BlockingCollection(); tokenSource = new CancellationTokenSource(); } public void Start() { task = Task.Run(()=>Loop(), tokenSource.Token); } public void Enqueue(Work work) { workQueue.Add(work); } void Loop() { foreach (var work in workQueue.GetConsumingEnumerable(tokenSource.Token)) { work.Execute().ConfigureAwait(false); Interlocked.Increment(ref i); } } public Task Stop() { tokenSource.Cancel(); return task; } } class WorkPoolOld { readonly ConcurrentQueue workQueue; readonly TimeSpan waitTime; readonly CancellationTokenSource tokenSource; TaskCompletionSource messageArrived; Task task; public int i = 0; public WorkPoolOld() { workQueue = new ConcurrentQueue(); messageArrived = new TaskCompletionSource(); waitTime = TimeSpan.FromMilliseconds(100); tokenSource = new CancellationTokenSource(); } public void Start() { task = Task.Run(Loop, tokenSource.Token); } public void Enqueue(Work work) { workQueue.Enqueue(work); messageArrived.TrySetResult(true); } async Task Loop() { while (!tokenSource.IsCancellationRequested) { Work work; while (workQueue.TryDequeue(out work)) { await work.Execute().ConfigureAwait(false); Interlocked.Increment(ref i); if (tokenSource.IsCancellationRequested) break;// This is missing from their code. } await Task.WhenAny(Task.Delay(waitTime, tokenSource.Token), messageArrived.Task).ConfigureAwait(false); messageArrived.TrySetResult(true); messageArrived = new TaskCompletionSource(); } } public Task Stop() { tokenSource.Cancel(); return task; } } class Work { public Work() { } public async Task Execute() { try { //async operation System.Diagnostics.Trace.WriteLine("Executing"); await Task.Yield(); } catch (Exception) { // intentionally caught } } } } public class Result { public int Processed; public long Milliseconds; } }