Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rename: Enqueue -> EnqueueAsync #512

Merged
merged 4 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Masa.Framework.sln
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Masa.BuildingBlocks.StackSd
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Masa.Contrib.StackSdks.Alert.Tests", "src\Contrib\StackSdks\Tests\Masa.Contrib.StackSdks.Alert.Tests\Masa.Contrib.StackSdks.Alert.Tests.csproj", "{1221F32F-7310-49FF-94DD-2BCF570E03F2}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Masa.Contrib.Dispatcher.Events.Tests.Scenes.IntegrationEvent", "src\Contrib\Dispatcher\Tests\Scenes\Masa.Contrib.Dispatcher.Events.Tests.Scenes.IntegrationEvent\Masa.Contrib.Dispatcher.Events.Tests.Scenes.IntegrationEvent.csproj", "{D1BC6F22-430A-4C5A-BDA8-5F8CC49D8E71}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -2523,6 +2525,14 @@ Global
{1221F32F-7310-49FF-94DD-2BCF570E03F2}.Release|Any CPU.Build.0 = Release|Any CPU
{1221F32F-7310-49FF-94DD-2BCF570E03F2}.Release|x64.ActiveCfg = Release|Any CPU
{1221F32F-7310-49FF-94DD-2BCF570E03F2}.Release|x64.Build.0 = Release|Any CPU
{D1BC6F22-430A-4C5A-BDA8-5F8CC49D8E71}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D1BC6F22-430A-4C5A-BDA8-5F8CC49D8E71}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D1BC6F22-430A-4C5A-BDA8-5F8CC49D8E71}.Debug|x64.ActiveCfg = Debug|Any CPU
{D1BC6F22-430A-4C5A-BDA8-5F8CC49D8E71}.Debug|x64.Build.0 = Debug|Any CPU
{D1BC6F22-430A-4C5A-BDA8-5F8CC49D8E71}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D1BC6F22-430A-4C5A-BDA8-5F8CC49D8E71}.Release|Any CPU.Build.0 = Release|Any CPU
{D1BC6F22-430A-4C5A-BDA8-5F8CC49D8E71}.Release|x64.ActiveCfg = Release|Any CPU
{D1BC6F22-430A-4C5A-BDA8-5F8CC49D8E71}.Release|x64.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -2868,6 +2878,7 @@ Global
{30A143AF-4E9D-457E-BDAE-DFBC5A262F4A} = {383995FF-B661-4E15-A830-640FC5BA8A1F}
{EEACDE24-9D4A-4F65-B13A-644E89A7918D} = {8A9DBB76-6618-4982-87D7-6CBD8375EB15}
{1221F32F-7310-49FF-94DD-2BCF570E03F2} = {EC7A08E9-3355-486B-BA30-41A1F8CAC5F5}
{D1BC6F22-430A-4C5A-BDA8-5F8CC49D8E71} = {08B138B5-2599-4F42-9584-6AE736673882}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {40383055-CC50-4600-AD9A-53C14F620D03}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@ namespace Masa.BuildingBlocks.Ddd.Domain.Events;

public interface IDomainEventBus : IEventBus
{
#pragma warning disable S1133
[Obsolete("Enqueue has expired, please use EnqueueAsync instead, it will be removed in 1.0")]
Task Enqueue<TDomainEvent>(TDomainEvent @event)
where TDomainEvent : IDomainEvent;
#pragma warning restore S1133

Task EnqueueAsync<TDomainEvent>(TDomainEvent @event)
where TDomainEvent : IDomainEvent;

Task PublishQueueAsync();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ bool IsAssignableFromDomainQuery(Type? type)
}

public Task Enqueue<TDomainEvent>(TDomainEvent @event) where TDomainEvent : IDomainEvent
=> EnqueueAsync(@event);

public Task EnqueueAsync<TDomainEvent>(TDomainEvent @event)
where TDomainEvent : IDomainEvent
{
_eventQueue.Enqueue(@event);
return Task.CompletedTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,14 @@ internal static IServiceCollection TryAddIntegrationEventBus(
serviceProvider => Microsoft.Extensions.Options.Options.Create(dispatcherOptions));

LocalQueueProcessor.SetLogger(services);
services.AddScoped<IIntegrationEventBus, IntegrationEventBus>();
services.AddScoped<IIntegrationEventBus>(serviceProvider => new IntegrationEventBus(
new Lazy<IEventBus?>(serviceProvider.GetService<IEventBus>),
serviceProvider.GetRequiredService<IPublisher>(),
serviceProvider.GetService<IIntegrationEventLogService>(),
serviceProvider.GetService<IOptionsMonitor<MasaAppConfigureOptions>>(),
serviceProvider.GetService<ILogger<IntegrationEventBus>>(),
serviceProvider.GetService<IUnitOfWork>()
));
action?.Invoke();

if (services.Any(d => d.ServiceType == typeof(IIntegrationEventLogService)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,29 @@ namespace Masa.Contrib.Dispatcher.IntegrationEvents;

public class IntegrationEventBus : IIntegrationEventBus
{
private readonly Lazy<IEventBus?> _lazyEventBus;

private IEventBus? EventBus => _lazyEventBus.Value;

private readonly IPublisher _publisher;
private readonly ILogger<IntegrationEventBus>? _logger;
private readonly IIntegrationEventLogService? _eventLogService;
private readonly IOptionsMonitor<MasaAppConfigureOptions>? _masaAppConfigureOptions;
private readonly IEventBus? _eventBus;
private readonly IUnitOfWork? _unitOfWork;

public IntegrationEventBus(IPublisher publisher,
public IntegrationEventBus(
Lazy<IEventBus?> eventBusLazy,
IPublisher publisher,
IIntegrationEventLogService? eventLogService = null,
IOptionsMonitor<MasaAppConfigureOptions>? masaAppConfigureOptions = null,
ILogger<IntegrationEventBus>? logger = null,
IEventBus? eventBus = null,
IUnitOfWork? unitOfWork = null)
{
_lazyEventBus = eventBusLazy;
_publisher = publisher;
_eventLogService = eventLogService;
_masaAppConfigureOptions = masaAppConfigureOptions;
_logger = logger;
_eventBus = eventBus;
_unitOfWork = unitOfWork;
}

Expand All @@ -34,9 +38,9 @@ public async Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancella
{
await PublishIntegrationAsync(integrationEvent, cancellationToken);
}
else if (_eventBus != null)
else if (EventBus != null)
{
await _eventBus.PublishAsync(@event, cancellationToken);
await EventBus.PublishAsync(@event, cancellationToken);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,15 @@ public void Initialize()
public void TestDispatcherOption()
{
var services = new ServiceCollection();
var options = new IntegrationEventOptions(services, new[] { typeof(IntegrationEventBusTest).Assembly });
var options = new IntegrationEventOptions(services, new[]
{
typeof(IntegrationEventBusTest).Assembly
});
Assert.IsTrue(options.Services.Equals(services));
var allEventTypes = new[] { typeof(IntegrationEventBusTest).Assembly }.SelectMany(assembly => assembly.GetTypes())
var allEventTypes = new[]
{
typeof(IntegrationEventBusTest).Assembly
}.SelectMany(assembly => assembly.GetTypes())
.Where(type => type.IsClass && type != typeof(IntegrationEvent) && typeof(IEvent).IsAssignableFrom(type)).ToList();
Assert.IsTrue(options.AllEventTypes.Count == allEventTypes.Count());
}
Expand All @@ -61,11 +67,11 @@ public void TestDispatcherOption()
public async Task TestPublishIntegrationEventAsync(bool useLogger)
{
var integrationEventBus = new IntegrationEventBus(
new Lazy<IEventBus?>(_eventBus.Object),
_publisher.Object,
_eventLog.Object,
_masaAppConfigureOptions.Object,
useLogger ? _logger.Object : null,
_eventBus.Object,
_uoW.Object);
RegisterUserIntegrationEvent @event = new RegisterUserIntegrationEvent()
{
Expand All @@ -90,11 +96,11 @@ public async Task TestPublishIntegrationEventAsync(bool useLogger)
public async Task TestPublishIntegrationEventAndNotUoWAsync(bool useLogger)
{
var integrationEventBus = new IntegrationEventBus(
new Lazy<IEventBus?>(_eventBus.Object),
_publisher.Object,
_eventLog.Object,
_masaAppConfigureOptions.Object,
useLogger ? _logger.Object : null,
_eventBus.Object);
useLogger ? _logger.Object : null);
RegisterUserIntegrationEvent @event = new RegisterUserIntegrationEvent()
{
Account = "masa",
Expand All @@ -119,11 +125,11 @@ public async Task TestNotUseTransactionAsync(bool useLogger)
{
_uoW.Setup(uoW => uoW.UseTransaction).Returns(false);
var integrationEventBus = new IntegrationEventBus(
new Lazy<IEventBus?>(_eventBus.Object),
_publisher.Object,
_eventLog.Object,
_masaAppConfigureOptions.Object,
useLogger ? _logger.Object : null,
_eventBus.Object,
_uoW.Object);
RegisterUserIntegrationEvent @event = new RegisterUserIntegrationEvent()
{
Expand All @@ -150,11 +156,11 @@ public async Task TestSaveEventFailedAsync(bool useLogger)
_eventLog.Setup(eventLog => eventLog.SaveEventAsync(It.IsAny<IIntegrationEvent>(), null!, default))
.Callback(() => throw new Exception("custom exception"));
var integrationEventBus = new IntegrationEventBus(
new Lazy<IEventBus?>(_eventBus.Object),
_publisher.Object,
_eventLog.Object,
_masaAppConfigureOptions.Object,
useLogger ? _logger.Object : null,
_eventBus.Object,
_uoW.Object);
RegisterUserIntegrationEvent @event = new RegisterUserIntegrationEvent()
{
Expand All @@ -169,11 +175,11 @@ public async Task TestPublishLocalEventAsync()
{
_eventBus.Setup(eventBus => eventBus.PublishAsync(It.IsAny<CreateUserEvent>(), default)).Verifiable();
var integrationEventBus = new IntegrationEventBus(
new Lazy<IEventBus?>(_eventBus.Object),
_publisher.Object,
_eventLog.Object,
_masaAppConfigureOptions.Object,
_logger.Object,
_eventBus.Object,
_uoW.Object);
CreateUserEvent @event = new CreateUserEvent()
{
Expand All @@ -188,31 +194,29 @@ public async Task TestPublishLocalEventAsync()
public async Task TestPublishEventAndNotEventBusAsync()
{
var integrationEventBus = new IntegrationEventBus(
new Lazy<IEventBus?>(_eventBus.Object),
_publisher.Object,
_eventLog.Object,
_masaAppConfigureOptions.Object,
_logger.Object,
null,
_uoW.Object);
CreateUserEvent @event = new CreateUserEvent()
var @event = new CreateUserEvent()
{
Name = "Tom"
};
await Assert.ThrowsExceptionAsync<NotSupportedException>(async () =>
{
await integrationEventBus.PublishAsync(@event);
});
await integrationEventBus.PublishAsync(@event);
_eventBus.Verify(bus=>bus.PublishAsync(It.IsAny<CreateUserEvent>(),It.IsAny<CancellationToken>()), Times.Once);
}

[TestMethod]
public async Task TestCommitAsync()
{
var integrationEventBus = new IntegrationEventBus(
new Lazy<IEventBus?>(_eventBus.Object),
_publisher.Object,
_eventLog.Object,
_masaAppConfigureOptions.Object,
_logger.Object,
_eventBus.Object,
_uoW.Object);

await integrationEventBus.CommitAsync(default);
Expand All @@ -223,11 +227,11 @@ public async Task TestCommitAsync()
public async Task TestNotUseUowCommitAsync()
{
var integrationEventBus = new IntegrationEventBus(
new Lazy<IEventBus?>(_eventBus.Object),
_publisher.Object,
_eventLog.Object,
_masaAppConfigureOptions.Object,
_logger.Object,
_eventBus.Object,
null);

await Assert.ThrowsExceptionAsync<ArgumentNullException>(async () => await integrationEventBus.CommitAsync());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public static IServiceCollection AddEventBus(

services.AddSingleton<EventBusProvider>();

services.TryAddEnumerable(new ServiceDescriptor(typeof(IEventMiddleware<>), typeof(TransactionEventMiddleware<>), ServiceLifetime.Transient));
services.TryAddEnumerable(new ServiceDescriptor(typeof(IEventMiddleware<>), typeof(TransactionEventMiddleware<>),
ServiceLifetime.Transient));

var builder = new EventBusBuilder(services);
eventBusBuilder?.Invoke(builder);
Expand All @@ -45,7 +46,11 @@ public static IServiceCollection AddEventBus(
services.TryAddSingleton<IExceptionStrategyProvider, DefaultExceptionStrategyProvider>();
services.TryAdd(typeof(IExecutionStrategy), typeof(ExecutionStrategy), ServiceLifetime.Singleton);
services.TryAddScoped<IInitializeServiceProvider, InitializeServiceProvider>();
services.AddScoped(typeof(IEventBus), typeof(EventBus));

services.AddScoped<ILocalEventBus, LocalEventBus>();
services.AddScoped<IEventBus>(serviceProvider => new EventBus(
serviceProvider.GetRequiredService<ILocalEventBus>(),
new Lazy<IIntegrationEventBus?>(serviceProvider.GetService<IIntegrationEventBus>())));
MasaApp.TrySetServiceCollection(services);
return services;
}
Expand Down Expand Up @@ -75,7 +80,7 @@ public static IServiceCollection AddTestEventBus(
services.TryAdd(typeof(IExecutionStrategy), typeof(ExecutionStrategy), ServiceLifetime.Singleton);
services.TryAddScoped<IInitializeServiceProvider, InitializeServiceProvider>();
services.AddTransient(typeof(IEventMiddleware<>), typeof(TransactionEventMiddleware<>));
services.AddScoped(typeof(IEventBus), typeof(EventBus));
services.AddScoped(typeof(IEventBus), typeof(LocalEventBus));

return services;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) MASA Stack All rights reserved.
// Licensed under the MIT License. See LICENSE.txt in the project root for license information.

[assembly: InternalsVisibleTo("Masa.Contrib.Dispatcher.Events.Tests.Scenes.IntegrationEvent")]

// ReSharper disable once CheckNamespace

namespace Masa.Contrib.Dispatcher.Events;

internal class EventBus : IEventBus
{
private readonly ILocalEventBus _localEventBus;

private readonly Lazy<IIntegrationEventBus?> _lazyIntegrationEventBus;
private IIntegrationEventBus? IntegrationEventBus => _lazyIntegrationEventBus.Value;

public EventBus(ILocalEventBus localEventBus, Lazy<IIntegrationEventBus?> integrationEventBusLazy)
{
_localEventBus = localEventBus;
_lazyIntegrationEventBus = integrationEventBusLazy;
}

public Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default) where TEvent : IEvent
{
if (@event is IIntegrationEvent _)
{
if (IntegrationEventBus == null)
throw new NotSupportedException("Integration events are not supported, please ensure integration events are registered");

return IntegrationEventBus.PublishAsync(@event, cancellationToken);
}

return _localEventBus.PublishAsync(@event, cancellationToken);
}

public Task CommitAsync(CancellationToken cancellationToken = default)
=> _localEventBus.CommitAsync(cancellationToken);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) MASA Stack All rights reserved.
// Licensed under the MIT License. See LICENSE.txt in the project root for license information.

[assembly: InternalsVisibleTo("Masa.Contrib.Dispatcher.Events.Tests.Scenes.IntegrationEvent")]
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]

// ReSharper disable once CheckNamespace

namespace Masa.Contrib.Dispatcher.Events;

internal interface ILocalEventBus : IEventBus
{

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace Masa.Contrib.Dispatcher.Events;

public class EventBus : IEventBus
public class LocalEventBus : ILocalEventBus
{
private readonly IServiceProvider _serviceProvider;

Expand All @@ -19,7 +19,7 @@ public class EventBus : IEventBus

private readonly IInitializeServiceProvider _initializeServiceProvider;

public EventBus(IServiceProvider serviceProvider,
public LocalEventBus(IServiceProvider serviceProvider,
IOptions<DispatcherOptions> options,
IInitializeServiceProvider initializeServiceProvider,
IUnitOfWork? unitOfWork = null)
Expand Down Expand Up @@ -61,11 +61,13 @@ public async Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancella
await middlewares.Reverse().Aggregate(eventHandlerDelegate, (next, middleware) => () => middleware.HandleAsync(@event, next))();
}

#pragma warning disable S3928
public async Task CommitAsync(CancellationToken cancellationToken = default)
{
if (_unitOfWork is null)
throw new ArgumentNullException("You need to UseUoW when adding services");
throw new ArgumentNullException(nameof(IUnitOfWork), "You need to UseUoW when adding services");

await _unitOfWork.CommitAsync(cancellationToken);
}
#pragma warning restore S3928
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@
global using Microsoft.Extensions.Options;
global using System.Linq.Expressions;
global using System.Reflection;
global using System.Runtime.CompilerServices;
global using System.Runtime.ExceptionServices;
Loading