diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index b2bb5efc7..3df789ae3 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -78,5 +78,7 @@ + + \ No newline at end of file diff --git a/src/Framework.RabbitMq.Consumer/BackgroundServices/RabbitMqBackgroundService.cs b/src/Framework.RabbitMq.Consumer/BackgroundServices/RabbitMqBackgroundService.cs new file mode 100644 index 000000000..43486fbd2 --- /dev/null +++ b/src/Framework.RabbitMq.Consumer/BackgroundServices/RabbitMqBackgroundService.cs @@ -0,0 +1,125 @@ +using System.Text; + +using Framework.RabbitMq.Consumer.Enums; +using Framework.RabbitMq.Consumer.Interfaces; +using Framework.RabbitMq.Consumer.Settings; +using Framework.RabbitMq.Interfaces; +using Framework.RabbitMq.Settings; + +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +using RabbitMQ.Client; + +namespace Framework.RabbitMq.Consumer.BackgroundServices; + +public class RabbitMqBackgroundService : BackgroundService +{ + private readonly IRabbitMqClient _client; + + private readonly IRabbitMqConsumerInitializer _consumerInitializer; + + private readonly RabbitMqConsumerSettings _consumerSettings; + + private readonly ILogger _logger; + + private readonly IRabbitMqMessageProcessor _messageProcessor; + + private readonly RabbitMqServerSettings _serverSettings; + + private IModel? _channel; + + private IConnection? _connection; + + public RabbitMqBackgroundService( + IRabbitMqClient client, + IRabbitMqConsumerInitializer consumerInitializer, + IRabbitMqMessageProcessor messageProcessor, + IOptions serverOptions, + IOptions consumerOptions, + ILogger logger) + { + this._client = client; + this._consumerInitializer = consumerInitializer; + this._messageProcessor = messageProcessor; + this._logger = logger; + this._serverSettings = serverOptions.Value; + this._consumerSettings = consumerOptions.Value; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + this._connection = await this._client.TryConnectAsync(this._consumerSettings.ConnectionAttemptCount, stoppingToken); + if (this._connection == null) + { + this._logger.LogInformation("Listening RabbitMQ events wasn't started"); + return; + } + + this._channel = this._connection!.CreateModel(); + this._consumerInitializer.Initialize(this._channel); + + await this.Listen(stoppingToken); + } + + private async Task Listen(CancellationToken token) + { + this._logger.LogInformation( + "Listening RabbitMQ events has started on {Address}. Queue name is {Queue}", + this._serverSettings.Address, + this._consumerSettings.Queue); + + while (!token.IsCancellationRequested) + { + var result = this._channel!.BasicGet(this._consumerSettings.Queue, false); + if (result is null) + { + await Delay(this._consumerSettings.ReceiveMessageDelayMilliseconds, token); + continue; + } + + await this.ProcessAsync(result, token); + } + } + + public override void Dispose() + { + this._channel?.Close(); + this._connection?.Close(); + base.Dispose(); + GC.SuppressFinalize(this); + } + + private async Task ProcessAsync(BasicGetResult result, CancellationToken token) + { + try + { + var message = Encoding.UTF8.GetString(result.Body.Span); + if (result.Redelivered && result.DeliveryTag > this._consumerSettings.FailedMessageRetryCount) + { + var behaviour = await this._messageProcessor.ProcessDeadLetterAsync( + result.BasicProperties, + result.RoutingKey, + message, + token); + if (behaviour == DeadLetterBehaviour.Skip) + { + this._channel!.BasicAck(result.DeliveryTag, false); + return; + } + } + + await this._messageProcessor.ProcessAsync(result.BasicProperties, result.RoutingKey, message, token); + this._channel!.BasicAck(result.DeliveryTag, false); + } + catch (Exception ex) + { + this._logger.LogError(ex, "There was some problem with processing message. Routing key:'{RoutingKey}'", result.RoutingKey); + this._channel!.BasicNack(result.DeliveryTag, false, true); + await Delay(this._consumerSettings.RejectMessageDelayMilliseconds, token); + } + } + + private static Task Delay(int value, CancellationToken token) => Task.Delay(TimeSpan.FromMilliseconds(value), token); +} diff --git a/src/Framework.RabbitMq.Consumer/DependencyInjection.cs b/src/Framework.RabbitMq.Consumer/DependencyInjection.cs new file mode 100644 index 000000000..699bdfcd7 --- /dev/null +++ b/src/Framework.RabbitMq.Consumer/DependencyInjection.cs @@ -0,0 +1,20 @@ +using Framework.RabbitMq.Consumer.BackgroundServices; +using Framework.RabbitMq.Consumer.Interfaces; +using Framework.RabbitMq.Consumer.Services; +using Framework.RabbitMq.Consumer.Settings; + +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; + +namespace Framework.RabbitMq.Consumer; + +public static class DependencyInjection +{ + public static IServiceCollection AddRabbitMqConsumer(this IServiceCollection services, IConfiguration configuration) + where TMessageProcessor : class, IRabbitMqMessageProcessor => + services + .Configure(configuration.GetSection("RabbitMQ:Consumer")) + .AddSingleton() + .AddSingleton() + .AddHostedService(); +} diff --git a/src/Framework.RabbitMq.Consumer/Enums/DeadLetterBehaviour.cs b/src/Framework.RabbitMq.Consumer/Enums/DeadLetterBehaviour.cs new file mode 100644 index 000000000..b977e87a2 --- /dev/null +++ b/src/Framework.RabbitMq.Consumer/Enums/DeadLetterBehaviour.cs @@ -0,0 +1,8 @@ +namespace Framework.RabbitMq.Consumer.Enums; + +public enum DeadLetterBehaviour +{ + ForeverRetry = 1, + + Skip = 2 +} diff --git a/src/Framework.RabbitMq.Consumer/Framework.RabbitMq.Consumer.csproj b/src/Framework.RabbitMq.Consumer/Framework.RabbitMq.Consumer.csproj new file mode 100644 index 000000000..15222e9d4 --- /dev/null +++ b/src/Framework.RabbitMq.Consumer/Framework.RabbitMq.Consumer.csproj @@ -0,0 +1,9 @@ + + + Luxoft.Framework.RabbitMq.Consumer + enable + + + + + diff --git a/src/Framework.RabbitMq.Consumer/Interfaces/IRabbitMqConsumerInitializer.cs b/src/Framework.RabbitMq.Consumer/Interfaces/IRabbitMqConsumerInitializer.cs new file mode 100644 index 000000000..c02f6bbcd --- /dev/null +++ b/src/Framework.RabbitMq.Consumer/Interfaces/IRabbitMqConsumerInitializer.cs @@ -0,0 +1,8 @@ +using RabbitMQ.Client; + +namespace Framework.RabbitMq.Consumer.Interfaces; + +public interface IRabbitMqConsumerInitializer +{ + void Initialize(IModel model); +} diff --git a/src/Framework.RabbitMq.Consumer/Interfaces/IRabbitMqMessageProcessor.cs b/src/Framework.RabbitMq.Consumer/Interfaces/IRabbitMqMessageProcessor.cs new file mode 100644 index 000000000..39b559bb6 --- /dev/null +++ b/src/Framework.RabbitMq.Consumer/Interfaces/IRabbitMqMessageProcessor.cs @@ -0,0 +1,17 @@ +using Framework.RabbitMq.Consumer.Enums; + +using RabbitMQ.Client; + +namespace Framework.RabbitMq.Consumer.Interfaces; + +public interface IRabbitMqMessageProcessor +{ + Task ProcessAsync(IBasicProperties properties, string routingKey, string message, CancellationToken token); + + Task ProcessDeadLetterAsync( + IBasicProperties properties, + string routingKey, + string message, + CancellationToken token) => + Task.FromResult(DeadLetterBehaviour.ForeverRetry); +} diff --git a/src/Framework.RabbitMq.Consumer/Services/RabbitMqConsumerInitializer.cs b/src/Framework.RabbitMq.Consumer/Services/RabbitMqConsumerInitializer.cs new file mode 100644 index 000000000..1eb9962e4 --- /dev/null +++ b/src/Framework.RabbitMq.Consumer/Services/RabbitMqConsumerInitializer.cs @@ -0,0 +1,22 @@ +using Framework.RabbitMq.Consumer.Interfaces; +using Framework.RabbitMq.Consumer.Settings; + +using Microsoft.Extensions.Options; + +using RabbitMQ.Client; + +namespace Framework.RabbitMq.Consumer.Services; + +public record RabbitMqConsumerInitializer(IOptions Options) : IRabbitMqConsumerInitializer +{ + public void Initialize(IModel model) + { + var consumerSettings = this.Options.Value; + + model.ExchangeDeclare(consumerSettings.Exchange, ExchangeType.Topic, true); + model.QueueDeclare(consumerSettings.Queue, true, false, false, null); + + foreach (var routingKey in consumerSettings.RoutingKeys) + model.QueueBind(consumerSettings.Queue, consumerSettings.Exchange, routingKey); + } +} diff --git a/src/Framework.RabbitMq.Consumer/Settings/RabbitMqConsumerSettings.cs b/src/Framework.RabbitMq.Consumer/Settings/RabbitMqConsumerSettings.cs new file mode 100644 index 000000000..3ae0c76fb --- /dev/null +++ b/src/Framework.RabbitMq.Consumer/Settings/RabbitMqConsumerSettings.cs @@ -0,0 +1,18 @@ +namespace Framework.RabbitMq.Consumer.Settings; + +public class RabbitMqConsumerSettings +{ + public int ReceiveMessageDelayMilliseconds { get; set; } = 1000; + + public int RejectMessageDelayMilliseconds { get; set; } = 3000; + + public ulong FailedMessageRetryCount { get; set; } = 3; + + public int ConnectionAttemptCount { get; set; } = 3; + + public string Exchange { get; set; } = default!; + + public string Queue { get; set; } = default!; + + public string[] RoutingKeys { get; set; } = Array.Empty(); +} diff --git a/src/Framework.RabbitMq/DependencyInjection.cs b/src/Framework.RabbitMq/DependencyInjection.cs new file mode 100644 index 000000000..6cd3f3667 --- /dev/null +++ b/src/Framework.RabbitMq/DependencyInjection.cs @@ -0,0 +1,16 @@ +using Framework.RabbitMq.Interfaces; +using Framework.RabbitMq.Services; +using Framework.RabbitMq.Settings; + +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; + +namespace Framework.RabbitMq; + +public static class DependencyInjection +{ + public static IServiceCollection AddRabbitMqClient(this IServiceCollection services, IConfiguration configuration) => + services + .Configure(configuration.GetSection("RabbitMQ:Server")) + .AddSingleton(); +} diff --git a/src/Framework.RabbitMq/Framework.RabbitMq.csproj b/src/Framework.RabbitMq/Framework.RabbitMq.csproj new file mode 100644 index 000000000..3fc15acc3 --- /dev/null +++ b/src/Framework.RabbitMq/Framework.RabbitMq.csproj @@ -0,0 +1,15 @@ + + + Luxoft.Framework.RabbitMq + enable + + + + + + + + + + + diff --git a/src/Framework.RabbitMq/Interfaces/IRabbitMqClient.cs b/src/Framework.RabbitMq/Interfaces/IRabbitMqClient.cs new file mode 100644 index 000000000..bfe634a48 --- /dev/null +++ b/src/Framework.RabbitMq/Interfaces/IRabbitMqClient.cs @@ -0,0 +1,8 @@ +using RabbitMQ.Client; + +namespace Framework.RabbitMq.Interfaces; + +public interface IRabbitMqClient +{ + Task TryConnectAsync(int attempts, CancellationToken token = default); +} diff --git a/src/Framework.RabbitMq/Services/RabbitMqClient.cs b/src/Framework.RabbitMq/Services/RabbitMqClient.cs new file mode 100644 index 000000000..7d2a07c48 --- /dev/null +++ b/src/Framework.RabbitMq/Services/RabbitMqClient.cs @@ -0,0 +1,51 @@ +using Framework.RabbitMq.Interfaces; +using Framework.RabbitMq.Settings; + +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +using Polly; +using Polly.Retry; + +using RabbitMQ.Client; + +namespace Framework.RabbitMq.Services; + +public record RabbitMqClient(IOptions Options, ILogger Logger) : IRabbitMqClient +{ + private const int RetryConnectDelay = 5000; + + public async Task TryConnectAsync(int attempts, CancellationToken token = default) + { + var serverSettings = this.Options.Value; + var factory = new ConnectionFactory + { + HostName = serverSettings.Host, + Port = serverSettings.Port, + UserName = serverSettings.UserName, + Password = serverSettings.Secret, + VirtualHost = serverSettings.VirtualHost, + AutomaticRecoveryEnabled = true + }; + + var policy = this.CreateRetryPolicy(attempts); + try + { + return await policy.ExecuteAsync(_ => Task.FromResult(factory.CreateConnection()), token); + } + catch (Exception ex) + { + this.LogConnectionError(ex); + return null; + } + } + + private AsyncRetryPolicy CreateRetryPolicy(int attempts) => + Policy.Handle() + .WaitAndRetryAsync( + attempts, + _ => TimeSpan.FromMilliseconds(RetryConnectDelay), + (ex, _) => this.LogConnectionError(ex)); + + private void LogConnectionError(Exception exception) => this.Logger.LogError(exception, "Could not connect to RabbitMQ server"); +} diff --git a/src/Framework.RabbitMq/Settings/RabbitMqServerSettings.cs b/src/Framework.RabbitMq/Settings/RabbitMqServerSettings.cs new file mode 100644 index 000000000..f01631af3 --- /dev/null +++ b/src/Framework.RabbitMq/Settings/RabbitMqServerSettings.cs @@ -0,0 +1,16 @@ +namespace Framework.RabbitMq.Settings; + +public class RabbitMqServerSettings +{ + public string Host { get; set; } = default!; + + public int Port { get; set; } = 5672; + + public string UserName { get; set; } = default!; + + public string Secret { get; set; } = default!; + + public string VirtualHost { get; set; } = default!; + + public Uri Address => new($"{this.Host}:{this.Port}", UriKind.Absolute); +} diff --git a/src/IAD.Framework.sln b/src/IAD.Framework.sln index 8ab639494..4f03e3289 100644 --- a/src/IAD.Framework.sln +++ b/src/IAD.Framework.sln @@ -373,6 +373,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Framework.Authorization.Env EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Framework.AutomationCore.Unit", "Framework.AutomationCore.Unit\Framework.AutomationCore.Unit.csproj", "{7A4F922E-A66E-4DB6-A07F-DC3780DCF492}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Framework.RabbitMq.Consumer", "Framework.RabbitMq.Consumer\Framework.RabbitMq.Consumer.csproj", "{E583C4A7-F168-47ED-AA63-F1BCD4B87F34}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Framework.RabbitMq", "Framework.RabbitMq\Framework.RabbitMq.csproj", "{AEB54B82-AD22-4BF8-863E-824435EC4352}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -3457,6 +3461,46 @@ Global {7A4F922E-A66E-4DB6-A07F-DC3780DCF492}.Release|x64.Build.0 = Release|Any CPU {7A4F922E-A66E-4DB6-A07F-DC3780DCF492}.Release|x86.ActiveCfg = Release|Any CPU {7A4F922E-A66E-4DB6-A07F-DC3780DCF492}.Release|x86.Build.0 = Release|Any CPU + {E583C4A7-F168-47ED-AA63-F1BCD4B87F34}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E583C4A7-F168-47ED-AA63-F1BCD4B87F34}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E583C4A7-F168-47ED-AA63-F1BCD4B87F34}.Debug|ARM.ActiveCfg = Debug|Any CPU + {E583C4A7-F168-47ED-AA63-F1BCD4B87F34}.Debug|ARM.Build.0 = Debug|Any CPU + {E583C4A7-F168-47ED-AA63-F1BCD4B87F34}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU + {E583C4A7-F168-47ED-AA63-F1BCD4B87F34}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU + {E583C4A7-F168-47ED-AA63-F1BCD4B87F34}.Debug|x64.ActiveCfg = Debug|Any CPU + {E583C4A7-F168-47ED-AA63-F1BCD4B87F34}.Debug|x64.Build.0 = Debug|Any CPU + {E583C4A7-F168-47ED-AA63-F1BCD4B87F34}.Debug|x86.ActiveCfg = Debug|Any CPU + {E583C4A7-F168-47ED-AA63-F1BCD4B87F34}.Debug|x86.Build.0 = Debug|Any CPU + {E583C4A7-F168-47ED-AA63-F1BCD4B87F34}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E583C4A7-F168-47ED-AA63-F1BCD4B87F34}.Release|Any CPU.Build.0 = Release|Any CPU + {E583C4A7-F168-47ED-AA63-F1BCD4B87F34}.Release|ARM.ActiveCfg = Release|Any CPU + {E583C4A7-F168-47ED-AA63-F1BCD4B87F34}.Release|ARM.Build.0 = Release|Any CPU + {E583C4A7-F168-47ED-AA63-F1BCD4B87F34}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU + {E583C4A7-F168-47ED-AA63-F1BCD4B87F34}.Release|Mixed Platforms.Build.0 = Release|Any CPU + {E583C4A7-F168-47ED-AA63-F1BCD4B87F34}.Release|x64.ActiveCfg = Release|Any CPU + {E583C4A7-F168-47ED-AA63-F1BCD4B87F34}.Release|x64.Build.0 = Release|Any CPU + {E583C4A7-F168-47ED-AA63-F1BCD4B87F34}.Release|x86.ActiveCfg = Release|Any CPU + {E583C4A7-F168-47ED-AA63-F1BCD4B87F34}.Release|x86.Build.0 = Release|Any CPU + {AEB54B82-AD22-4BF8-863E-824435EC4352}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {AEB54B82-AD22-4BF8-863E-824435EC4352}.Debug|Any CPU.Build.0 = Debug|Any CPU + {AEB54B82-AD22-4BF8-863E-824435EC4352}.Debug|ARM.ActiveCfg = Debug|Any CPU + {AEB54B82-AD22-4BF8-863E-824435EC4352}.Debug|ARM.Build.0 = Debug|Any CPU + {AEB54B82-AD22-4BF8-863E-824435EC4352}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU + {AEB54B82-AD22-4BF8-863E-824435EC4352}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU + {AEB54B82-AD22-4BF8-863E-824435EC4352}.Debug|x64.ActiveCfg = Debug|Any CPU + {AEB54B82-AD22-4BF8-863E-824435EC4352}.Debug|x64.Build.0 = Debug|Any CPU + {AEB54B82-AD22-4BF8-863E-824435EC4352}.Debug|x86.ActiveCfg = Debug|Any CPU + {AEB54B82-AD22-4BF8-863E-824435EC4352}.Debug|x86.Build.0 = Debug|Any CPU + {AEB54B82-AD22-4BF8-863E-824435EC4352}.Release|Any CPU.ActiveCfg = Release|Any CPU + {AEB54B82-AD22-4BF8-863E-824435EC4352}.Release|Any CPU.Build.0 = Release|Any CPU + {AEB54B82-AD22-4BF8-863E-824435EC4352}.Release|ARM.ActiveCfg = Release|Any CPU + {AEB54B82-AD22-4BF8-863E-824435EC4352}.Release|ARM.Build.0 = Release|Any CPU + {AEB54B82-AD22-4BF8-863E-824435EC4352}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU + {AEB54B82-AD22-4BF8-863E-824435EC4352}.Release|Mixed Platforms.Build.0 = Release|Any CPU + {AEB54B82-AD22-4BF8-863E-824435EC4352}.Release|x64.ActiveCfg = Release|Any CPU + {AEB54B82-AD22-4BF8-863E-824435EC4352}.Release|x64.Build.0 = Release|Any CPU + {AEB54B82-AD22-4BF8-863E-824435EC4352}.Release|x86.ActiveCfg = Release|Any CPU + {AEB54B82-AD22-4BF8-863E-824435EC4352}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/__SolutionItems/CommonAssemblyInfo.cs b/src/__SolutionItems/CommonAssemblyInfo.cs index a654d0226..9844d95a4 100644 --- a/src/__SolutionItems/CommonAssemblyInfo.cs +++ b/src/__SolutionItems/CommonAssemblyInfo.cs @@ -4,9 +4,9 @@ [assembly: AssemblyCompany("Luxoft")] [assembly: AssemblyCopyright("Copyright © Luxoft 2009-2023")] -[assembly: AssemblyVersion("19.0.14.0")] -[assembly: AssemblyFileVersion("19.0.14.0")] -[assembly: AssemblyInformationalVersion("19.0.14")] +[assembly: AssemblyVersion("19.0.15.0")] +[assembly: AssemblyFileVersion("19.0.15.0")] +[assembly: AssemblyInformationalVersion("19.0.15")] #if DEBUG [assembly: AssemblyConfiguration("Debug")]