-
Notifications
You must be signed in to change notification settings - Fork 116
/
Copy pathDomainEventBus.cs
78 lines (66 loc) · 2.43 KB
/
DomainEventBus.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// Copyright (c) MASA Stack All rights reserved.
// Licensed under the MIT License. See LICENSE.txt in the project root for license information.
namespace Masa.Contrib.Ddd.Domain;
public class DomainEventBus : IDomainEventBus
{
private readonly IEventBus _eventBus;
private readonly IIntegrationEventBus _integrationEventBus;
private readonly IUnitOfWork _unitOfWork;
private readonly ConcurrentQueue<IDomainEvent> _eventQueue = new();
public DomainEventBus(
IEventBus eventBus,
IIntegrationEventBus integrationEventBus,
IUnitOfWork unitOfWork)
{
_eventBus = eventBus;
_integrationEventBus = integrationEventBus;
_unitOfWork = unitOfWork;
}
public async Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default) where TEvent : IEvent
{
if (@event is IDomainEvent domainEvent && !IsAssignableFromDomainQuery(@event.GetType()))
{
domainEvent.UnitOfWork = _unitOfWork;
}
if (@event is IIntegrationEvent integrationEvent)
{
integrationEvent.UnitOfWork ??= _unitOfWork;
await _integrationEventBus.PublishAsync(integrationEvent, cancellationToken);
}
else
{
await _eventBus.PublishAsync(@event, cancellationToken);
}
bool IsAssignableFromDomainQuery(Type? type)
{
if (type == null)
return false;
if (!type.IsGenericType)
{
return IsAssignableFromDomainQuery(type.BaseType);
}
return type.GetInterfaces().Any(type => type.GetGenericTypeDefinition() == typeof(IDomainQuery<>));
}
}
public Task Enqueue<TDomainEvent>(TDomainEvent @event) where TDomainEvent : IDomainEvent
=> EnqueueAsync(@event);
public Task EnqueueAsync<TDomainEvent>(TDomainEvent @event)
where TDomainEvent : IDomainEvent
{
_eventQueue.Enqueue(@event);
return Task.CompletedTask;
}
public async Task PublishQueueAsync()
{
while (_eventQueue.TryDequeue(out IDomainEvent? @event))
{
await PublishAsync(@event);
}
}
public Task<bool> AnyQueueAsync()
{
return Task.FromResult(_eventQueue.Count > 0);
}
public async Task CommitAsync(CancellationToken cancellationToken = default)
=> await _unitOfWork.CommitAsync(cancellationToken);
}