Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: move domain event fanout from app to bus #1406

Merged
merged 10 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using Digdir.Domain.Dialogporten.Domain.Common.EventPublisher;
using MediatR;

namespace Digdir.Domain.Dialogporten.Application.Common;

public static class ApplicationEventHandlerUtils
{
public static HandlerEventMapping[] GetHandlerEventMaps()
{
var openNotificationHandler = typeof(INotificationHandler<>);
var domainEventType = typeof(IDomainEvent);
return ApplicationAssemblyMarker.Assembly.DefinedTypes
.Where(x => x is { IsInterface: false, IsAbstract: false })
.SelectMany(x => x
.GetInterfaces()
.Where(@interface =>
@interface.IsGenericType
&& @interface.GetGenericTypeDefinition() == openNotificationHandler
&& @interface.GetGenericArguments().Single().IsAssignableTo(domainEventType))
.Select(@interface => (@class: x, @event: @interface.GetGenericArguments().Single()))
.Select(x => new HandlerEventMapping(x.@class.AsType(), x.@event, EndpointNameAttribute.GetName(x.@class, x.@event))))
.ToArray();
}
}

public record struct HandlerEventMapping(Type HandlerType, Type EventType, string EndpointName);
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System.Reflection;
using Digdir.Domain.Dialogporten.Domain.Common.EventPublisher;
using MediatR;

namespace Digdir.Domain.Dialogporten.Application.Common;

/// <summary>
/// Attribute to specify which endpoint name MassTransit will use when wrapping <see cref="INotificationHandler{T}.Handle(T, CancellationToken)"/>.
/// </summary>
/// <remarks>
/// <list type="bullet">
/// <item>Will default to "{handlerType.Name}_{eventType.Name}" if not specified.</item>
/// <item>MassTransit will only wrap <see cref="INotificationHandler{TNotification}"/> where TNotification implements <see cref="IDomainEvent"/>.</item>
/// </list>
/// </remarks>
[AttributeUsage(AttributeTargets.Method)]
internal sealed class EndpointNameAttribute : Attribute
{
public string Name { get; }

public EndpointNameAttribute(string name)
{
if (string.IsNullOrWhiteSpace(name))
{
throw new ArgumentException("Value cannot be null or whitespace.", nameof(name));
}

Name = name;
}

public static string GetName(Type handlerType, Type eventType)
=> handlerType
.GetInterfaceMap(typeof(INotificationHandler<>).MakeGenericType(eventType))
.TargetMethods.Single()
.GetCustomAttribute<EndpointNameAttribute>()?
.Name ?? DefaultName(handlerType, eventType);
MagnusSandgren marked this conversation as resolved.
Show resolved Hide resolved

private static string DefaultName(Type handlerType, Type eventType)
=> $"{handlerType.Name}_{eventType.Name}";
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Digdir.Domain.Dialogporten.Application.Common;
using Digdir.Domain.Dialogporten.Application.Externals;
using Digdir.Domain.Dialogporten.Domain.Dialogs.Events.Activities;
using MediatR;
Expand All @@ -11,6 +12,7 @@ internal sealed class DialogActivityEventToAltinnForwarder : DomainEventToAltinn
public DialogActivityEventToAltinnForwarder(ICloudEventBus cloudEventBus, IOptions<ApplicationSettings> settings)
: base(cloudEventBus, settings) { }

[EndpointName("DialogEventToAltinnForwarder_DialogActivityCreatedDomainEvent")]
public async Task Handle(DialogActivityCreatedDomainEvent domainEvent, CancellationToken cancellationToken)
{
var cloudEvent = new CloudEvent
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Digdir.Domain.Dialogporten.Application.Common;
using Digdir.Domain.Dialogporten.Application.Externals;
using Digdir.Domain.Dialogporten.Domain.Dialogs.Events;
using MediatR;
Expand All @@ -14,6 +15,7 @@ internal sealed class DialogEventToAltinnForwarder : DomainEventToAltinnForwarde
public DialogEventToAltinnForwarder(ICloudEventBus cloudEventBus, IOptions<ApplicationSettings> settings)
: base(cloudEventBus, settings) { }

[EndpointName("DialogEventToAltinnForwarder_DialogCreatedDomainEvent")]
public async Task Handle(DialogCreatedDomainEvent domainEvent, CancellationToken cancellationToken)
{
var cloudEvent = new CloudEvent
Expand All @@ -30,6 +32,7 @@ public async Task Handle(DialogCreatedDomainEvent domainEvent, CancellationToken
await CloudEventBus.Publish(cloudEvent, cancellationToken);
}

[EndpointName("DialogEventToAltinnForwarder_DialogUpdatedDomainEvent")]
public async Task Handle(DialogUpdatedDomainEvent domainEvent, CancellationToken cancellationToken)
{
var cloudEvent = new CloudEvent
Expand All @@ -47,6 +50,7 @@ public async Task Handle(DialogUpdatedDomainEvent domainEvent, CancellationToken
await CloudEventBus.Publish(cloudEvent, cancellationToken);
}

[EndpointName("DialogEventToAltinnForwarder_DialogSeenDomainEvent")]
public async Task Handle(DialogSeenDomainEvent domainEvent, CancellationToken cancellationToken)
{
var cloudEvent = new CloudEvent
Expand All @@ -64,6 +68,7 @@ public async Task Handle(DialogSeenDomainEvent domainEvent, CancellationToken ca
await CloudEventBus.Publish(cloudEvent, cancellationToken);
}

[EndpointName("DialogEventToAltinnForwarder_DialogDeletedDomainEvent")]
public async Task Handle(DialogDeletedDomainEvent domainEvent, CancellationToken cancellationToken)
{
var cloudEvent = new CloudEvent
Expand Down
3 changes: 1 addition & 2 deletions src/Digdir.Domain.Dialogporten.Domain/DomainExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ namespace Digdir.Domain.Dialogporten.Domain;
public static class DomainExtensions
{
public static IEnumerable<Type> GetDomainEventTypes()
=> DomainAssemblyMarker.Assembly
.GetTypes()
=> DomainAssemblyMarker.Assembly.DefinedTypes
.Where(x => !x.IsAbstract && !x.IsInterface && !x.IsGenericType)
.Where(x => x.IsAssignableTo(typeof(IDomainEvent)));
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ public async Task<INotificationProcessingContext> CreateContext(
bool isFirstAttempt = false,
CancellationToken cancellationToken = default)
{
var transaction = GetOrAddContext(domainEvent.EventId);
var context = GetOrAddContext(domainEvent.EventId);
try
{
await transaction.Initialize(isFirstAttempt, cancellationToken);
return transaction;
await context.Initialize(isFirstAttempt, cancellationToken);
return context;
}
catch (Exception)
{
Expand All @@ -63,24 +63,32 @@ public void Dispose()

private NotificationProcessingContext GetOrAddContext(Guid eventId)
{
// We keep a strong reference to the context while it's being
// created to avoid it being garbage collected prematurely
// before we can extract it from the weak reference.
NotificationProcessingContext? context;
var weakContext = _contextByEventId.AddOrUpdate(eventId,
addValueFactory: eventId => new(new(_serviceScopeFactory, eventId, onDispose: RemoveContext)),
addValueFactory: eventId => new(context = new(_serviceScopeFactory, eventId, onDispose: RemoveContext)),
// Should the context, for whatever reason, be garbage collected or
// disposed but still remain in the dictionary, we should recreate it.
updateValueFactory: (eventId, old) => TryGetLiveContext(old, out _) ? old
: new(new(_serviceScopeFactory, eventId, onDispose: RemoveContext)));
updateValueFactory: (eventId, old) => TryGetLiveContext(old, out context) ? old
: new(context = new(_serviceScopeFactory, eventId, onDispose: RemoveContext)));

return TryGetLiveContext(weakContext, out var context) ? context
// Although we have a strong reference to __a__ context, it may
// not be __the__ context in a multithreaded scenario. We
// know that the actual context is in the week reference, so
// we extract it before returning.
return TryGetLiveContext(weakContext, out context) ? context
: throw new UnreachableException("The context should be alive at this point in time.");
}

private void RemoveContext(Guid eventId) => _contextByEventId.TryRemove(eventId, out _);

private async Task ContextHousekeeping()
{
try
while (await WaitForNextTickSafeAsync())
{
while (await _cleanupTimer.WaitForNextTickAsync(_cleanupCts.Token))
try
{
foreach (var key in _contextByEventId.Keys)
{
Expand All @@ -91,14 +99,26 @@ private async Task ContextHousekeeping()
}
}
}
catch (OperationCanceledException)
{
// Ignore
}
catch (Exception e)
{
_logger.LogWarning(e, "An unhandled exception occurred in the notification processing context cleanup task. This may lead to memory leaks.");
}
}
catch (OperationCanceledException)
}

private async ValueTask<bool> WaitForNextTickSafeAsync()
{
try
{
// Ignore
return await _cleanupTimer.WaitForNextTickAsync(_cleanupCts.Token);
}
catch (Exception e)
catch (OperationCanceledException)
{
_logger.LogWarning(e, "An unhandled exception occurred in the notification processing context cleanup task. This may lead to memory leaks.");
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Digdir.Domain.Dialogporten.Application.Common;
using Digdir.Domain.Dialogporten.Service.Consumers;

namespace Digdir.Domain.Dialogporten.Service.Common;

internal static class MassTransitApplicationUtils
{
public static ApplicationConsumerMapping[] GetApplicationConsumerMaps()
{
var openDomainEventConsumer = typeof(DomainEventConsumer<,>);
var openDomainEventConsumerDefinition = typeof(DomainEventConsumerDefinition<,>);
return ApplicationEventHandlerUtils
.GetHandlerEventMaps()
.Select(x => new ApplicationConsumerMapping(
AppConsumerType: x.HandlerType,
BusConsumerType: openDomainEventConsumer.MakeGenericType(x.HandlerType, x.EventType),
BusDefinitionType: openDomainEventConsumerDefinition.MakeGenericType(x.HandlerType, x.EventType),
EndpointName: x.EndpointName))
.ToArray();
}
}

internal record struct ApplicationConsumerMapping(Type AppConsumerType, Type BusConsumerType, Type BusDefinitionType, string EndpointName);
Original file line number Diff line number Diff line change
@@ -1,41 +1,24 @@
using Digdir.Domain.Dialogporten.Domain.Common.EventPublisher;
using Digdir.Domain.Dialogporten.Infrastructure.Persistence.IdempotentNotifications;
using MassTransit;
using MediatR;

namespace Digdir.Domain.Dialogporten.Service.Consumers;

public sealed class DomainEventConsumer<T> : IConsumer<T>
where T : class, IDomainEvent
public sealed class DomainEventConsumer<THandler, TEvent>(THandler handler) : IConsumer<TEvent>
where THandler : INotificationHandler<TEvent>
where TEvent : class, IDomainEvent
{
private readonly IPublisher _publisher;
private readonly INotificationProcessingContextFactory _notificationProcessingContextFactory;

public DomainEventConsumer(IPublisher publisher, INotificationProcessingContextFactory notificationProcessingContextFactory)
{
_publisher = publisher ?? throw new ArgumentNullException(nameof(publisher));
_notificationProcessingContextFactory = notificationProcessingContextFactory ?? throw new ArgumentNullException(nameof(notificationProcessingContextFactory));
}

public async Task Consume(ConsumeContext<T> context)
{
var isFirstAttempt = IsFirstAttempt(context);
await using var notificationContext = await _notificationProcessingContextFactory
.CreateContext(context.Message, isFirstAttempt, context.CancellationToken);
await _publisher.Publish(context.Message, context.CancellationToken);
await notificationContext.Ack(context.CancellationToken);
}

private static bool IsFirstAttempt(ConsumeContext<T> context)
=> (context.GetRetryAttempt() + context.GetRedeliveryCount()) == 0;
private readonly THandler _handler = handler ?? throw new ArgumentNullException(nameof(handler));
public Task Consume(ConsumeContext<TEvent> context) => _handler.Handle(context.Message, context.CancellationToken);
}

public sealed class DomainEventConsumerDefinition<T> : ConsumerDefinition<DomainEventConsumer<T>>
where T : class, IDomainEvent
public sealed class DomainEventConsumerDefinition<THandler, TEvent> : ConsumerDefinition<DomainEventConsumer<THandler, TEvent>>
where THandler : INotificationHandler<TEvent>
where TEvent : class, IDomainEvent
{
protected override void ConfigureConsumer(
IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<DomainEventConsumer<T>> consumerConfigurator,
IConsumerConfigurator<DomainEventConsumer<THandler, TEvent>> consumerConfigurator,
IRegistrationContext context)
{
endpointConfigurator.UseDelayedRedelivery(r => r.Intervals(
Expand Down
23 changes: 6 additions & 17 deletions src/Digdir.Domain.Dialogporten.Service/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
using Microsoft.ApplicationInsights.Extensibility;
using Serilog;
using Digdir.Domain.Dialogporten.Application.Externals.Presentation;
using Digdir.Domain.Dialogporten.Domain;
using Digdir.Domain.Dialogporten.Service;
using Digdir.Domain.Dialogporten.Service.Consumers;
using Digdir.Domain.Dialogporten.Service.Common;
using Digdir.Library.Utils.AspNet;
using MassTransit;
using Microsoft.Extensions.DependencyInjection.Extensions;

// Using two-stage initialization to catch startup errors.
var telemetryConfiguration = TelemetryConfiguration.CreateDefault();
Expand Down Expand Up @@ -49,19 +49,6 @@ static void BuildAndRun(string[] args, TelemetryConfiguration telemetryConfigura
.AddAzureConfiguration(builder.Environment.EnvironmentName)
.AddLocalConfiguration(builder.Environment);

// Generic consumers are not registered through MassTransits assembly
// scanning, so we need to create domain event handlers for all
// domain events and register them manually
var openDomainEventConsumer = typeof(DomainEventConsumer<>);
var openDomainEventConsumerDefinition = typeof(DomainEventConsumerDefinition<>);
var domainEventConsumers = DomainExtensions.GetDomainEventTypes()
.Select(x =>
(
consumerType: openDomainEventConsumer.MakeGenericType(x),
definitionType: openDomainEventConsumerDefinition.MakeGenericType(x))
)
.ToArray();

builder.ConfigureTelemetry();

builder.Services
Expand All @@ -71,9 +58,11 @@ static void BuildAndRun(string[] args, TelemetryConfiguration telemetryConfigura
.WithPubSubCapabilities<ServiceAssemblyMarker>()
.AndBusConfiguration(x =>
{
foreach (var (consumer, definition) in domainEventConsumers)
foreach (var map in MassTransitApplicationUtils.GetApplicationConsumerMaps())
{
x.AddConsumer(consumer, definition);
x.TryAddTransient(map.AppConsumerType);
x.AddConsumer(map.BusConsumerType, map.BusDefinitionType)
.Endpoint(x => x.Name = map.EndpointName);
}
})
.Build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
</PackageReference>
<PackageReference Include="FluentAssertions" Version="6.12.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageReference Include="Verify.Xunit" Version="28.1.3" />
<PackageReference Include="NSubstitute" Version="5.3.0"/>
<PackageReference Include="xunit" Version="2.9.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.8.2">
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[
DialogEventToAltinnForwarder_DialogActivityCreatedDomainEvent,
DialogEventToAltinnForwarder_DialogCreatedDomainEvent,
DialogEventToAltinnForwarder_DialogDeletedDomainEvent,
DialogEventToAltinnForwarder_DialogSeenDomainEvent,
DialogEventToAltinnForwarder_DialogUpdatedDomainEvent
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[
Digdir.Domain.Dialogporten.Domain.Dialogs.Events.Activities.DialogActivityCreatedDomainEvent,
Digdir.Domain.Dialogporten.Domain.Dialogs.Events.DialogCreatedDomainEvent,
Digdir.Domain.Dialogporten.Domain.Dialogs.Events.DialogDeletedDomainEvent,
Digdir.Domain.Dialogporten.Domain.Dialogs.Events.DialogSeenDomainEvent,
Digdir.Domain.Dialogporten.Domain.Dialogs.Events.DialogUpdatedDomainEvent
]
Loading
Loading