Skip to content

Commit

Permalink
[Host.RabbitMq] Multiple consumers on same queue with varying concurr…
Browse files Browse the repository at this point in the history
…ency separated by routing key

Signed-off-by: Tomasz Maruszak <maruszaktomasz@gmail.com>
  • Loading branch information
zarusz committed Dec 25, 2024
1 parent 2716054 commit 7b81940
Show file tree
Hide file tree
Showing 20 changed files with 480 additions and 195 deletions.
2 changes: 1 addition & 1 deletion src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Import Project="Common.NuGet.Properties.xml" />

<PropertyGroup>
<Version>2.6.1</Version>
<Version>2.6.2-rc13</Version>
</PropertyGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ public static RabbitMqMessageRoutingKeyProvider<object> GetMessageRoutingKeyProv
public static RabbitMqMessagePropertiesModifier<object> GetMessagePropertiesModifier(this HasProviderExtensions p, HasProviderExtensions settings = null)
=> p.GetOrDefault<RabbitMqMessagePropertiesModifier<object>>(RabbitMqProperties.MessagePropertiesModifier, settings, null);

public static string GetQueueName(this AbstractConsumerSettings p)
=> p.GetOrDefault<string>(RabbitMqProperties.QueueName, null);
public static string GetQueueName(this AbstractConsumerSettings c)
=> c.GetOrDefault<string>(RabbitMqProperties.QueueName, null);

public static string GetQueueName(this RequestResponseSettings p)
=> p.GetOrDefault<string>(RabbitMqProperties.QueueName, null);
public static string GetBindingRoutingKey(this AbstractConsumerSettings c, HasProviderExtensions settings = null)
=> c.GetOrDefault<string>(RabbitMqProperties.BindingRoutingKey, settings, null);

public static string GetExchageType(this ProducerSettings p, HasProviderExtensions settings = null)
=> p.GetOrDefault<string>(RabbitMqProperties.ExchangeType, settings, null);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.RabbitMQ;

using System;

public static class RabbitMqMessageBusSettingsExtensions
{
/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public abstract class AbstractRabbitMqConsumer : AbstractConsumer
private AsyncEventingBasicConsumer _consumer;
private string _consumerTag;

protected string QueueName { get; }
public string QueueName { get; }
protected abstract RabbitMqMessageAcknowledgementMode AcknowledgementMode { get; }

protected AbstractRabbitMqConsumer(ILogger logger, IRabbitMqChannel channel, string queueName, IHeaderValueConverter headerValueConverter)
Expand Down Expand Up @@ -82,7 +82,7 @@ protected async Task OnMessageReceived(object sender, BasicDeliverEventArgs @eve

protected abstract Task<Exception> OnMessageReceived(Dictionary<string, object> messageHeaders, BasicDeliverEventArgs transportMessage);

protected void NackMessage(BasicDeliverEventArgs @event, bool requeue)
public void NackMessage(BasicDeliverEventArgs @event, bool requeue)
{
lock (_channel.ChannelLock)
{
Expand All @@ -91,7 +91,7 @@ protected void NackMessage(BasicDeliverEventArgs @event, bool requeue)
}
}

protected void AckMessage(BasicDeliverEventArgs @event)
public void AckMessage(BasicDeliverEventArgs @event)
{
lock (_channel.ChannelLock)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
namespace SlimMessageBus.Host.RabbitMQ;

/// <summary>
/// Decorator for see <see cref="IMessageProcessor{TMessage}"/> that automatically acknowledges the message after processing.
/// </summary>
/// <param name="target"></param>
/// <param name="logger"></param>
/// <param name="acknowledgementMode"></param>
/// <param name="consumer"></param>
internal sealed class RabbitMqAutoAcknowledgeMessageProcessor(IMessageProcessor<BasicDeliverEventArgs> target,
ILogger logger,
RabbitMqMessageAcknowledgementMode acknowledgementMode,
IRabbitMqConsumer consumer)
: IMessageProcessor<BasicDeliverEventArgs>, IDisposable
{
public IReadOnlyCollection<AbstractConsumerSettings> ConsumerSettings => target.ConsumerSettings;

public void Dispose()
{
if (target is IDisposable targetDisposable)
{
targetDisposable.Dispose();
}
}

public async Task<ProcessMessageResult> ProcessMessage(BasicDeliverEventArgs transportMessage, IReadOnlyDictionary<string, object> messageHeaders, IDictionary<string, object> consumerContextProperties = null, IServiceProvider currentServiceProvider = null, CancellationToken cancellationToken = default)
{
var r = await target.ProcessMessage(transportMessage, messageHeaders: messageHeaders, consumerContextProperties: consumerContextProperties, cancellationToken: cancellationToken);

if (acknowledgementMode == RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade)
{
// Acknowledge after processing
var confirmOption = r.Exception != null
? RabbitMqMessageConfirmOptions.Nack // NAck after processing when message fails (unless the user already acknowledged in any way).
: RabbitMqMessageConfirmOptions.Ack; // Acknowledge after processing

consumer.ConfirmMessage(transportMessage, confirmOption, consumerContextProperties);
}

if (r.Exception != null)
{
// We rely on the IMessageProcessor to execute the ConsumerErrorHandler<T>, but if it's not registered in the DI, it fails, or there is another fatal error then the message will be lost.
logger.LogError(r.Exception, "Exchange {Exchange} - Queue {Queue}: Error processing message {Message}, delivery tag {DeliveryTag}", transportMessage.Exchange, consumer.QueueName, transportMessage, transportMessage.DeliveryTag);
}
return r;
}
}
102 changes: 77 additions & 25 deletions src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,29 +1,74 @@
namespace SlimMessageBus.Host.RabbitMQ;

using Microsoft.Extensions.Logging;
public interface IRabbitMqConsumer
{
string QueueName { get; }
void ConfirmMessage(BasicDeliverEventArgs transportMessage, RabbitMqMessageConfirmOptions option, IDictionary<string, object> properties, bool warnIfAlreadyConfirmed = false);
}

public class RabbitMqConsumer : AbstractRabbitMqConsumer
public class RabbitMqConsumer : AbstractRabbitMqConsumer, IRabbitMqConsumer
{
public static readonly string ContextProperty_MessageConfirmed = "RabbitMq_MessageConfirmed";

private readonly RabbitMqMessageAcknowledgementMode _acknowledgementMode;
private readonly IMessageProcessor<BasicDeliverEventArgs> _messageProcessor;
private readonly IDictionary<string, IMessageProcessor<BasicDeliverEventArgs>> _messageProcessorByRoutingKey;

protected override RabbitMqMessageAcknowledgementMode AcknowledgementMode => _acknowledgementMode;

public RabbitMqConsumer(ILoggerFactory loggerFactory, IRabbitMqChannel channel, string queueName, IList<ConsumerSettings> consumers, IMessageSerializer serializer, MessageBusBase messageBus, IHeaderValueConverter headerValueConverter)
: base(loggerFactory.CreateLogger<RabbitMqConsumer>(), channel, queueName, headerValueConverter)
: base(loggerFactory.CreateLogger<RabbitMqConsumer>(), channel, queueName: queueName, headerValueConverter)
{
_acknowledgementMode = consumers.Select(x => x.GetOrDefault<RabbitMqMessageAcknowledgementMode?>(RabbitMqProperties.MessageAcknowledgementMode, messageBus.Settings)).FirstOrDefault(x => x != null)
?? RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade; // be default choose the safer acknowledgement mode
_messageProcessor = new MessageProcessor<BasicDeliverEventArgs>(
consumers,
messageBus,
path: queueName,
responseProducer: messageBus,
messageProvider: (messageType, m) => serializer.Deserialize(messageType, m.Body.ToArray()),
consumerContextInitializer: InitializeConsumerContext,
consumerErrorHandlerOpenGenericType: typeof(IRabbitMqConsumerErrorHandler<>));

IMessageProcessor<BasicDeliverEventArgs> CreateMessageProcessor(IEnumerable<ConsumerSettings> consumers)
{
IMessageProcessor<BasicDeliverEventArgs> messageProcessor = new MessageProcessor<BasicDeliverEventArgs>(
consumers,
messageBus,
path: queueName,
responseProducer: messageBus,
messageProvider: (messageType, m) => serializer.Deserialize(messageType, m.Body.ToArray()),
consumerContextInitializer: InitializeConsumerContext,
consumerErrorHandlerOpenGenericType: typeof(IRabbitMqConsumerErrorHandler<>));

messageProcessor = new RabbitMqAutoAcknowledgeMessageProcessor(messageProcessor, Logger, _acknowledgementMode, this);

// pick the maximum number of instances
var instances = consumers.Max(x => x.Instances);
// For a given rabbit channel, there is only 1 task that dispatches messages. We want to be be able to let each SMB consume process within its own task (1 or more)
messageProcessor = new ConcurrentMessageProcessorDecorator<BasicDeliverEventArgs>(instances, loggerFactory, messageProcessor);

return messageProcessor;
}

_messageProcessorByRoutingKey = consumers
.GroupBy(x => x.GetBindingRoutingKey() ?? string.Empty)
.ToDictionary(x => x.Key, CreateMessageProcessor);

_messageProcessor = _messageProcessorByRoutingKey.Count == 1 && _messageProcessorByRoutingKey.TryGetValue(string.Empty, out var value)
? value : null;
}

protected override async Task OnStop()
{
try
{
// Wait max 5 seconds for all background processing tasks to complete
using var taskCancellationSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var backgrounProcessingTasks = _messageProcessorByRoutingKey.Values
.OfType<ConcurrentMessageProcessorDecorator<BasicDeliverEventArgs>>()
.Select(x => x.WaitAll(taskCancellationSource.Token));

await Task.WhenAll(backgrounProcessingTasks);
}
catch (Exception e)
{
Logger.LogError(e, "Error occurred while waiting for background processing tasks to complete");
}

await base.OnStop();
}

private void InitializeConsumerContext(BasicDeliverEventArgs transportMessage, ConsumerContext consumerContext)
Expand All @@ -40,14 +85,14 @@ private void InitializeConsumerContext(BasicDeliverEventArgs transportMessage, C
consumerContext.SetConfirmAction(option => ConfirmMessage(transportMessage, option, consumerContext.Properties, warnIfAlreadyConfirmed: true));
}

private void ConfirmMessage(BasicDeliverEventArgs transportMessage, RabbitMqMessageConfirmOptions option, IDictionary<string, object> properties, bool warnIfAlreadyConfirmed = false)
public void ConfirmMessage(BasicDeliverEventArgs transportMessage, RabbitMqMessageConfirmOptions option, IDictionary<string, object> properties, bool warnIfAlreadyConfirmed = false)
{
if (properties.TryGetValue(ContextProperty_MessageConfirmed, out var confirmed) && confirmed is true)
{
// Note: We want to makes sure the 1st message confirmation is handled
if (warnIfAlreadyConfirmed)
{
Logger.LogWarning("The message (delivery tag {MessageDeliveryTag}, queue name {QueueName}) was already confirmed, subsequent message confirmation will have no effect", transportMessage.DeliveryTag, QueueName);
Logger.LogWarning("Exchange {Exchange} - Queue {Queue}: The message (delivery tag {MessageDeliveryTag}) was already confirmed, subsequent message confirmation will have no effect", transportMessage.Exchange, QueueName, transportMessage.DeliveryTag);
}
return;
}
Expand Down Expand Up @@ -80,23 +125,30 @@ protected override async Task<Exception> OnMessageReceived(Dictionary<string, ob
ConfirmMessage(transportMessage, RabbitMqMessageConfirmOptions.Ack, consumerContextProperties);
}

var r = await _messageProcessor.ProcessMessage(transportMessage, messageHeaders: messageHeaders, consumerContextProperties: consumerContextProperties, cancellationToken: CancellationToken);

if (_acknowledgementMode == RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade)
var messageProcessor = _messageProcessor;
if (messageProcessor != null || _messageProcessorByRoutingKey.TryGetValue(transportMessage.RoutingKey, out messageProcessor))
{
// Acknowledge after processing
var confirmOption = r.Exception != null
? RabbitMqMessageConfirmOptions.Nack // NAck after processing when message fails (unless the user already acknowledged in any way).
: RabbitMqMessageConfirmOptions.Ack; // Acknowledge after processing
ConfirmMessage(transportMessage, confirmOption, consumerContextProperties);
await messageProcessor.ProcessMessage(transportMessage, messageHeaders: messageHeaders, consumerContextProperties: consumerContextProperties, cancellationToken: CancellationToken);
}

if (r.Exception != null)
else
{
// We rely on the IMessageProcessor to execute the ConsumerErrorHandler<T>, but if it's not registered in the DI, it fails, or there is another fatal error then the message will be lost.
Logger.LogError(r.Exception, "Error processing message {Message} from exchange {Exchange}, delivery tag {DeliveryTag}", transportMessage, transportMessage.Exchange, transportMessage.DeliveryTag);
Logger.LogDebug("Exchange {Exchange} - Queue {Queue}: No message processor found for routing key {RoutingKey}", transportMessage.Exchange, QueueName, transportMessage.RoutingKey);
}

// error handling happens in the message processor
return null;
}

protected override async ValueTask DisposeAsyncCore()
{
await base.DisposeAsyncCore();

foreach (var messageProcessor in _messageProcessorByRoutingKey.Values)
{
if (messageProcessor is IDisposable disposable)
{
disposable.Dispose();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public class RabbitMqResponseConsumer : AbstractRabbitMqConsumer
protected override RabbitMqMessageAcknowledgementMode AcknowledgementMode => RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade;

public RabbitMqResponseConsumer(ILoggerFactory loggerFactory, IRabbitMqChannel channel, string queueName, RequestResponseSettings requestResponseSettings, MessageBusBase messageBus, IHeaderValueConverter headerValueConverter)
: base(loggerFactory.CreateLogger<RabbitMqConsumer>(), channel, queueName, headerValueConverter)
: base(loggerFactory.CreateLogger<RabbitMqConsumer>(), channel, queueName: queueName, headerValueConverter)
{
_messageProcessor = new ResponseMessageProcessor<BasicDeliverEventArgs>(loggerFactory, requestResponseSettings, messageBus, m => m.Body.ToArray());
}
Expand Down
26 changes: 16 additions & 10 deletions src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,24 +62,30 @@ private async Task CreateConnection()
{
try
{
var retryCount = 3;
for (var retry = 0; _connection == null && retry < retryCount; retry++)
{
try
const int retryCount = 3;
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
await Retry.WithDelay(operation: async (cancellationTask) =>
{
// See https://www.rabbitmq.com/client-libraries/dotnet-api-guide#connection-recovery
ProviderSettings.ConnectionFactory.AutomaticRecoveryEnabled = true;
ProviderSettings.ConnectionFactory.DispatchConsumersAsync = true;

_connection = ProviderSettings.Endpoints != null && ProviderSettings.Endpoints.Count > 0
? ProviderSettings.ConnectionFactory.CreateConnection(ProviderSettings.Endpoints)
: ProviderSettings.ConnectionFactory.CreateConnection();
}
catch (global::RabbitMQ.Client.Exceptions.BrokerUnreachableException e)
},
shouldRetry: (ex, attempt) =>
{
_logger.LogInformation(e, "Retrying {Retry} of {RetryCount} connection to RabbitMQ...", retry, retryCount);
await Task.Delay(ProviderSettings.ConnectionFactory.NetworkRecoveryInterval);
}
}
if (ex is global::RabbitMQ.Client.Exceptions.BrokerUnreachableException && attempt < retryCount)
{
_logger.LogInformation(ex, "Retrying {Retry} of {RetryCount} connection to RabbitMQ...", attempt, retryCount);
return true;
}
return false;
},
delay: ProviderSettings.ConnectionFactory.NetworkRecoveryInterval
);
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously

lock (_channelLock)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ public string ConnectionString

public ConnectionFactory ConnectionFactory { get; set; } = new()
{
NetworkRecoveryInterval = TimeSpan.FromSeconds(5)
NetworkRecoveryInterval = TimeSpan.FromSeconds(5),
// By default the consumer dispatch is single threaded, we can increase it to the number of consumers by applying the .Instances(10) setting
ConsumerDispatchConcurrency = 1
};

public IList<AmqpTcpEndpoint> Endpoints { get; set; } = new List<AmqpTcpEndpoint>();
public IList<AmqpTcpEndpoint> Endpoints { get; set; } = [];

/// <summary>
/// Allows to set a custom header values converter between SMB and the underlying RabbitMq client.
Expand Down
4 changes: 2 additions & 2 deletions src/SlimMessageBus.Host.RabbitMQ/RabbitMqTopologyService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ private void ProvisionConsumers()
}
}

private void DeclareQueueBinding(HasProviderExtensions settings, string bindingExchangeName, string queueName)
private void DeclareQueueBinding(AbstractConsumerSettings settings, string bindingExchangeName, string queueName)
{
var bindingRoutingKey = settings.GetOrDefault(RabbitMqProperties.BindingRoutingKey, _providerSettings, string.Empty);
var bindingRoutingKey = settings.GetBindingRoutingKey(_providerSettings) ?? string.Empty;

_logger.LogInformation("Binding queue {QueueName} to exchange {ExchangeName} using routing key {RoutingKey}", queueName, bindingExchangeName, bindingRoutingKey);
try
Expand Down
2 changes: 1 addition & 1 deletion src/SlimMessageBus.Host.Redis/RedisMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ void AddTopicConsumer(string topic, ISubscriber subscriber, IMessageProcessor<Me
}

// When it was requested to have more than once concurrent instances working then we need to fan out the incoming Redis consumption tasks
processor = new ConcurrencyIncreasingMessageProcessorDecorator<MessageWithHeaders>(instances, this, processor);
processor = new ConcurrentMessageProcessorDecorator<MessageWithHeaders>(instances, LoggerFactory, processor);
}

_logger.LogInformation("Creating consumer for redis {PathKind} {Path}", GetPathKindString(pathKind), path);
Expand Down
Loading

0 comments on commit 7b81940

Please sign in to comment.