Skip to content

Commit

Permalink
test: Add initial benchmark application
Browse files Browse the repository at this point in the history
  • Loading branch information
ashovlin committed Oct 17, 2023
1 parent 0d7033d commit cd45667
Show file tree
Hide file tree
Showing 7 changed files with 335 additions and 0 deletions.
7 changes: 7 additions & 0 deletions AWS.Messaging.sln
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AWS.Messaging.Lambda", "src
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "LambdaMessaging", "sampleapps\LambdaMessaging\LambdaMessaging.csproj", "{F74A4CF0-D814-426E-8149-46758E86AFE3}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AWS.Messaging.Benchmarks", "test\AWS.Messaging.Benchmarks\AWS.Messaging.Benchmarks.csproj", "{143DC3E0-A1C6-4670-86F4-E7CD4C8F52CB}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -69,6 +71,10 @@ Global
{F74A4CF0-D814-426E-8149-46758E86AFE3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F74A4CF0-D814-426E-8149-46758E86AFE3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F74A4CF0-D814-426E-8149-46758E86AFE3}.Release|Any CPU.Build.0 = Release|Any CPU
{143DC3E0-A1C6-4670-86F4-E7CD4C8F52CB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{143DC3E0-A1C6-4670-86F4-E7CD4C8F52CB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{143DC3E0-A1C6-4670-86F4-E7CD4C8F52CB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{143DC3E0-A1C6-4670-86F4-E7CD4C8F52CB}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -82,6 +88,7 @@ Global
{A174942B-AF9C-4935-AD7B-AF651BACE63C} = {80DB2C77-6ADD-4A60-B27D-763BDF9659D3}
{24FA3671-8C2B-4B64-865C-68FB6237E34D} = {2D0A561B-0B97-4259-8603-3AF5437BB652}
{F74A4CF0-D814-426E-8149-46758E86AFE3} = {1AA8985B-897C-4BD5-9735-FD8B33FEBFFB}
{143DC3E0-A1C6-4670-86F4-E7CD4C8F52CB} = {80DB2C77-6ADD-4A60-B27D-763BDF9659D3}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {7B2B759D-6455-4089-8173-3F1619567B36}
Expand Down
20 changes: 20 additions & 0 deletions test/AWS.Messaging.Benchmarks/AWS.Messaging.Benchmarks.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" />
<PackageReference Include="Perfolizer" Version="0.3.5" />
<PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\AWS.Messaging\AWS.Messaging.csproj" />
</ItemGroup>

</Project>
87 changes: 87 additions & 0 deletions test/AWS.Messaging.Benchmarks/BenchmarkCollector.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

using System.Collections.Concurrent;
using System.Diagnostics;

namespace AWS.Messaging.Benchmarks;

/// <summary>
/// Aggregates the data for each message that is published and received during a benchmark run
/// </summary>
/// <remarks>
/// Inspired by a similar technique in https://github.com/MassTransit/MassTransit-Benchmark/ and
/// https://github.com/justeattakeaway/JustSaying/tree/main/tests/JustSaying.Benchmark
/// </remarks>
public interface IBenchmarkCollector
{
/// <summary>
/// Records the publishing of a single message
/// </summary>
/// <param name="publishDuration">How long the message took to publish</param>
void RecordMessagePublish(TimeSpan publishDuration);

/// <summary>
/// Records the handling of a single message
/// </summary>
/// <param name="message">Received message</param>
void RecordMessageReception(BenchmarkMessage message);

/// <summary>
/// Task that completes when the expected number of messages have been handled
/// </summary>
Task<TimeSpan> HandlingCompleted { get; }

/// <summary>
/// Publish times for all messages, in milliseconds
/// </summary>
List<double> PublishTimes { get; }

/// <summary>
/// Handling times for all messages, in milliseconds
/// </summary>
List<double> ReceptionTimes { get; }
}

public class BenchmarkCollector : IBenchmarkCollector
{
private readonly ConcurrentBag<TimeSpan> _publishDurations = new();
private readonly ConcurrentBag<TimeSpan> _receiveDurations = new();
private readonly int _expectedNumberOfMessages;
readonly TaskCompletionSource<TimeSpan> _receivingCompleted;
readonly Stopwatch _stopwatch;

public BenchmarkCollector(int expectedNumberOfMessages)
{
_expectedNumberOfMessages = expectedNumberOfMessages;
_receivingCompleted = new TaskCompletionSource<TimeSpan>();

_stopwatch = Stopwatch.StartNew();
}

/// <inheritdoc/>
public void RecordMessagePublish(TimeSpan publishDuration)
{
_publishDurations.Add(publishDuration);
}

/// <inheritdoc/>
public void RecordMessageReception(BenchmarkMessage message)
{
_receiveDurations.Add(DateTime.UtcNow - message.SentTime);

if (_receiveDurations.Count == _expectedNumberOfMessages)
{
_receivingCompleted.TrySetResult(_stopwatch.Elapsed);
}
}

/// <inheritdoc/>
public Task<TimeSpan> HandlingCompleted => _receivingCompleted.Task;

/// <inheritdoc/>
public List<double> PublishTimes => _publishDurations.Select(x => x.TotalMilliseconds).ToList();

/// <inheritdoc/>
public List<double> ReceptionTimes => _receiveDurations.Select(x => x.TotalMilliseconds).ToList();
}
9 changes: 9 additions & 0 deletions test/AWS.Messaging.Benchmarks/BenchmarkMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

namespace AWS.Messaging.Benchmarks;

public class BenchmarkMessage
{
public DateTime SentTime { get; set; }
}
21 changes: 21 additions & 0 deletions test/AWS.Messaging.Benchmarks/BenchmarkMessageHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

namespace AWS.Messaging.Benchmarks;

public class BenchmarkMessageHandler : IMessageHandler<BenchmarkMessage>
{
private readonly IBenchmarkCollector _benchmarkCollector;

public BenchmarkMessageHandler(IBenchmarkCollector benchmarkCollector)
{
_benchmarkCollector = benchmarkCollector;
}

public Task<MessageProcessStatus> HandleAsync(MessageEnvelope<BenchmarkMessage> messageEnvelope, CancellationToken token = default)
{
_benchmarkCollector.RecordMessageReception(messageEnvelope.Message);

return Task.FromResult(MessageProcessStatus.Success());
}
}
183 changes: 183 additions & 0 deletions test/AWS.Messaging.Benchmarks/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

using System.CommandLine;
using System.Diagnostics;
using Amazon.SQS;
using AWS.Messaging.Publishers.SQS;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Perfolizer.Mathematics.Histograms;
using Perfolizer.Mathematics.QuantileEstimators;

namespace AWS.Messaging.Benchmarks;

internal class Program
{
static async Task<int> Main(string[] args)
{
var queueOption = new Option<string>(
name: "--queueUrl",
description: "SQS queue URL. NOTE: it will be purged prior to executing the test.");

var numMessagesOption = new Option<int>(
name: "--numMessages",
description: "Number of messages to send.",
getDefaultValue: () => 1000);

var publishConcurrencyOption = new Option<int>(
name: "--publishConcurrency",
description: "Maximum number of concurrent publishing tasks to run.",
getDefaultValue: () => 10);

var handlerConcurrencyOption = new Option<int>(
name: "--handerConcurrency",
description: "Maximum number of messages to handle concurrently.",
getDefaultValue: () => 10);

var publishBeforePollingOption = new Option<bool>(
name: "--publishBeforePolling",
description: "Whether all messages should be published prior to starting the poller.",
getDefaultValue: () => false);

var rootCommand = new RootCommand("Sample app for System.CommandLine");
rootCommand.AddOption(queueOption);
rootCommand.AddOption(numMessagesOption);
rootCommand.AddOption(publishConcurrencyOption);
rootCommand.AddOption(handlerConcurrencyOption);
rootCommand.AddOption(publishBeforePollingOption);

rootCommand.SetHandler(async (queueUrl, numberOfMessages, publishConcurrency, handlerConcurrency, publishBeforePolling) =>
{
await RunBenchmark(queueUrl, numberOfMessages, publishConcurrency, handlerConcurrency, publishBeforePolling);
}, queueOption, numMessagesOption, publishConcurrencyOption, handlerConcurrencyOption, publishBeforePollingOption);

return await rootCommand.InvokeAsync(args);
}

/// <summary>
/// Executes a single benchmark run and prints the results
/// </summary>
/// <param name="queueUrl">SQS queue URL</param>
/// <param name="numberOfMessages">Number of messages to send</param>
/// <param name="publishConcurrency">Maximum number of concurrent publishing tasks to run</param>
/// <param name="handlerConcurrency">Maximum number of messages to handle concurrently</param>
public static async Task RunBenchmark(string queueUrl, int numberOfMessages, int publishConcurrency, int handlerConcurrency, bool publishBeforePolling)
{
ArgumentNullException.ThrowIfNull(queueUrl);

// Purge the queue before starting
var client = new AmazonSQSClient();
await client.PurgeQueueAsync(queueUrl);

var benchmarkCollector = new BenchmarkCollector(numberOfMessages);

var host = Host.CreateDefaultBuilder()
.ConfigureLogging(logging =>
{
logging.AddConsole();
logging.SetMinimumLevel(LogLevel.Error);
})
.ConfigureServices((context, services) =>
{
services.AddSingleton<IBenchmarkCollector>(benchmarkCollector);
services.AddAWSMessageBus(builder =>
{
builder.AddSQSPublisher<BenchmarkMessage>(queueUrl);
builder.AddMessageHandler<BenchmarkMessageHandler, BenchmarkMessage>();
builder.AddSQSPoller(queueUrl, options =>
{
options.MaxNumberOfConcurrentMessages = handlerConcurrency;
});
});
}).Build();

Console.WriteLine("Running single poller test with: ");
Console.WriteLine($" Queue URL: {queueUrl}");
Console.WriteLine($" Number of messages: {numberOfMessages}");
Console.WriteLine($" Publish concurrency: {publishConcurrency}");
Console.WriteLine($" Handler concurrency: {handlerConcurrency}");
Console.WriteLine(publishBeforePolling ?
$" All messages will be published before starting the poller." :
$" The poller will be started before publishing, so messages are handled as they are published.");

var publisher = host.Services.GetRequiredService<ISQSPublisher>();
var cts = new CancellationTokenSource();
TimeSpan publishElapsedTime;
TimeSpan handlingElapsedTime;

if (publishBeforePolling) // await the publishing of all messages, then start the poller
{
publishElapsedTime = await PublishMessages(publisher, benchmarkCollector, numberOfMessages, publishConcurrency);
_ = host.StartAsync(cts.Token);
}
else // Start the poller first, then publish
{
_ = host.StartAsync(cts.Token);
publishElapsedTime = await PublishMessages(publisher, benchmarkCollector, numberOfMessages, publishConcurrency);
}

// This will complete once the the exected number of messages have been handled
handlingElapsedTime = await benchmarkCollector.HandlingCompleted;
cts.Cancel(); // then stop the poller

// Print the results
DisplayData(benchmarkCollector.PublishTimes, publishElapsedTime, numberOfMessages, "Publishing");
DisplayData(benchmarkCollector.ReceptionTimes, handlingElapsedTime, numberOfMessages, "Receiving");
}

/// <summary>
/// Publishes the specified number of messages
/// </summary>
/// <param name="publisher">SQS publisher</param>
/// <param name="benchmarkCollector"></param>
/// <param name="messageCount">Number of messages to publish</param>
/// <param name="numberOfThreads">Number of concurrent publishing tasks</param>
/// <returns>Total elapsed time to send all messages</returns>
private static async Task<TimeSpan> PublishMessages(ISQSPublisher publisher, IBenchmarkCollector benchmarkCollector, int messageCount, int maxDegreeOfParallelism)
{
var options = new ParallelOptions()
{
MaxDegreeOfParallelism = maxDegreeOfParallelism
};

var stopwatch = new Stopwatch();
stopwatch.Start();

await Parallel.ForEachAsync(Enumerable.Range(0, messageCount), options, async (messageNumber, token) =>
{
var start = stopwatch.Elapsed;
await publisher.PublishAsync(new BenchmarkMessage { SentTime = DateTime.UtcNow }, null, token);
var publishDuration = stopwatch.Elapsed - start;
benchmarkCollector.RecordMessagePublish(publishDuration);
});

stopwatch.Stop();
return stopwatch.Elapsed;
}

/// <summary>
/// Prints the data for a "phase" of the benchmark
/// </summary>
/// <param name="messageTimes">Times for each message, in milliseconds</param>
/// <param name="totalElapsedTime">Total elapsed time for this phase</param>
/// <param name="numberOfMessages">Total number of messages</param>
/// <param name="header">Header for the section (such as "Publishing")</param>
private static void DisplayData(List<double> messageTimes, TimeSpan totalElapsedTime, int numberOfMessages, string header)
{
var quartiles = Quartiles.Create(messageTimes);

Console.WriteLine($"{header}: ");
Console.WriteLine($" Total time: {totalElapsedTime.ToString("mm':'ss':'fff")}");
Console.WriteLine($" Rate: {Math.Round(numberOfMessages / totalElapsedTime.TotalSeconds, 2)} msgs/second");
Console.WriteLine($" Min: {Math.Round(quartiles.Min, 2)} ms");
Console.WriteLine($" P25: {Math.Round(quartiles.Q1, 2)} ms");
Console.WriteLine($" P50: {Math.Round(quartiles.Q2, 2)} ms");
Console.WriteLine($" P75: {Math.Round(quartiles.Q3, 2)} ms");
Console.WriteLine($" P99: {Math.Round(SimpleQuantileEstimator.Instance.Quantile(new Perfolizer.Common.Sample(messageTimes), 0.99), 2)} ms");
Console.WriteLine($" Max: {Math.Round(quartiles.Max, 2)} ms");
Console.WriteLine(HistogramBuilder.Simple.Build(messageTimes).ToString());
}
}
8 changes: 8 additions & 0 deletions test/AWS.Messaging.Benchmarks/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"profiles": {
"Default Benchmark": {
"commandName": "Project",
"commandLineArgs": "--queueUrl https://sqs.us-east-1.amazonaws.com/012345678910/benchmark-queue"
}
}
}

0 comments on commit cd45667

Please sign in to comment.