Skip to content

Commit a88f54f

Browse files
committed
Pass on CancellationToken
1 parent af594e1 commit a88f54f

File tree

9 files changed

+110
-70
lines changed

9 files changed

+110
-70
lines changed

projects/RabbitMQ.Client/client/api/ConnectionConfig.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Threading;
3435
using System.Threading.Tasks;
3536
using RabbitMQ.Client.Impl;
3637

@@ -142,7 +143,7 @@ public sealed class ConnectionConfig
142143
/// </summary>
143144
public readonly int DispatchConsumerConcurrency;
144145

145-
internal readonly Func<AmqpTcpEndpoint, Task<IFrameHandler>> FrameHandlerFactory;
146+
internal readonly Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> FrameHandlerFactoryAsync;
146147

147148
internal ConnectionConfig(string virtualHost, string userName, string password,
148149
ICredentialsProvider credentialsProvider, ICredentialsRefresher credentialsRefresher,
@@ -152,7 +153,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
152153
TopologyRecoveryFilter topologyRecoveryFilter, TopologyRecoveryExceptionHandler topologyRecoveryExceptionHandler,
153154
TimeSpan networkRecoveryInterval, TimeSpan heartbeatInterval, TimeSpan continuationTimeout, TimeSpan handshakeContinuationTimeout, TimeSpan requestedConnectionTimeout,
154155
bool dispatchConsumersAsync, int dispatchConsumerConcurrency,
155-
Func<AmqpTcpEndpoint, Task<IFrameHandler>> frameHandlerFactory)
156+
Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync)
156157
{
157158
VirtualHost = virtualHost;
158159
UserName = userName;
@@ -174,7 +175,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
174175
RequestedConnectionTimeout = requestedConnectionTimeout;
175176
DispatchConsumersAsync = dispatchConsumersAsync;
176177
DispatchConsumerConcurrency = dispatchConsumerConcurrency;
177-
FrameHandlerFactory = frameHandlerFactory;
178+
FrameHandlerFactoryAsync = frameHandlerFactoryAsync;
178179
}
179180
}
180181
}

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -421,12 +421,14 @@ public IConnection CreateConnection()
421421
/// returned by the EndpointResolverFactory. By default the configured
422422
/// hostname and port are used.
423423
/// </summary>
424+
/// <param name="cancellationToken">Cancellation token for this connection</param>
424425
/// <exception cref="BrokerUnreachableException">
425426
/// When the configured hostname was not reachable.
426427
/// </exception>
427-
public ValueTask<IConnection> CreateConnectionAsync()
428+
public ValueTask<IConnection> CreateConnectionAsync(
429+
CancellationToken cancellationToken = default(CancellationToken))
428430
{
429-
return CreateConnectionAsync(ClientProvidedName);
431+
return CreateConnectionAsync(ClientProvidedName, cancellationToken);
430432
}
431433

432434
/// <summary>
@@ -459,12 +461,14 @@ public IConnection CreateConnection(string clientProvidedName)
459461
/// be used as a connection identifier, e.g. in HTTP API requests.
460462
/// This value is supposed to be human-readable.
461463
/// </param>
464+
/// <param name="cancellationToken">Cancellation token for this connection</param>
462465
/// <exception cref="BrokerUnreachableException">
463466
/// When the configured hostname was not reachable.
464467
/// </exception>
465-
public ValueTask<IConnection> CreateConnectionAsync(string clientProvidedName)
468+
public ValueTask<IConnection> CreateConnectionAsync(string clientProvidedName,
469+
CancellationToken cancellationToken = default(CancellationToken))
466470
{
467-
return CreateConnectionAsync(EndpointResolverFactory(LocalEndpoints()), clientProvidedName);
471+
return CreateConnectionAsync(EndpointResolverFactory(LocalEndpoints()), clientProvidedName, cancellationToken);
468472
}
469473

470474
/// <summary>
@@ -496,13 +500,15 @@ public IConnection CreateConnection(IEnumerable<string> hostnames)
496500
/// List of hostnames to use for the initial
497501
/// connection and recovery.
498502
/// </param>
503+
/// <param name="cancellationToken">Cancellation token for this connection</param>
499504
/// <returns>Open connection</returns>
500505
/// <exception cref="BrokerUnreachableException">
501506
/// When no hostname was reachable.
502507
/// </exception>
503-
public ValueTask<IConnection> CreateConnectionAsync(IEnumerable<string> hostnames)
508+
public ValueTask<IConnection> CreateConnectionAsync(IEnumerable<string> hostnames,
509+
CancellationToken cancellationToken = default(CancellationToken))
504510
{
505-
return CreateConnectionAsync(hostnames, ClientProvidedName);
511+
return CreateConnectionAsync(hostnames, ClientProvidedName, cancellationToken);
506512
}
507513

508514
/// <summary>
@@ -547,14 +553,16 @@ public IConnection CreateConnection(IEnumerable<string> hostnames, string client
547553
/// be used as a connection identifier, e.g. in HTTP API requests.
548554
/// This value is supposed to be human-readable.
549555
/// </param>
556+
/// <param name="cancellationToken">Cancellation token for this connection</param>
550557
/// <returns>Open connection</returns>
551558
/// <exception cref="BrokerUnreachableException">
552559
/// When no hostname was reachable.
553560
/// </exception>
554-
public ValueTask<IConnection> CreateConnectionAsync(IEnumerable<string> hostnames, string clientProvidedName)
561+
public ValueTask<IConnection> CreateConnectionAsync(IEnumerable<string> hostnames, string clientProvidedName,
562+
CancellationToken cancellationToken = default(CancellationToken))
555563
{
556564
IEnumerable<AmqpTcpEndpoint> endpoints = hostnames.Select(h => new AmqpTcpEndpoint(h, Port, Ssl, MaxMessageSize));
557-
return CreateConnectionAsync(EndpointResolverFactory(endpoints), clientProvidedName);
565+
return CreateConnectionAsync(EndpointResolverFactory(endpoints), clientProvidedName, cancellationToken);
558566
}
559567

560568
/// <summary>
@@ -584,13 +592,15 @@ public IConnection CreateConnection(IEnumerable<AmqpTcpEndpoint> endpoints)
584592
/// List of endpoints to use for the initial
585593
/// connection and recovery.
586594
/// </param>
595+
/// <param name="cancellationToken">Cancellation token for this connection</param>
587596
/// <returns>Open connection</returns>
588597
/// <exception cref="BrokerUnreachableException">
589598
/// When no hostname was reachable.
590599
/// </exception>
591-
public ValueTask<IConnection> CreateConnectionAsync(IEnumerable<AmqpTcpEndpoint> endpoints)
600+
public ValueTask<IConnection> CreateConnectionAsync(IEnumerable<AmqpTcpEndpoint> endpoints,
601+
CancellationToken cancellationToken = default(CancellationToken))
592602
{
593-
return CreateConnectionAsync(endpoints, ClientProvidedName);
603+
return CreateConnectionAsync(endpoints, ClientProvidedName, cancellationToken);
594604
}
595605

596606
/// <summary>
@@ -632,13 +642,15 @@ public IConnection CreateConnection(IEnumerable<AmqpTcpEndpoint> endpoints, stri
632642
/// be used as a connection identifier, e.g. in HTTP API requests.
633643
/// This value is supposed to be human-readable.
634644
/// </param>
645+
/// <param name="cancellationToken">Cancellation token for this connection</param>
635646
/// <returns>Open connection</returns>
636647
/// <exception cref="BrokerUnreachableException">
637648
/// When no hostname was reachable.
638649
/// </exception>
639-
public ValueTask<IConnection> CreateConnectionAsync(IEnumerable<AmqpTcpEndpoint> endpoints, string clientProvidedName)
650+
public ValueTask<IConnection> CreateConnectionAsync(IEnumerable<AmqpTcpEndpoint> endpoints, string clientProvidedName,
651+
CancellationToken cancellationToken = default(CancellationToken))
640652
{
641-
return CreateConnectionAsync(EndpointResolverFactory(endpoints), clientProvidedName);
653+
return CreateConnectionAsync(EndpointResolverFactory(endpoints), clientProvidedName, cancellationToken);
642654
}
643655

644656
/// <summary>
@@ -669,7 +681,8 @@ public IConnection CreateConnection(IEndpointResolver endpointResolver, string c
669681
}
670682
else
671683
{
672-
IFrameHandler frameHandler = endpointResolver.SelectOne(CreateFrameHandlerAsync).EnsureCompleted();
684+
IFrameHandler frameHandler = endpointResolver.SelectOneAsync(
685+
CreateFrameHandlerAsync, CancellationToken.None).EnsureCompleted();
673686
var c = new Connection(config, frameHandler);
674687
return (Connection)c.Open();
675688
}
@@ -692,27 +705,29 @@ public IConnection CreateConnection(IEndpointResolver endpointResolver, string c
692705
/// be used as a connection identifier, e.g. in HTTP API requests.
693706
/// This value is supposed to be human-readable.
694707
/// </param>
708+
/// <param name="cancellationToken">Cancellation token for this connection</param>
695709
/// <returns>Open connection</returns>
696710
/// <exception cref="BrokerUnreachableException">
697711
/// When no hostname was reachable.
698712
/// </exception>
699-
public async ValueTask<IConnection> CreateConnectionAsync(IEndpointResolver endpointResolver, string clientProvidedName)
713+
public async ValueTask<IConnection> CreateConnectionAsync(IEndpointResolver endpointResolver, string clientProvidedName,
714+
CancellationToken cancellationToken = default(CancellationToken))
700715
{
701716
ConnectionConfig config = CreateConfig(clientProvidedName);
702717
try
703718
{
704719
if (AutomaticRecoveryEnabled)
705720
{
706721
var c = new AutorecoveringConnection(config, endpointResolver);
707-
return await c.OpenAsync()
722+
return await c.OpenAsync(cancellationToken)
708723
.ConfigureAwait(false);
709724
}
710725
else
711726
{
712-
IFrameHandler frameHandler = await endpointResolver.SelectOne(CreateFrameHandlerAsync)
727+
IFrameHandler frameHandler = await endpointResolver.SelectOneAsync(CreateFrameHandlerAsync, cancellationToken)
713728
.ConfigureAwait(false);
714729
var c = new Connection(config, frameHandler);
715-
return await c.OpenAsync()
730+
return await c.OpenAsync(cancellationToken)
716731
.ConfigureAwait(false);
717732
}
718733
}
@@ -748,11 +763,11 @@ private ConnectionConfig CreateConfig(string clientProvidedName)
748763
CreateFrameHandlerAsync);
749764
}
750765

751-
internal async Task<IFrameHandler> CreateFrameHandlerAsync(AmqpTcpEndpoint endpoint)
766+
internal async Task<IFrameHandler> CreateFrameHandlerAsync(
767+
AmqpTcpEndpoint endpoint, CancellationToken cancellationToken)
752768
{
753769
IFrameHandler fh = new SocketFrameHandler(endpoint, SocketFactory, RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout);
754-
// TODO cancellation token
755-
await fh.ConnectAsync(default(CancellationToken))
770+
await fh.ConnectAsync(cancellationToken)
756771
.ConfigureAwait(false);
757772
return ConfigureFrameHandler(fh);
758773
}

projects/RabbitMQ.Client/client/api/IConnectionFactory.cs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Threading;
3435
using System.Threading.Tasks;
3536
using RabbitMQ.Client.Exceptions;
3637

@@ -105,7 +106,8 @@ public interface IConnectionFactory
105106
/// <summary>
106107
/// Asynchronously create a connection to the specified endpoint.
107108
/// </summary>
108-
ValueTask<IConnection> CreateConnectionAsync();
109+
/// <param name="cancellationToken">Cancellation token for this connection</param>
110+
ValueTask<IConnection> CreateConnectionAsync(CancellationToken cancellationToken);
109111

110112
/// <summary>
111113
/// Create a connection to the specified endpoint.
@@ -128,8 +130,9 @@ public interface IConnectionFactory
128130
/// be used as a connection identifier, e.g. in HTTP API requests.
129131
/// This value is supposed to be human-readable.
130132
/// </param>
133+
/// <param name="cancellationToken">Cancellation token for this connection</param>
131134
/// <returns>Open connection</returns>
132-
ValueTask<IConnection> CreateConnectionAsync(string clientProvidedName);
135+
ValueTask<IConnection> CreateConnectionAsync(string clientProvidedName, CancellationToken cancellationToken);
133136

134137
/// <summary>
135138
/// Connects to the first reachable hostname from the list.
@@ -142,8 +145,9 @@ public interface IConnectionFactory
142145
/// Asynchronously connects to the first reachable hostname from the list.
143146
/// </summary>
144147
/// <param name="hostnames">List of host names to use</param>
148+
/// <param name="cancellationToken">Cancellation token for this connection</param>
145149
/// <returns>Open connection</returns>
146-
ValueTask<IConnection> CreateConnectionAsync(IEnumerable<string> hostnames);
150+
ValueTask<IConnection> CreateConnectionAsync(IEnumerable<string> hostnames, CancellationToken cancellationToken);
147151

148152
/// <summary>
149153
/// Connects to the first reachable hostname from the list.
@@ -168,8 +172,10 @@ public interface IConnectionFactory
168172
/// be used as a connection identifier, e.g. in HTTP API requests.
169173
/// This value is supposed to be human-readable.
170174
/// </param>
175+
/// <param name="cancellationToken">Cancellation token for this connection</param>
171176
/// <returns>Open connection</returns>
172-
ValueTask<IConnection> CreateConnectionAsync(IEnumerable<string> hostnames, string clientProvidedName);
177+
ValueTask<IConnection> CreateConnectionAsync(IEnumerable<string> hostnames, string clientProvidedName,
178+
CancellationToken cancellationToken);
173179

174180
/// <summary>
175181
/// Create a connection using a list of endpoints.
@@ -193,11 +199,12 @@ public interface IConnectionFactory
193199
/// List of endpoints to use for the initial
194200
/// connection and recovery.
195201
/// </param>
202+
/// <param name="cancellationToken">Cancellation token for this connection</param>
196203
/// <returns>Open connection</returns>
197204
/// <exception cref="BrokerUnreachableException">
198205
/// When no hostname was reachable.
199206
/// </exception>
200-
ValueTask<IConnection> CreateConnectionAsync(IEnumerable<AmqpTcpEndpoint> endpoints);
207+
ValueTask<IConnection> CreateConnectionAsync(IEnumerable<AmqpTcpEndpoint> endpoints, CancellationToken cancellationToken);
201208

202209
/// <summary>
203210
/// Create a connection using a list of endpoints.
@@ -233,11 +240,13 @@ public interface IConnectionFactory
233240
/// be used as a connection identifier, e.g. in HTTP API requests.
234241
/// This value is supposed to be human-readable.
235242
/// </param>
243+
/// <param name="cancellationToken">Cancellation token for this connection</param>
236244
/// <returns>Open connection</returns>
237245
/// <exception cref="BrokerUnreachableException">
238246
/// When no hostname was reachable.
239247
/// </exception>
240-
ValueTask<IConnection> CreateConnectionAsync(IEnumerable<AmqpTcpEndpoint> endpoints, string clientProvidedName);
248+
ValueTask<IConnection> CreateConnectionAsync(IEnumerable<AmqpTcpEndpoint> endpoints, string clientProvidedName,
249+
CancellationToken cancellationToken);
241250

242251
/// <summary>
243252
/// Amount of time protocol handshake operations are allowed to take before

projects/RabbitMQ.Client/client/api/IEndpointResolverExtensions.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,23 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Threading;
3435
using System.Threading.Tasks;
3536

3637
namespace RabbitMQ.Client
3738
{
3839
public static class EndpointResolverExtensions
3940
{
40-
public static async Task<T> SelectOne<T>(this IEndpointResolver resolver, Func<AmqpTcpEndpoint, Task<T>> selector)
41+
public static async Task<T> SelectOneAsync<T>(this IEndpointResolver resolver,
42+
Func<AmqpTcpEndpoint, CancellationToken, Task<T>> selector, CancellationToken cancellationToken)
4143
{
4244
var t = default(T);
4345
List<Exception> exceptions = null;
4446
foreach (AmqpTcpEndpoint ep in resolver.All())
4547
{
4648
try
4749
{
48-
t = await selector(ep).ConfigureAwait(false);
50+
t = await selector(ep, cancellationToken).ConfigureAwait(false);
4951
if (t.Equals(default(T)) == false)
5052
{
5153
return t;

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -151,16 +151,16 @@ private static void HandleTopologyRecoveryException(TopologyRecoveryException e)
151151
}
152152

153153
// TODO propagate token
154-
private async ValueTask<bool> TryPerformAutomaticRecoveryAsync(CancellationToken token)
154+
private async ValueTask<bool> TryPerformAutomaticRecoveryAsync(CancellationToken cancellationToken)
155155
{
156156
ESLog.Info("Performing automatic recovery");
157157

158158
try
159159
{
160160
ThrowIfDisposed();
161-
if (await TryRecoverConnectionDelegate().ConfigureAwait(false))
161+
if (await TryRecoverConnectionDelegateAsync(cancellationToken).ConfigureAwait(false))
162162
{
163-
await _recordedEntitiesSemaphore.WaitAsync(token)
163+
await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
164164
.ConfigureAwait(false);
165165
try
166166
{
@@ -226,16 +226,16 @@ await RecoverChannelsAndItsConsumersAsync(recordedEntitiesSemaphoreHeld: true)
226226
return false;
227227
}
228228

229-
private async ValueTask<bool> TryRecoverConnectionDelegate()
229+
// TODO cancellation token
230+
private async ValueTask<bool> TryRecoverConnectionDelegateAsync(CancellationToken cancellationToken)
230231
{
231232
try
232233
{
233234
Connection defunctConnection = _innerConnection;
234-
IFrameHandler fh = await _endpoints.SelectOne(_config.FrameHandlerFactory)
235+
IFrameHandler fh = await _endpoints.SelectOneAsync(_config.FrameHandlerFactoryAsync, cancellationToken)
235236
.ConfigureAwait(false);
236237
_innerConnection = new Connection(_config, fh);
237-
// TODO cancellation token
238-
await _innerConnection.OpenAsync()
238+
await _innerConnection.OpenAsync(cancellationToken)
239239
.ConfigureAwait(false);
240240
_innerConnection.TakeOver(defunctConnection);
241241
return true;

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
using System;
3333
using System.Collections.Generic;
3434
using System.Runtime.CompilerServices;
35+
using System.Threading;
3536
using System.Threading.Tasks;
3637
using RabbitMQ.Client.Events;
3738
using RabbitMQ.Client.Impl;
@@ -66,18 +67,18 @@ internal AutorecoveringConnection(ConnectionConfig config, IEndpointResolver end
6667

6768
internal IConnection Open()
6869
{
69-
IFrameHandler fh = _endpoints.SelectOne(_config.FrameHandlerFactory).EnsureCompleted();
70+
IFrameHandler fh = _endpoints.SelectOneAsync(_config.FrameHandlerFactoryAsync, CancellationToken.None).EnsureCompleted();
7071
CreateInnerConnection(fh);
7172
_innerConnection.Open();
7273
return this;
7374
}
7475

75-
internal async ValueTask<IConnection> OpenAsync()
76+
internal async ValueTask<IConnection> OpenAsync(CancellationToken cancellationToken)
7677
{
77-
IFrameHandler fh = await _endpoints.SelectOne(_config.FrameHandlerFactory)
78+
IFrameHandler fh = await _endpoints.SelectOneAsync(_config.FrameHandlerFactoryAsync, cancellationToken)
7879
.ConfigureAwait(false);
7980
CreateInnerConnection(fh);
80-
await _innerConnection.OpenAsync()
81+
await _innerConnection.OpenAsync(cancellationToken)
8182
.ConfigureAwait(false);
8283
return this;
8384
}

0 commit comments

Comments
 (0)