diff --git a/RabbitMQDotNetClient.sln b/RabbitMQDotNetClient.sln index 8974d4a9c..5354b8d07 100644 --- a/RabbitMQDotNetClient.sln +++ b/RabbitMQDotNetClient.sln @@ -40,7 +40,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Common", "projects\Test\Com EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ToxiproxyNetCore", "projects\toxiproxy-netcore\src\ToxiproxyNetCore\ToxiproxyNetCore.csproj", "{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQ.Client.OpenTelemetry", "projects\RabbitMQ.Client.OpenTelemetry\RabbitMQ.Client.OpenTelemetry.csproj", "{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ.Client.OpenTelemetry", "projects\RabbitMQ.Client.OpenTelemetry\RabbitMQ.Client.OpenTelemetry.csproj", "{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GH-1647", "projects\Test\Applications\GH-1647\GH-1647.csproj", "{64ED07BF-4D77-47CD-AF4F-5B4525686FA1}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -96,6 +98,10 @@ Global {16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Debug|Any CPU.Build.0 = Debug|Any CPU {16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Release|Any CPU.ActiveCfg = Release|Any CPU {16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Release|Any CPU.Build.0 = Release|Any CPU + {64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Debug|Any CPU.Build.0 = Debug|Any CPU + {64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Release|Any CPU.ActiveCfg = Release|Any CPU + {64ED07BF-4D77-47CD-AF4F-5B4525686FA1}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -110,6 +116,7 @@ Global {F25725D7-2978-45F4-B90F-25D6F8B71C9E} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704} {C11F25F4-7EA1-4874-9E25-DEB42E3A7C67} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704} {AB5B7C53-D7EC-4985-A6DE-70178E4B688A} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704} + {64ED07BF-4D77-47CD-AF4F-5B4525686FA1} = {D21B282C-49E6-4A30-887B-9626D94B8D69} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1} diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index 5dd3047fb..3640d461e 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -144,7 +144,7 @@ public IAsyncBasicConsumer? DefaultConsumer public string? CurrentQueue => InnerChannel.CurrentQueue; - internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection conn, bool recoverConsumers, + internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection conn, bool recoverConsumers, bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken) { if (false == recordedEntitiesSemaphoreHeld) @@ -152,7 +152,11 @@ internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection conn, boo throw new InvalidOperationException("recordedEntitiesSemaphore must be held"); } - ThrowIfDisposed(); + if (_disposed) + { + return false; + } + _connection = conn; RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(cancellationToken) @@ -189,15 +193,27 @@ await newChannel.TxSelectAsync(cancellationToken) * chance that an invalid Channel will be used to handle a basic.deliver frame, * with the resulting basic.ack never getting sent out. */ - _innerChannel = newChannel; - if (recoverConsumers) + if (_disposed) { - await _connection.RecoverConsumersAsync(this, newChannel, recordedEntitiesSemaphoreHeld) + await newChannel.AbortAsync() .ConfigureAwait(false); + return false; } + else + { + _innerChannel = newChannel; + + if (recoverConsumers) + { + await _connection.RecoverConsumersAsync(this, newChannel, recordedEntitiesSemaphoreHeld) + .ConfigureAwait(false); + } + + _innerChannel.RunRecoveryEventHandlers(this); - _innerChannel.RunRecoveryEventHandlers(this); + return true; + } } public async Task CloseAsync(ushort replyCode, string replyText, bool abort, diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs index e1d47ba87..4d4846363 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -583,12 +584,44 @@ private async ValueTask RecoverChannelsAndItsConsumersAsync(bool recordedEntitie throw new InvalidOperationException("recordedEntitiesSemaphore must be held"); } - foreach (AutorecoveringChannel channel in _channels) + var channelsToRecover = new List(); + await _channelsSemaphore.WaitAsync(cancellationToken) + .ConfigureAwait(false); + try + { + channelsToRecover.AddRange(_channels); + } + finally + { + _channelsSemaphore.Release(); + } + + var notRecoveredChannels = new List(); + foreach (AutorecoveringChannel channel in channelsToRecover) { - await channel.AutomaticallyRecoverAsync(this, _config.TopologyRecoveryEnabled, + bool recovered = await channel.AutomaticallyRecoverAsync(this, _config.TopologyRecoveryEnabled, recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld, cancellationToken: cancellationToken) .ConfigureAwait(false); + + if (false == recovered) + { + notRecoveredChannels.Add(channel); + } + } + + await _channelsSemaphore.WaitAsync(cancellationToken) + .ConfigureAwait(false); + try + { + foreach (AutorecoveringChannel channel in notRecoveredChannels) + { + _channels.Remove(channel); + } + } + finally + { + _channelsSemaphore.Release(); } } } diff --git a/projects/Test/Applications/GH-1647/GH-1647.csproj b/projects/Test/Applications/GH-1647/GH-1647.csproj new file mode 100644 index 000000000..022441c0e --- /dev/null +++ b/projects/Test/Applications/GH-1647/GH-1647.csproj @@ -0,0 +1,15 @@ + + + + Exe + net6.0 + GH_1647 + enable + enable + + + + + + + diff --git a/projects/Test/Applications/GH-1647/Program.cs b/projects/Test/Applications/GH-1647/Program.cs new file mode 100644 index 000000000..f3cccb3da --- /dev/null +++ b/projects/Test/Applications/GH-1647/Program.cs @@ -0,0 +1,29 @@ +#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task +using System.Text; +using RabbitMQ.Client; + +ConnectionFactory connectionFactory = new() +{ + AutomaticRecoveryEnabled = true, + UserName = "guest", + Password = "guest" +}; + +var props = new BasicProperties(); +byte[] msg = Encoding.UTF8.GetBytes("test"); +using var connection = await connectionFactory.CreateConnectionAsync(); +for (int i = 0; i < 300; i++) +{ + try + { + using var channel = await connection.CreateChannelAsync(); // New channel for each message + await Task.Delay(1000); + await channel.BasicPublishAsync(string.Empty, string.Empty, props, msg); + Console.WriteLine($"Sent message {i}"); + } + catch (Exception ex) + { + Console.WriteLine($"Failed to send message {i}: {ex.Message}"); + await Task.Delay(1000); + } +}