Skip to content

Commit 93869bf

Browse files
committed
feat(IntegrationEventBus): Support does not specify local message table service
1 parent 7b0d3a5 commit 93869bf

File tree

7 files changed

+75
-9
lines changed

7 files changed

+75
-9
lines changed

src/Dispatcher/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EF/DispatcherOptionsExtensions.cs

+2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ public static IDispatcherOptions UseEventLog<TDbContext>(
2222

2323
options.Services.AddSingleton<EventLogProvider>();
2424

25+
options.Services.TryAddScoped<IIntegrationEventLogService, IntegrationEventLogService>();
26+
2527
//Add local message table model mapping
2628
options.Services.TryAddEnumerable(new ServiceDescriptor(typeof(IModelCreatingProvider),
2729
typeof(IntegrationEventLogModelCreatingProvider), ServiceLifetime.Singleton));

src/Dispatcher/Masa.Contrib.Dispatcher.IntegrationEvents/DispatcherOptionsExtensions.cs

+12-4
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,25 @@ namespace Masa.Contrib.Dispatcher.IntegrationEvents;
55

66
public static class DispatcherOptionsExtensions
77
{
8+
public static IDistributedDispatcherOptions UseIntegrationEventBus(
9+
this IDistributedDispatcherOptions dispatcherOptions,
10+
Action<DispatcherOptions>? optionAction = null)
11+
{
12+
ArgumentNullException.ThrowIfNull(dispatcherOptions.Services, nameof(dispatcherOptions.Services));
13+
14+
dispatcherOptions.Services.TryAddIntegrationEventBus(dispatcherOptions.Assemblies, option => optionAction?.Invoke(option));
15+
return dispatcherOptions;
16+
}
17+
818
public static IDistributedDispatcherOptions UseIntegrationEventBus<TIntegrationEventLogService>(
919
this IDistributedDispatcherOptions dispatcherOptions,
1020
Action<DispatcherOptions>? optionAction = null)
1121
where TIntegrationEventLogService : class, IIntegrationEventLogService
1222
{
1323
ArgumentNullException.ThrowIfNull(dispatcherOptions.Services, nameof(dispatcherOptions.Services));
1424

15-
dispatcherOptions.Services.TryAddIntegrationEventBus<TIntegrationEventLogService>(dispatcherOptions.Assemblies, option =>
16-
{
17-
optionAction?.Invoke(option);
18-
});
25+
dispatcherOptions.Services.TryAddIntegrationEventBus<TIntegrationEventLogService>(dispatcherOptions.Assemblies,
26+
option => optionAction?.Invoke(option));
1927
return dispatcherOptions;
2028
}
2129
}

src/Dispatcher/Masa.Contrib.Dispatcher.IntegrationEvents/IntegrationEventBus.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ public class IntegrationEventBus : IIntegrationEventBus
88
private readonly DispatcherOptions _dispatcherOptions;
99
private readonly IPublisher _publisher;
1010
private readonly ILogger<IntegrationEventBus>? _logger;
11-
private readonly IIntegrationEventLogService _eventLogService;
11+
private readonly IIntegrationEventLogService? _eventLogService;
1212
private readonly IOptionsMonitor<AppConfig>? _appConfig;
1313
private readonly IEventBus? _eventBus;
1414
private readonly IUnitOfWork? _unitOfWork;
1515

1616
public IntegrationEventBus(IOptions<DispatcherOptions> options,
1717
IPublisher publisher,
18-
IIntegrationEventLogService eventLogService,
18+
IIntegrationEventLogService? eventLogService = null,
1919
IOptionsMonitor<AppConfig>? appConfig = null,
2020
ILogger<IntegrationEventBus>? logger = null,
2121
IEventBus? eventBus = null,
@@ -59,7 +59,7 @@ private async Task PublishIntegrationAsync<TEvent>(TEvent @event)
5959
@event.UnitOfWork = _unitOfWork;
6060

6161
var topicName = @event.Topic;
62-
if (@event.UnitOfWork is { UseTransaction: true })
62+
if (@event.UnitOfWork is { UseTransaction: true } && _eventLogService != null)
6363
{
6464
try
6565
{

src/Dispatcher/Masa.Contrib.Dispatcher.IntegrationEvents/Servers/DefaultHostedService.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public DefaultHostedService(IServiceProvider serviceProvider, IEnumerable<IProce
1616

1717
public Task ExecuteAsync(CancellationToken stoppingToken)
1818
{
19-
if (_serviceProvider.GetService<IUnitOfWorkManager>() == null)
19+
if (_serviceProvider.GetService<IUnitOfWorkManager>() == null || _serviceProvider.GetService<IIntegrationEventLogService>() == null)
2020
return Task.CompletedTask;
2121

2222
var processorTasks = _processors.Select(processor => new InfiniteLoopProcessor(_serviceProvider, processor))

src/Dispatcher/Masa.Contrib.Dispatcher.IntegrationEvents/ServiceCollectionExtensions.cs

+24-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,17 @@ namespace Microsoft.Extensions.DependencyInjection;
55

66
public static class ServiceCollectionExtensions
77
{
8+
public static IServiceCollection AddIntegrationEventBus(
9+
this IServiceCollection services,
10+
Action<DispatcherOptions>? options = null)
11+
=> services.AddIntegrationEventBus(AppDomain.CurrentDomain.GetAssemblies(), options);
12+
13+
public static IServiceCollection AddIntegrationEventBus(
14+
this IServiceCollection services,
15+
Assembly[] assemblies,
16+
Action<DispatcherOptions>? options = null)
17+
=> services.TryAddIntegrationEventBus(assemblies, options);
18+
819
public static IServiceCollection AddIntegrationEventBus<TIntegrationEventLogService>(
920
this IServiceCollection services,
1021
Action<DispatcherOptions>? options = null)
@@ -23,6 +34,18 @@ internal static IServiceCollection TryAddIntegrationEventBus<TIntegrationEventLo
2334
Assembly[] assemblies,
2435
Action<DispatcherOptions>? options)
2536
where TIntegrationEventLogService : class, IIntegrationEventLogService
37+
{
38+
return services.TryAddIntegrationEventBus(assemblies, options, () =>
39+
{
40+
services.AddScoped<IIntegrationEventLogService, TIntegrationEventLogService>();
41+
});
42+
}
43+
44+
internal static IServiceCollection TryAddIntegrationEventBus(
45+
this IServiceCollection services,
46+
Assembly[] assemblies,
47+
Action<DispatcherOptions>? options,
48+
Action? action = null)
2649
{
2750
if (services.Any(service => service.ImplementationType == typeof(IntegrationEventBusProvider)))
2851
return services;
@@ -37,7 +60,7 @@ internal static IServiceCollection TryAddIntegrationEventBus<TIntegrationEventLo
3760

3861
LocalQueueProcessor.SetLogger(services);
3962
services.AddScoped<IIntegrationEventBus, IntegrationEventBus>();
40-
services.AddScoped<IIntegrationEventLogService, TIntegrationEventLogService>();
63+
action?.Invoke();
4164
services.AddSingleton<IProcessor, RetryByDataProcessor>();
4265
services.AddSingleton<IProcessor, RetryByLocalQueueProcessor>();
4366
services.AddSingleton<IProcessor, DeletePublishedExpireEventProcessor>();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright (c) MASA Stack All rights reserved.
2+
// Licensed under the MIT License. See LICENSE.txt in the project root for license information.
3+
4+
namespace Masa.Contrib.Dispatcher.Tests.Application.Events;
5+
6+
public record AddGoodsIntegrationEvent : IntegrationEvent
7+
{
8+
public Guid Id { get; set; }
9+
10+
public string Name { get; set; }
11+
12+
public int Count { get; set; }
13+
14+
public decimal Price { get; set; }
15+
16+
public override string Topic { get; set; } = nameof(AddGoodsIntegrationEvent);
17+
}

test/Masa.Contrib.Dispatcher.Tests/TestDispatcher.cs

+16
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,22 @@ await dbContext.Set<User>().AddAsync(new User()
6363

6464
Assert.IsTrue(RecordMiddleware<UserAgeQuery>.Time == 1);
6565
Assert.IsTrue(RecordMiddleware<CheckUserQuery>.Time == 0);
66+
}
67+
68+
[TestMethod]
69+
public async Task TestIntegrationEventAndNotUseEventLogServiceReturnNoError()
70+
{
71+
var services = new ServiceCollection();
72+
services.AddIntegrationEventBus(option => option.UseTestPub().UseEventBus());
73+
var serviceProvider = services.BuildServiceProvider();
6674

75+
var integrationEventBus = serviceProvider.GetRequiredService<IIntegrationEventBus>();
76+
await integrationEventBus.PublishAsync(new AddGoodsIntegrationEvent()
77+
{
78+
Name = "Apple",
79+
Count = 1,
80+
Id = Guid.NewGuid(),
81+
Price = 9.9m,
82+
});
6783
}
6884
}

0 commit comments

Comments
 (0)