Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove TODO #1516

Merged
merged 1 commit into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
~RabbitMQ.Client.IChannel.TxRollbackAsync() -> System.Threading.Tasks.Task
~RabbitMQ.Client.IChannel.TxSelectAsync() -> System.Threading.Tasks.Task
~RabbitMQ.Client.IConnection.CloseAsync(ushort reasonCode, string reasonText, System.TimeSpan timeout, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IConnection.CreateChannelAsync() -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel>
~RabbitMQ.Client.IConnection.CreateChannelAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel>
~RabbitMQ.Client.IConnection.UpdateSecretAsync(string newSecret, string reason) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>
~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(System.Collections.Generic.IEnumerable<RabbitMQ.Client.AmqpTcpEndpoint> endpoints, string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>
Expand Down
9 changes: 5 additions & 4 deletions projects/RabbitMQ.Client/client/api/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,14 @@ public interface IConnection : INetworkConnection, IDisposable
/// <param name="reasonText">A message indicating the reason for closing the connection.</param>
/// <param name="timeout"></param>
/// <param name="abort">Whether or not this close is an abort (ignores certain exceptions).</param>
/// <param name="cancellationToken"></param>
Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort, CancellationToken cancellationToken = default);
/// <param name="cancellationToken">Cancellation token</param>
Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort,
CancellationToken cancellationToken = default);

/// <summary>
/// Asynchronously create and return a fresh channel, session, and channel.
/// </summary>
// TODO cancellation token
Task<IChannel> CreateChannelAsync();
/// <param name="cancellationToken">Cancellation token</param>
Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default);
}
}
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public IBasicConsumer DefaultConsumer
public string CurrentQueue => InnerChannel.CurrentQueue;

internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection conn, bool recoverConsumers,
bool recordedEntitiesSemaphoreHeld = false)
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
{
if (false == recordedEntitiesSemaphoreHeld)
{
Expand All @@ -156,7 +156,7 @@ internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection conn, boo
ThrowIfDisposed();
_connection = conn;

RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync()
RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(cancellationToken)
.ConfigureAwait(false);
newChannel.TakeOver(_innerChannel);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,15 +731,15 @@ private void RecordChannel(in AutorecoveringChannel channel,
}

private async Task RecordChannelAsync(AutorecoveringChannel channel,
bool channelsSemaphoreHeld = false)
bool channelsSemaphoreHeld, CancellationToken cancellationToken)
{
if (channelsSemaphoreHeld)
{
DoAddRecordedChannel(channel);
}
else
{
await _channelsSemaphore.WaitAsync()
await _channelsSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
try
{
Expand All @@ -757,6 +757,7 @@ private void DoAddRecordedChannel(AutorecoveringChannel channel)
_channels.Add(channel);
}

// TODO remove this unused method
internal void DeleteRecordedChannel(in AutorecoveringChannel channel,
bool channelsSemaphoreHeld, bool recordedEntitiesSemaphoreHeld)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ await RecoverBindingsAsync(_innerConnection, recordedEntitiesSemaphoreHeld: true
.ConfigureAwait(false);

}
await RecoverChannelsAndItsConsumersAsync(recordedEntitiesSemaphoreHeld: true)
await RecoverChannelsAndItsConsumersAsync(recordedEntitiesSemaphoreHeld: true, cancellationToken: cancellationToken)
.ConfigureAwait(false);
}
finally
Expand Down Expand Up @@ -541,7 +541,7 @@ void UpdateConsumer(string oldTag, string newTag, in RecordedConsumer consumer)
}
}

private async ValueTask RecoverChannelsAndItsConsumersAsync(bool recordedEntitiesSemaphoreHeld = false)
private async ValueTask RecoverChannelsAndItsConsumersAsync(bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
{
if (false == recordedEntitiesSemaphoreHeld)
{
Expand All @@ -551,7 +551,8 @@ private async ValueTask RecoverChannelsAndItsConsumersAsync(bool recordedEntitie
foreach (AutorecoveringChannel channel in _channels)
{
await channel.AutomaticallyRecoverAsync(this, _config.TopologyRecoveryEnabled,
recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld)
recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld,
cancellationToken: cancellationToken)
.ConfigureAwait(false);
}
}
Expand Down
10 changes: 5 additions & 5 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,11 @@ public event EventHandler<RecoveringConsumerEventArgs> RecoveringConsumer

public IProtocol Protocol => Endpoint.Protocol;

public async ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync()
public async ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(CancellationToken cancellationToken)
{
ISession session = InnerConnection.CreateSession();
var result = new RecoveryAwareChannel(_config, session);
return await result.OpenAsync()
return await result.OpenAsync(cancellationToken)
.ConfigureAwait(false) as RecoveryAwareChannel;
}

Expand Down Expand Up @@ -241,13 +241,13 @@ await CloseInnerConnectionAsync()
}
}

public async Task<IChannel> CreateChannelAsync()
public async Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default)
{
EnsureIsOpen();
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync()
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(cancellationToken)
.ConfigureAwait(false);
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel);
await RecordChannelAsync(channel, channelsSemaphoreHeld: false)
await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
.ConfigureAwait(false);
return channel;
}
Expand Down
58 changes: 34 additions & 24 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ internal abstract class ChannelBase : IChannel, IRecoverable
private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue();
private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim(true);

private readonly object _confirmLock = new object();
private object _confirmLock;
private readonly LinkedList<ulong> _pendingDeliveryTags = new LinkedList<ulong>();

private bool _onlyAcksReceived = true;
Expand Down Expand Up @@ -183,7 +183,6 @@ public IBasicConsumer DefaultConsumer

public bool IsOpen => CloseReason is null;

// TODO add private bool for Confirm mode
public ulong NextPublishSeqNo { get; private set; }

public string CurrentQueue { get; private set; }
Expand Down Expand Up @@ -376,19 +375,22 @@ protected bool Enqueue(IRpcContinuation k)
}
}

internal async Task<IChannel> OpenAsync()
internal async Task<IChannel> OpenAsync(CancellationToken cancellationToken)
{
bool enqueued = false;
var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout);

await _rpcSemaphore.WaitAsync(k.CancellationToken)
using CancellationTokenSource lts =
CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, k.CancellationToken);

await _rpcSemaphore.WaitAsync(lts.Token)
.ConfigureAwait(false);
try
{
enqueued = Enqueue(k);

var method = new ChannelOpen();
await ModelSendAsync(method, k.CancellationToken)
await ModelSendAsync(method, lts.Token)
.ConfigureAwait(false);

bool result = await k;
Expand Down Expand Up @@ -416,6 +418,8 @@ internal void FinishClose()
m_connectionStartCell?.TrySetResult(null);
}

private bool ConfirmsAreEnabled => _confirmLock != null;

private async Task HandleCommandAsync(IncomingCommand cmd, CancellationToken cancellationToken)
{
/*
Expand Down Expand Up @@ -475,17 +479,21 @@ private void OnChannelShutdown(ShutdownEventArgs reason)
{
_continuationQueue.HandleChannelShutdown(reason);
_channelShutdownWrapper.Invoke(this, reason);
lock (_confirmLock)

if (ConfirmsAreEnabled)
{
if (_confirmsTaskCompletionSources?.Count > 0)
lock (_confirmLock)
{
var exception = new AlreadyClosedException(reason);
foreach (var confirmsTaskCompletionSource in _confirmsTaskCompletionSources)
if (_confirmsTaskCompletionSources?.Count > 0)
{
confirmsTaskCompletionSource.TrySetException(exception);
}
var exception = new AlreadyClosedException(reason);
foreach (var confirmsTaskCompletionSource in _confirmsTaskCompletionSources)
{
confirmsTaskCompletionSource.TrySetException(exception);
}

_confirmsTaskCompletionSources.Clear();
_confirmsTaskCompletionSources.Clear();
}
}
}

Expand Down Expand Up @@ -581,7 +589,7 @@ protected void HandleBasicNack(in IncomingCommand cmd)
protected void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack)
{
// No need to do this if publisher confirms have never been enabled.
if (NextPublishSeqNo > 0)
if (ConfirmsAreEnabled)
{
// let's take a lock so we can assume that deliveryTags are unique, never duplicated and always sorted
lock (_confirmLock)
Expand Down Expand Up @@ -1017,7 +1025,7 @@ await ModelSendAsync(method, k.CancellationToken)
public async ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
{
if (NextPublishSeqNo > 0)
if (ConfirmsAreEnabled)
{
lock (_confirmLock)
{
Expand Down Expand Up @@ -1047,7 +1055,7 @@ public async ValueTask BasicPublishAsync<TProperties>(string exchange, string ro
}
catch
{
if (NextPublishSeqNo > 0)
if (ConfirmsAreEnabled)
{
lock (_confirmLock)
{
Expand Down Expand Up @@ -1078,7 +1086,7 @@ public async void BasicPublish<TProperties>(CachedString exchange, CachedString
TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
{
if (NextPublishSeqNo > 0)
if (ConfirmsAreEnabled)
{
lock (_confirmLock)
{
Expand Down Expand Up @@ -1109,7 +1117,7 @@ public async void BasicPublish<TProperties>(CachedString exchange, CachedString
}
catch
{
if (NextPublishSeqNo > 0)
if (ConfirmsAreEnabled)
{
lock (_confirmLock)
{
Expand All @@ -1126,7 +1134,7 @@ public async ValueTask BasicPublishAsync<TProperties>(CachedString exchange, Cac
TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
{
if (NextPublishSeqNo > 0)
if (ConfirmsAreEnabled)
{
lock (_confirmLock)
{
Expand Down Expand Up @@ -1157,7 +1165,7 @@ public async ValueTask BasicPublishAsync<TProperties>(CachedString exchange, Cac
}
catch
{
if (NextPublishSeqNo > 0)
if (ConfirmsAreEnabled)
{
lock (_confirmLock)
{
Expand Down Expand Up @@ -1263,6 +1271,10 @@ await ModelSendAsync(method, k.CancellationToken)
bool result = await k;
Debug.Assert(result);

// Note:
// Non-null means confirms are enabled
_confirmLock = new object();

return;
}
finally
Expand Down Expand Up @@ -1747,7 +1759,7 @@ await ModelSendAsync(method, k.CancellationToken)

public Task<bool> WaitForConfirmsAsync(CancellationToken token = default)
{
if (NextPublishSeqNo == 0UL)
if (false == ConfirmsAreEnabled)
{
throw new InvalidOperationException("Confirms not selected");
}
Expand Down Expand Up @@ -1821,17 +1833,15 @@ public async Task WaitForConfirmsOrDieAsync(CancellationToken token = default)
await CloseAsync(ea, false)
.ConfigureAwait(false);
}
catch (TaskCanceledException)
catch (OperationCanceledException ex)
{
const string msg = "timed out waiting for acks";

var ex = new IOException(msg);
var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.ReplySuccess, msg, ex);

await CloseAsync(ea, false)
.ConfigureAwait(false);

throw ex;
throw;
}
}

Expand Down
7 changes: 4 additions & 3 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,12 @@ await CloseAsync(ea, true,
}
}

public Task<IChannel> CreateChannelAsync()
public Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default)
{
EnsureIsOpen();
ISession session = CreateSession();
var channel = new Channel(_config, session);
return channel.OpenAsync();
return channel.OpenAsync(cancellationToken);
}

internal ISession CreateSession()
Expand All @@ -285,7 +285,8 @@ internal void EnsureIsOpen()
}

///<summary>Asynchronous API-side invocation of connection.close with timeout.</summary>
public Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort, CancellationToken cancellationToken)
public Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abort,
CancellationToken cancellationToken = default)
{
var reason = new ShutdownEventArgs(ShutdownInitiator.Application, reasonCode, reasonText);
return CloseAsync(reason, abort, timeout, cancellationToken);
Expand Down
10 changes: 8 additions & 2 deletions projects/RabbitMQ.Client/client/impl/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,14 @@ internal static bool TryReadFrame(ref ReadOnlySequence<byte> buffer, uint maxMes
return false;
}

// TODO check this?
// buffer.IsSingleSegment;
/*
* Note:
* The use of buffer.Slice seems to take all segments into account, thus there appears to be no need to check IsSingleSegment
* Debug.Assert(buffer.IsSingleSegment);
* In addition, the TestBasicRoundtripConcurrentManyMessages asserts that the consumed message bodies are equivalent to
* the published bodies, and if there were an issue parsing frames, it would show up there for sure.
* https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1516#issuecomment-1991943017
*/

byte firstByte = buffer.First.Span[0];
if (firstByte == 'A')
Expand Down
1 change: 0 additions & 1 deletion projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,6 @@ await tcpClient.ConnectAsync(endpoint.Address, endpoint.Port, linkedTokenSource.
{
if (timeoutTokenSource.Token.IsCancellationRequested)
{
// TODO maybe do not use System.TimeoutException here
var timeoutException = new TimeoutException(msg, e);
throw new ConnectFailureException(msg, timeoutException);
}
Expand Down