Skip to content

Commit 2adb30d

Browse files
authoredMay 18, 2023
fix: fix issues-594, Event bus and unit of work are optional (#618)
1 parent b11b77a commit 2adb30d

File tree

2 files changed

+84
-7
lines changed

2 files changed

+84
-7
lines changed
 

‎src/Contrib/Ddd/Domain/Masa.Contrib.Ddd.Domain/DomainEventBus.cs

+15-7
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,16 @@ namespace Masa.Contrib.Ddd.Domain;
55

66
public class DomainEventBus : IDomainEventBus
77
{
8-
private readonly IEventBus _eventBus;
9-
private readonly IIntegrationEventBus _integrationEventBus;
10-
private readonly IUnitOfWork _unitOfWork;
8+
private readonly IEventBus? _eventBus;
9+
private readonly IIntegrationEventBus? _integrationEventBus;
10+
private readonly IUnitOfWork? _unitOfWork;
1111

1212
private readonly ConcurrentQueue<IDomainEvent> _eventQueue = new();
1313

1414
public DomainEventBus(
15-
IEventBus eventBus,
16-
IIntegrationEventBus integrationEventBus,
17-
IUnitOfWork unitOfWork)
15+
IEventBus? eventBus = null,
16+
IIntegrationEventBus? integrationEventBus = null,
17+
IUnitOfWork? unitOfWork = null)
1818
{
1919
_eventBus = eventBus;
2020
_integrationEventBus = integrationEventBus;
@@ -30,10 +30,15 @@ public async Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancella
3030
if (@event is IIntegrationEvent integrationEvent)
3131
{
3232
integrationEvent.UnitOfWork ??= _unitOfWork;
33+
34+
MasaArgumentException.ThrowIfNull(_integrationEventBus);
35+
3336
await _integrationEventBus.PublishAsync(integrationEvent, cancellationToken);
3437
}
3538
else
3639
{
40+
MasaArgumentException.ThrowIfNull(_eventBus);
41+
3742
await _eventBus.PublishAsync(@event, cancellationToken);
3843
}
3944

@@ -74,5 +79,8 @@ public Task<bool> AnyQueueAsync()
7479
}
7580

7681
public async Task CommitAsync(CancellationToken cancellationToken = default)
77-
=> await _unitOfWork.CommitAsync(cancellationToken);
82+
{
83+
MasaArgumentException.ThrowIfNull(_unitOfWork);
84+
await _unitOfWork.CommitAsync(cancellationToken);
85+
}
7886
}

‎src/Contrib/Ddd/Domain/Tests/Masa.Contrib.Ddd.Domain.Tests/DomainEventBusTest.cs

+69
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,75 @@ public void TestCheckRequiredService(
159159
Assert.AreEqual(expectedTimer, ExecuteTimer);
160160
}
161161

162+
[DataRow(true, true, true, true, false, false)]
163+
[DataRow(true, true, true, false, false, false)]
164+
[DataRow(true, true, false, false, false, true)]
165+
[DataRow(true, false, true, true, false, false)]
166+
[DataRow(true, false, false, true, false, true)]
167+
[DataRow(true, false, false, true, false, true)]
168+
[DataRow(false, true, true, true, true, false)]
169+
[DataRow(true, false, true, false, true, false)]
170+
[DataTestMethod]
171+
public async Task PublishAndUnitOfWorkAsync(
172+
bool isAddEventBus,
173+
bool isAddIntegrationEventBus,
174+
bool isAddUnitOfWork,
175+
bool isLocalEvent,
176+
bool expectedThrowExceptionByPublish,
177+
bool expectedThrowExceptionByCommit)
178+
{
179+
IEventBus? eventBus = isAddEventBus ? _eventBus.Object : null;
180+
IIntegrationEventBus? integrationEventBus = isAddIntegrationEventBus ? _integrationEventBus.Object : null;
181+
var unitOfWorkMock = new Mock<IUnitOfWork>();
182+
IUnitOfWork? unitOfWork = isAddUnitOfWork ? unitOfWorkMock.Object : null;
183+
184+
var domainEventBus = new DomainEventBus(eventBus, integrationEventBus, unitOfWork);
185+
186+
if (isLocalEvent)
187+
{
188+
var registerUserEvent = new RegisterUserEvent();
189+
190+
if (expectedThrowExceptionByPublish)
191+
{
192+
await Assert.ThrowsExceptionAsync<MasaArgumentException>(() => domainEventBus.PublishAsync(registerUserEvent));
193+
}
194+
else
195+
{
196+
await domainEventBus.PublishAsync(registerUserEvent);
197+
198+
_integrationEventBus.Verify(bus => bus.PublishAsync(It.IsAny<IIntegrationEvent>(), default), Times.Never);
199+
_eventBus.Verify(bus => bus.PublishAsync(It.IsAny<IEvent>(), default), Times.Once);
200+
}
201+
}
202+
else
203+
{
204+
var changeOrderStateIntegrationEvent = new ChangeOrderStateIntegrationEvent();
205+
206+
if (expectedThrowExceptionByPublish)
207+
{
208+
await Assert.ThrowsExceptionAsync<MasaArgumentException>(()
209+
=> domainEventBus.PublishAsync(changeOrderStateIntegrationEvent));
210+
}
211+
else
212+
{
213+
await _domainEventBus.PublishAsync(changeOrderStateIntegrationEvent);
214+
215+
_integrationEventBus.Verify(bus => bus.PublishAsync(It.IsAny<IIntegrationEvent>(), default), Times.Once);
216+
_eventBus.Verify(bus => bus.PublishAsync(It.IsAny<IEvent>(), default), Times.Never);
217+
}
218+
}
219+
220+
if (expectedThrowExceptionByCommit)
221+
{
222+
await Assert.ThrowsExceptionAsync<MasaArgumentException>(() => domainEventBus.CommitAsync());
223+
}
224+
else
225+
{
226+
await domainEventBus.CommitAsync();
227+
unitOfWorkMock.Verify(uow => uow.CommitAsync(default), Times.Once);
228+
}
229+
}
230+
162231
private static ConcurrentQueue<IDomainEvent> GetEventQueue(DomainEventBus domainEventBus)
163232
{
164233
var eventQueue = EventQueueFileInfo.GetValue(domainEventBus) as ConcurrentQueue<IDomainEvent>;

0 commit comments

Comments
 (0)