Skip to content

Commit

Permalink
Merge pull request #1305 from rabbitmq/recoving-consumer-event
Browse files Browse the repository at this point in the history
Port #1304 - Add event for recovering consumer
  • Loading branch information
michaelklishin authored Feb 23, 2023
2 parents 85c53be + e4974f6 commit cbe6a8c
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 1 deletion.
12 changes: 12 additions & 0 deletions projects/RabbitMQ.Client/client/api/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,18 @@ public interface IConnection : INetworkConnection, IDisposable
/// </remarks>
event EventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChangeAfterRecovery;

/// <summary>
/// Raised when a consumer is about to be recovered. This event raises when topology recovery
/// is enabled, and just before the consumer is recovered. This allows applications to update
/// the consumer arguments before the consumer is recovered. It could be particularly useful
/// when consuming from a stream queue, as it allows to update the consumer offset argument
/// just before the consumer is recovered.
/// </summary>
/// <remarks>
/// This event will never fire for connections that disable automatic recovery.
/// </remarks>
public event EventHandler<RecoveringConsumerEventArgs> RecoveringConsumer;

event EventHandler<EventArgs> ConnectionUnblocked;

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2023 VMware, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2023 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System.Collections.Generic;

namespace RabbitMQ.Client.Events
{
/// <summary>
/// Event related to consumer recovery, during automatic recovery.
/// </summary>
public class RecoveringConsumerEventArgs
{
/// <summary>
/// Constructs an event containing the consumer arguments and consumer
/// tag of the consumer this event relates to.
/// </summary>
/// <param name="consumerTag">Consumer arguments of the consumer for this event</param>
/// <param name="consumerArguments">Consumer tag of the consumer for this event</param>
public RecoveringConsumerEventArgs(string consumerTag, IDictionary<string, object> consumerArguments)
{
ConsumerTag = consumerTag;
ConsumerArguments = consumerArguments;
}

/// <summary>
/// Access the consumer arguments of the consumer this event relates to.
/// </summary>
public string ConsumerTag { get; }

/// <summary>
/// Access the consumer tag of the consumer this event relates to.
/// </summary>
public IDictionary<string, object> ConsumerArguments { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ internal void RecoverConsumers(AutorecoveringModel channelToRecover, IModel chan
continue;
}

_consumerAboutToBeRecovered.Invoke(this, new RecoveringConsumerEventArgs(consumer.ConsumerTag, consumer.Arguments));

var oldTag = consumer.ConsumerTag;
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ public event EventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChang
}
private EventingWrapper<QueueNameChangedAfterRecoveryEventArgs> _queueNameChangeAfterRecoveryWrapper;

public event EventHandler<RecoveringConsumerEventArgs> RecoveringConsumer
{
add => _consumerAboutToBeRecovered.AddHandler(value);
remove => _consumerAboutToBeRecovered.RemoveHandler(value);
}
private EventingWrapper<RecoveringConsumerEventArgs> _consumerAboutToBeRecovered;

public string ClientProvidedName => _config.ClientProvidedName;

public ushort ChannelMax => InnerConnection.ChannelMax;
Expand Down
7 changes: 7 additions & 0 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ public event EventHandler<EventArgs> ConnectionUnblocked
}
private EventingWrapper<EventArgs> _connectionUnblockedWrapper;

public event EventHandler<RecoveringConsumerEventArgs> RecoveringConsumer
{
add => _consumerAboutToBeRecovered.AddHandler(value);
remove => _consumerAboutToBeRecovered.RemoveHandler(value);
}
private EventingWrapper<RecoveringConsumerEventArgs> _consumerAboutToBeRecovered;

public event EventHandler<ShutdownEventArgs> ConnectionShutdown
{
add
Expand Down
9 changes: 8 additions & 1 deletion projects/Unit/APIApproval.Approve.verified.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"Benchmarks, PublicKey=00240000048000009400000006020000002400005253413100040000010001008d20ec856aeeb8c3153a77faa2d80e6e43b5db93224a20cc7ae384f65f142e89730e2ff0fcc5d578bbe96fa98a7196c77329efdee4579b3814c0789e5a39b51df6edd75b602a33ceabdfcf19a3feb832f31d8254168cd7ba5700dfbca301fbf8db614ba41ba18474de0a5f4c2d51c995bc3636c641c8cbe76f45717bfcb943b5")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"Benchmarks, PublicKey=00240000048000009400000006020000002400005253413100040000010001008d20ec856aeeb8c3153a77faa2d80e6e43b5db93224a20cc7ae384f65f142e89730e2ff0fcc5d578bbe96fa98a7196c77329efdee4579b3814c0789e5a39b51df6edd75b602a33ceabdfcf19a3feb832f31d8254168cd7ba5700dfbca301fbf8db614ba41ba18474de0a5f4c2d51c995bc3636c641c8cbe76f45717bfcb943b5")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleTo(@"Unit, PublicKey=00240000048000009400000006020000002400005253413100040000010001008d20ec856aeeb8c3153a77faa2d80e6e43b5db93224a20cc7ae384f65f142e89730e2ff0fcc5d578bbe96fa98a7196c77329efdee4579b3814c0789e5a39b51df6edd75b602a33ceabdfcf19a3feb832f31d8254168cd7ba5700dfbca301fbf8db614ba41ba18474de0a5f4c2d51c995bc3636c641c8cbe76f45717bfcb943b5")]
namespace RabbitMQ.Client
{
Expand Down Expand Up @@ -391,6 +391,7 @@ namespace RabbitMQ.Client
event System.EventHandler<System.EventArgs> ConnectionUnblocked;
event System.EventHandler<RabbitMQ.Client.Events.ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecovery;
event System.EventHandler<RabbitMQ.Client.Events.QueueNameChangedAfterRecoveryEventArgs> QueueNameChangeAfterRecovery;
event System.EventHandler<RabbitMQ.Client.Events.RecoveringConsumerEventArgs> RecoveringConsumer;
event System.EventHandler<System.EventArgs> RecoverySucceeded;
void Close(ushort reasonCode, string reasonText, System.TimeSpan timeout, bool abort);
RabbitMQ.Client.IModel CreateModel();
Expand Down Expand Up @@ -795,6 +796,12 @@ namespace RabbitMQ.Client.Events
public string NameAfter { get; }
public string NameBefore { get; }
}
public class RecoveringConsumerEventArgs
{
public RecoveringConsumerEventArgs(string consumerTag, System.Collections.Generic.IDictionary<string, object> consumerArguments) { }
public System.Collections.Generic.IDictionary<string, object> ConsumerArguments { get; }
public string ConsumerTag { get; }
}
}
namespace RabbitMQ.Client.Exceptions
{
Expand Down
47 changes: 47 additions & 0 deletions projects/Unit/TestConnectionRecovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,53 @@ public void TestRecoveryEventHandlersOnModel()
Assert.True(counter >= 3);
}

[Theory]
[InlineData(1)]
[InlineData(3)]
public void TestRecoveringConsumerHandlerOnConnection(int iterations)
{
string q = _model.QueueDeclare(GenerateQueueName(), false, false, false, null).QueueName;
var cons = new EventingBasicConsumer(_model);
_model.BasicConsume(q, true, cons);

int counter = 0;
((AutorecoveringConnection)_conn).RecoveringConsumer += (sender, args) => Interlocked.Increment(ref counter);

for (int i = 0; i < iterations; i++)
{
CloseAndWaitForRecovery();
}

Assert.Equal(iterations, counter);
}

[Fact]
public void TestRecoveringConsumerHandlerOnConnection_EventArgumentsArePassedDown()
{
var myArgs = new Dictionary<string, object> { { "first-argument", "some-value" } };
string q = _model.QueueDeclare(GenerateQueueName(), false, false, false, null).QueueName;
var cons = new EventingBasicConsumer(_model);
string expectedCTag = _model.BasicConsume(cons, q, arguments: myArgs);

bool ctagMatches = false;
bool consumerArgumentMatches = false;
((AutorecoveringConnection)_conn).RecoveringConsumer += (sender, args) =>
{
// We cannot assert here because NUnit throws when an assertion fails. This exception is caught and
// passed to a CallbackExceptionHandler, instead of failing the test. Instead, we have to do this trick
// and assert in the test function.
ctagMatches = args.ConsumerTag == expectedCTag;
consumerArgumentMatches = (string)args.ConsumerArguments["first-argument"] == "some-value";
args.ConsumerArguments["first-argument"] = "event-handler-set-this-value";
};

CloseAndWaitForRecovery();
Assert.True(ctagMatches, "expected consumer tag to match");
Assert.True(consumerArgumentMatches, "expected consumer arguments to match");
string actualVal = (string)Assert.Contains("first-argument", myArgs as IDictionary<string, object>);
Assert.Equal("event-handler-set-this-value", actualVal);
}

[Fact]
public void TestRecoveryWithTopologyDisabled()
{
Expand Down

0 comments on commit cbe6a8c

Please sign in to comment.