From b5c83a0b34fd3108b231a2fdd9dd1b6fa27dc182 Mon Sep 17 00:00:00 2001 From: Phong Nguyen Date: Sat, 9 Sep 2023 09:58:00 +0700 Subject: [PATCH] Message Bus Consumers + OutBox Event Publishers --- .../EventLogs/Commands/PublishEventsCommand.cs | 1 + .../FileEntryOutBoxEventPublisher.cs | 8 +++++++- .../MessageBrokers/IOutBoxEventPublisher.cs | 4 ++++ .../Infrastructure/MessageBrokers/MessageBus.cs | 13 ++++++++----- 4 files changed, 20 insertions(+), 6 deletions(-) diff --git a/src/Monolith/ClassifiedAds.Application/EventLogs/Commands/PublishEventsCommand.cs b/src/Monolith/ClassifiedAds.Application/EventLogs/Commands/PublishEventsCommand.cs index 6b914aaf8..44d5de537 100644 --- a/src/Monolith/ClassifiedAds.Application/EventLogs/Commands/PublishEventsCommand.cs +++ b/src/Monolith/ClassifiedAds.Application/EventLogs/Commands/PublishEventsCommand.cs @@ -47,6 +47,7 @@ public async Task HandleAsync(PublishEventsCommand command, CancellationToken ca { Id = eventLog.Id.ToString(), EventType = eventLog.EventType, + EventSource = typeof(PublishEventsCommand).Assembly.GetName().Name, Payload = eventLog.Message, }; diff --git a/src/Monolith/ClassifiedAds.BackgroundServer/OutBoxEventPublishers/FileEntryOutBoxEventPublisher.cs b/src/Monolith/ClassifiedAds.BackgroundServer/OutBoxEventPublishers/FileEntryOutBoxEventPublisher.cs index 968cab48a..d0abde48d 100644 --- a/src/Monolith/ClassifiedAds.BackgroundServer/OutBoxEventPublishers/FileEntryOutBoxEventPublisher.cs +++ b/src/Monolith/ClassifiedAds.BackgroundServer/OutBoxEventPublishers/FileEntryOutBoxEventPublisher.cs @@ -1,4 +1,5 @@ -using ClassifiedAds.Application.FileEntries.DTOs; +using ClassifiedAds.Application.EventLogs.Commands; +using ClassifiedAds.Application.FileEntries.DTOs; using ClassifiedAds.Domain.Constants; using ClassifiedAds.Domain.Entities; using ClassifiedAds.Domain.Infrastructure.MessageBrokers; @@ -17,6 +18,11 @@ public static string[] CanHandleEventTypes() return new string[] { EventTypeConstants.FileEntryCreated, EventTypeConstants.FileEntryDeleted }; } + public static string CanHandleEventSource() + { + return typeof(PublishEventsCommand).Assembly.GetName().Name; + } + public FileEntryOutBoxEventPublisher(IMessageBus messageBus) { _messageBus = messageBus; diff --git a/src/Monolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IOutBoxEventPublisher.cs b/src/Monolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IOutBoxEventPublisher.cs index 5dc03f6a1..1c9ef5f0f 100644 --- a/src/Monolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IOutBoxEventPublisher.cs +++ b/src/Monolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/IOutBoxEventPublisher.cs @@ -7,6 +7,8 @@ public interface IOutBoxEventPublisher { static abstract string[] CanHandleEventTypes(); + static abstract string CanHandleEventSource(); + Task HandleAsync(PublishingOutBoxEvent outbox, CancellationToken cancellationToken = default); } @@ -16,5 +18,7 @@ public class PublishingOutBoxEvent public string EventType { get; set; } + public string EventSource { get; set; } + public string Payload { get; set; } } \ No newline at end of file diff --git a/src/Monolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/MessageBus.cs b/src/Monolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/MessageBus.cs index 3169e37d6..8a4478651 100644 --- a/src/Monolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/MessageBus.cs +++ b/src/Monolith/ClassifiedAds.Domain/Infrastructure/MessageBrokers/MessageBus.cs @@ -13,7 +13,7 @@ public class MessageBus : IMessageBus { private readonly IServiceProvider _serviceProvider; private static List _consumers = new List(); - private static Dictionary> _outboxEventHandlers = new (); + private static Dictionary> _outboxEventHandlers = new(); internal static void AddConsumers(Assembly assembly, IServiceCollection services) { @@ -43,15 +43,17 @@ internal static void AddOutboxEventPublishers(Assembly assembly, IServiceCollect foreach (var item in types) { var canHandlerEventTypes = (string[])item.InvokeMember(nameof(IOutBoxEventPublisher.CanHandleEventTypes), BindingFlags.InvokeMethod, null, null, null, CultureInfo.CurrentCulture); + var eventSource = (string)item.InvokeMember(nameof(IOutBoxEventPublisher.CanHandleEventSource), BindingFlags.InvokeMethod, null, null, null, CultureInfo.CurrentCulture); foreach (var eventType in canHandlerEventTypes) { - if (!_outboxEventHandlers.ContainsKey(eventType)) + var key = eventSource + ":" + eventType; + if (!_outboxEventHandlers.ContainsKey(key)) { - _outboxEventHandlers[eventType] = new List(); + _outboxEventHandlers[key] = new List(); } - _outboxEventHandlers[eventType].Add(item); + _outboxEventHandlers[key].Add(item); } } } @@ -97,7 +99,8 @@ await _serviceProvider.GetRequiredService>().Rece public async Task SendAsync(PublishingOutBoxEvent outbox, CancellationToken cancellationToken = default) { - var handlerTypes = _outboxEventHandlers.ContainsKey(outbox.EventType) ? _outboxEventHandlers[outbox.EventType] : null; + var key = outbox.EventSource + ":" + outbox.EventType; + var handlerTypes = _outboxEventHandlers.ContainsKey(key) ? _outboxEventHandlers[key] : null; if (handlerTypes == null) {