diff --git a/src/Testing/BackPressureTests/BackPressureTests.csproj b/src/Testing/BackPressureTests/BackPressureTests.csproj new file mode 100644 index 000000000..c68fd66bd --- /dev/null +++ b/src/Testing/BackPressureTests/BackPressureTests.csproj @@ -0,0 +1,28 @@ + + + + false + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + + + + Servers.cs + + + diff --git a/src/Testing/BackPressureTests/Harness.cs b/src/Testing/BackPressureTests/Harness.cs new file mode 100644 index 000000000..e176cacd4 --- /dev/null +++ b/src/Testing/BackPressureTests/Harness.cs @@ -0,0 +1,160 @@ +using IntegrationTests; +using JasperFx.Core; +using JasperFx.Core.Reflection; +using Marten; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Wolverine; +using Wolverine.Marten; +using Wolverine.RabbitMQ; +using Wolverine.Runtime; +using Wolverine.Tracking; +using Xunit; +using Xunit.Abstractions; + + +namespace BackPressureTests; + +public class MassSender(IHost sender) +{ + private readonly CancellationTokenSource _cancellation = new(); + private Task _task; + + public void Cancel() + { + _cancellation.Cancel(); + } + + public void StartPublishing(int maximum = 5000, TimeSpan? time = null) + { + if (time != null) + { + _cancellation.CancelAfter(time.Value); + } + + var runtime = sender.GetRuntime(); + + _task = Task.Run(async () => + { + for (int i = 0; i < maximum; i++) + { + if (_cancellation.IsCancellationRequested) return; + var bus = new MessageBus(runtime); + await bus.PublishAsync(new Message1(Guid.NewGuid())); + await bus.PublishAsync(new Message2(Guid.NewGuid())); + await bus.PublishAsync(new Message3(Guid.NewGuid())); + await bus.PublishAsync(new Message4(Guid.NewGuid())); + } + }); + } +} + +public class Harness : IAsyncLifetime, IWolverineActivator +{ + private IHost _sender; + private XUnitObserver theObserver; + private IHost _receiver; + public static bool GoSlow { get; set; } = true; + + public Harness(ITestOutputHelper output) + { + theObserver = new XUnitObserver(output); + } + + void IWolverineActivator.Apply(IWolverineRuntime runtime) + { + runtime.Observer = theObserver; + } + + public async Task InitializeAsync() + { + _sender = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseRabbitMq().AutoProvision().AutoPurgeOnStartup(); + opts.Discovery.DisableConventionalDiscovery(); + opts.PublishAllMessages().ToRabbitQueue("bp"); + }).StartAsync(); + + _receiver = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + // for callbacks + opts.Services.AddSingleton(this); + + opts.Services.AddMarten(m => + { + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "bp"; + m.DisableNpgsqlLogging = true; + }).IntegrateWithWolverine(); + + opts.ListenToRabbitQueue("bp").UseDurableInbox(); + opts.Policies.AutoApplyTransactions(); + }).StartAsync(); + + _receiver.GetRuntime().Observer = theObserver; + } + + public async Task DisposeAsync() + { + await _sender.StopAsync(); + await _receiver.StopAsync(); + } + + [Fact] + public async Task lets_see_if_we_can_trip_off_back_pressure_and_see_it_lifted() + { + Harness.GoSlow = true; + var sender = new MassSender(_sender); + sender.StartPublishing(20000); + + await theObserver.Triggered.Task.TimeoutAfterAsync(90000); + + Harness.GoSlow = false; + + sender.Cancel(); + + await theObserver.Lifted.Task.TimeoutAfterAsync(90000); + } +} + +public record Message1(Guid Id); +public record Message2(Guid Id); +public record Message3(Guid Id); +public record Message4(Guid Id); + +public static class MessageHandler +{ + public static async Task HandleAsync(Message1 m) + { + if (Harness.GoSlow) + { + await Task.Delay(Random.Shared.Next(100, 500)); + } + } + + public static async Task HandleAsync(Message2 m) + { + if (Harness.GoSlow) + { + await Task.Delay(Random.Shared.Next(100, 500)); + } + } + + public static async Task HandleAsync(Message3 m) + { + if (Harness.GoSlow) + { + await Task.Delay(Random.Shared.Next(100, 500)); + } + } + + public static async Task HandleAsync(Message4 m) + { + if (Harness.GoSlow) + { + await Task.Delay(Random.Shared.Next(100, 500)); + } + } +} \ No newline at end of file diff --git a/src/Testing/BackPressureTests/XUnitObserver.cs b/src/Testing/BackPressureTests/XUnitObserver.cs new file mode 100644 index 000000000..87175c08f --- /dev/null +++ b/src/Testing/BackPressureTests/XUnitObserver.cs @@ -0,0 +1,84 @@ +using Wolverine.Configuration; +using Wolverine.Runtime.Agents; +using Wolverine.Runtime.Routing; +using Wolverine.Transports; +using Xunit.Abstractions; + +namespace BackPressureTests; + +public class XUnitObserver(ITestOutputHelper Output) : IWolverineObserver +{ + public TaskCompletionSource Triggered { get; set; } = new(); + public TaskCompletionSource Lifted { get; set; } = new(); + + public void Reset() + { + Triggered = new(); + Lifted = new(); + } + + public Task AssumedLeadership() + { + return Task.CompletedTask; + } + + public Task NodeStarted() + { + return Task.CompletedTask; + } + + public Task NodeStopped() + { + return Task.CompletedTask; + } + + public Task AgentStarted(Uri agentUri) + { + return Task.CompletedTask; + } + + public Task AgentStopped(Uri agentUri) + { + return Task.CompletedTask; + } + + public Task AssignmentsChanged(AssignmentGrid grid, AgentCommands commands) + { + return Task.CompletedTask; + } + + public Task StaleNodes(IReadOnlyList staleNodes) + { + return Task.CompletedTask; + } + + public Task RuntimeIsFullyStarted() + { + Output.WriteLine("The WolverineRuntime is fully started"); + return Task.CompletedTask; + } + + public void EndpointAdded(Endpoint endpoint) + { + + } + + public void MessageRouted(Type messageType, IMessageRouter router) + { + } + + public Task BackPressureTriggered(Endpoint endpoint, IListeningAgent agent) + { + Output.WriteLine("Back Pressure was Triggerd!"); + Triggered?.TrySetResult(); + return Task.CompletedTask; + + } + + public Task BackPressureLifted(Endpoint endpoint) + { + Output.WriteLine("Back Pressure was Lifted!"); + Lifted?.TrySetResult(); + return Task.CompletedTask; + } +} \ No newline at end of file diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/BatchedAzureServiceBusListener.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/BatchedAzureServiceBusListener.cs index f89fb3e8d..c142c449a 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/BatchedAzureServiceBusListener.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus/Internal/BatchedAzureServiceBusListener.cs @@ -86,7 +86,7 @@ public ValueTask DisposeAsync() _complete.SafeDispose(); _defer.SafeDispose(); _deadLetter.SafeDispose(); - return _receiver.DisposeAsync(); + return new ValueTask(); } public Uri Address => _endpoint.Uri; diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqListener.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqListener.cs index b7af6a1d7..deaa21b27 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqListener.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqListener.cs @@ -102,7 +102,6 @@ public async ValueTask StopAsync() public override async ValueTask DisposeAsync() { - _receiver.Dispose(); await base.DisposeAsync(); if (_sender.IsValueCreated && _sender.Value is IAsyncDisposable ad) diff --git a/src/Wolverine/Transports/SharedMemory/SharedMemorySubscription.cs b/src/Wolverine/Transports/SharedMemory/SharedMemorySubscription.cs index eba3d8b39..e578b04e7 100644 --- a/src/Wolverine/Transports/SharedMemory/SharedMemorySubscription.cs +++ b/src/Wolverine/Transports/SharedMemory/SharedMemorySubscription.cs @@ -61,9 +61,9 @@ public ValueTask DeferAsync(Envelope envelope) return _receiver.PostAsync(envelope); } - public async ValueTask DisposeAsync() + public ValueTask DisposeAsync() { - await _receiver.DisposeAsync(); + return new ValueTask(); } public Uri Address => Uri; diff --git a/wolverine.sln b/wolverine.sln index 7173cce5a..8cd49aaac 100644 --- a/wolverine.sln +++ b/wolverine.sln @@ -323,9 +323,9 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OtelWebApiWolverineMarten", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wolverine.MySql", "src\Persistence\MySql\Wolverine.MySql\Wolverine.MySql.csproj", "{738DB46A-B1B5-4843-A536-A5918918DEB5}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MySqlTests", "src\Persistence\MySql\MySqlTests\MySqlTests.csproj", "{9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MySqlTests", "src\Persistence\MySql\MySqlTests\MySqlTests.csproj", "{382BD656-89CD-4899-A30F-1589578B639F}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MySqlTests", "src\Persistence\MySql\MySqlTests\MySqlTests.csproj", "{162630BD-6192-4888-BD52-D257C75F3E52}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BackPressureTests", "src\Testing\BackPressureTests\BackPressureTests.csproj", "{02F5459A-A96B-42AB-9E4E-CF6B067238FB}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -1813,30 +1813,30 @@ Global {738DB46A-B1B5-4843-A536-A5918918DEB5}.Release|x64.Build.0 = Release|Any CPU {738DB46A-B1B5-4843-A536-A5918918DEB5}.Release|x86.ActiveCfg = Release|Any CPU {738DB46A-B1B5-4843-A536-A5918918DEB5}.Release|x86.Build.0 = Release|Any CPU - {9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}.Debug|Any CPU.Build.0 = Debug|Any CPU - {9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}.Debug|x64.ActiveCfg = Debug|Any CPU - {9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}.Debug|x64.Build.0 = Debug|Any CPU - {9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}.Debug|x86.ActiveCfg = Debug|Any CPU - {9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}.Debug|x86.Build.0 = Debug|Any CPU - {9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}.Release|Any CPU.ActiveCfg = Release|Any CPU - {9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}.Release|Any CPU.Build.0 = Release|Any CPU - {9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}.Release|x64.ActiveCfg = Release|Any CPU - {9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}.Release|x64.Build.0 = Release|Any CPU - {9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}.Release|x86.ActiveCfg = Release|Any CPU - {9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7}.Release|x86.Build.0 = Release|Any CPU - {162630BD-6192-4888-BD52-D257C75F3E52}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {162630BD-6192-4888-BD52-D257C75F3E52}.Debug|Any CPU.Build.0 = Debug|Any CPU - {162630BD-6192-4888-BD52-D257C75F3E52}.Debug|x64.ActiveCfg = Debug|Any CPU - {162630BD-6192-4888-BD52-D257C75F3E52}.Debug|x64.Build.0 = Debug|Any CPU - {162630BD-6192-4888-BD52-D257C75F3E52}.Debug|x86.ActiveCfg = Debug|Any CPU - {162630BD-6192-4888-BD52-D257C75F3E52}.Debug|x86.Build.0 = Debug|Any CPU - {162630BD-6192-4888-BD52-D257C75F3E52}.Release|Any CPU.ActiveCfg = Release|Any CPU - {162630BD-6192-4888-BD52-D257C75F3E52}.Release|Any CPU.Build.0 = Release|Any CPU - {162630BD-6192-4888-BD52-D257C75F3E52}.Release|x64.ActiveCfg = Release|Any CPU - {162630BD-6192-4888-BD52-D257C75F3E52}.Release|x64.Build.0 = Release|Any CPU - {162630BD-6192-4888-BD52-D257C75F3E52}.Release|x86.ActiveCfg = Release|Any CPU - {162630BD-6192-4888-BD52-D257C75F3E52}.Release|x86.Build.0 = Release|Any CPU + {382BD656-89CD-4899-A30F-1589578B639F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {382BD656-89CD-4899-A30F-1589578B639F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {382BD656-89CD-4899-A30F-1589578B639F}.Debug|x64.ActiveCfg = Debug|Any CPU + {382BD656-89CD-4899-A30F-1589578B639F}.Debug|x64.Build.0 = Debug|Any CPU + {382BD656-89CD-4899-A30F-1589578B639F}.Debug|x86.ActiveCfg = Debug|Any CPU + {382BD656-89CD-4899-A30F-1589578B639F}.Debug|x86.Build.0 = Debug|Any CPU + {382BD656-89CD-4899-A30F-1589578B639F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {382BD656-89CD-4899-A30F-1589578B639F}.Release|Any CPU.Build.0 = Release|Any CPU + {382BD656-89CD-4899-A30F-1589578B639F}.Release|x64.ActiveCfg = Release|Any CPU + {382BD656-89CD-4899-A30F-1589578B639F}.Release|x64.Build.0 = Release|Any CPU + {382BD656-89CD-4899-A30F-1589578B639F}.Release|x86.ActiveCfg = Release|Any CPU + {382BD656-89CD-4899-A30F-1589578B639F}.Release|x86.Build.0 = Release|Any CPU + {02F5459A-A96B-42AB-9E4E-CF6B067238FB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {02F5459A-A96B-42AB-9E4E-CF6B067238FB}.Debug|Any CPU.Build.0 = Debug|Any CPU + {02F5459A-A96B-42AB-9E4E-CF6B067238FB}.Debug|x64.ActiveCfg = Debug|Any CPU + {02F5459A-A96B-42AB-9E4E-CF6B067238FB}.Debug|x64.Build.0 = Debug|Any CPU + {02F5459A-A96B-42AB-9E4E-CF6B067238FB}.Debug|x86.ActiveCfg = Debug|Any CPU + {02F5459A-A96B-42AB-9E4E-CF6B067238FB}.Debug|x86.Build.0 = Debug|Any CPU + {02F5459A-A96B-42AB-9E4E-CF6B067238FB}.Release|Any CPU.ActiveCfg = Release|Any CPU + {02F5459A-A96B-42AB-9E4E-CF6B067238FB}.Release|Any CPU.Build.0 = Release|Any CPU + {02F5459A-A96B-42AB-9E4E-CF6B067238FB}.Release|x64.ActiveCfg = Release|Any CPU + {02F5459A-A96B-42AB-9E4E-CF6B067238FB}.Release|x64.Build.0 = Release|Any CPU + {02F5459A-A96B-42AB-9E4E-CF6B067238FB}.Release|x86.ActiveCfg = Release|Any CPU + {02F5459A-A96B-42AB-9E4E-CF6B067238FB}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1986,8 +1986,8 @@ Global {0FD02607-BF12-4201-90F9-3FA88BFCDFBC} = {F429686D-BB41-4E1C-A84E-518F8A289AEF} {AC643465-CD1E-4E9E-9860-DDAAF956A3DC} = {96119B5E-B5F0-400A-9580-B342EBE26212} {738DB46A-B1B5-4843-A536-A5918918DEB5} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210} - {9B9FD22A-4B63-4D93-944E-4A0A0E1D9CF7} = {96119B5E-B5F0-400A-9580-B342EBE26212} - {162630BD-6192-4888-BD52-D257C75F3E52} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210} + {382BD656-89CD-4899-A30F-1589578B639F} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210} + {02F5459A-A96B-42AB-9E4E-CF6B067238FB} = {96119B5E-B5F0-400A-9580-B342EBE26212} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {30422362-0D90-4DBE-8C97-DD2B5B962768}