From a748041eeeabc3d523546d2ef97bdeac2644f0dc Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Mon, 3 Nov 2025 15:45:07 -0800 Subject: [PATCH] Do not handle publisher confirms when disposed Fixes #1865 This code will set an exception if any outstanding confirm TCSs remain upon disposal. --- RabbitMQDotNetClient.sln | 7 + projects/Applications/GH-1865/GH-1865.csproj | 15 ++ projects/Applications/GH-1865/Program.cs | 148 ++++++++++++++++++ .../Impl/Channel.PublisherConfirms.cs | 38 +++-- projects/RabbitMQ.Client/Impl/Channel.cs | 17 +- 5 files changed, 207 insertions(+), 18 deletions(-) create mode 100644 projects/Applications/GH-1865/GH-1865.csproj create mode 100644 projects/Applications/GH-1865/Program.cs diff --git a/RabbitMQDotNetClient.sln b/RabbitMQDotNetClient.sln index 6d1baca81..d7f8cb41f 100644 --- a/RabbitMQDotNetClient.sln +++ b/RabbitMQDotNetClient.sln @@ -48,6 +48,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PublisherConfirms", "projec EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GH-1749", "projects\Applications\GH-1749\GH-1749.csproj", "{B3F17265-91A8-4BE1-AE64-132CB8BB6CDF}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GH-1865", "projects\Applications\GH-1865\GH-1865.csproj", "{38CE721E-2801-AED1-DDF8-DC5F888C6C05}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -114,6 +116,10 @@ Global {B3F17265-91A8-4BE1-AE64-132CB8BB6CDF}.Debug|Any CPU.Build.0 = Debug|Any CPU {B3F17265-91A8-4BE1-AE64-132CB8BB6CDF}.Release|Any CPU.ActiveCfg = Release|Any CPU {B3F17265-91A8-4BE1-AE64-132CB8BB6CDF}.Release|Any CPU.Build.0 = Release|Any CPU + {38CE721E-2801-AED1-DDF8-DC5F888C6C05}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {38CE721E-2801-AED1-DDF8-DC5F888C6C05}.Debug|Any CPU.Build.0 = Debug|Any CPU + {38CE721E-2801-AED1-DDF8-DC5F888C6C05}.Release|Any CPU.ActiveCfg = Release|Any CPU + {38CE721E-2801-AED1-DDF8-DC5F888C6C05}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -130,6 +136,7 @@ Global {64ED07BF-4D77-47CD-AF4F-5B4525686FA1} = {D21B282C-49E6-4A30-887B-9626D94B8D69} {13149F73-2CDB-4ECF-BF2C-403860045751} = {D21B282C-49E6-4A30-887B-9626D94B8D69} {B3F17265-91A8-4BE1-AE64-132CB8BB6CDF} = {D21B282C-49E6-4A30-887B-9626D94B8D69} + {38CE721E-2801-AED1-DDF8-DC5F888C6C05} = {D21B282C-49E6-4A30-887B-9626D94B8D69} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1} diff --git a/projects/Applications/GH-1865/GH-1865.csproj b/projects/Applications/GH-1865/GH-1865.csproj new file mode 100644 index 000000000..135fd9829 --- /dev/null +++ b/projects/Applications/GH-1865/GH-1865.csproj @@ -0,0 +1,15 @@ + + + + Exe + net8.0 + GH_1865 + enable + enable + + + + + + + diff --git a/projects/Applications/GH-1865/Program.cs b/projects/Applications/GH-1865/Program.cs new file mode 100644 index 000000000..5e291ebc5 --- /dev/null +++ b/projects/Applications/GH-1865/Program.cs @@ -0,0 +1,148 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +//--------------------------------------------------------------------------- + +#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task + +using System.Diagnostics; +using System.Globalization; +using RabbitMQ.Client; + +class Program +{ + static int _channelsProcessed; + static readonly TaskCompletionSource s_tcs = new(); + static readonly ThreadLocal s_rng = new(() => new Random()); + + private static string Now => DateTime.UtcNow.ToString("s", CultureInfo.InvariantCulture); + + static async Task Main(string[] args) + { + const int Repeats = 3; + const int ChannelsToOpen = 20; + + var connectionFactory = new ConnectionFactory + { + HostName = "localhost", + Port = 5672, + UserName = "guest", + Password = "guest", + VirtualHost = "/", + RequestedConnectionTimeout = TimeSpan.FromMilliseconds(60000), + RequestedHeartbeat = TimeSpan.FromSeconds(600), + AutomaticRecoveryEnabled = false, + TopologyRecoveryEnabled = false, + ContinuationTimeout = TimeSpan.FromMilliseconds(1000) + }; + await using IConnection connection = await connectionFactory.CreateConnectionAsync(); + + var watch = Stopwatch.StartNew(); + _ = Task.Run(async () => + { + for (int i = 0; i < Repeats; i++) + { + try + { + var tasks = new Task[ChannelsToOpen]; + for (int j = 0; j < ChannelsToOpen; j++) + { + tasks[j] = Task.Run(async () => + { + try + { + IChannel channel = await connection.CreateChannelAsync( + new CreateChannelOptions(true, true)); + var cts = new CancellationTokenSource(); + int cancelAfterMs = s_rng.Value!.Next(1, 10000); // upper bound exclusive + cts.CancelAfter(cancelAfterMs); + var tcs = new TaskCompletionSource(); + channel.ChannelShutdownAsync += async (sender, args) => + { + await Task.Delay(100); + tcs.TrySetResult(1); + }; + try + { + await channel.CloseAsync(); + } + catch (TaskCanceledException ex) + { + Console.WriteLine( + $"{Now} CloseAsync canceled after {cancelAfterMs} ms " + + $"{ex.Message}"); + } + catch (OperationCanceledException ex) + { + Console.WriteLine( + $"{Now} CloseAsync canceled after {cancelAfterMs} ms" + + $"{ex.Message}"); + } + catch (Exception exClose) + { + Console.WriteLine($"{Now} CloseAsync error: {exClose.GetType().Name} {exClose.Message}"); + } + + // Wait a bit for the ChannelShutdown event to fire + var delayTask = Task.Delay(15000); + await Task.WhenAny(tcs.Task, delayTask); + await channel.DisposeAsync(); + cts.Dispose(); + } + catch (Exception exOuter) + { + Console.WriteLine($"{Now} outer error: {exOuter.GetType().Name} {exOuter.Message}"); + } + finally + { + Interlocked.Increment(ref _channelsProcessed); + } + }); + } + await Task.WhenAll(tasks); + } + catch (Exception ex) + { + Console.WriteLine($"{Now} connection error: {ex.GetType().Name} {ex.Message}"); + } + } + + s_tcs.SetResult(true); + }); + + Console.WriteLine($"{Repeats} times opening {ChannelsToOpen} channels on a connection. => Total channel open/close: {Repeats * ChannelsToOpen}"); + Console.WriteLine(); + Console.WriteLine("Opened"); + while (false == s_tcs.Task.IsCompleted) + { + await Task.Delay(500); + Console.WriteLine($"{_channelsProcessed,5}"); + } + watch.Stop(); + Console.WriteLine($"{_channelsProcessed,5}"); + Console.WriteLine(); + Console.WriteLine($"Took {watch.Elapsed.TotalMilliseconds} ms"); + } +} diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs index 3e8fc6347..e9e2711ec 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -268,22 +268,18 @@ private void HandleReturn(BasicReturnEventArgs basicReturnEvent) [MethodImpl(MethodImplOptions.AggressiveInlining)] private async Task MaybeHandlePublisherConfirmationTcsOnChannelShutdownAsync(ShutdownEventArgs reason) { + if (_disposed) + { + return; + } + if (_publisherConfirmationsEnabled) { await _confirmSemaphore.WaitAsync(reason.CancellationToken) .ConfigureAwait(false); try { - if (!_confirmsTaskCompletionSources.IsEmpty) - { - var exception = new AlreadyClosedException(reason); - foreach (TaskCompletionSource confirmsTaskCompletionSource in _confirmsTaskCompletionSources.Values) - { - confirmsTaskCompletionSource.TrySetException(exception); - } - - _confirmsTaskCompletionSources.Clear(); - } + MaybeSetExceptionOnConfirmsTcs(reason); } finally { @@ -404,5 +400,27 @@ await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken) } } } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void MaybeSetExceptionOnConfirmsTcs(ShutdownEventArgs? reason = null) + { + if (!_confirmsTaskCompletionSources.IsEmpty) + { + Exception ex; + if (reason is not null) + { + ex = new AlreadyClosedException(reason); + } + else + { + ex = new OperationInterruptedException(); + } + foreach (TaskCompletionSource confirmsTaskCompletionSource in _confirmsTaskCompletionSources.Values) + { + confirmsTaskCompletionSource.TrySetException(ex); + } + _confirmsTaskCompletionSources.Clear(); + } + } } } diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs index 53bb47c80..a8ee311a0 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.cs @@ -591,6 +591,7 @@ protected virtual void Dispose(bool disposing) { _rpcSemaphore.Dispose(); _confirmSemaphore.Dispose(); + MaybeSetExceptionOnConfirmsTcs(); } catch { @@ -608,13 +609,13 @@ public async ValueTask DisposeAsync() return; } - await DisposeAsyncCore() + await DisposeAsyncCoreAsync() .ConfigureAwait(false); Dispose(false); } - protected virtual async ValueTask DisposeAsyncCore() + protected virtual async ValueTask DisposeAsyncCoreAsync() { if (_disposed) { @@ -669,7 +670,7 @@ public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heart return ModelSendAsync(in method, cancellationToken).AsTask(); } - protected async Task HandleBasicAck(IncomingCommand cmd, + protected async Task HandleBasicAckAsync(IncomingCommand cmd, CancellationToken cancellationToken = default) { var ack = new BasicAck(cmd.MethodSpan); @@ -685,7 +686,7 @@ await _basicAcksAsyncWrapper.InvokeAsync(this, args) return true; } - protected async Task HandleBasicNack(IncomingCommand cmd, + protected async Task HandleBasicNackAsync(IncomingCommand cmd, CancellationToken cancellationToken = default) { var nack = new BasicNack(cmd.MethodSpan); @@ -702,7 +703,7 @@ await _basicNacksAsyncWrapper.InvokeAsync(this, args) return true; } - protected async Task HandleBasicReturn(IncomingCommand cmd, CancellationToken cancellationToken) + protected async Task HandleBasicReturnAsync(IncomingCommand cmd, CancellationToken cancellationToken) { var basicReturn = new BasicReturn(cmd.MethodSpan); @@ -1750,16 +1751,16 @@ private Task DispatchCommandAsync(IncomingCommand cmd, CancellationToken c } case ProtocolCommandId.BasicAck: { - return HandleBasicAck(cmd, cancellationToken); + return HandleBasicAckAsync(cmd, cancellationToken); } case ProtocolCommandId.BasicNack: { - return HandleBasicNack(cmd, cancellationToken); + return HandleBasicNackAsync(cmd, cancellationToken); } case ProtocolCommandId.BasicReturn: { // Note: always returns true - return HandleBasicReturn(cmd, cancellationToken); + return HandleBasicReturnAsync(cmd, cancellationToken); } case ProtocolCommandId.ChannelClose: {