diff --git a/projects/RabbitMQ.Client/client/api/IAutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/api/IAutorecoveringConnection.cs index be4cb04877..7de4bca6f1 100644 --- a/projects/RabbitMQ.Client/client/api/IAutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/client/api/IAutorecoveringConnection.cs @@ -45,5 +45,6 @@ public interface IAutorecoveringConnection : IConnection event EventHandler ConsumerTagChangeAfterRecovery; event EventHandler QueueNameChangeAfterRecovery; + event EventHandler RecoveringConsumer; } } diff --git a/projects/RabbitMQ.Client/client/events/RecoveringConsumerEventArgs.cs b/projects/RabbitMQ.Client/client/events/RecoveringConsumerEventArgs.cs new file mode 100644 index 0000000000..9a20a04225 --- /dev/null +++ b/projects/RabbitMQ.Client/client/events/RecoveringConsumerEventArgs.cs @@ -0,0 +1,64 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// +// The APL v2.0: +// +//--------------------------------------------------------------------------- +// Copyright (c) 2007-2023 VMware, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//--------------------------------------------------------------------------- +// +// The MPL v2.0: +// +//--------------------------------------------------------------------------- +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright (c) 2007-2023 VMware, Inc. All rights reserved. +//--------------------------------------------------------------------------- + +using System; +using System.Collections.Generic; + +namespace RabbitMQ.Client.Events +{ + /// + /// Event related to consumer recovery, during automatic recovery. + /// + public class RecoveringConsumerEventArgs : EventArgs + { + /// + /// Constructs an event containing the consumer arguments and consumer + /// tag of the consumer this event relates to. + /// + /// Consumer arguments of the consumer for this event + /// Consumer tag of the consumer for this event + public RecoveringConsumerEventArgs(IDictionary consumerArguments, string consumerTag) + { + ConsumerArguments = consumerArguments; + ConsumerTag = consumerTag; + } + + /// + /// Access the consumer arguments of the consumer this event relates to. + /// + public IDictionary ConsumerArguments { get; } + + /// + /// Access the consumer tag of the consumer this event relates to. + /// + public string ConsumerTag { get; } + } +} diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs index 57fd363193..958073b656 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs @@ -198,6 +198,7 @@ public event EventHandler ConnectionUnblocked } } + public event EventHandler RecoveringConsumer; public event EventHandler ConsumerTagChangeAfterRecovery; public event EventHandler QueueNameChangeAfterRecovery; @@ -1099,6 +1100,20 @@ internal void RecoverConsumers(AutorecoveringModel modelToRecover, IModel channe string tag = pair.Key; try { + foreach (EventHandler eh in RecoveringConsumer?.GetInvocationList() ?? Array.Empty()) + { + try + { + var eventArgs = new RecoveringConsumerEventArgs(cons.Arguments, cons.ConsumerTag); + eh(this, eventArgs); + } + catch (Exception e) + { + var args = new CallbackExceptionEventArgs(e) { Detail = { ["context"] = "OnBeforeRecoveringConsumer" } }; + _delegate.OnCallbackException(args); + } + } + string newTag = cons.Recover(channelToUse); lock (_recordedConsumers) { diff --git a/projects/Unit/APIApproval.Approve.verified.txt b/projects/Unit/APIApproval.Approve.verified.txt index 934f6ef618..997db0a5c8 100644 --- a/projects/Unit/APIApproval.Approve.verified.txt +++ b/projects/Unit/APIApproval.Approve.verified.txt @@ -243,6 +243,7 @@ namespace RabbitMQ.Client event System.EventHandler ConnectionRecoveryError; event System.EventHandler ConsumerTagChangeAfterRecovery; event System.EventHandler QueueNameChangeAfterRecovery; + event System.EventHandler RecoveringConsumer; event System.EventHandler RecoverySucceeded; } public interface IBasicConsumer @@ -689,6 +690,12 @@ namespace RabbitMQ.Client.Events public string NameAfter { get; } public string NameBefore { get; } } + public class RecoveringConsumerEventArgs : System.EventArgs + { + public RecoveringConsumerEventArgs(System.Collections.Generic.IDictionary consumerArguments, string consumerTag) { } + public System.Collections.Generic.IDictionary ConsumerArguments { get; } + public string ConsumerTag { get; } + } public class RecoveryExceptionEventArgs : RabbitMQ.Client.Events.BaseExceptionEventArgs { public RecoveryExceptionEventArgs(System.Exception e) { } diff --git a/projects/Unit/TestConnectionRecovery.cs b/projects/Unit/TestConnectionRecovery.cs index 7795caaa0d..a7dbcbf7bd 100644 --- a/projects/Unit/TestConnectionRecovery.cs +++ b/projects/Unit/TestConnectionRecovery.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System; +using System.Collections; using System.Collections.Generic; using System.Threading; @@ -688,6 +689,52 @@ public void TestRecoveryEventHandlersOnConnection() Assert.IsTrue(counter >= 3); } + [Test] + public void TestRecoveringConsumerHandlerOnConnection() + { + string q = Model.QueueDeclare(GenerateQueueName(), false, false, false, null).QueueName; + var cons = new EventingBasicConsumer(Model); + Model.BasicConsume(q, true, cons); + + int counter = 0; + ((AutorecoveringConnection)Conn).RecoveringConsumer += (sender, args) => Interlocked.Increment(ref counter); + + CloseAndWaitForRecovery(); + Assert.IsTrue(Conn.IsOpen, "expected connection to be open"); + Assert.AreEqual(1, counter); + + CloseAndWaitForRecovery(); + CloseAndWaitForRecovery(); + Assert.IsTrue(Conn.IsOpen, "expected connection to be open"); + Assert.AreEqual(3, counter); + } + + [Test] + public void TestRecoveringConsumerHandlerOnConnection_EventArgumentsArePassedDown() + { + var myArgs = new Dictionary { { "first-argument", "some-value" } }; + string q = Model.QueueDeclare(GenerateQueueName(), false, false, false, null).QueueName; + var cons = new EventingBasicConsumer(Model); + string expectedCTag = Model.BasicConsume(cons, q, arguments: myArgs); + + bool ctagMatches = false; + bool consumerArgumentMatches = false; + ((AutorecoveringConnection)Conn).RecoveringConsumer += (sender, args) => + { + // We cannot assert here because NUnit throws when an assertion fails. This exception is caught and + // passed to a CallbackExceptionHandler, instead of failing the test. Instead, we have to do this trick + // and assert in the test function. + ctagMatches = args.ConsumerTag == expectedCTag; + consumerArgumentMatches = (string)args.ConsumerArguments["first-argument"] == "some-value"; + args.ConsumerArguments["first-argument"] = "event-handler-set-this-value"; + }; + + CloseAndWaitForRecovery(); + Assert.That(ctagMatches, Is.True, "expected consumer tag to match"); + Assert.That(consumerArgumentMatches, Is.True, "expected consumer arguments to match"); + Assert.That(myArgs, Does.ContainKey("first-argument").WithValue("event-handler-set-this-value")); + } + [Test] public void TestRecoveryEventHandlersOnModel() {