Skip to content

feat: subscribe integration events by event handlers #30

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;

/// <summary>
/// The empty interface as a generic type constraint
/// </summary>
public interface IEventBusHandler
{
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
using MediatR;
using MediatR;

namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;

/// <summary>
/// 集成事件处理器。
/// </summary>
/// <typeparam name="TEvent">集成事件。</typeparam>
public interface IIntegrationEventHandler<TEvent> : INotificationHandler<TEvent>
public interface IIntegrationEventHandler<TEvent> : INotificationHandler<TEvent>, IEventBusHandler
where TEvent : IntegrationEvent
{
}
}
120 changes: 91 additions & 29 deletions src/Cnblogs.Architecture.Ddd.EventBus.Dapr/EndPointExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Reflection;
using System.Reflection;
using Cnblogs.Architecture.Ddd.EventBus.Abstractions;
using Cnblogs.Architecture.Ddd.EventBus.Dapr;
using MediatR;
using Microsoft.AspNetCore.Routing;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
Expand All @@ -19,20 +20,11 @@ public static class EndPointExtensions
/// <param name="builder"><see cref="IEndpointRouteBuilder"/></param>
/// <typeparam name="TEvent">事件类型。</typeparam>
/// <returns><see cref="IEndpointConventionBuilder"/></returns>
public static IEndpointConventionBuilder Subscribe<TEvent>(this IEndpointRouteBuilder builder)
public static IEndpointRouteBuilder Subscribe<TEvent>(this IEndpointRouteBuilder builder)
where TEvent : IntegrationEvent
{
var attr = typeof(TEvent).Assembly
.GetCustomAttributes(typeof(AssemblyAppNameAttribute), false)
.Cast<AssemblyAppNameAttribute>()
.FirstOrDefault();
if (attr is null || string.IsNullOrEmpty(attr.Name))
{
throw new InvalidOperationException(
$"No AppName was configured in assembly for event: {typeof(TEvent).Name}, either use Subscribe<TEvent>(string appName) method to set AppName manually or add [assembly:AssemblyAppName()] to the Assembly that {typeof(TEvent).Name} belongs to");
}

return builder.Subscribe<TEvent>(attr.Name);
var appName = typeof(TEvent).Assembly.GetAppName();
return builder.Subscribe<TEvent>(appName);
}

/// <summary>
Expand All @@ -42,7 +34,7 @@ public static IEndpointConventionBuilder Subscribe<TEvent>(this IEndpointRouteBu
/// <param name="appName">事件隶属名称。</param>
/// <typeparam name="TEvent">事件类型。</typeparam>
/// <returns></returns>
public static IEndpointConventionBuilder Subscribe<TEvent>(this IEndpointRouteBuilder builder, string appName)
public static IEndpointRouteBuilder Subscribe<TEvent>(this IEndpointRouteBuilder builder, string appName)
where TEvent : IntegrationEvent
{
var eventName = typeof(TEvent).Name;
Expand All @@ -57,7 +49,7 @@ public static IEndpointConventionBuilder Subscribe<TEvent>(this IEndpointRouteBu
/// <param name="appName">应用名称。</param>
/// <typeparam name="TEvent">事件类型。</typeparam>
/// <returns></returns>
public static IEndpointConventionBuilder Subscribe<TEvent>(
public static IEndpointRouteBuilder Subscribe<TEvent>(
this IEndpointRouteBuilder builder,
string route,
string appName)
Expand All @@ -68,36 +60,78 @@ public static IEndpointConventionBuilder Subscribe<TEvent>(
var result = builder
.MapPost(route, (TEvent receivedEvent, IEventBus eventBus) => eventBus.ReceiveAsync(receivedEvent))
.WithTopic(DaprOptions.PubSubName, DaprUtils.GetDaprTopicName<TEvent>(appName));
return result;

return builder;
}

/// <summary>
/// 订阅 Assembly 中的全部事件。
/// </summary>
/// <param name="builder"><see cref="IEndpointRouteBuilder"/></param>
/// <param name="assemblies"><see cref="Assembly"/></param>
public static void Subscribe(this IEndpointRouteBuilder builder, params Assembly[] assemblies)
public static IEndpointRouteBuilder Subscribe(this IEndpointRouteBuilder builder, params Assembly[] assemblies)
{
builder.EnsureDaprEventBus();

var method = typeof(EndPointExtensions).GetMethod(
nameof(Subscribe),
new[] { typeof(IEndpointRouteBuilder), typeof(string) })!;
var method = GetSubscribeMethod();

foreach (var assembly in assemblies)
{
var events = assembly.GetTypes().Where(x => x.IsSubclassOf(typeof(IntegrationEvent))).ToList();
var attr = assembly
.GetCustomAttributes(typeof(AssemblyAppNameAttribute), false)
.Cast<AssemblyAppNameAttribute>()
.FirstOrDefault();
if (attr is null || string.IsNullOrEmpty(attr.Name))
var appName = assembly.GetAppName();
events.ForEach(e => method.InvokeSubscribe(e, builder, appName));
}

return builder;
}

/// <summary>
/// Subscribes integration events that the TEventHandler implements
/// </summary>
/// <typeparam name="TEventHandler">The integration event handler that implements <![CDATA[IIntegrationEventHandler<TEvent>]]></typeparam>
/// <param name="builder"><see cref="IEndpointRouteBuilder"/></param>
public static IEndpointRouteBuilder SubscribeByEventHandler<TEventHandler>(this IEndpointRouteBuilder builder)
where TEventHandler : IEventBusHandler
{
return builder.SubscribeByEventHandler(typeof(TEventHandler));
}

/// <summary>
/// Subscribes integration events that event handlers implement in assemblies
/// </summary>
/// <param name="builder"><see cref="IEndpointRouteBuilder"/></param>
/// <param name="assemblies">assemblies that event handlers reside</param>
/// <returns></returns>
public static IEndpointRouteBuilder SubscribeByEventHandler(this IEndpointRouteBuilder builder, params Assembly[] assemblies)
{
foreach (var assembly in assemblies)
{
foreach (Type type in assembly.GetTypes())
{
throw new InvalidOperationException(
$"No AppName was configured in assembly: {assembly.FullName}, either use Subscribe<TEvent>(string appName) method to set AppName manually or add [assembly:AssemblyAppName()] to the Assembly");
builder.SubscribeByEventHandler(type);
}
}

return builder;
}

events.ForEach(e => method.MakeGenericMethod(e).Invoke(null, new object[] { builder, attr.Name }));
private static IEndpointRouteBuilder SubscribeByEventHandler(this IEndpointRouteBuilder builder, Type type)
{
var interfaces = type.GetInterfaces()
.Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IIntegrationEventHandler<>));

foreach (var handlerInterface in interfaces)
{
var eventType = handlerInterface.GetGenericArguments().FirstOrDefault();
if (eventType != null)
{
var assembly = eventType.Assembly;
var appName = assembly.GetAppName();
GetSubscribeMethod().InvokeSubscribe(eventType, builder, appName);
}
}

return builder;
}

private static void EnsureEventBusRegistered(this IEndpointRouteBuilder builder, DaprOptions daprOptions)
Expand Down Expand Up @@ -142,4 +176,32 @@ private static void EnsureDaprEventBus(this IEndpointRouteBuilder builder)
builder.EnsureDaprSubscribeHandlerMapped(options);
builder.EnsureEventBusRegistered(options);
}
}

private static MethodInfo GetSubscribeMethod()
{
return typeof(EndPointExtensions).GetMethod(
nameof(Subscribe),
new[] { typeof(IEndpointRouteBuilder), typeof(string) })!;
}

private static void InvokeSubscribe(this MethodInfo method, Type eventType, IEndpointRouteBuilder builder, string appName)
{
method.MakeGenericMethod(eventType).Invoke(null, new object[] { builder, appName });
}

private static string GetAppName(this Assembly assembly)
{
var appName = assembly
.GetCustomAttributes(typeof(AssemblyAppNameAttribute), false)
.Cast<AssemblyAppNameAttribute>()
.FirstOrDefault()?.Name;

if (string.IsNullOrEmpty(appName))
{
throw new InvalidOperationException(
$"No AppName was configured in assembly: {assembly.FullName}, either use Subscribe<TEvent>(string appName) method to set AppName manually or add [assembly:AssemblyAppName()] to the Assembly");
}

return appName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

namespace Cnblogs.Architecture.IntegrationTestProject.EventHandlers;

public class TestIntegrationEventHandler : IIntegrationEventHandler<TestIntegrationEvent>
public class TestIntegrationEventHandler : IIntegrationEventHandler<TestIntegrationEvent>,
IIntegrationEventHandler<BlogPostCreatedIntegrationEvent>
{
private readonly ILogger _logger;

Expand All @@ -19,4 +20,11 @@ public Task Handle(TestIntegrationEvent notification, CancellationToken cancella

return Task.CompletedTask;
}

public Task Handle(BlogPostCreatedIntegrationEvent notification, CancellationToken cancellationToken)
{
_logger.LogInformation(LogTemplates.HandledIntegratonEvent, notification);

return Task.CompletedTask;
}
}
24 changes: 19 additions & 5 deletions test/Cnblogs.Architecture.IntegrationTests/DaprTests.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Diagnostics;
using System.Net;
using Cnblogs.Architecture.IntegrationTestProject.EventHandlers;
using Cnblogs.Architecture.TestIntegrationEvents;
using FluentAssertions;
using Microsoft.AspNetCore.Builder;
Expand All @@ -10,16 +11,29 @@ namespace Cnblogs.Architecture.IntegrationTests;

public class DaprTests
{
[Fact]
public async Task Dapr_SubscribeEndpoint_OkAsync()
[Theory]
[InlineData(SubscribeType.ByEvent)]
[InlineData(SubscribeType.ByEventAssemblies)]
[InlineData(SubscribeType.ByEventHandler)]
[InlineData(SubscribeType.ByEventHandlerAssemblies)]
public async Task Dapr_SubscribeEndpoint_OkAsync(SubscribeType subscribeType)
{
// Arrange
var builder = WebApplication.CreateBuilder();
builder.Services.AddDaprEventBus(nameof(DaprTests));
builder.WebHost.UseTestServer();

var app = builder.Build();
app.Subscribe<TestIntegrationEvent>();
using var app = builder.Build();

_ = subscribeType switch
{
SubscribeType.ByEvent => app.Subscribe<TestIntegrationEvent>().Subscribe<BlogPostCreatedIntegrationEvent>(),
SubscribeType.ByEventAssemblies => app.Subscribe(typeof(TestIntegrationEvent).Assembly),
SubscribeType.ByEventHandler => app.SubscribeByEventHandler<TestIntegrationEventHandler>(),
SubscribeType.ByEventHandlerAssemblies => app.SubscribeByEventHandler(typeof(TestIntegrationEventHandler).Assembly),
_ => app
};

await app.StartAsync();
var httpClient = app.GetTestClient();

Expand All @@ -29,8 +43,8 @@ public async Task Dapr_SubscribeEndpoint_OkAsync()
// Assert
response.Should().BeSuccessful();
var responseText = await response.Content.ReadAsStringAsync();
Debug.WriteLine(responseText);
responseText.Should().Contain(nameof(TestIntegrationEvent));
responseText.Should().Contain(nameof(BlogPostCreatedIntegrationEvent));
}

[Fact]
Expand Down
10 changes: 10 additions & 0 deletions test/Cnblogs.Architecture.IntegrationTests/SubscribeType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace Cnblogs.Architecture.IntegrationTests;

public enum SubscribeType
{
None,
ByEvent,
ByEventAssemblies,
ByEventHandler,
ByEventHandlerAssemblies,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
using Cnblogs.Architecture.Ddd.EventBus.Abstractions;

namespace Cnblogs.Architecture.TestIntegrationEvents;

public record BlogPostCreatedIntegrationEvent(Guid Id, DateTimeOffset CreatedTime, string Title) : IntegrationEvent(Id, CreatedTime);