Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RabbitMQ Consumer #248

Merged
merged 15 commits into from
Oct 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,7 @@
<PackageVersion Include="WorkflowCore.Persistence.SqlServer" Version="3.9.0" />
<PackageVersion Include="xunit" Version="2.5.1" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.5.1" />
<PackageVersion Include="RabbitMQ.Client" Version="6.6.0" />
<PackageVersion Include="Polly" Version="8.0.0" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
using System.Text;

using Framework.RabbitMq.Consumer.Enums;
using Framework.RabbitMq.Consumer.Interfaces;
using Framework.RabbitMq.Consumer.Settings;
using Framework.RabbitMq.Interfaces;
using Framework.RabbitMq.Settings;

using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

using RabbitMQ.Client;

namespace Framework.RabbitMq.Consumer.BackgroundServices;

public class RabbitMqBackgroundService : BackgroundService
{
private readonly IRabbitMqClient _client;
aleshchev marked this conversation as resolved.
Show resolved Hide resolved

private readonly IRabbitMqConsumerInitializer _consumerInitializer;

private readonly RabbitMqConsumerSettings _consumerSettings;

private readonly ILogger<RabbitMqBackgroundService> _logger;

private readonly IRabbitMqMessageProcessor _messageProcessor;

private readonly RabbitMqServerSettings _serverSettings;

private IModel? _channel;

private IConnection? _connection;

public RabbitMqBackgroundService(
IRabbitMqClient client,
IRabbitMqConsumerInitializer consumerInitializer,
IRabbitMqMessageProcessor messageProcessor,
IOptions<RabbitMqServerSettings> serverOptions,
IOptions<RabbitMqConsumerSettings> consumerOptions,
ILogger<RabbitMqBackgroundService> logger)
{
this._client = client;
this._consumerInitializer = consumerInitializer;
this._messageProcessor = messageProcessor;
this._logger = logger;
this._serverSettings = serverOptions.Value;
this._consumerSettings = consumerOptions.Value;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
this._connection = await this._client.TryConnectAsync(this._consumerSettings.ConnectionAttemptCount, stoppingToken);
if (this._connection == null)
{
this._logger.LogInformation("Listening RabbitMQ events wasn't started");
return;
}

this._channel = this._connection!.CreateModel();
this._consumerInitializer.Initialize(this._channel);

await this.Listen(stoppingToken);
}

private async Task Listen(CancellationToken token)
{
this._logger.LogInformation(
"Listening RabbitMQ events has started on {Address}. Queue name is {Queue}",
this._serverSettings.Address,
this._consumerSettings.Queue);

while (!token.IsCancellationRequested)
{
var result = this._channel!.BasicGet(this._consumerSettings.Queue, false);
if (result is null)
{
await Delay(this._consumerSettings.ReceiveMessageDelayMilliseconds, token);
continue;
}

await this.ProcessAsync(result, token);
}
}

public override void Dispose()
{
this._channel?.Close();
this._connection?.Close();
base.Dispose();
GC.SuppressFinalize(this);
}

private async Task ProcessAsync(BasicGetResult result, CancellationToken token)
{
try
{
var message = Encoding.UTF8.GetString(result.Body.Span);
if (result.Redelivered && result.DeliveryTag > this._consumerSettings.FailedMessageRetryCount)
{
var behaviour = await this._messageProcessor.ProcessDeadLetterAsync(
result.BasicProperties,
result.RoutingKey,
message,
token);
if (behaviour == DeadLetterBehaviour.Skip)
{
this._channel!.BasicAck(result.DeliveryTag, false);
return;
}
}

await this._messageProcessor.ProcessAsync(result.BasicProperties, result.RoutingKey, message, token);
this._channel!.BasicAck(result.DeliveryTag, false);
}
catch (Exception ex)
{
this._logger.LogError(ex, "There was some problem with processing message. Routing key:'{RoutingKey}'", result.RoutingKey);
this._channel!.BasicNack(result.DeliveryTag, false, true);
await Delay(this._consumerSettings.RejectMessageDelayMilliseconds, token);
}
}

private static Task Delay(int value, CancellationToken token) => Task.Delay(TimeSpan.FromMilliseconds(value), token);
}
20 changes: 20 additions & 0 deletions src/Framework.RabbitMq.Consumer/DependencyInjection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using Framework.RabbitMq.Consumer.BackgroundServices;
using Framework.RabbitMq.Consumer.Interfaces;
using Framework.RabbitMq.Consumer.Services;
using Framework.RabbitMq.Consumer.Settings;

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;

namespace Framework.RabbitMq.Consumer;

public static class DependencyInjection
{
public static IServiceCollection AddRabbitMqConsumer<TMessageProcessor>(this IServiceCollection services, IConfiguration configuration)
where TMessageProcessor : class, IRabbitMqMessageProcessor =>
services
.Configure<RabbitMqConsumerSettings>(configuration.GetSection("RabbitMQ:Consumer"))
.AddSingleton<IRabbitMqMessageProcessor, TMessageProcessor>()
.AddSingleton<IRabbitMqConsumerInitializer, RabbitMqConsumerInitializer>()
.AddHostedService<RabbitMqBackgroundService>();
}
8 changes: 8 additions & 0 deletions src/Framework.RabbitMq.Consumer/Enums/DeadLetterBehaviour.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Framework.RabbitMq.Consumer.Enums;

public enum DeadLetterBehaviour
{
ForeverRetry = 1,

Skip = 2
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<PackageId>Luxoft.Framework.RabbitMq.Consumer</PackageId>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Framework.RabbitMq\Framework.RabbitMq.csproj" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
using RabbitMQ.Client;

namespace Framework.RabbitMq.Consumer.Interfaces;

public interface IRabbitMqConsumerInitializer
{
void Initialize(IModel model);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using Framework.RabbitMq.Consumer.Enums;

using RabbitMQ.Client;

namespace Framework.RabbitMq.Consumer.Interfaces;

public interface IRabbitMqMessageProcessor
{
Task ProcessAsync(IBasicProperties properties, string routingKey, string message, CancellationToken token);

Task<DeadLetterBehaviour> ProcessDeadLetterAsync(
IBasicProperties properties,
string routingKey,
string message,
CancellationToken token) =>
Task.FromResult(DeadLetterBehaviour.ForeverRetry);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Framework.RabbitMq.Consumer.Interfaces;
using Framework.RabbitMq.Consumer.Settings;

using Microsoft.Extensions.Options;

using RabbitMQ.Client;

namespace Framework.RabbitMq.Consumer.Services;

public record RabbitMqConsumerInitializer(IOptions<RabbitMqConsumerSettings> Options) : IRabbitMqConsumerInitializer
{
public void Initialize(IModel model)
{
var consumerSettings = this.Options.Value;

model.ExchangeDeclare(consumerSettings.Exchange, ExchangeType.Topic, true);
model.QueueDeclare(consumerSettings.Queue, true, false, false, null);

foreach (var routingKey in consumerSettings.RoutingKeys)
model.QueueBind(consumerSettings.Queue, consumerSettings.Exchange, routingKey);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace Framework.RabbitMq.Consumer.Settings;

public class RabbitMqConsumerSettings
{
public int ReceiveMessageDelayMilliseconds { get; set; } = 1000;

public int RejectMessageDelayMilliseconds { get; set; } = 3000;

public ulong FailedMessageRetryCount { get; set; } = 3;

public int ConnectionAttemptCount { get; set; } = 3;

public string Exchange { get; set; } = default!;

public string Queue { get; set; } = default!;

public string[] RoutingKeys { get; set; } = Array.Empty<string>();
}
16 changes: 16 additions & 0 deletions src/Framework.RabbitMq/DependencyInjection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using Framework.RabbitMq.Interfaces;
using Framework.RabbitMq.Services;
using Framework.RabbitMq.Settings;

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;

namespace Framework.RabbitMq;

public static class DependencyInjection
{
public static IServiceCollection AddRabbitMqClient(this IServiceCollection services, IConfiguration configuration) =>
services
.Configure<RabbitMqServerSettings>(configuration.GetSection("RabbitMQ:Server"))
.AddSingleton<IRabbitMqClient, RabbitMqClient>();
}
15 changes: 15 additions & 0 deletions src/Framework.RabbitMq/Framework.RabbitMq.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<PackageId>Luxoft.Framework.RabbitMq</PackageId>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Hosting" />
<PackageReference Include="Polly" />
<PackageReference Include="RabbitMQ.Client" />
</ItemGroup>

</Project>
8 changes: 8 additions & 0 deletions src/Framework.RabbitMq/Interfaces/IRabbitMqClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
using RabbitMQ.Client;

namespace Framework.RabbitMq.Interfaces;

public interface IRabbitMqClient
{
Task<IConnection?> TryConnectAsync(int attempts, CancellationToken token = default);
}
51 changes: 51 additions & 0 deletions src/Framework.RabbitMq/Services/RabbitMqClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using Framework.RabbitMq.Interfaces;
using Framework.RabbitMq.Settings;

using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

using Polly;
using Polly.Retry;

using RabbitMQ.Client;

namespace Framework.RabbitMq.Services;

public record RabbitMqClient(IOptions<RabbitMqServerSettings> Options, ILogger<RabbitMqClient> Logger) : IRabbitMqClient
{
private const int RetryConnectDelay = 5000;

public async Task<IConnection?> TryConnectAsync(int attempts, CancellationToken token = default)
{
var serverSettings = this.Options.Value;
var factory = new ConnectionFactory
{
HostName = serverSettings.Host,
Port = serverSettings.Port,
UserName = serverSettings.UserName,
Password = serverSettings.Secret,
VirtualHost = serverSettings.VirtualHost,
AutomaticRecoveryEnabled = true
};

var policy = this.CreateRetryPolicy(attempts);
try
{
return await policy.ExecuteAsync(_ => Task.FromResult(factory.CreateConnection()), token);
}
catch (Exception ex)
{
this.LogConnectionError(ex);
return null;
}
}

private AsyncRetryPolicy CreateRetryPolicy(int attempts) =>
Policy.Handle<Exception>()
.WaitAndRetryAsync(
attempts,
_ => TimeSpan.FromMilliseconds(RetryConnectDelay),
(ex, _) => this.LogConnectionError(ex));

private void LogConnectionError(Exception exception) => this.Logger.LogError(exception, "Could not connect to RabbitMQ server");
}
16 changes: 16 additions & 0 deletions src/Framework.RabbitMq/Settings/RabbitMqServerSettings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
namespace Framework.RabbitMq.Settings;

public class RabbitMqServerSettings
{
public string Host { get; set; } = default!;

public int Port { get; set; } = 5672;

public string UserName { get; set; } = default!;

public string Secret { get; set; } = default!;

public string VirtualHost { get; set; } = default!;

public Uri Address => new($"{this.Host}:{this.Port}", UriKind.Absolute);
}
Loading
Loading