Skip to content

Commit

Permalink
Message Bus Consumers + OutBox Event Publishers
Browse files Browse the repository at this point in the history
  • Loading branch information
phongnguyend committed Sep 9, 2023
1 parent 9953e5a commit b5c83a0
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public async Task HandleAsync(PublishEventsCommand command, CancellationToken ca
{
Id = eventLog.Id.ToString(),
EventType = eventLog.EventType,
EventSource = typeof(PublishEventsCommand).Assembly.GetName().Name,
Payload = eventLog.Message,
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using ClassifiedAds.Application.FileEntries.DTOs;
using ClassifiedAds.Application.EventLogs.Commands;
using ClassifiedAds.Application.FileEntries.DTOs;
using ClassifiedAds.Domain.Constants;
using ClassifiedAds.Domain.Entities;
using ClassifiedAds.Domain.Infrastructure.MessageBrokers;
Expand All @@ -17,6 +18,11 @@ public static string[] CanHandleEventTypes()
return new string[] { EventTypeConstants.FileEntryCreated, EventTypeConstants.FileEntryDeleted };
}

public static string CanHandleEventSource()
{
return typeof(PublishEventsCommand).Assembly.GetName().Name;
}

public FileEntryOutBoxEventPublisher(IMessageBus messageBus)
{
_messageBus = messageBus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ public interface IOutBoxEventPublisher
{
static abstract string[] CanHandleEventTypes();

static abstract string CanHandleEventSource();

Task HandleAsync(PublishingOutBoxEvent outbox, CancellationToken cancellationToken = default);
}

Expand All @@ -16,5 +18,7 @@ public class PublishingOutBoxEvent

public string EventType { get; set; }

public string EventSource { get; set; }

public string Payload { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class MessageBus : IMessageBus
{
private readonly IServiceProvider _serviceProvider;
private static List<Type> _consumers = new List<Type>();
private static Dictionary<string, List<Type>> _outboxEventHandlers = new ();
private static Dictionary<string, List<Type>> _outboxEventHandlers = new();

internal static void AddConsumers(Assembly assembly, IServiceCollection services)
{
Expand Down Expand Up @@ -43,15 +43,17 @@ internal static void AddOutboxEventPublishers(Assembly assembly, IServiceCollect
foreach (var item in types)
{
var canHandlerEventTypes = (string[])item.InvokeMember(nameof(IOutBoxEventPublisher.CanHandleEventTypes), BindingFlags.InvokeMethod, null, null, null, CultureInfo.CurrentCulture);
var eventSource = (string)item.InvokeMember(nameof(IOutBoxEventPublisher.CanHandleEventSource), BindingFlags.InvokeMethod, null, null, null, CultureInfo.CurrentCulture);

foreach (var eventType in canHandlerEventTypes)
{
if (!_outboxEventHandlers.ContainsKey(eventType))
var key = eventSource + ":" + eventType;
if (!_outboxEventHandlers.ContainsKey(key))
{
_outboxEventHandlers[eventType] = new List<Type>();
_outboxEventHandlers[key] = new List<Type>();
}

_outboxEventHandlers[eventType].Add(item);
_outboxEventHandlers[key].Add(item);
}
}
}
Expand Down Expand Up @@ -97,7 +99,8 @@ await _serviceProvider.GetRequiredService<IMessageReceiver<TConsumer, T>>().Rece

public async Task SendAsync(PublishingOutBoxEvent outbox, CancellationToken cancellationToken = default)
{
var handlerTypes = _outboxEventHandlers.ContainsKey(outbox.EventType) ? _outboxEventHandlers[outbox.EventType] : null;
var key = outbox.EventSource + ":" + outbox.EventType;
var handlerTypes = _outboxEventHandlers.ContainsKey(key) ? _outboxEventHandlers[key] : null;

if (handlerTypes == null)
{
Expand Down

0 comments on commit b5c83a0

Please sign in to comment.