diff --git a/src/EventBus/DefaultEventBus.php b/src/EventBus/DefaultEventBus.php index cb5a8c1b..0df7ca60 100644 --- a/src/EventBus/DefaultEventBus.php +++ b/src/EventBus/DefaultEventBus.php @@ -8,7 +8,7 @@ final class DefaultEventBus implements EventBus { - /** @var list */ + /** @var array */ private array $queue; /** @var list */ private array $listeners; @@ -25,9 +25,9 @@ public function __construct(array $listeners = []) $this->processing = false; } - public function dispatch(Message $message): void + public function dispatch(Message ...$messages): void { - $this->queue[] = $message; + $this->queue += $messages; if ($this->processing) { return; diff --git a/src/EventBus/EventBus.php b/src/EventBus/EventBus.php index b45e022e..22af33ec 100644 --- a/src/EventBus/EventBus.php +++ b/src/EventBus/EventBus.php @@ -6,5 +6,5 @@ interface EventBus { - public function dispatch(Message $message): void; + public function dispatch(Message ...$messages): void; } diff --git a/src/EventBus/SymfonyEventBus.php b/src/EventBus/SymfonyEventBus.php index 1f5e2e6d..c78ea709 100644 --- a/src/EventBus/SymfonyEventBus.php +++ b/src/EventBus/SymfonyEventBus.php @@ -20,12 +20,14 @@ public function __construct(MessageBusInterface $bus) $this->bus = $bus; } - public function dispatch(Message $message): void + public function dispatch(Message ...$messages): void { - $envelope = (new Envelope($message)) - ->with(new DispatchAfterCurrentBusStamp()); + foreach ($messages as $message) { + $envelope = (new Envelope($message)) + ->with(new DispatchAfterCurrentBusStamp()); - $this->bus->dispatch($envelope); + $this->bus->dispatch($envelope); + } } /** diff --git a/src/Repository/DefaultRepository.php b/src/Repository/DefaultRepository.php index f2a3e7f9..241a3f94 100644 --- a/src/Repository/DefaultRepository.php +++ b/src/Repository/DefaultRepository.php @@ -15,7 +15,7 @@ final class DefaultRepository implements Repository { private Store $store; - private EventBus $eventStream; + private EventBus $eventBus; /** @var class-string */ private string $aggregateClass; @@ -28,7 +28,7 @@ final class DefaultRepository implements Repository */ public function __construct( Store $store, - EventBus $eventStream, + EventBus $eventBus, string $aggregateClass ) { if (!is_subclass_of($aggregateClass, AggregateRoot::class)) { @@ -36,7 +36,7 @@ public function __construct( } $this->store = $store; - $this->eventStream = $eventStream; + $this->eventBus = $eventBus; $this->aggregateClass = $aggregateClass; } @@ -66,22 +66,17 @@ public function has(string $id): bool public function save(AggregateRoot $aggregate): void { - $class = $aggregate::class; - if (!$aggregate instanceof $this->aggregateClass) { - throw new WrongAggregate($class, $this->aggregateClass); + throw new WrongAggregate($aggregate::class, $this->aggregateClass); } - $messageStream = $aggregate->releaseMessages(); + $messages = $aggregate->releaseMessages(); - if (count($messageStream) === 0) { + if (count($messages) === 0) { return; } - $this->store->save(...$messageStream); - - foreach ($messageStream as $message) { - $this->eventStream->dispatch($message); - } + $this->store->save(...$messages); + $this->eventBus->dispatch(...$messages); } } diff --git a/src/Repository/SnapshotRepository.php b/src/Repository/SnapshotRepository.php index cce653db..b8335550 100644 --- a/src/Repository/SnapshotRepository.php +++ b/src/Repository/SnapshotRepository.php @@ -18,7 +18,7 @@ final class SnapshotRepository implements Repository { private Store $store; - private EventBus $eventStream; + private EventBus $eventBus; /** @var class-string */ private string $aggregateClass; @@ -33,7 +33,7 @@ final class SnapshotRepository implements Repository */ public function __construct( Store $store, - EventBus $eventStream, + EventBus $eventBus, string $aggregateClass, SnapshotStore $snapshotStore ) { @@ -42,7 +42,7 @@ public function __construct( } $this->store = $store; - $this->eventStream = $eventStream; + $this->eventBus = $eventBus; $this->aggregateClass = $aggregateClass; $this->snapshotStore = $snapshotStore; } @@ -98,10 +98,7 @@ public function save(AggregateRoot $aggregate): void } $this->store->save(...$messages); - - foreach ($messages as $message) { - $this->eventStream->dispatch($message); - } + $this->eventBus->dispatch(...$messages); $snapshot = $aggregate->toSnapshot(); $this->snapshotStore->save($snapshot); diff --git a/tests/Unit/EventBus/DefaultEventBusTest.php b/tests/Unit/EventBus/DefaultEventBusTest.php index a928d95f..93089757 100644 --- a/tests/Unit/EventBus/DefaultEventBusTest.php +++ b/tests/Unit/EventBus/DefaultEventBusTest.php @@ -36,7 +36,7 @@ public function __invoke(Message $message): void 1, new ProfileCreated( ProfileId::fromString('1'), - Email::fromString('d.badura@gmx.de') + Email::fromString('info@patchlevel.de') ) ); @@ -46,6 +46,46 @@ public function __invoke(Message $message): void self::assertSame($message, $listener->message); } + public function testDispatchMultipleMessages(): void + { + $listener = new class implements Listener { + /** @var list */ + public array $message = []; + + public function __invoke(Message $message): void + { + $this->message[] = $message; + } + }; + + $message1 = new Message( + Profile::class, + '1', + 1, + new ProfileCreated( + ProfileId::fromString('1'), + Email::fromString('info@patchlevel.de') + ) + ); + + $message2 = new Message( + Profile::class, + '1', + 2, + new ProfileCreated( + ProfileId::fromString('1'), + Email::fromString('info@patchlevel.de') + ) + ); + + $eventBus = new DefaultEventBus([$listener]); + $eventBus->dispatch($message1, $message2); + + self::assertCount(2, $listener->message); + self::assertSame($message1, $listener->message[0]); + self::assertSame($message2, $listener->message[1]); + } + public function testDynamicListener(): void { $listener = new class implements Listener { @@ -63,7 +103,7 @@ public function __invoke(Message $message): void 1, new ProfileCreated( ProfileId::fromString('1'), - Email::fromString('d.badura@gmx.de') + Email::fromString('info@patchlevel.de') ) ); @@ -82,7 +122,7 @@ public function testSynchroneEvents(): void 1, new ProfileCreated( ProfileId::fromString('1'), - Email::fromString('d.badura@gmx.de') + Email::fromString('info@patchlevel.de') ) ); diff --git a/tests/Unit/EventBus/SymfonyEventBusTest.php b/tests/Unit/EventBus/SymfonyEventBusTest.php index 20e4e286..d590098b 100644 --- a/tests/Unit/EventBus/SymfonyEventBusTest.php +++ b/tests/Unit/EventBus/SymfonyEventBusTest.php @@ -30,7 +30,7 @@ public function testDispatchEvent(): void 1, new ProfileCreated( ProfileId::fromString('1'), - Email::fromString('d.badura@gmx.de') + Email::fromString('info@patchlevel.de') ) ); @@ -49,6 +49,53 @@ public function testDispatchEvent(): void $eventBus->dispatch($message); } + public function testDispatchMultipleMessages(): void + { + $message1 = new Message( + Profile::class, + '1', + 1, + new ProfileCreated( + ProfileId::fromString('1'), + Email::fromString('info@patchlevel.de') + ) + ); + + $message2 = new Message( + Profile::class, + '1', + 1, + new ProfileCreated( + ProfileId::fromString('1'), + Email::fromString('info@patchlevel.de') + ) + ); + + $envelope1 = new Envelope($message1); + + $symfony = $this->prophesize(MessageBusInterface::class); + $symfony->dispatch(Argument::that(static function ($envelope1) use ($message1) { + if (!$envelope1 instanceof Envelope) { + return false; + } + + return $envelope1->getMessage() === $message1; + }))->willReturn($envelope1)->shouldBeCalled(); + + $envelope2 = new Envelope($message2); + + $symfony->dispatch(Argument::that(static function ($envelope2) use ($message2) { + if (!$envelope2 instanceof Envelope) { + return false; + } + + return $envelope2->getMessage() === $message2; + }))->willReturn($envelope2)->shouldBeCalled(); + + $eventBus = new SymfonyEventBus($symfony->reveal()); + $eventBus->dispatch($message1, $message2); + } + public function testDefaultEventBus(): void { $listener = new class implements Listener { @@ -66,7 +113,7 @@ public function __invoke(Message $message): void 1, new ProfileCreated( ProfileId::fromString('1'), - Email::fromString('d.badura@gmx.de') + Email::fromString('info@patchlevel.de') ) );