diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index e8e5583aa6..58d5f93495 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -919,7 +919,7 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void ~RabbitMQ.Client.IChannel.TxRollbackAsync() -> System.Threading.Tasks.Task ~RabbitMQ.Client.IChannel.TxSelectAsync() -> System.Threading.Tasks.Task ~RabbitMQ.Client.IConnection.CloseAsync(ushort reasonCode, string reasonText, System.TimeSpan timeout, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task -~RabbitMQ.Client.IConnection.CreateChannelAsync() -> System.Threading.Tasks.Task +~RabbitMQ.Client.IConnection.CreateChannelAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.IConnection.UpdateSecretAsync(string newSecret, string reason) -> System.Threading.Tasks.Task ~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task ~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(System.Collections.Generic.IEnumerable endpoints, string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task diff --git a/projects/RabbitMQ.Client/client/api/IConnection.cs b/projects/RabbitMQ.Client/client/api/IConnection.cs index 2e3c44bc67..8076bdb30b 100644 --- a/projects/RabbitMQ.Client/client/api/IConnection.cs +++ b/projects/RabbitMQ.Client/client/api/IConnection.cs @@ -225,13 +225,14 @@ public interface IConnection : INetworkConnection, IDisposable /// A message indicating the reason for closing the connection. /// /// Whether or not this close is an abort (ignores certain exceptions). - /// - Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort, CancellationToken cancellationToken = default); + /// Cancellation token + Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort, + CancellationToken cancellationToken = default); /// /// Asynchronously create and return a fresh channel, session, and channel. /// - // TODO cancellation token - Task CreateChannelAsync(); + /// Cancellation token + Task CreateChannelAsync(CancellationToken cancellationToken = default); } } diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs index 39593404ff..7224e4bcf6 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs @@ -146,7 +146,7 @@ public IBasicConsumer DefaultConsumer public string CurrentQueue => InnerChannel.CurrentQueue; internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection conn, bool recoverConsumers, - bool recordedEntitiesSemaphoreHeld = false) + bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken) { if (false == recordedEntitiesSemaphoreHeld) { @@ -156,7 +156,7 @@ internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection conn, boo ThrowIfDisposed(); _connection = conn; - RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync() + RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(cancellationToken) .ConfigureAwait(false); newChannel.TakeOver(_innerChannel); diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs index c3c3e11f40..cd048c9428 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs @@ -731,7 +731,7 @@ private void RecordChannel(in AutorecoveringChannel channel, } private async Task RecordChannelAsync(AutorecoveringChannel channel, - bool channelsSemaphoreHeld = false) + bool channelsSemaphoreHeld, CancellationToken cancellationToken) { if (channelsSemaphoreHeld) { @@ -739,7 +739,7 @@ private async Task RecordChannelAsync(AutorecoveringChannel channel, } else { - await _channelsSemaphore.WaitAsync() + await _channelsSemaphore.WaitAsync(cancellationToken) .ConfigureAwait(false); try { @@ -757,6 +757,7 @@ private void DoAddRecordedChannel(AutorecoveringChannel channel) _channels.Add(channel); } + // TODO remove this unused method internal void DeleteRecordedChannel(in AutorecoveringChannel channel, bool channelsSemaphoreHeld, bool recordedEntitiesSemaphoreHeld) { diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs index b310213ce1..0eda2e94e1 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs @@ -184,7 +184,7 @@ await RecoverBindingsAsync(_innerConnection, recordedEntitiesSemaphoreHeld: true .ConfigureAwait(false); } - await RecoverChannelsAndItsConsumersAsync(recordedEntitiesSemaphoreHeld: true) + await RecoverChannelsAndItsConsumersAsync(recordedEntitiesSemaphoreHeld: true, cancellationToken: cancellationToken) .ConfigureAwait(false); } finally @@ -541,7 +541,7 @@ void UpdateConsumer(string oldTag, string newTag, in RecordedConsumer consumer) } } - private async ValueTask RecoverChannelsAndItsConsumersAsync(bool recordedEntitiesSemaphoreHeld = false) + private async ValueTask RecoverChannelsAndItsConsumersAsync(bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken) { if (false == recordedEntitiesSemaphoreHeld) { @@ -551,7 +551,8 @@ private async ValueTask RecoverChannelsAndItsConsumersAsync(bool recordedEntitie foreach (AutorecoveringChannel channel in _channels) { await channel.AutomaticallyRecoverAsync(this, _config.TopologyRecoveryEnabled, - recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld) + recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld, + cancellationToken: cancellationToken) .ConfigureAwait(false); } } diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs index 546d3e642a..f9fb7b2f1b 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs @@ -175,11 +175,11 @@ public event EventHandler RecoveringConsumer public IProtocol Protocol => Endpoint.Protocol; - public async ValueTask CreateNonRecoveringChannelAsync() + public async ValueTask CreateNonRecoveringChannelAsync(CancellationToken cancellationToken) { ISession session = InnerConnection.CreateSession(); var result = new RecoveryAwareChannel(_config, session); - return await result.OpenAsync() + return await result.OpenAsync(cancellationToken) .ConfigureAwait(false) as RecoveryAwareChannel; } @@ -241,13 +241,13 @@ await CloseInnerConnectionAsync() } } - public async Task CreateChannelAsync() + public async Task CreateChannelAsync(CancellationToken cancellationToken = default) { EnsureIsOpen(); - RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync() + RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(cancellationToken) .ConfigureAwait(false); AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel); - await RecordChannelAsync(channel, channelsSemaphoreHeld: false) + await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken) .ConfigureAwait(false); return channel; } diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index cd53d6c960..514c80aa20 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -59,7 +59,7 @@ internal abstract class ChannelBase : IChannel, IRecoverable private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue(); private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim(true); - private readonly object _confirmLock = new object(); + private object _confirmLock; private readonly LinkedList _pendingDeliveryTags = new LinkedList(); private bool _onlyAcksReceived = true; @@ -183,7 +183,6 @@ public IBasicConsumer DefaultConsumer public bool IsOpen => CloseReason is null; - // TODO add private bool for Confirm mode public ulong NextPublishSeqNo { get; private set; } public string CurrentQueue { get; private set; } @@ -376,19 +375,22 @@ protected bool Enqueue(IRpcContinuation k) } } - internal async Task OpenAsync() + internal async Task OpenAsync(CancellationToken cancellationToken) { bool enqueued = false; var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout); - await _rpcSemaphore.WaitAsync(k.CancellationToken) + using CancellationTokenSource lts = + CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, k.CancellationToken); + + await _rpcSemaphore.WaitAsync(lts.Token) .ConfigureAwait(false); try { enqueued = Enqueue(k); var method = new ChannelOpen(); - await ModelSendAsync(method, k.CancellationToken) + await ModelSendAsync(method, lts.Token) .ConfigureAwait(false); bool result = await k; @@ -416,6 +418,8 @@ internal void FinishClose() m_connectionStartCell?.TrySetResult(null); } + private bool ConfirmsAreEnabled => _confirmLock != null; + private async Task HandleCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken) { /* @@ -475,17 +479,21 @@ private void OnChannelShutdown(ShutdownEventArgs reason) { _continuationQueue.HandleChannelShutdown(reason); _channelShutdownWrapper.Invoke(this, reason); - lock (_confirmLock) + + if (ConfirmsAreEnabled) { - if (_confirmsTaskCompletionSources?.Count > 0) + lock (_confirmLock) { - var exception = new AlreadyClosedException(reason); - foreach (var confirmsTaskCompletionSource in _confirmsTaskCompletionSources) + if (_confirmsTaskCompletionSources?.Count > 0) { - confirmsTaskCompletionSource.TrySetException(exception); - } + var exception = new AlreadyClosedException(reason); + foreach (var confirmsTaskCompletionSource in _confirmsTaskCompletionSources) + { + confirmsTaskCompletionSource.TrySetException(exception); + } - _confirmsTaskCompletionSources.Clear(); + _confirmsTaskCompletionSources.Clear(); + } } } @@ -581,7 +589,7 @@ protected void HandleBasicNack(in IncomingCommand cmd) protected void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack) { // No need to do this if publisher confirms have never been enabled. - if (NextPublishSeqNo > 0) + if (ConfirmsAreEnabled) { // let's take a lock so we can assume that deliveryTags are unique, never duplicated and always sorted lock (_confirmLock) @@ -1017,7 +1025,7 @@ await ModelSendAsync(method, k.CancellationToken) public async ValueTask BasicPublishAsync(string exchange, string routingKey, TProperties basicProperties, ReadOnlyMemory body, bool mandatory) where TProperties : IReadOnlyBasicProperties, IAmqpHeader { - if (NextPublishSeqNo > 0) + if (ConfirmsAreEnabled) { lock (_confirmLock) { @@ -1047,7 +1055,7 @@ public async ValueTask BasicPublishAsync(string exchange, string ro } catch { - if (NextPublishSeqNo > 0) + if (ConfirmsAreEnabled) { lock (_confirmLock) { @@ -1078,7 +1086,7 @@ public async void BasicPublish(CachedString exchange, CachedString TProperties basicProperties, ReadOnlyMemory body, bool mandatory) where TProperties : IReadOnlyBasicProperties, IAmqpHeader { - if (NextPublishSeqNo > 0) + if (ConfirmsAreEnabled) { lock (_confirmLock) { @@ -1109,7 +1117,7 @@ public async void BasicPublish(CachedString exchange, CachedString } catch { - if (NextPublishSeqNo > 0) + if (ConfirmsAreEnabled) { lock (_confirmLock) { @@ -1126,7 +1134,7 @@ public async ValueTask BasicPublishAsync(CachedString exchange, Cac TProperties basicProperties, ReadOnlyMemory body, bool mandatory) where TProperties : IReadOnlyBasicProperties, IAmqpHeader { - if (NextPublishSeqNo > 0) + if (ConfirmsAreEnabled) { lock (_confirmLock) { @@ -1157,7 +1165,7 @@ public async ValueTask BasicPublishAsync(CachedString exchange, Cac } catch { - if (NextPublishSeqNo > 0) + if (ConfirmsAreEnabled) { lock (_confirmLock) { @@ -1263,6 +1271,10 @@ await ModelSendAsync(method, k.CancellationToken) bool result = await k; Debug.Assert(result); + // Note: + // Non-null means confirms are enabled + _confirmLock = new object(); + return; } finally @@ -1747,7 +1759,7 @@ await ModelSendAsync(method, k.CancellationToken) public Task WaitForConfirmsAsync(CancellationToken token = default) { - if (NextPublishSeqNo == 0UL) + if (false == ConfirmsAreEnabled) { throw new InvalidOperationException("Confirms not selected"); } @@ -1821,17 +1833,15 @@ public async Task WaitForConfirmsOrDieAsync(CancellationToken token = default) await CloseAsync(ea, false) .ConfigureAwait(false); } - catch (TaskCanceledException) + catch (OperationCanceledException ex) { const string msg = "timed out waiting for acks"; - - var ex = new IOException(msg); var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.ReplySuccess, msg, ex); await CloseAsync(ea, false) .ConfigureAwait(false); - throw ex; + throw; } } diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index d8668d3775..6c93d895d3 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -256,12 +256,12 @@ await CloseAsync(ea, true, } } - public Task CreateChannelAsync() + public Task CreateChannelAsync(CancellationToken cancellationToken = default) { EnsureIsOpen(); ISession session = CreateSession(); var channel = new Channel(_config, session); - return channel.OpenAsync(); + return channel.OpenAsync(cancellationToken); } internal ISession CreateSession() @@ -285,7 +285,8 @@ internal void EnsureIsOpen() } ///Asynchronous API-side invocation of connection.close with timeout. - public Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort, CancellationToken cancellationToken) + public Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort, + CancellationToken cancellationToken = default) { var reason = new ShutdownEventArgs(ShutdownInitiator.Application, reasonCode, reasonText); return CloseAsync(reason, abort, timeout, cancellationToken); diff --git a/projects/RabbitMQ.Client/client/impl/Frame.cs b/projects/RabbitMQ.Client/client/impl/Frame.cs index 0e1e130c39..faf75727c2 100644 --- a/projects/RabbitMQ.Client/client/impl/Frame.cs +++ b/projects/RabbitMQ.Client/client/impl/Frame.cs @@ -313,8 +313,14 @@ internal static bool TryReadFrame(ref ReadOnlySequence buffer, uint maxMes return false; } - // TODO check this? - // buffer.IsSingleSegment; + /* + * Note: + * The use of buffer.Slice seems to take all segments into account, thus there appears to be no need to check IsSingleSegment + * Debug.Assert(buffer.IsSingleSegment); + * In addition, the TestBasicRoundtripConcurrentManyMessages asserts that the consumed message bodies are equivalent to + * the published bodies, and if there were an issue parsing frames, it would show up there for sure. + * https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1516#issuecomment-1991943017 + */ byte firstByte = buffer.First.Span[0]; if (firstByte == 'A') diff --git a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs index 12ab187651..d04fac64c7 100644 --- a/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs +++ b/projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs @@ -428,7 +428,6 @@ await tcpClient.ConnectAsync(endpoint.Address, endpoint.Port, linkedTokenSource. { if (timeoutTokenSource.Token.IsCancellationRequested) { - // TODO maybe do not use System.TimeoutException here var timeoutException = new TimeoutException(msg, e); throw new ConnectFailureException(msg, timeoutException); }