Skip to content

Commit 424c3d4

Browse files
authored
Merge pull request #1740 from danielmarbach/token-event-args
Ensure Connection and Channel cancellation token properly float into handlers
2 parents b14bd95 + 1f73259 commit 424c3d4

22 files changed

+415
-101
lines changed

projects/RabbitMQ.Client/ConsumerDispatching/AsyncConsumerDispatcher.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,20 +34,20 @@ protected override async Task ProcessChannelAsync()
3434
{
3535
await work.Consumer.HandleBasicDeliverAsync(
3636
work.ConsumerTag!, work.DeliveryTag, work.Redelivered,
37-
work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory)
37+
work.Exchange!, work.RoutingKey!, work.BasicProperties!, work.Body.Memory, work.CancellationToken)
3838
.ConfigureAwait(false);
3939
}
4040
break;
4141
case WorkType.Cancel:
42-
await work.Consumer.HandleBasicCancelAsync(work.ConsumerTag!)
42+
await work.Consumer.HandleBasicCancelAsync(work.ConsumerTag!, work.CancellationToken)
4343
.ConfigureAwait(false);
4444
break;
4545
case WorkType.CancelOk:
46-
await work.Consumer.HandleBasicCancelOkAsync(work.ConsumerTag!)
46+
await work.Consumer.HandleBasicCancelOkAsync(work.ConsumerTag!, work.CancellationToken)
4747
.ConfigureAwait(false);
4848
break;
4949
case WorkType.ConsumeOk:
50-
await work.Consumer.HandleBasicConsumeOkAsync(work.ConsumerTag!)
50+
await work.Consumer.HandleBasicConsumeOkAsync(work.ConsumerTag!, work.CancellationToken)
5151
.ConfigureAwait(false);
5252
break;
5353
case WorkType.Shutdown:

projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs

Lines changed: 70 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase,
4646
private readonly ushort _concurrency;
4747
private long _isQuiescing;
4848
private bool _disposed;
49+
private readonly CancellationTokenSource _shutdownCts = new CancellationTokenSource();
4950

5051
internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency)
5152
{
@@ -92,7 +93,7 @@ public async ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, s
9293
try
9394
{
9495
AddConsumer(consumer, consumerTag);
95-
WorkStruct work = WorkStruct.CreateConsumeOk(consumer, consumerTag);
96+
WorkStruct work = WorkStruct.CreateConsumeOk(consumer, consumerTag, _shutdownCts);
9697
await _writer.WriteAsync(work, cancellationToken)
9798
.ConfigureAwait(false);
9899
}
@@ -113,7 +114,7 @@ public async ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliver
113114
if (false == _disposed && false == IsQuiescing)
114115
{
115116
IAsyncBasicConsumer consumer = GetConsumerOrDefault(consumerTag);
116-
var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body);
117+
var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, _shutdownCts);
117118
await _writer.WriteAsync(work, cancellationToken)
118119
.ConfigureAwait(false);
119120
}
@@ -126,7 +127,7 @@ public async ValueTask HandleBasicCancelOkAsync(string consumerTag, Cancellation
126127
if (false == _disposed && false == IsQuiescing)
127128
{
128129
IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
129-
WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag);
130+
WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag, _shutdownCts);
130131
await _writer.WriteAsync(work, cancellationToken)
131132
.ConfigureAwait(false);
132133
}
@@ -139,18 +140,31 @@ public async ValueTask HandleBasicCancelAsync(string consumerTag, CancellationTo
139140
if (false == _disposed && false == IsQuiescing)
140141
{
141142
IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
142-
WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag);
143+
WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag, _shutdownCts);
143144
await _writer.WriteAsync(work, cancellationToken)
144145
.ConfigureAwait(false);
145146
}
146147
}
147148

148149
public void Quiesce()
149150
{
151+
if (IsQuiescing)
152+
{
153+
return;
154+
}
155+
150156
Interlocked.Exchange(ref _isQuiescing, 1);
157+
try
158+
{
159+
_shutdownCts.Cancel();
160+
}
161+
catch
162+
{
163+
// ignore
164+
}
151165
}
152166

153-
public async Task WaitForShutdownAsync()
167+
public async Task WaitForShutdownAsync(CancellationToken cancellationToken)
154168
{
155169
if (_disposed)
156170
{
@@ -169,7 +183,7 @@ public async Task WaitForShutdownAsync()
169183
*
170184
* await _reader.Completion.ConfigureAwait(false);
171185
*/
172-
await _worker
186+
await _worker.WaitAsync(cancellationToken)
173187
.ConfigureAwait(false);
174188
}
175189
catch (AggregateException aex)
@@ -203,18 +217,13 @@ protected bool IsQuiescing
203217
{
204218
get
205219
{
206-
if (Interlocked.Read(ref _isQuiescing) == 1)
207-
{
208-
return true;
209-
}
210-
211-
return false;
220+
return Interlocked.Read(ref _isQuiescing) == 1;
212221
}
213222
}
214223

215224
protected sealed override void ShutdownConsumer(IAsyncBasicConsumer consumer, ShutdownEventArgs reason)
216225
{
217-
_writer.TryWrite(WorkStruct.CreateShutdown(consumer, reason));
226+
_writer.TryWrite(WorkStruct.CreateShutdown(consumer, reason, _shutdownCts));
218227
}
219228

220229
protected override Task InternalShutdownAsync()
@@ -237,25 +246,32 @@ protected override Task InternalShutdownAsync()
237246
public readonly RentedMemory Body;
238247
public readonly ShutdownEventArgs? Reason;
239248
public readonly WorkType WorkType;
249+
public readonly CancellationToken CancellationToken;
250+
private readonly CancellationTokenSource? _cancellationTokenSource;
240251

241-
private WorkStruct(WorkType type, IAsyncBasicConsumer consumer, string consumerTag)
252+
private WorkStruct(WorkType type, IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken)
242253
: this()
243254
{
244255
WorkType = type;
245256
Consumer = consumer;
246257
ConsumerTag = consumerTag;
258+
CancellationToken = cancellationToken;
259+
_cancellationTokenSource = null;
247260
}
248261

249-
private WorkStruct(IAsyncBasicConsumer consumer, ShutdownEventArgs reason)
262+
private WorkStruct(IAsyncBasicConsumer consumer, ShutdownEventArgs reason, CancellationTokenSource? cancellationTokenSource)
250263
: this()
251264
{
252265
WorkType = WorkType.Shutdown;
253266
Consumer = consumer;
254267
Reason = reason;
268+
CancellationToken = cancellationTokenSource?.Token ?? CancellationToken.None;
269+
this._cancellationTokenSource = cancellationTokenSource;
255270
}
256271

257272
private WorkStruct(IAsyncBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered,
258-
string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body)
273+
string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body,
274+
CancellationToken cancellationToken)
259275
{
260276
WorkType = WorkType.Deliver;
261277
Consumer = consumer;
@@ -266,37 +282,62 @@ private WorkStruct(IAsyncBasicConsumer consumer, string consumerTag, ulong deliv
266282
RoutingKey = routingKey;
267283
BasicProperties = basicProperties;
268284
Body = body;
269-
Reason = default;
285+
Reason = null;
286+
CancellationToken = cancellationToken;
287+
_cancellationTokenSource = null;
270288
}
271289

272-
public static WorkStruct CreateCancel(IAsyncBasicConsumer consumer, string consumerTag)
290+
public static WorkStruct CreateCancel(IAsyncBasicConsumer consumer, string consumerTag, CancellationTokenSource cancellationTokenSource)
273291
{
274-
return new WorkStruct(WorkType.Cancel, consumer, consumerTag);
292+
return new WorkStruct(WorkType.Cancel, consumer, consumerTag, cancellationTokenSource.Token);
275293
}
276294

277-
public static WorkStruct CreateCancelOk(IAsyncBasicConsumer consumer, string consumerTag)
295+
public static WorkStruct CreateCancelOk(IAsyncBasicConsumer consumer, string consumerTag, CancellationTokenSource cancellationTokenSource)
278296
{
279-
return new WorkStruct(WorkType.CancelOk, consumer, consumerTag);
297+
return new WorkStruct(WorkType.CancelOk, consumer, consumerTag, cancellationTokenSource.Token);
280298
}
281299

282-
public static WorkStruct CreateConsumeOk(IAsyncBasicConsumer consumer, string consumerTag)
300+
public static WorkStruct CreateConsumeOk(IAsyncBasicConsumer consumer, string consumerTag, CancellationTokenSource cancellationTokenSource)
283301
{
284-
return new WorkStruct(WorkType.ConsumeOk, consumer, consumerTag);
302+
return new WorkStruct(WorkType.ConsumeOk, consumer, consumerTag, cancellationTokenSource.Token);
285303
}
286304

287-
public static WorkStruct CreateShutdown(IAsyncBasicConsumer consumer, ShutdownEventArgs reason)
305+
public static WorkStruct CreateShutdown(IAsyncBasicConsumer consumer, ShutdownEventArgs reason, CancellationTokenSource cancellationTokenSource)
288306
{
289-
return new WorkStruct(consumer, reason);
307+
// Create a linked CTS so the shutdown args token reflects both dispatcher cancellation and any upstream token.
308+
CancellationTokenSource? linked = null;
309+
try
310+
{
311+
if (reason.CancellationToken.CanBeCanceled)
312+
{
313+
linked = CancellationTokenSource.CreateLinkedTokenSource(cancellationTokenSource.Token, reason.CancellationToken);
314+
}
315+
}
316+
catch
317+
{
318+
linked = null;
319+
}
320+
321+
CancellationToken token = linked?.Token ?? cancellationTokenSource.Token;
322+
ShutdownEventArgs argsWithToken = reason.Exception != null ?
323+
new ShutdownEventArgs(reason.Initiator, reason.ReplyCode, reason.ReplyText, reason.Exception, token) :
324+
new ShutdownEventArgs(reason.Initiator, reason.ReplyCode, reason.ReplyText, reason.ClassId, reason.MethodId, reason.Cause, token);
325+
326+
return new WorkStruct(consumer, argsWithToken, linked);
290327
}
291328

292329
public static WorkStruct CreateDeliver(IAsyncBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered,
293-
string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body)
330+
string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body, CancellationTokenSource cancellationTokenSource)
294331
{
295332
return new WorkStruct(consumer, consumerTag, deliveryTag, redelivered,
296-
exchange, routingKey, basicProperties, body);
333+
exchange, routingKey, basicProperties, body, cancellationTokenSource.Token);
297334
}
298335

299-
public void Dispose() => Body.Dispose();
336+
public void Dispose()
337+
{
338+
Body.Dispose();
339+
_cancellationTokenSource?.Dispose();
340+
}
300341
}
301342

302343
protected enum WorkType : byte
@@ -317,6 +358,7 @@ protected virtual void Dispose(bool disposing)
317358
if (disposing)
318359
{
319360
Quiesce();
361+
_shutdownCts.Dispose();
320362
}
321363
}
322364
catch

projects/RabbitMQ.Client/ConsumerDispatching/IConsumerDispatcher.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,6 @@ ValueTask HandleBasicDeliverAsync(string consumerTag,
6464
void Quiesce();
6565

6666
Task ShutdownAsync(ShutdownEventArgs reason);
67-
Task WaitForShutdownAsync();
67+
Task WaitForShutdownAsync(CancellationToken cancellationToken);
6868
}
6969
}

projects/RabbitMQ.Client/IChannel.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,13 +257,22 @@ ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue,
257257
Task CloseAsync(ushort replyCode, string replyText, bool abort,
258258
CancellationToken cancellationToken = default);
259259

260+
/// <summary>
261+
/// Asynchronously close this session.
262+
/// </summary>
263+
/// <param name="reason">The <see cref="ShutdownEventArgs"/> instance containing the close data.</param>
264+
/// <param name="abort">Whether or not the close is an abort (ignoring certain exceptions).</param>
265+
/// <returns></returns>
266+
Task CloseAsync(ShutdownEventArgs reason, bool abort);
267+
260268
/// <summary>
261269
/// Asynchronously close this session.
262270
/// </summary>
263271
/// <param name="reason">The <see cref="ShutdownEventArgs"/> instance containing the close data.</param>
264272
/// <param name="abort">Whether or not the close is an abort (ignoring certain exceptions).</param>
265273
/// <param name="cancellationToken">CancellationToken for this operation.</param>
266274
/// <returns></returns>
275+
[Obsolete("7.2.0 - cancellationToken is ignored")]
267276
Task CloseAsync(ShutdownEventArgs reason, bool abort,
268277
CancellationToken cancellationToken = default);
269278

projects/RabbitMQ.Client/IConnectionExtensions.cs

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,21 +93,36 @@ public static Task CloseAsync(this IConnection connection, ushort reasonCode, st
9393
/// </summary>
9494
/// <remarks>
9595
/// Note that all active channels and sessions will be closed if this method is called.
96-
/// In comparison to normal <see cref="CloseAsync(IConnection, CancellationToken)"/> method, <see cref="AbortAsync(IConnection)"/> will not throw
96+
/// In comparison to normal <see cref="CloseAsync(IConnection, CancellationToken)"/> method, <see cref="AbortAsync(IConnection, CancellationToken)"/> will not throw
9797
/// <see cref="IOException"/> during closing connection.
9898
///This method waits infinitely for the in-progress close operation to complete.
9999
/// </remarks>
100100
public static Task AbortAsync(this IConnection connection)
101101
{
102-
return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced", InternalConstants.DefaultConnectionAbortTimeout, true,
103-
CancellationToken.None);
102+
return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced",
103+
InternalConstants.DefaultConnectionAbortTimeout, true, default);
104+
}
105+
106+
/// <summary>
107+
/// Asynchronously abort this connection and all its channels.
108+
/// </summary>
109+
/// <remarks>
110+
/// Note that all active channels and sessions will be closed if this method is called.
111+
/// In comparison to normal <see cref="CloseAsync(IConnection, CancellationToken)"/> method, <see cref="AbortAsync(IConnection, CancellationToken)"/> will not throw
112+
/// <see cref="IOException"/> during closing connection.
113+
///This method waits infinitely for the in-progress close operation to complete.
114+
/// </remarks>
115+
public static Task AbortAsync(this IConnection connection, CancellationToken cancellationToken = default)
116+
{
117+
return connection.CloseAsync(Constants.ReplySuccess, "Connection close forced",
118+
InternalConstants.DefaultConnectionAbortTimeout, true, cancellationToken);
104119
}
105120

106121
/// <summary>
107122
/// Asynchronously abort this connection and all its channels.
108123
/// </summary>
109124
/// <remarks>
110-
/// The method behaves in the same way as <see cref="AbortAsync(IConnection)"/>, with the only
125+
/// The method behaves in the same way as <see cref="AbortAsync(IConnection, CancellationToken)"/>, with the only
111126
/// difference that the connection is closed with the given connection close code and message.
112127
/// <para>
113128
/// The close code (See under "Reply Codes" in the AMQP 0-9-1 specification)
@@ -118,16 +133,35 @@ public static Task AbortAsync(this IConnection connection)
118133
/// </remarks>
119134
public static Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText)
120135
{
121-
return connection.CloseAsync(reasonCode, reasonText, InternalConstants.DefaultConnectionAbortTimeout, true,
122-
CancellationToken.None);
136+
return connection.CloseAsync(reasonCode, reasonText,
137+
InternalConstants.DefaultConnectionAbortTimeout, true, default);
138+
}
139+
140+
/// <summary>
141+
/// Asynchronously abort this connection and all its channels.
142+
/// </summary>
143+
/// <remarks>
144+
/// The method behaves in the same way as <see cref="AbortAsync(IConnection, CancellationToken)"/>, with the only
145+
/// difference that the connection is closed with the given connection close code and message.
146+
/// <para>
147+
/// The close code (See under "Reply Codes" in the AMQP 0-9-1 specification)
148+
/// </para>
149+
/// <para>
150+
/// A message indicating the reason for closing the connection
151+
/// </para>
152+
/// </remarks>
153+
public static Task AbortAsync(this IConnection connection, ushort reasonCode, string reasonText, CancellationToken cancellationToken = default)
154+
{
155+
return connection.CloseAsync(reasonCode, reasonText,
156+
InternalConstants.DefaultConnectionAbortTimeout, true, cancellationToken);
123157
}
124158

125159
/// <summary>
126160
/// Asynchronously abort this connection and all its channels and wait with a
127161
/// timeout for all the in-progress close operations to complete.
128162
/// </summary>
129163
/// <remarks>
130-
/// This method, behaves in a similar way as method <see cref="AbortAsync(IConnection)"/> with the
164+
/// This method, behaves in a similar way as method <see cref="AbortAsync(IConnection, CancellationToken)"/> with the
131165
/// only difference that it explicitly specifies a timeout given
132166
/// for all the in-progress close operations to complete.
133167
/// If timeout is reached and the close operations haven't finished, then socket is forced to close.

projects/RabbitMQ.Client/Impl/AsyncEventingWrapper.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ private readonly async Task InternalInvoke(Delegate[] handlers, object sender, T
6161
await action(sender, @event)
6262
.ConfigureAwait(false);
6363
}
64+
catch (OperationCanceledException)
65+
{
66+
// Ignore cancellation exceptions
67+
}
6468
catch (Exception exception)
6569
{
6670
if (_onException != null)

0 commit comments

Comments
 (0)