diff --git a/AWS.Messaging.sln b/AWS.Messaging.sln index 2ffc190..16c7ffa 100644 --- a/AWS.Messaging.sln +++ b/AWS.Messaging.sln @@ -31,6 +31,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AWS.Messaging.Lambda", "src EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "LambdaMessaging", "sampleapps\LambdaMessaging\LambdaMessaging.csproj", "{F74A4CF0-D814-426E-8149-46758E86AFE3}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AWS.Messaging.Benchmarks", "test\AWS.Messaging.Benchmarks\AWS.Messaging.Benchmarks.csproj", "{143DC3E0-A1C6-4670-86F4-E7CD4C8F52CB}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -69,6 +71,10 @@ Global {F74A4CF0-D814-426E-8149-46758E86AFE3}.Debug|Any CPU.Build.0 = Debug|Any CPU {F74A4CF0-D814-426E-8149-46758E86AFE3}.Release|Any CPU.ActiveCfg = Release|Any CPU {F74A4CF0-D814-426E-8149-46758E86AFE3}.Release|Any CPU.Build.0 = Release|Any CPU + {143DC3E0-A1C6-4670-86F4-E7CD4C8F52CB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {143DC3E0-A1C6-4670-86F4-E7CD4C8F52CB}.Debug|Any CPU.Build.0 = Debug|Any CPU + {143DC3E0-A1C6-4670-86F4-E7CD4C8F52CB}.Release|Any CPU.ActiveCfg = Release|Any CPU + {143DC3E0-A1C6-4670-86F4-E7CD4C8F52CB}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -82,6 +88,7 @@ Global {A174942B-AF9C-4935-AD7B-AF651BACE63C} = {80DB2C77-6ADD-4A60-B27D-763BDF9659D3} {24FA3671-8C2B-4B64-865C-68FB6237E34D} = {2D0A561B-0B97-4259-8603-3AF5437BB652} {F74A4CF0-D814-426E-8149-46758E86AFE3} = {1AA8985B-897C-4BD5-9735-FD8B33FEBFFB} + {143DC3E0-A1C6-4670-86F4-E7CD4C8F52CB} = {80DB2C77-6ADD-4A60-B27D-763BDF9659D3} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {7B2B759D-6455-4089-8173-3F1619567B36} diff --git a/test/AWS.Messaging.Benchmarks/AWS.Messaging.Benchmarks.csproj b/test/AWS.Messaging.Benchmarks/AWS.Messaging.Benchmarks.csproj new file mode 100644 index 0000000..30f509e --- /dev/null +++ b/test/AWS.Messaging.Benchmarks/AWS.Messaging.Benchmarks.csproj @@ -0,0 +1,20 @@ + + + + Exe + net6.0 + enable + enable + + + + + + + + + + + + + diff --git a/test/AWS.Messaging.Benchmarks/BenchmarkCollector.cs b/test/AWS.Messaging.Benchmarks/BenchmarkCollector.cs new file mode 100644 index 0000000..66ec25f --- /dev/null +++ b/test/AWS.Messaging.Benchmarks/BenchmarkCollector.cs @@ -0,0 +1,87 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +using System.Collections.Concurrent; +using System.Diagnostics; + +namespace AWS.Messaging.Benchmarks; + +/// +/// Aggregates the data for each message that is published and received during a benchmark run +/// +/// +/// Inspired by a similar technique in https://github.com/MassTransit/MassTransit-Benchmark/ and +/// https://github.com/justeattakeaway/JustSaying/tree/main/tests/JustSaying.Benchmark +/// +public interface IBenchmarkCollector +{ + /// + /// Records the publishing of a single message + /// + /// How long the message took to publish + void RecordMessagePublish(TimeSpan publishDuration); + + /// + /// Records the handling of a single message + /// + /// Received message + void RecordMessageReception(BenchmarkMessage message); + + /// + /// Task that completes when the expected number of messages have been handled + /// + Task HandlingCompleted { get; } + + /// + /// Publish times for all messages, in milliseconds + /// + List PublishTimes { get; } + + /// + /// Handling times for all messages, in milliseconds + /// + List ReceptionTimes { get; } +} + +public class BenchmarkCollector : IBenchmarkCollector +{ + private readonly ConcurrentBag _publishDurations = new(); + private readonly ConcurrentBag _receiveDurations = new(); + private readonly int _expectedNumberOfMessages; + readonly TaskCompletionSource _receivingCompleted; + readonly Stopwatch _stopwatch; + + public BenchmarkCollector(int expectedNumberOfMessages) + { + _expectedNumberOfMessages = expectedNumberOfMessages; + _receivingCompleted = new TaskCompletionSource(); + + _stopwatch = Stopwatch.StartNew(); + } + + /// + public void RecordMessagePublish(TimeSpan publishDuration) + { + _publishDurations.Add(publishDuration); + } + + /// + public void RecordMessageReception(BenchmarkMessage message) + { + _receiveDurations.Add(DateTime.UtcNow - message.SentTime); + + if (_receiveDurations.Count == _expectedNumberOfMessages) + { + _receivingCompleted.TrySetResult(_stopwatch.Elapsed); + } + } + + /// + public Task HandlingCompleted => _receivingCompleted.Task; + + /// + public List PublishTimes => _publishDurations.Select(x => x.TotalMilliseconds).ToList(); + + /// + public List ReceptionTimes => _receiveDurations.Select(x => x.TotalMilliseconds).ToList(); +} diff --git a/test/AWS.Messaging.Benchmarks/BenchmarkMessage.cs b/test/AWS.Messaging.Benchmarks/BenchmarkMessage.cs new file mode 100644 index 0000000..722716a --- /dev/null +++ b/test/AWS.Messaging.Benchmarks/BenchmarkMessage.cs @@ -0,0 +1,9 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +namespace AWS.Messaging.Benchmarks; + +public class BenchmarkMessage +{ + public DateTime SentTime { get; set; } +} diff --git a/test/AWS.Messaging.Benchmarks/BenchmarkMessageHandler.cs b/test/AWS.Messaging.Benchmarks/BenchmarkMessageHandler.cs new file mode 100644 index 0000000..1b2b644 --- /dev/null +++ b/test/AWS.Messaging.Benchmarks/BenchmarkMessageHandler.cs @@ -0,0 +1,21 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +namespace AWS.Messaging.Benchmarks; + +public class BenchmarkMessageHandler : IMessageHandler +{ + private readonly IBenchmarkCollector _benchmarkCollector; + + public BenchmarkMessageHandler(IBenchmarkCollector benchmarkCollector) + { + _benchmarkCollector = benchmarkCollector; + } + + public Task HandleAsync(MessageEnvelope messageEnvelope, CancellationToken token = default) + { + _benchmarkCollector.RecordMessageReception(messageEnvelope.Message); + + return Task.FromResult(MessageProcessStatus.Success()); + } +} diff --git a/test/AWS.Messaging.Benchmarks/Program.cs b/test/AWS.Messaging.Benchmarks/Program.cs new file mode 100644 index 0000000..b3fb3ab --- /dev/null +++ b/test/AWS.Messaging.Benchmarks/Program.cs @@ -0,0 +1,183 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +using System.CommandLine; +using System.Diagnostics; +using Amazon.SQS; +using AWS.Messaging.Publishers.SQS; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Perfolizer.Mathematics.Histograms; +using Perfolizer.Mathematics.QuantileEstimators; + +namespace AWS.Messaging.Benchmarks; + +internal class Program +{ + static async Task Main(string[] args) + { + var queueOption = new Option( + name: "--queueUrl", + description: "SQS queue URL. NOTE: it will be purged prior to executing the test."); + + var numMessagesOption = new Option( + name: "--numMessages", + description: "Number of messages to send.", + getDefaultValue: () => 1000); + + var publishConcurrencyOption = new Option( + name: "--publishConcurrency", + description: "Maximum number of concurrent publishing tasks to run.", + getDefaultValue: () => 10); + + var handlerConcurrencyOption = new Option( + name: "--handerConcurrency", + description: "Maximum number of messages to handle concurrently.", + getDefaultValue: () => 10); + + var publishBeforePollingOption = new Option( + name: "--publishBeforePolling", + description: "Whether all messages should be published prior to starting the poller.", + getDefaultValue: () => false); + + var rootCommand = new RootCommand("Sample app for System.CommandLine"); + rootCommand.AddOption(queueOption); + rootCommand.AddOption(numMessagesOption); + rootCommand.AddOption(publishConcurrencyOption); + rootCommand.AddOption(handlerConcurrencyOption); + rootCommand.AddOption(publishBeforePollingOption); + + rootCommand.SetHandler(async (queueUrl, numberOfMessages, publishConcurrency, handlerConcurrency, publishBeforePolling) => + { + await RunBenchmark(queueUrl, numberOfMessages, publishConcurrency, handlerConcurrency, publishBeforePolling); + }, queueOption, numMessagesOption, publishConcurrencyOption, handlerConcurrencyOption, publishBeforePollingOption); + + return await rootCommand.InvokeAsync(args); + } + + /// + /// Executes a single benchmark run and prints the results + /// + /// SQS queue URL + /// Number of messages to send + /// Maximum number of concurrent publishing tasks to run + /// Maximum number of messages to handle concurrently + public static async Task RunBenchmark(string queueUrl, int numberOfMessages, int publishConcurrency, int handlerConcurrency, bool publishBeforePolling) + { + ArgumentNullException.ThrowIfNull(queueUrl); + + // Purge the queue before starting + var client = new AmazonSQSClient(); + await client.PurgeQueueAsync(queueUrl); + + var benchmarkCollector = new BenchmarkCollector(numberOfMessages); + + var host = Host.CreateDefaultBuilder() + .ConfigureLogging(logging => + { + logging.AddConsole(); + logging.SetMinimumLevel(LogLevel.Error); + }) + .ConfigureServices((context, services) => + { + services.AddSingleton(benchmarkCollector); + services.AddAWSMessageBus(builder => + { + builder.AddSQSPublisher(queueUrl); + builder.AddMessageHandler(); + builder.AddSQSPoller(queueUrl, options => + { + options.MaxNumberOfConcurrentMessages = handlerConcurrency; + }); + }); + }).Build(); + + Console.WriteLine("Running single poller test with: "); + Console.WriteLine($" Queue URL: {queueUrl}"); + Console.WriteLine($" Number of messages: {numberOfMessages}"); + Console.WriteLine($" Publish concurrency: {publishConcurrency}"); + Console.WriteLine($" Handler concurrency: {handlerConcurrency}"); + Console.WriteLine(publishBeforePolling ? + $" All messages will be published before starting the poller." : + $" The poller will be started before publishing, so messages are handled as they are published."); + + var publisher = host.Services.GetRequiredService(); + var cts = new CancellationTokenSource(); + TimeSpan publishElapsedTime; + TimeSpan handlingElapsedTime; + + if (publishBeforePolling) // await the publishing of all messages, then start the poller + { + publishElapsedTime = await PublishMessages(publisher, benchmarkCollector, numberOfMessages, publishConcurrency); + _ = host.StartAsync(cts.Token); + } + else // Start the poller first, then publish + { + _ = host.StartAsync(cts.Token); + publishElapsedTime = await PublishMessages(publisher, benchmarkCollector, numberOfMessages, publishConcurrency); + } + + // This will complete once the the exected number of messages have been handled + handlingElapsedTime = await benchmarkCollector.HandlingCompleted; + cts.Cancel(); // then stop the poller + + // Print the results + DisplayData(benchmarkCollector.PublishTimes, publishElapsedTime, numberOfMessages, "Publishing"); + DisplayData(benchmarkCollector.ReceptionTimes, handlingElapsedTime, numberOfMessages, "Receiving"); + } + + /// + /// Publishes the specified number of messages + /// + /// SQS publisher + /// + /// Number of messages to publish + /// Number of concurrent publishing tasks + /// Total elapsed time to send all messages + private static async Task PublishMessages(ISQSPublisher publisher, IBenchmarkCollector benchmarkCollector, int messageCount, int maxDegreeOfParallelism) + { + var options = new ParallelOptions() + { + MaxDegreeOfParallelism = maxDegreeOfParallelism + }; + + var stopwatch = new Stopwatch(); + stopwatch.Start(); + + await Parallel.ForEachAsync(Enumerable.Range(0, messageCount), options, async (messageNumber, token) => + { + var start = stopwatch.Elapsed; + await publisher.PublishAsync(new BenchmarkMessage { SentTime = DateTime.UtcNow }, null, token); + var publishDuration = stopwatch.Elapsed - start; + + benchmarkCollector.RecordMessagePublish(publishDuration); + }); + + stopwatch.Stop(); + return stopwatch.Elapsed; + } + + /// + /// Prints the data for a "phase" of the benchmark + /// + /// Times for each message, in milliseconds + /// Total elapsed time for this phase + /// Total number of messages + /// Header for the section (such as "Publishing") + private static void DisplayData(List messageTimes, TimeSpan totalElapsedTime, int numberOfMessages, string header) + { + var quartiles = Quartiles.Create(messageTimes); + + Console.WriteLine($"{header}: "); + Console.WriteLine($" Total time: {totalElapsedTime.ToString("mm':'ss':'fff")}"); + Console.WriteLine($" Rate: {Math.Round(numberOfMessages / totalElapsedTime.TotalSeconds, 2)} msgs/second"); + Console.WriteLine($" Min: {Math.Round(quartiles.Min, 2)} ms"); + Console.WriteLine($" P25: {Math.Round(quartiles.Q1, 2)} ms"); + Console.WriteLine($" P50: {Math.Round(quartiles.Q2, 2)} ms"); + Console.WriteLine($" P75: {Math.Round(quartiles.Q3, 2)} ms"); + Console.WriteLine($" P99: {Math.Round(SimpleQuantileEstimator.Instance.Quantile(new Perfolizer.Common.Sample(messageTimes), 0.99), 2)} ms"); + Console.WriteLine($" Max: {Math.Round(quartiles.Max, 2)} ms"); + Console.WriteLine(HistogramBuilder.Simple.Build(messageTimes).ToString()); + } +} diff --git a/test/AWS.Messaging.Benchmarks/Properties/launchSettings.json b/test/AWS.Messaging.Benchmarks/Properties/launchSettings.json new file mode 100644 index 0000000..7c041f9 --- /dev/null +++ b/test/AWS.Messaging.Benchmarks/Properties/launchSettings.json @@ -0,0 +1,8 @@ +{ + "profiles": { + "Default Benchmark": { + "commandName": "Project", + "commandLineArgs": "--queueUrl https://sqs.us-east-1.amazonaws.com/012345678910/benchmark-queue" + } + } +}