Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make connection events async #1677

Merged
merged 18 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
830b1e4
Make connection events async
danielmarbach Sep 17, 2024
7337fad
Rename three internal async methods to use `Async` suffix.
lukebakken Sep 17, 2024
8c7f8f9
Rename `CallbackException` event to `CallbackExceptionAsync`
lukebakken Sep 17, 2024
ebb8e88
Rename `ConnectionBlocked` to `ConnectionBlockedAsync`
lukebakken Sep 17, 2024
5082dc6
Rename `ConnectionShutdown` to `ConnectionShutdownAsync`
lukebakken Sep 17, 2024
209fc7d
Rename `RecoverySucceeded` to `RecoverySucceededAsync`
lukebakken Sep 17, 2024
2d8bc9a
Rename `ConnectionRecoveryError` to `ConnectionRecoveryErrorAsync`
lukebakken Sep 17, 2024
a48789a
Rename `ConsumerTagChangeAfterRecovery` to `ConsumerTagChangeAfterRec…
lukebakken Sep 17, 2024
a4ebfc2
Rename `QueueNameChangedAfterRecovery` to `QueueNameChangedAfterRecov…
lukebakken Sep 17, 2024
4af30b0
Rename `RecoveringConsumer` to `RecoveringConsumerAsync`
lukebakken Sep 17, 2024
9015943
Ensure that `_recoveringConsumerAsyncWrapper` is initialized as expec…
lukebakken Sep 17, 2024
31e9ccc
Rename `ConnectionUnblocked` to `ConnectionUnblockedAsync`
lukebakken Sep 17, 2024
b82ae27
Rename `HandleMainLoopException` to `HandleMainLoopExceptionAsync`
lukebakken Sep 17, 2024
275134e
Rename `HandleConnectionShutdown` to `HandleConnectionShutdownAsync`
lukebakken Sep 17, 2024
213f0da
Rename `ClosedViaPeer` to `ClosedViaPeerAsync`
lukebakken Sep 17, 2024
5d97fa1
Rename `onException` to `onExceptionAsync`
lukebakken Sep 17, 2024
1deeae3
Add `_consumerAboutToBeRecoveredAsyncWrapper` to `TakeOver`
lukebakken Sep 17, 2024
5b6956b
Remove unused `virtual` keyword.
lukebakken Sep 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions projects/RabbitMQ.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -445,24 +445,15 @@ RabbitMQ.Client.IChannel.IsClosed.get -> bool
RabbitMQ.Client.IChannel.IsOpen.get -> bool
RabbitMQ.Client.IChannelExtensions
RabbitMQ.Client.IConnection
RabbitMQ.Client.IConnection.CallbackException -> System.EventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs>
RabbitMQ.Client.IConnection.ChannelMax.get -> ushort
RabbitMQ.Client.IConnection.ClientProperties.get -> System.Collections.Generic.IDictionary<string, object>
RabbitMQ.Client.IConnection.ClientProvidedName.get -> string
RabbitMQ.Client.IConnection.CloseReason.get -> RabbitMQ.Client.ShutdownEventArgs
RabbitMQ.Client.IConnection.ConnectionBlocked -> System.EventHandler<RabbitMQ.Client.Events.ConnectionBlockedEventArgs>
RabbitMQ.Client.IConnection.ConnectionRecoveryError -> System.EventHandler<RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs>
RabbitMQ.Client.IConnection.ConnectionShutdown -> System.EventHandler<RabbitMQ.Client.ShutdownEventArgs>
RabbitMQ.Client.IConnection.ConnectionUnblocked -> System.EventHandler<System.EventArgs>
RabbitMQ.Client.IConnection.ConsumerTagChangeAfterRecovery -> System.EventHandler<RabbitMQ.Client.Events.ConsumerTagChangedAfterRecoveryEventArgs>
RabbitMQ.Client.IConnection.Endpoint.get -> RabbitMQ.Client.AmqpTcpEndpoint
RabbitMQ.Client.IConnection.FrameMax.get -> uint
RabbitMQ.Client.IConnection.Heartbeat.get -> System.TimeSpan
RabbitMQ.Client.IConnection.IsOpen.get -> bool
RabbitMQ.Client.IConnection.Protocol.get -> RabbitMQ.Client.IProtocol
RabbitMQ.Client.IConnection.QueueNameChangedAfterRecovery -> System.EventHandler<RabbitMQ.Client.Events.QueueNameChangedAfterRecoveryEventArgs>
RabbitMQ.Client.IConnection.RecoveringConsumer -> System.EventHandler<RabbitMQ.Client.Events.RecoveringConsumerEventArgs>
RabbitMQ.Client.IConnection.RecoverySucceeded -> System.EventHandler<System.EventArgs>
RabbitMQ.Client.IConnection.ServerProperties.get -> System.Collections.Generic.IDictionary<string, object>
RabbitMQ.Client.IConnection.ShutdownReport.get -> System.Collections.Generic.IEnumerable<RabbitMQ.Client.ShutdownReportEntry>
RabbitMQ.Client.IConnectionExtensions
Expand Down Expand Up @@ -895,3 +886,12 @@ RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, Syst
const RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency = 1 -> ushort
readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort? consumerDispatchConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
RabbitMQ.Client.IConnection.CallbackExceptionAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs!>!
RabbitMQ.Client.IConnection.ConnectionBlockedAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.ConnectionBlockedEventArgs!>!
RabbitMQ.Client.IConnection.ConnectionShutdownAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.ShutdownEventArgs!>!
RabbitMQ.Client.IConnection.RecoverySucceededAsync -> RabbitMQ.Client.Events.AsyncEventHandler<System.EventArgs!>!
RabbitMQ.Client.IConnection.ConnectionRecoveryErrorAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs!>!
RabbitMQ.Client.IConnection.ConsumerTagChangeAfterRecoveryAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.ConsumerTagChangedAfterRecoveryEventArgs!>!
RabbitMQ.Client.IConnection.QueueNameChangedAfterRecoveryAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.QueueNameChangedAfterRecoveryEventArgs!>!
RabbitMQ.Client.IConnection.RecoveringConsumerAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.RecoveringConsumerEventArgs!>!
RabbitMQ.Client.IConnection.ConnectionUnblockedAsync -> RabbitMQ.Client.Events.AsyncEventHandler<System.EventArgs!>!
26 changes: 16 additions & 10 deletions projects/RabbitMQ.Client/client/api/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,7 @@ public interface IConnection : INetworkConnection, IDisposable
/// <see cref="IConnection"/>, then this event will be signalled whenever one
/// of those event handlers throws an exception, as well.
/// </remarks>
event EventHandler<CallbackExceptionEventArgs> CallbackException;

event EventHandler<ConnectionBlockedEventArgs> ConnectionBlocked;
event AsyncEventHandler<CallbackExceptionEventArgs> CallbackExceptionAsync;

/// <summary>
/// Raised when the connection is destroyed.
Expand All @@ -155,15 +153,15 @@ public interface IConnection : INetworkConnection, IDisposable
/// event handler is added to this event, the event handler
/// will be fired immediately.
/// </remarks>
event EventHandler<ShutdownEventArgs> ConnectionShutdown;
event AsyncEventHandler<ShutdownEventArgs> ConnectionShutdownAsync;

/// <summary>
/// Raised when the connection completes recovery.
/// </summary>
/// <remarks>
/// This event will never fire for connections that disable automatic recovery.
/// </remarks>
event EventHandler<EventArgs> RecoverySucceeded;
event AsyncEventHandler<EventArgs> RecoverySucceededAsync;

/// <summary>
/// Raised when the connection recovery fails, e.g. because reconnection or topology
Expand All @@ -172,7 +170,7 @@ public interface IConnection : INetworkConnection, IDisposable
/// <remarks>
/// This event will never fire for connections that disable automatic recovery.
/// </remarks>
event EventHandler<ConnectionRecoveryErrorEventArgs> ConnectionRecoveryError;
event AsyncEventHandler<ConnectionRecoveryErrorEventArgs> ConnectionRecoveryErrorAsync;

/// <summary>
/// Raised when the server-generated tag of a consumer registered on this connection changes during
Expand All @@ -182,7 +180,7 @@ public interface IConnection : INetworkConnection, IDisposable
/// <remarks>
/// This event will never fire for connections that disable automatic recovery.
/// </remarks>
event EventHandler<ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecovery;
event AsyncEventHandler<ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecoveryAsync;

/// <summary>
/// Raised when the name of a server-named queue declared on this connection changes during
Expand All @@ -192,7 +190,7 @@ public interface IConnection : INetworkConnection, IDisposable
/// <remarks>
/// This event will never fire for connections that disable automatic recovery.
/// </remarks>
event EventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChangedAfterRecovery;
event AsyncEventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChangedAfterRecoveryAsync;

/// <summary>
/// Raised when a consumer is about to be recovered. This event raises when topology recovery
Expand All @@ -204,9 +202,17 @@ public interface IConnection : INetworkConnection, IDisposable
/// <remarks>
/// This event will never fire for connections that disable automatic recovery.
/// </remarks>
public event EventHandler<RecoveringConsumerEventArgs> RecoveringConsumer;
public event AsyncEventHandler<RecoveringConsumerEventArgs> RecoveringConsumerAsync;

/// <summary>
/// Raised when a connection is blocked by the AMQP broker.
/// </summary>
event AsyncEventHandler<ConnectionBlockedEventArgs> ConnectionBlockedAsync;

event EventHandler<EventArgs> ConnectionUnblocked;
/// <summary>
/// Raised when a connection is unblocked by the AMQP broker.
/// </summary>
event AsyncEventHandler<EventArgs> ConnectionUnblockedAsync;

/// <summary>
/// This method updates the secret used to authenticate this connection.
Expand Down
12 changes: 6 additions & 6 deletions projects/RabbitMQ.Client/client/framing/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
}
case ProtocolCommandId.ConnectionBlocked:
{
HandleConnectionBlocked(cmd);
return Task.FromResult(true);
// Note: always returns true
return HandleConnectionBlockedAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ConnectionClose:
{
Expand All @@ -128,7 +128,7 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
case ProtocolCommandId.ConnectionSecure:
{
// Note: always returns true
return HandleConnectionSecureAsync(cmd);
return HandleConnectionSecureAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ConnectionStart:
{
Expand All @@ -138,12 +138,12 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
case ProtocolCommandId.ConnectionTune:
{
// Note: always returns true
return HandleConnectionTuneAsync(cmd);
return HandleConnectionTuneAsync(cmd, cancellationToken);
}
case ProtocolCommandId.ConnectionUnblocked:
{
HandleConnectionUnblocked();
return Task.FromResult(true);
// Note: always returns true
return HandleConnectionUnblockedAsync(cancellationToken);
}
default:
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ internal sealed partial class AutorecoveringConnection
private Task? _recoveryTask;
private readonly CancellationTokenSource _recoveryCancellationTokenSource = new CancellationTokenSource();

private void HandleConnectionShutdown(object? _, ShutdownEventArgs args)
private Task HandleConnectionShutdownAsync(object? _, ShutdownEventArgs args)
{
if (ShouldTriggerConnectionRecovery(args))
{
Expand All @@ -57,6 +57,8 @@ private void HandleConnectionShutdown(object? _, ShutdownEventArgs args)
}
}

return Task.CompletedTask;

static bool ShouldTriggerConnectionRecovery(ShutdownEventArgs args)
{
if (args.Initiator == ShutdownInitiator.Peer)
Expand Down Expand Up @@ -204,7 +206,8 @@ await RecoverChannelsAndItsConsumersAsync(recordedEntitiesSemaphoreHeld: true, c

ESLog.Info("Connection recovery completed");
ThrowIfDisposed();
_recoverySucceededWrapper.Invoke(this, EventArgs.Empty);
await _recoverySucceededAsyncWrapper.InvokeAsync(this, EventArgs.Empty)
.ConfigureAwait(false);

return true;
}
Expand Down Expand Up @@ -266,10 +269,11 @@ await maybeNewInnerConnection.OpenAsync(cancellationToken)
{
ESLog.Error("Connection recovery exception.", e);
// Trigger recovery error events
if (!_connectionRecoveryErrorWrapper.IsEmpty)
if (!_connectionRecoveryErrorAsyncWrapper.IsEmpty)
{
// Note: recordedEntities semaphore is _NOT_ held at this point
_connectionRecoveryErrorWrapper.Invoke(this, new ConnectionRecoveryErrorEventArgs(e));
await _connectionRecoveryErrorAsyncWrapper.InvokeAsync(this, new ConnectionRecoveryErrorEventArgs(e))
.ConfigureAwait(false);
}

maybeNewInnerConnection?.Dispose();
Expand Down Expand Up @@ -377,12 +381,13 @@ await RecordQueueAsync(new RecordedQueue(newName, recordedQueue),
recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld, cancellationToken)
.ConfigureAwait(false);

if (!_queueNameChangedAfterRecoveryWrapper.IsEmpty)
if (!_queueNameChangedAfterRecoveryAsyncWrapper.IsEmpty)
{
try
{
_recordedEntitiesSemaphore.Release();
_queueNameChangedAfterRecoveryWrapper.Invoke(this, new QueueNameChangedAfterRecoveryEventArgs(oldName, newName));
await _queueNameChangedAfterRecoveryAsyncWrapper.InvokeAsync(this, new QueueNameChangedAfterRecoveryEventArgs(oldName, newName))
.ConfigureAwait(false);
}
finally
{
Expand Down Expand Up @@ -515,7 +520,8 @@ internal async ValueTask RecoverConsumersAsync(AutorecoveringChannel channelToRe
try
{
_recordedEntitiesSemaphore.Release();
_consumerAboutToBeRecovered.Invoke(this, new RecoveringConsumerEventArgs(consumer.ConsumerTag, consumer.Arguments));
await _recoveringConsumerAsyncWrapper.InvokeAsync(this, new RecoveringConsumerEventArgs(consumer.ConsumerTag, consumer.Arguments))
.ConfigureAwait(false);
}
finally
{
Expand All @@ -531,12 +537,13 @@ await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
RecordedConsumer consumerWithNewConsumerTag = RecordedConsumer.WithNewConsumerTag(newTag, consumer);
UpdateConsumer(oldTag, newTag, consumerWithNewConsumerTag);

if (!_consumerTagChangeAfterRecoveryWrapper.IsEmpty)
if (!_consumerTagChangeAfterRecoveryAsyncWrapper.IsEmpty)
{
try
{
_recordedEntitiesSemaphore.Release();
_consumerTagChangeAfterRecoveryWrapper.Invoke(this, new ConsumerTagChangedAfterRecoveryEventArgs(oldTag, newTag));
await _consumerTagChangeAfterRecoveryAsyncWrapper.InvokeAsync(this, new ConsumerTagChangedAfterRecoveryEventArgs(oldTag, newTag))
.ConfigureAwait(false);
}
finally
{
Expand Down
Loading