Skip to content

Commit

Permalink
Follow-up to #1669 - per-channel dispatch concurrency
Browse files Browse the repository at this point in the history
PR #1669 by @danielmarbach adds the ability to configure consumer
dispatch on a per-channel basis.

* Test that consumer dispatch concurrency is set on the dispatcher.
  • Loading branch information
lukebakken committed Sep 12, 2024
1 parent 624cf2e commit b9797d4
Show file tree
Hide file tree
Showing 19 changed files with 106 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ namespace Benchmarks.Networking
[MemoryDiagnoser]
public class Networking_BasicDeliver_Commons
{
public static async Task Publish_Hello_World(IConnection connection, uint messageCount, byte[] body)
public static async Task Publish_Hello_World(IConnection connection,
uint messageCount, byte[] body, ushort consumerDispatchConcurrency = 1)
{
using (IChannel channel = await connection.CreateChannelAsync())
using (IChannel channel = await connection.CreateChannelAsync(consumerDispatchConcurrency))
{
QueueDeclareOk queue = await channel.QueueDeclareAsync();
var consumer = new CountingConsumer(channel, messageCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,17 @@ public void GlobalCleanup()
[Benchmark(Baseline = true)]
public async Task Publish_Hello_World()
{
var cf = new ConnectionFactory { ConsumerDispatchConcurrency = 2 };
var cf = new ConnectionFactory();
using (IConnection connection = await cf.CreateConnectionAsync())
{
await Publish_Hello_World(connection);
await Publish_Hello_World(connection, 2);
}
}

public static async Task Publish_Hello_World(IConnection connection)
public static async Task Publish_Hello_World(IConnection connection, ushort consumerDispatchConcurrency)
{
await Networking_BasicDeliver_Commons.Publish_Hello_World(connection, messageCount, _body);
await Networking_BasicDeliver_Commons.Publish_Hello_World(connection, messageCount, _body,
consumerDispatchConcurrency);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ public class Networking_BasicDeliver_LongLivedConnection
public void GlobalSetup()
{
_container = RabbitMQBroker.Start();

var cf = new ConnectionFactory { ConsumerDispatchConcurrency = 2 };
var cf = new ConnectionFactory();
// NOTE: https://github.com/dotnet/BenchmarkDotNet/issues/1738
_connection = EnsureCompleted(cf.CreateConnectionAsync());
}
Expand All @@ -35,7 +34,8 @@ public void GlobalCleanup()
[Benchmark(Baseline = true)]
public Task Publish_Hello_World()
{
return Networking_BasicDeliver_Commons.Publish_Hello_World(_connection, messageCount, _body);
return Networking_BasicDeliver_Commons.Publish_Hello_World(_connection, messageCount, _body,
consumerDispatchConcurrency: 2);
}

private static T EnsureCompleted<T>(Task<T> task) => task.GetAwaiter().GetResult();
Expand Down
7 changes: 3 additions & 4 deletions projects/RabbitMQ.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@ readonly RabbitMQ.Client.ConnectionConfig.AuthMechanisms -> System.Collections.G
readonly RabbitMQ.Client.ConnectionConfig.ClientProperties -> System.Collections.Generic.IDictionary<string, object>
readonly RabbitMQ.Client.ConnectionConfig.ClientProvidedName -> string
readonly RabbitMQ.Client.ConnectionConfig.ContinuationTimeout -> System.TimeSpan
readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort
readonly RabbitMQ.Client.ConnectionConfig.HandshakeContinuationTimeout -> System.TimeSpan
readonly RabbitMQ.Client.ConnectionConfig.HeartbeatInterval -> System.TimeSpan
readonly RabbitMQ.Client.ConnectionConfig.MaxChannelCount -> ushort
Expand Down Expand Up @@ -891,8 +892,6 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
const RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency = 1 -> ushort
RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
const RabbitMQ.Client.ConnectionFactory.DefaultConsumerDispatchConcurrency = 1 -> ushort
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort consumerDispatchConcurrency, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
static RabbitMQ.Client.IConnectionExtensions.CreateChannelAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort consumerDispatchConcurrency = 1, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
3 changes: 2 additions & 1 deletion projects/RabbitMQ.Client/client/api/ConnectionConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
ushort maxChannelCount, uint maxFrameSize, uint maxInboundMessageBodySize, bool topologyRecoveryEnabled,
TopologyRecoveryFilter topologyRecoveryFilter, TopologyRecoveryExceptionHandler topologyRecoveryExceptionHandler,
TimeSpan networkRecoveryInterval, TimeSpan heartbeatInterval, TimeSpan continuationTimeout, TimeSpan handshakeContinuationTimeout, TimeSpan requestedConnectionTimeout,
ushort consumerDispatchConcurrency, Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync)
ushort consumerDispatchConcurrency,
Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync)
{
VirtualHost = virtualHost;
UserName = userName;
Expand Down
8 changes: 2 additions & 6 deletions projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,6 @@ namespace RabbitMQ.Client
///hosts with an empty name are not addressable. </para></remarks>
public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
{
/// <summary>
/// Default value for consumer dispatch concurrency.
/// </summary>
public const ushort DefaultConsumerDispatchConcurrency = 1;

/// <summary>
/// Default value for the desired maximum channel number. Default: 2047.
/// </summary>
Expand Down Expand Up @@ -180,7 +175,8 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
/// </summary>
/// <remarks>For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
/// In addition to that consumers need to be thread/concurrency safe.</remarks>
public ushort ConsumerDispatchConcurrency { get; set; } = DefaultConsumerDispatchConcurrency;
public ushort ConsumerDispatchConcurrency { get; set; } = Constants.DefaultConsumerDispatchConcurrency;


/// <summary>The host to connect to.</summary>
public string HostName { get; set; } = "localhost";
Expand Down
5 changes: 3 additions & 2 deletions projects/RabbitMQ.Client/client/api/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,13 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
///
/// Defaults to <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>.
/// The default value is 1.
///
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
/// In addition to that consumers need to be thread/concurrency safe.
/// </param>
/// <param name="cancellationToken">Cancellation token</param>
Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default);
Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency,
CancellationToken cancellationToken = default);
}
}
6 changes: 0 additions & 6 deletions projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,6 @@ namespace RabbitMQ.Client
{
public static class IConnectionExtensions
{
/// <summary>
/// Asynchronously create and return a fresh channel, session, and channel.
/// </summary>
public static Task<IChannel> CreateChannelAsync(this IConnection connection, CancellationToken cancellationToken = default) =>
connection.CreateChannelAsync(ConnectionFactory.DefaultConsumerDispatchConcurrency, cancellationToken);

/// <summary>
/// Asynchronously close this connection and all its channels.
/// </summary>
Expand Down
3 changes: 2 additions & 1 deletion projects/RabbitMQ.Client/client/framing/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ namespace RabbitMQ.Client.Framing.Impl
{
internal class Channel : ChannelBase
{
public Channel(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
public Channel(ConnectionConfig config, ISession session,
ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency)
: base(config, session, consumerDispatchConcurrency)
{
}
Expand Down
8 changes: 8 additions & 0 deletions projects/RabbitMQ.Client/client/framing/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,13 @@ public static class Constants
public const int NotImplemented = 540;
///<summary>(= 541)</summary>
public const int InternalError = 541;

/// <summary>
/// The default consumer dispatch concurrency. See <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
/// to set this value for every channel created on a connection,
/// and <see cref="IConnection.CreateChannelAsync(ushort, System.Threading.CancellationToken)"/>
/// for setting this value for a particular channel.
/// </summary>
public const ushort DefaultConsumerDispatchConcurrency = 1;
}
}
18 changes: 15 additions & 3 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,14 @@ await CloseInnerConnectionAsync()
}
}

public async Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default)
public async Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency,
CancellationToken cancellationToken = default)
{
EnsureIsOpen();
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(consumerDispatchConcurrency, cancellationToken)
ushort cdc = DetermineConsumerDispatchConcurrency(_config, consumerDispatchConcurrency);
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(cdc, cancellationToken)
.ConfigureAwait(false);
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, consumerDispatchConcurrency);
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc);
await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
.ConfigureAwait(false);
return channel;
Expand Down Expand Up @@ -279,6 +281,16 @@ public void Dispose()
private void EnsureIsOpen()
=> InnerConnection.EnsureIsOpen();

private static ushort DetermineConsumerDispatchConcurrency(ConnectionConfig config, ushort perChannelConsumerDispatchConcurrency)
{
ushort cdc = config.ConsumerDispatchConcurrency;
if (perChannelConsumerDispatchConcurrency > Constants.DefaultConsumerDispatchConcurrency)
{
cdc = perChannelConsumerDispatchConcurrency;
}
return cdc;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ThrowIfDisposed()
{
Expand Down
16 changes: 14 additions & 2 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,12 @@ internal abstract class ChannelBase : IChannel, IRecoverable

internal readonly IConsumerDispatcher ConsumerDispatcher;

protected ChannelBase(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
protected ChannelBase(ConnectionConfig config, ISession session,
ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency)
{
ContinuationTimeout = config.ContinuationTimeout;
ConsumerDispatcher = new AsyncConsumerDispatcher(this, consumerDispatchConcurrency);
ConsumerDispatcher = BuildConsumerDispatcher(config, consumerDispatchConcurrency);

Action<Exception, string> onException = (exception, context) =>
OnCallbackException(CallbackExceptionEventArgs.Build(exception, context));
_basicAcksWrapper = new EventingWrapper<BasicAckEventArgs>("OnBasicAck", onException);
Expand All @@ -92,6 +94,16 @@ protected ChannelBase(ConnectionConfig config, ISession session, ushort consumer
Session = session;
}

private IConsumerDispatcher BuildConsumerDispatcher(ConnectionConfig config, ushort perChannelConsumerDispatchConcurrency)
{
ushort cdc = config.ConsumerDispatchConcurrency;
if (perChannelConsumerDispatchConcurrency > Constants.DefaultConsumerDispatchConcurrency)
{
cdc = perChannelConsumerDispatchConcurrency;
}
return new AsyncConsumerDispatcher(this, cdc);
}

internal TimeSpan HandshakeContinuationTimeout { get; set; } = TimeSpan.FromSeconds(10);
public TimeSpan ContinuationTimeout { get; set; }

Expand Down
5 changes: 3 additions & 2 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler)

_sessionManager = new SessionManager(this, 0, config.MaxInboundMessageBodySize);
_session0 = new MainSession(this, config.MaxInboundMessageBodySize);
_channel0 = new Channel(_config, _session0, ConnectionFactory.DefaultConsumerDispatchConcurrency); ;
_channel0 = new Channel(_config, _session0);

ClientProperties = new Dictionary<string, object?>(_config.ClientProperties)
{
Expand Down Expand Up @@ -253,7 +253,8 @@ await CloseAsync(ea, true,
}
}

public Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default)
public Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency,
CancellationToken cancellationToken = default)
{
EnsureIsOpen();
ISession session = CreateSession();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase,
private readonly Task _worker;
private bool _quiesce = false;
private bool _disposed;
private ushort _concurrency;

internal ConsumerDispatcherChannelBase(ChannelBase channel, ushort concurrency)
{
_channel = channel;
_concurrency = concurrency;
var workChannel = Channel.CreateUnbounded<WorkStruct>(new UnboundedChannelOptions
{
SingleReader = concurrency == 1,
SingleReader = _concurrency == 1,
SingleWriter = false,
AllowSynchronousContinuations = false
});
Expand All @@ -36,22 +38,18 @@ internal ConsumerDispatcherChannelBase(ChannelBase channel, ushort concurrency)
}
else
{
var tasks = new Task[concurrency];
for (int i = 0; i < concurrency; i++)
var tasks = new Task[_concurrency];
for (int i = 0; i < _concurrency; i++)
{
tasks[i] = Task.Run(loopStart);
}
_worker = Task.WhenAll(tasks);
}
}

public bool IsShutdown
{
get
{
return _quiesce;
}
}
public bool IsShutdown => _quiesce;

public ushort Concurrency => _concurrency;

public ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ internal interface IConsumerDispatcher : IDisposable

bool IsShutdown { get; }

ushort Concurrency { get; }

IAsyncBasicConsumer GetAndRemoveConsumer(string tag);

ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken);
Expand Down
8 changes: 4 additions & 4 deletions projects/Test/Common/IntegrationFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public abstract class IntegrationFixture : IAsyncLifetime
protected readonly ITestOutputHelper _output;
protected readonly string _testDisplayName;

protected readonly ushort _consumerDispatchConcurrency = 1;
protected readonly ushort _consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency;
protected readonly bool _openChannel = true;

public static readonly TimeSpan ShortSpan;
Expand Down Expand Up @@ -109,7 +109,7 @@ static IntegrationFixture()
}

public IntegrationFixture(ITestOutputHelper output,
ushort consumerDispatchConcurrency = 1,
ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency,
bool openChannel = true)
{
_consumerDispatchConcurrency = consumerDispatchConcurrency;
Expand Down Expand Up @@ -144,7 +144,6 @@ public virtual async Task InitializeAsync()
if (_connFactory == null)
{
_connFactory = CreateConnectionFactory();
_connFactory.ConsumerDispatchConcurrency = _consumerDispatchConcurrency;
}

if (_conn == null)
Expand All @@ -153,7 +152,8 @@ public virtual async Task InitializeAsync()

if (_openChannel)
{
_channel = await _conn.CreateChannelAsync();
_channel = await _conn.CreateChannelAsync(
consumerDispatchConcurrency: _consumerDispatchConcurrency);
}

if (IsVerbose)
Expand Down
Loading

0 comments on commit b9797d4

Please sign in to comment.