Skip to content

Commit

Permalink
Merge pull request #899 from danielmarbach/processing-concurrency
Browse files Browse the repository at this point in the history
Pull ProcessingConcurrency into connection factory interface
  • Loading branch information
michaelklishin authored Jul 8, 2020
2 parents 849b9b8 + 970fdbe commit 7b4a308
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 3 deletions.
10 changes: 10 additions & 0 deletions projects/RabbitMQ.Client/client/api/IConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -180,5 +180,15 @@ public interface IConnectionFactory
/// timing out.
/// </summary>
TimeSpan ContinuationTimeout { get; set; }

/// <summary>
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IBasicConsumer"/>
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
/// Defaults to 1.
/// </summary>
/// <remarks>For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
/// In addition to that consumers need to be thread/concurrency safe.</remarks>
int ProcessingConcurrency { get; set; }
}
}
5 changes: 2 additions & 3 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,13 @@ public Connection(IConnectionFactory factory, bool insist, IFrameHandler frameHa
_factory = factory;
_frameHandler = frameHandler;

int processingConcurrency = (factory as ConnectionFactory)?.ProcessingConcurrency ?? 1;
if (factory is IAsyncConnectionFactory asyncConnectionFactory && asyncConnectionFactory.DispatchConsumersAsync)
{
ConsumerWorkService = new AsyncConsumerWorkService(processingConcurrency);
ConsumerWorkService = new AsyncConsumerWorkService(factory.ProcessingConcurrency);
}
else
{
ConsumerWorkService = new ConsumerWorkService(processingConcurrency);
ConsumerWorkService = new ConsumerWorkService(factory.ProcessingConcurrency);
}

_sessionManager = new SessionManager(this, 0);
Expand Down
1 change: 1 addition & 0 deletions projects/Unit/APIApproval.Approve.verified.txt
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ namespace RabbitMQ.Client
System.TimeSpan ContinuationTimeout { get; set; }
System.TimeSpan HandshakeContinuationTimeout { get; set; }
string Password { get; set; }
int ProcessingConcurrency { get; set; }
ushort RequestedChannelMax { get; set; }
uint RequestedFrameMax { get; set; }
System.TimeSpan RequestedHeartbeat { get; set; }
Expand Down

0 comments on commit 7b4a308

Please sign in to comment.