From 4d1d9022c39facfb3dfd36cf1925baf11bb0dc42 Mon Sep 17 00:00:00 2001 From: Leshchev Artem Date: Sat, 18 May 2024 20:26:55 +0200 Subject: [PATCH] Add domain & integration events (#11) Co-authored-by: Artem Leshchev --- .../Bss.Platform.Events.Abstractions.csproj | 8 ++ .../IDomainEvent.cs | 5 ++ .../IDomainEventPublisher.cs | 6 ++ .../IIntegrationEvent.cs | 5 ++ .../IIntegrationEventPublisher.cs | 6 ++ .../Bss.Platform.Events.csproj | 14 ++++ .../CapConsumerExecutor.cs | 10 +++ .../CapConsumerServiceSelector.cs | 53 ++++++++++++ .../DependencyInjection.cs | 80 +++++++++++++++++++ .../Interfaces/IIntegrationEventProcessor.cs | 8 ++ .../IntegrationEventsMessageQueueOptions.cs | 18 +++++ .../Models/IntegrationEventsOptions.cs | 24 ++++++ .../IntegrationEventsSqlServerOptions.cs | 8 ++ .../Publishers/DomainEventPublisher.cs | 10 +++ .../Publishers/IntegrationEventPublisher.cs | 19 +++++ src/Bss.Platform.sln | 16 ++++ src/Directory.Packages.props | 5 ++ src/__SolutionItems/CommonAssemblyInfo.cs | 6 +- 18 files changed, 298 insertions(+), 3 deletions(-) create mode 100644 src/Bss.Platform.Events.Abstractions/Bss.Platform.Events.Abstractions.csproj create mode 100644 src/Bss.Platform.Events.Abstractions/IDomainEvent.cs create mode 100644 src/Bss.Platform.Events.Abstractions/IDomainEventPublisher.cs create mode 100644 src/Bss.Platform.Events.Abstractions/IIntegrationEvent.cs create mode 100644 src/Bss.Platform.Events.Abstractions/IIntegrationEventPublisher.cs create mode 100644 src/Bss.Platform.Events/Bss.Platform.Events.csproj create mode 100644 src/Bss.Platform.Events/CapConsumerExecutor.cs create mode 100644 src/Bss.Platform.Events/CapConsumerServiceSelector.cs create mode 100644 src/Bss.Platform.Events/DependencyInjection.cs create mode 100644 src/Bss.Platform.Events/Interfaces/IIntegrationEventProcessor.cs create mode 100644 src/Bss.Platform.Events/Models/IntegrationEventsMessageQueueOptions.cs create mode 100644 src/Bss.Platform.Events/Models/IntegrationEventsOptions.cs create mode 100644 src/Bss.Platform.Events/Models/IntegrationEventsSqlServerOptions.cs create mode 100644 src/Bss.Platform.Events/Publishers/DomainEventPublisher.cs create mode 100644 src/Bss.Platform.Events/Publishers/IntegrationEventPublisher.cs diff --git a/src/Bss.Platform.Events.Abstractions/Bss.Platform.Events.Abstractions.csproj b/src/Bss.Platform.Events.Abstractions/Bss.Platform.Events.Abstractions.csproj new file mode 100644 index 0000000..b0a2923 --- /dev/null +++ b/src/Bss.Platform.Events.Abstractions/Bss.Platform.Events.Abstractions.csproj @@ -0,0 +1,8 @@ + + + Luxoft.Bss.Platform.Events.Abstractions + + + + + diff --git a/src/Bss.Platform.Events.Abstractions/IDomainEvent.cs b/src/Bss.Platform.Events.Abstractions/IDomainEvent.cs new file mode 100644 index 0000000..82e2c5d --- /dev/null +++ b/src/Bss.Platform.Events.Abstractions/IDomainEvent.cs @@ -0,0 +1,5 @@ +using MediatR; + +namespace Bss.Platform.Events.Abstractions; + +public interface IDomainEvent : INotification; diff --git a/src/Bss.Platform.Events.Abstractions/IDomainEventPublisher.cs b/src/Bss.Platform.Events.Abstractions/IDomainEventPublisher.cs new file mode 100644 index 0000000..241730d --- /dev/null +++ b/src/Bss.Platform.Events.Abstractions/IDomainEventPublisher.cs @@ -0,0 +1,6 @@ +namespace Bss.Platform.Events.Abstractions; + +public interface IDomainEventPublisher +{ + Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken); +} diff --git a/src/Bss.Platform.Events.Abstractions/IIntegrationEvent.cs b/src/Bss.Platform.Events.Abstractions/IIntegrationEvent.cs new file mode 100644 index 0000000..256cbdc --- /dev/null +++ b/src/Bss.Platform.Events.Abstractions/IIntegrationEvent.cs @@ -0,0 +1,5 @@ +using MediatR; + +namespace Bss.Platform.Events.Abstractions; + +public interface IIntegrationEvent : INotification; diff --git a/src/Bss.Platform.Events.Abstractions/IIntegrationEventPublisher.cs b/src/Bss.Platform.Events.Abstractions/IIntegrationEventPublisher.cs new file mode 100644 index 0000000..1e6cb80 --- /dev/null +++ b/src/Bss.Platform.Events.Abstractions/IIntegrationEventPublisher.cs @@ -0,0 +1,6 @@ +namespace Bss.Platform.Events.Abstractions; + +public interface IIntegrationEventPublisher +{ + Task PublishAsync(IIntegrationEvent @event, CancellationToken cancellationToken); +} diff --git a/src/Bss.Platform.Events/Bss.Platform.Events.csproj b/src/Bss.Platform.Events/Bss.Platform.Events.csproj new file mode 100644 index 0000000..0ad9700 --- /dev/null +++ b/src/Bss.Platform.Events/Bss.Platform.Events.csproj @@ -0,0 +1,14 @@ + + + Luxoft.Bss.Platform.Events + + + + + + + + + + + diff --git a/src/Bss.Platform.Events/CapConsumerExecutor.cs b/src/Bss.Platform.Events/CapConsumerExecutor.cs new file mode 100644 index 0000000..ca29eb7 --- /dev/null +++ b/src/Bss.Platform.Events/CapConsumerExecutor.cs @@ -0,0 +1,10 @@ +using Bss.Platform.Events.Abstractions; +using Bss.Platform.Events.Interfaces; + +namespace Bss.Platform.Events; + +internal class CapConsumerExecutor(IIntegrationEventProcessor eventProcessor) + where TEvent : IIntegrationEvent +{ + public Task HandleAsync(TEvent @event, CancellationToken cancellationToken) => eventProcessor.ProcessAsync(@event, cancellationToken); +} diff --git a/src/Bss.Platform.Events/CapConsumerServiceSelector.cs b/src/Bss.Platform.Events/CapConsumerServiceSelector.cs new file mode 100644 index 0000000..dd33ba5 --- /dev/null +++ b/src/Bss.Platform.Events/CapConsumerServiceSelector.cs @@ -0,0 +1,53 @@ +using System.Reflection; + +using Bss.Platform.Events.Abstractions; + +using DotNetCore.CAP; +using DotNetCore.CAP.Internal; + +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; + +namespace Bss.Platform.Events; + +public class CapConsumerServiceSelector(IServiceProvider serviceProvider, Assembly assembly) + : ConsumerServiceSelector(serviceProvider) +{ + protected override IEnumerable FindConsumersFromControllerTypes() => []; + + protected override IEnumerable FindConsumersFromInterfaceTypes(IServiceProvider provider) + { + var namePrefix = provider.GetRequiredService>().Value.TopicNamePrefix; + + return assembly + .ExportedTypes + .Where(x => typeof(IIntegrationEvent).IsAssignableFrom(x) && x is { IsInterface: false, IsAbstract: false }) + .Select(x => this.CreateExecutorDescriptor(typeof(CapConsumerExecutor<>).MakeGenericType(x), x, namePrefix)); + } + + private ConsumerExecutorDescriptor CreateExecutorDescriptor(Type executor, Type @event, string? namePrefix) + { + var subscribeAttribute = new CapSubscribeAttribute(@event.Name); + this.SetSubscribeAttribute(subscribeAttribute); + + var methodInfo = executor + .GetRuntimeMethods() + .Single(x => x.Name.Contains(nameof(CapConsumerExecutor.HandleAsync))); + + var methodParameters = methodInfo.GetParameters(); + return new ConsumerExecutorDescriptor + { + Attribute = subscribeAttribute, + ClassAttribute = null, + MethodInfo = methodInfo, + ImplTypeInfo = executor.GetTypeInfo(), + ServiceTypeInfo = null, + TopicNamePrefix = namePrefix, + Parameters = new List + { + new() { ParameterType = methodParameters[0].ParameterType, IsFromCap = false }, + new() { ParameterType = methodParameters[1].ParameterType, IsFromCap = true } + } + }; + } +} diff --git a/src/Bss.Platform.Events/DependencyInjection.cs b/src/Bss.Platform.Events/DependencyInjection.cs new file mode 100644 index 0000000..3e46b77 --- /dev/null +++ b/src/Bss.Platform.Events/DependencyInjection.cs @@ -0,0 +1,80 @@ +using System.Data; +using System.Reflection; + +using Bss.Platform.Events.Abstractions; +using Bss.Platform.Events.Interfaces; +using Bss.Platform.Events.Models; +using Bss.Platform.Events.Publishers; + +using DotNetCore.CAP; +using DotNetCore.CAP.Internal; + +using Microsoft.Extensions.DependencyInjection; + +using Savorboard.CAP.InMemoryMessageQueue; + +namespace Bss.Platform.Events; + +public static class DependencyInjection +{ + public static IServiceCollection AddPlatformDomainEvents(this IServiceCollection services) => + services.AddScoped(); + + public static IServiceCollection AddPlatformIntegrationEvents( + this IServiceCollection services, + Assembly eventsAssembly, + Action? setup = null) + where TEventProcessor : class, IIntegrationEventProcessor + { + services + .AddSingleton() + .AddSingleton(x => new CapConsumerServiceSelector(x, eventsAssembly)) + .AddScoped() + .AddScoped( + serviceProvider => + { + var capTransaction = ActivatorUtilities.CreateInstance(serviceProvider); + capTransaction.DbTransaction = serviceProvider.GetRequiredService(); + return capTransaction; + }) + .AddCap( + x => + { + var eventsOptions = IntegrationEventsOptions.Default; + setup?.Invoke(eventsOptions); + + x.FailedRetryCount = eventsOptions.FailedRetryCount; + x.SucceedMessageExpiredAfter = (int)TimeSpan.FromDays(eventsOptions.RetentionDays).TotalSeconds; + + x.UseSqlServer( + o => + { + o.ConnectionString = eventsOptions.SqlServer.ConnectionString; + o.Schema = eventsOptions.SqlServer.Schema; + }); + + x.UseDashboard(o => o.PathMatch = eventsOptions.DashboardPath); + + if (!eventsOptions.MessageQueue.Enable) + { + x.UseInMemoryMessageQueue(); + return; + } + + x.DefaultGroupName = eventsOptions.MessageQueue.ExchangeName; + x.UseRabbitMQ( + o => + { + o.HostName = eventsOptions.MessageQueue.Host; + o.Port = eventsOptions.MessageQueue.Port; + o.VirtualHost = eventsOptions.MessageQueue.VirtualHost; + o.Password = eventsOptions.MessageQueue.Secret; + o.UserName = eventsOptions.MessageQueue.UserName; + o.ExchangeName = eventsOptions.MessageQueue.ExchangeName; + o.BasicQosOptions = new RabbitMQOptions.BasicQos(1, true); + }); + }); + + return services; + } +} diff --git a/src/Bss.Platform.Events/Interfaces/IIntegrationEventProcessor.cs b/src/Bss.Platform.Events/Interfaces/IIntegrationEventProcessor.cs new file mode 100644 index 0000000..25af453 --- /dev/null +++ b/src/Bss.Platform.Events/Interfaces/IIntegrationEventProcessor.cs @@ -0,0 +1,8 @@ +using Bss.Platform.Events.Abstractions; + +namespace Bss.Platform.Events.Interfaces; + +public interface IIntegrationEventProcessor +{ + Task ProcessAsync(IIntegrationEvent @event, CancellationToken token); +} diff --git a/src/Bss.Platform.Events/Models/IntegrationEventsMessageQueueOptions.cs b/src/Bss.Platform.Events/Models/IntegrationEventsMessageQueueOptions.cs new file mode 100644 index 0000000..5f1a5fa --- /dev/null +++ b/src/Bss.Platform.Events/Models/IntegrationEventsMessageQueueOptions.cs @@ -0,0 +1,18 @@ +namespace Bss.Platform.Events.Models; + +public class IntegrationEventsMessageQueueOptions +{ + public bool Enable { get; set; } + + public string Host { get; set; } = default!; + + public int Port { get; set; } + + public string UserName { get; set; } = default!; + + public string Secret { get; set; } = default!; + + public string VirtualHost { get; set; } = default!; + + public string ExchangeName { get; set; } = default!; +} diff --git a/src/Bss.Platform.Events/Models/IntegrationEventsOptions.cs b/src/Bss.Platform.Events/Models/IntegrationEventsOptions.cs new file mode 100644 index 0000000..5ea18fe --- /dev/null +++ b/src/Bss.Platform.Events/Models/IntegrationEventsOptions.cs @@ -0,0 +1,24 @@ +namespace Bss.Platform.Events.Models; + +public class IntegrationEventsOptions +{ + public string DashboardPath { get; set; } = default!; + + public int FailedRetryCount { get; set; } + + public int RetentionDays { get; set; } + + public IntegrationEventsSqlServerOptions SqlServer { get; set; } = default!; + + public IntegrationEventsMessageQueueOptions MessageQueue { get; set; } = default!; + + public static IntegrationEventsOptions Default => + new() + { + DashboardPath = "/admin/events", + FailedRetryCount = 5, + RetentionDays = 15, + SqlServer = new IntegrationEventsSqlServerOptions { Schema = "events" }, + MessageQueue = new IntegrationEventsMessageQueueOptions { Enable = true } + }; +} diff --git a/src/Bss.Platform.Events/Models/IntegrationEventsSqlServerOptions.cs b/src/Bss.Platform.Events/Models/IntegrationEventsSqlServerOptions.cs new file mode 100644 index 0000000..f38a179 --- /dev/null +++ b/src/Bss.Platform.Events/Models/IntegrationEventsSqlServerOptions.cs @@ -0,0 +1,8 @@ +namespace Bss.Platform.Events.Models; + +public class IntegrationEventsSqlServerOptions +{ + public string ConnectionString { get; set; } = default!; + + public string Schema { get; set; } = default!; +} diff --git a/src/Bss.Platform.Events/Publishers/DomainEventPublisher.cs b/src/Bss.Platform.Events/Publishers/DomainEventPublisher.cs new file mode 100644 index 0000000..a3b9072 --- /dev/null +++ b/src/Bss.Platform.Events/Publishers/DomainEventPublisher.cs @@ -0,0 +1,10 @@ +using Bss.Platform.Events.Abstractions; + +using MediatR; + +namespace Bss.Platform.Events.Publishers; + +public class DomainEventPublisher(IMediator mediator) : IDomainEventPublisher +{ + public Task PublishAsync(IDomainEvent @event, CancellationToken cancellationToken) => mediator.Publish(@event, cancellationToken); +} diff --git a/src/Bss.Platform.Events/Publishers/IntegrationEventPublisher.cs b/src/Bss.Platform.Events/Publishers/IntegrationEventPublisher.cs new file mode 100644 index 0000000..1d0273c --- /dev/null +++ b/src/Bss.Platform.Events/Publishers/IntegrationEventPublisher.cs @@ -0,0 +1,19 @@ +using Bss.Platform.Events.Abstractions; + +using DotNetCore.CAP; + +namespace Bss.Platform.Events.Publishers; + +public class IntegrationEventPublisher(ICapPublisher capPublisher, ICapTransaction capTransaction) : IIntegrationEventPublisher +{ + public Task PublishAsync(IIntegrationEvent @event, CancellationToken cancellationToken) + { + if (capPublisher.Transaction is not null && capPublisher.Transaction != capTransaction) + { + throw new Exception("There cannot be different CAP transactions within the same scope"); + } + + capPublisher.Transaction = capTransaction; + return capPublisher.PublishAsync(@event.GetType().Name, @event, cancellationToken: cancellationToken); + } +} diff --git a/src/Bss.Platform.sln b/src/Bss.Platform.sln index b5cf3c5..8ae9840 100644 --- a/src/Bss.Platform.sln +++ b/src/Bss.Platform.sln @@ -28,6 +28,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Bss.Platform.Kubernetes", " EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Bss.Platform.Api.Middlewares", "Bss.Platform.Api.Middlewares\Bss.Platform.Api.Middlewares.csproj", "{285EBB7B-6B2A-453E-98F6-30AC38317A76}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Events", "Events", "{80AA8C01-842D-4A5F-BCBE-48E0361783A0}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Bss.Platform.Events", "Bss.Platform.Events\Bss.Platform.Events.csproj", "{51668BE6-C4A4-4E62-9BEC-529C33A434DB}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Bss.Platform.Events.Abstractions", "Bss.Platform.Events.Abstractions\Bss.Platform.Events.Abstractions.csproj", "{BE2E219F-3F3B-48E8-B13B-A71321BB6372}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -42,6 +48,8 @@ Global {0005DAE1-3F66-4A98-A11B-DEFDF2BD7EA3} = {8F22FE4E-8FD4-4309-8B47-C33AA6E52052} {1BD00077-8005-42E9-AEFE-62A21E057858} = {51F57F48-6CDC-43E2-90DA-B73C12EA2185} {285EBB7B-6B2A-453E-98F6-30AC38317A76} = {8F22FE4E-8FD4-4309-8B47-C33AA6E52052} + {51668BE6-C4A4-4E62-9BEC-529C33A434DB} = {80AA8C01-842D-4A5F-BCBE-48E0361783A0} + {BE2E219F-3F3B-48E8-B13B-A71321BB6372} = {80AA8C01-842D-4A5F-BCBE-48E0361783A0} EndGlobalSection GlobalSection(ProjectConfigurationPlatforms) = postSolution {860BFBD8-26EA-44F9-980E-21B828FC8F72}.Debug|Any CPU.ActiveCfg = Debug|Any CPU @@ -76,5 +84,13 @@ Global {285EBB7B-6B2A-453E-98F6-30AC38317A76}.Debug|Any CPU.Build.0 = Debug|Any CPU {285EBB7B-6B2A-453E-98F6-30AC38317A76}.Release|Any CPU.ActiveCfg = Release|Any CPU {285EBB7B-6B2A-453E-98F6-30AC38317A76}.Release|Any CPU.Build.0 = Release|Any CPU + {51668BE6-C4A4-4E62-9BEC-529C33A434DB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {51668BE6-C4A4-4E62-9BEC-529C33A434DB}.Debug|Any CPU.Build.0 = Debug|Any CPU + {51668BE6-C4A4-4E62-9BEC-529C33A434DB}.Release|Any CPU.ActiveCfg = Release|Any CPU + {51668BE6-C4A4-4E62-9BEC-529C33A434DB}.Release|Any CPU.Build.0 = Release|Any CPU + {BE2E219F-3F3B-48E8-B13B-A71321BB6372}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {BE2E219F-3F3B-48E8-B13B-A71321BB6372}.Debug|Any CPU.Build.0 = Debug|Any CPU + {BE2E219F-3F3B-48E8-B13B-A71321BB6372}.Release|Any CPU.ActiveCfg = Release|Any CPU + {BE2E219F-3F3B-48E8-B13B-A71321BB6372}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection EndGlobal diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index cd896b7..3f372e3 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -4,6 +4,11 @@ + + + + + diff --git a/src/__SolutionItems/CommonAssemblyInfo.cs b/src/__SolutionItems/CommonAssemblyInfo.cs index d789c80..5779d70 100644 --- a/src/__SolutionItems/CommonAssemblyInfo.cs +++ b/src/__SolutionItems/CommonAssemblyInfo.cs @@ -4,9 +4,9 @@ [assembly: AssemblyCompany("Luxoft")] [assembly: AssemblyCopyright("Copyright © Luxoft 2024")] -[assembly: AssemblyVersion("1.3.1.0")] -[assembly: AssemblyFileVersion("1.3.1.0")] -[assembly: AssemblyInformationalVersion("1.3.1.0")] +[assembly: AssemblyVersion("1.4.0.0")] +[assembly: AssemblyFileVersion("1.4.0.0")] +[assembly: AssemblyInformationalVersion("1.4.0.0")] #if DEBUG [assembly: AssemblyConfiguration("Debug")]