From d7eccdb50fa13c7717917faee39d50b4de27f1b2 Mon Sep 17 00:00:00 2001 From: David Badura Date: Mon, 22 Apr 2024 11:46:39 +0200 Subject: [PATCH] add trigger subscription engine event bus --- deptrac.yaml | 1 + docs/pages/event_bus.md | 39 ++ docs/pages/getting_started.md | 10 +- docs/pages/subscription.md | 29 +- src/EventBus/ChainEventBus.php | 28 ++ src/Subscription/Engine/AlreadyProcessing.php | 15 + .../Engine/DefaultSubscriptionEngine.php | 448 +++++++++--------- .../Engine/RunSubscriptionEngineEventBus.php | 39 ++ .../Engine/SubscriptionEngine.php | 2 + .../BasicIntegrationTest.php | 32 +- .../SendEmailProcessor.php} | 6 +- tests/Unit/EventBus/ChainEventBusTest.php | 43 ++ .../RunSubscriptionEngineEventBusTest.php | 39 ++ 13 files changed, 488 insertions(+), 243 deletions(-) create mode 100644 src/EventBus/ChainEventBus.php create mode 100644 src/Subscription/Engine/AlreadyProcessing.php create mode 100644 src/Subscription/Engine/RunSubscriptionEngineEventBus.php rename tests/Integration/BasicImplementation/{Listener/SendEmailListener.php => Processor/SendEmailProcessor.php} (80%) create mode 100644 tests/Unit/EventBus/ChainEventBusTest.php create mode 100644 tests/Unit/Subscription/Engine/RunSubscriptionEngineEventBusTest.php diff --git a/deptrac.yaml b/deptrac.yaml index 186f3d827..64d9917d6 100644 --- a/deptrac.yaml +++ b/deptrac.yaml @@ -149,6 +149,7 @@ deptrac: - Aggregate - Attribute - Clock + - EventBus - Message - MetadataSubscriber - Schema diff --git a/docs/pages/event_bus.md b/docs/pages/event_bus.md index 9294561c7..746cbe958 100644 --- a/docs/pages/event_bus.md +++ b/docs/pages/event_bus.md @@ -121,6 +121,31 @@ final class WelcomeSubscriber } } ``` +## Run Subscription Engine Event Bus + +If you want that the subscriptions run after the events are stored in the store, +you can use the `RunSubscriptionEngineEventBus`. +This means that a worker to run the subscriptions are not needed. + +```php +use Patchlevel\EventSourcing\Subscription\Engine\RunSubscriptionEngineEventBus; + +/** @var SubscriptionEngine $subscriptionEngine */ +$eventBus = new RunSubscriptionEngineEventBus($subscriptionEngine); +``` +!!! warning + + You need to be careful with this event bus if you use the DoctrineStore. + The subsription engine do not support rollbacks. + +!!! note + + More about the subscription engine can be found [here](subscription.md). + +!!! tip + + You can perfectly use it in development or testing. + ## Psr-14 Event Bus You can also use a [psr-14](https://www.php-fig.org/psr/psr-14/) compatible event bus. @@ -136,6 +161,20 @@ $eventBus = new Psr14EventBus($psr14EventDispatcher); You can't use the `Subscribe` attribute with the psr-14 event bus. +## Chain Event Bus + +You can also use a chain event bus to dispatch events to multiple event buses. + +```php +use Patchlevel\EventSourcing\EventBus\ChainEventBus; +use Patchlevel\EventSourcing\EventBus\EventBus; + +/** + * @var EventBus $eventBus1 + * @var EventBus $eventBus2 + */ +$eventBus = new ChainEventBus($eventBus1, $eventBus2); +``` ## Learn more * [How to use messages](message.md) diff --git a/docs/pages/getting_started.md b/docs/pages/getting_started.md index 022217ee9..fdd77dad4 100644 --- a/docs/pages/getting_started.md +++ b/docs/pages/getting_started.md @@ -275,6 +275,7 @@ use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; use Patchlevel\EventSourcing\Store\DoctrineDbalStore; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Engine\RunSubscriptionEngineEventBus; use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; @@ -311,6 +312,7 @@ $engine = new DefaultSubscriptionEngine( $repositoryManager = new DefaultRepositoryManager( $aggregateRegistry, $eventStore, + new RunSubscriptionEngineEventBus($engine), ); $hotelRepository = $repositoryManager->get(Hotel::class); @@ -361,7 +363,6 @@ We are now ready to use the Event Sourcing System. We can load, change and save ```php use Patchlevel\EventSourcing\Aggregate\Uuid; use Patchlevel\EventSourcing\Repository\Repository; -use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine; $hotel1 = Hotel::create(Uuid::generate(), 'HOTEL'); $hotel1->checkIn('David'); @@ -375,15 +376,8 @@ $hotel2 = $hotelRepository->load(Uuid::fromString('d0d0d0d0-d0d0-d0d0-d0d0-d0d0d $hotel2->checkIn('David'); $hotelRepository->save($hotel2); -/** @var SubscriptionEngine $engine */ -$engine->run(); - $hotels = $hotelProjection->getHotels(); ``` -!!! warning - - You need to run the subscription engine to update the projections and execute the processors. - !!! note You can also use other forms of IDs such as uuid version 6 or a custom format. diff --git a/docs/pages/subscription.md b/docs/pages/subscription.md index adfe32de7..53d26dbcb 100644 --- a/docs/pages/subscription.md +++ b/docs/pages/subscription.md @@ -685,7 +685,7 @@ $catchupSubscriptionEngine = new CatchUpSubscriptionEngine($subscriptionEngine); You can use the `CatchUpSubscriptionEngine` in your tests to process the events immediately. -### Throw by error Subscription Engine +### Throw on error Subscription Engine This is another decorator for the subscription engine. It throws an exception if a subscription is in error state. This is useful for testing or development to get directly feedback if something is wrong. @@ -695,13 +695,38 @@ use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Engine\ThrowOnErrorSubscriptionEngine; /** @var SubscriptionEngine $subscriptionStore */ -$throwByErrorSubscriptionEngine = new ThrowOnErrorSubscriptionEngine($subscriptionEngine); +$throwOnErrorSubscriptionEngine = new ThrowOnErrorSubscriptionEngine($subscriptionEngine); ``` !!! warning This is only for testing or development. Don't use it in production. The subscription engine has an build in retry strategy to retry subscriptions that have failed. +### Run Subscription Engine after save + +You can trigger the subscription engine after the events are stored in the store. +This means that a worker to run the subscriptions are not needed. + +```php +use Patchlevel\EventSourcing\Subscription\Engine\RunSubscriptionEngineEventBus; + +/** @var SubscriptionEngine $subscriptionEngine */ +$eventBus = new RunSubscriptionEngineEventBus($subscriptionEngine); +``` +!!! warning + + You need to be careful with this event bus if you use the DoctrineStore. + The subsription engine do not support rollbacks. + +!!! note + + More about repository can be found [here](./repository.md). + And more about event bus can be found [here](./event_bus.md). + +!!! tip + + You can perfectly use it in development or testing. + ## Usage The Subscription Engine has a few methods needed to use it effectively. diff --git a/src/EventBus/ChainEventBus.php b/src/EventBus/ChainEventBus.php new file mode 100644 index 000000000..16c3d90c8 --- /dev/null +++ b/src/EventBus/ChainEventBus.php @@ -0,0 +1,28 @@ + */ + private readonly array $eventBus; + + public function __construct( + EventBus ...$eventBus, + ) { + $this->eventBus = array_values($eventBus); + } + + public function dispatch(Message ...$messages): void + { + foreach ($this->eventBus as $eventBus) { + $eventBus->dispatch(...$messages); + } + } +} diff --git a/src/Subscription/Engine/AlreadyProcessing.php b/src/Subscription/Engine/AlreadyProcessing.php new file mode 100644 index 000000000..89f2d3f0e --- /dev/null +++ b/src/Subscription/Engine/AlreadyProcessing.php @@ -0,0 +1,15 @@ +processing) { + throw new AlreadyProcessing(); + } - $this->logger?->info( - 'Subscription Engine: Start booting.', - ); + $this->processing = true; - $this->discoverNewSubscriptions(); - $this->retrySubscriptions($criteria); + try { + $criteria ??= new SubscriptionEngineCriteria(); - return $this->findForUpdate( - new SubscriptionCriteria( - ids: $criteria->ids, - groups: $criteria->groups, - status: [Status::Booting], - ), - function ($subscriptions) use ($limit): ProcessedResult { - if (count($subscriptions) === 0) { - $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); + $this->logger?->info( + 'Subscription Engine: Start booting.', + ); - return new ProcessedResult(0); - } + $this->discoverNewSubscriptions(); + $this->retrySubscriptions($criteria); - /** @var list $errors */ - $errors = []; + return $this->findForUpdate( + new SubscriptionCriteria( + ids: $criteria->ids, + groups: $criteria->groups, + status: [Status::Booting], + ), + function ($subscriptions) use ($limit): ProcessedResult { + if (count($subscriptions) === 0) { + $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); - $startIndex = $this->lowestSubscriptionPosition($subscriptions); + return new ProcessedResult(0); + } - $this->logger?->debug( - sprintf( - 'Subscription Engine: Event stream is processed for booting from position %s.', - $startIndex, - ), - ); + /** @var list $errors */ + $errors = []; - $stream = null; - $messageCounter = 0; + $startIndex = $this->lowestSubscriptionPosition($subscriptions); - try { - $stream = $this->messageStore->load( - new Criteria(new FromIndexCriterion($startIndex)), + $this->logger?->debug( + sprintf( + 'Subscription Engine: Event stream is processed for booting from position %s.', + $startIndex, + ), ); - foreach ($stream as $message) { - $index = $stream->index(); + $stream = null; + $messageCounter = 0; - if ($index === null) { - throw new UnexpectedError('Stream index is null, this should not happen.'); - } + try { + $stream = $this->messageStore->load( + new Criteria(new FromIndexCriterion($startIndex)), + ); - foreach ($subscriptions as $subscription) { - if (!$subscription->isBooting()) { - continue; + foreach ($stream as $message) { + $index = $stream->index(); + + if ($index === null) { + throw new UnexpectedError('Stream index is null, this should not happen.'); } - if ($subscription->position() >= $index) { - $this->logger?->debug( - sprintf( - 'Subscription Engine: Subscription "%s" is farther than the current position (%d > %d), continue booting.', - $subscription->id(), - $subscription->position(), - $index, - ), - ); + foreach ($subscriptions as $subscription) { + if (!$subscription->isBooting()) { + continue; + } - continue; - } + if ($subscription->position() >= $index) { + $this->logger?->debug( + sprintf( + 'Subscription Engine: Subscription "%s" is farther than the current position (%d > %d), continue booting.', + $subscription->id(), + $subscription->position(), + $index, + ), + ); - $error = $this->handleMessage($index, $message, $subscription); + continue; + } - if (!$error) { - continue; - } + $error = $this->handleMessage($index, $message, $subscription); - $errors[] = $error; - } + if (!$error) { + continue; + } - $messageCounter++; + $errors[] = $error; + } - $this->logger?->debug( - sprintf( - 'Subscription Engine: Current event stream position for booting: %s', - $index, - ), - ); + $messageCounter++; - if ($limit !== null && $messageCounter >= $limit) { - $this->logger?->info( + $this->logger?->debug( sprintf( - 'Subscription Engine: Message limit (%d) reached, finish booting.', - $limit, + 'Subscription Engine: Current event stream position for booting: %s', + $index, ), ); - return new ProcessedResult( - $messageCounter, - false, - $errors, - ); - } - } - } finally { - $stream?->close(); + if ($limit !== null && $messageCounter >= $limit) { + $this->logger?->info( + sprintf( + 'Subscription Engine: Message limit (%d) reached, finish booting.', + $limit, + ), + ); - if ($messageCounter > 0) { - foreach ($subscriptions as $subscription) { - if (!$subscription->isBooting()) { - continue; + return new ProcessedResult( + $messageCounter, + false, + $errors, + ); } + } + } finally { + $stream?->close(); - $this->subscriptionStore->update($subscription); + if ($messageCounter > 0) { + foreach ($subscriptions as $subscription) { + if (!$subscription->isBooting()) { + continue; + } + + $this->subscriptionStore->update($subscription); + } } } - } - $this->logger?->debug('Subscription Engine: End of stream for booting has been reached.'); + $this->logger?->debug('Subscription Engine: End of stream for booting has been reached.'); - foreach ($subscriptions as $subscription) { - if (!$subscription->isBooting()) { - continue; - } + foreach ($subscriptions as $subscription) { + if (!$subscription->isBooting()) { + continue; + } - if ($subscription->runMode() === RunMode::Once) { - $subscription->finished(); + if ($subscription->runMode() === RunMode::Once) { + $subscription->finished(); + $this->subscriptionStore->update($subscription); + + $this->logger?->info(sprintf( + 'Subscription Engine: Subscription "%s" run only once and has been set to finished.', + $subscription->id(), + )); + + continue; + } + + $subscription->active(); $this->subscriptionStore->update($subscription); $this->logger?->info(sprintf( - 'Subscription Engine: Subscription "%s" run only once and has been set to finished.', + 'Subscription Engine: Subscription "%s" has been set to active after booting.', $subscription->id(), )); - - continue; } - $subscription->active(); - $this->subscriptionStore->update($subscription); + $this->logger?->info('Subscription Engine: Finish booting.'); - $this->logger?->info(sprintf( - 'Subscription Engine: Subscription "%s" has been set to active after booting.', - $subscription->id(), - )); - } - - $this->logger?->info('Subscription Engine: Finish booting.'); - - return new ProcessedResult( - $messageCounter, - true, - $errors, - ); - }, - ); + return new ProcessedResult( + $messageCounter, + true, + $errors, + ); + }, + ); + } finally { + $this->processing = false; + } } public function run( SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null, ): ProcessedResult { - $criteria ??= new SubscriptionEngineCriteria(); + if ($this->processing) { + throw new AlreadyProcessing(); + } - $this->logger?->info('Subscription Engine: Start processing.'); + $this->processing = true; - $this->discoverNewSubscriptions(); - $this->markDetachedSubscriptions($criteria); - $this->retrySubscriptions($criteria); + try { + $criteria ??= new SubscriptionEngineCriteria(); - return $this->findForUpdate( - new SubscriptionCriteria( - ids: $criteria->ids, - groups: $criteria->groups, - status: [Status::Active], - ), - function (array $subscriptions) use ($limit): ProcessedResult { - if (count($subscriptions) === 0) { - $this->logger?->info('Subscription Engine: No subscriptions to process, finish processing.'); + $this->logger?->info('Subscription Engine: Start processing.'); - return new ProcessedResult(0); - } + $this->discoverNewSubscriptions(); + $this->markDetachedSubscriptions($criteria); + $this->retrySubscriptions($criteria); - /** @var list $errors */ - $errors = []; + return $this->findForUpdate( + new SubscriptionCriteria( + ids: $criteria->ids, + groups: $criteria->groups, + status: [Status::Active], + ), + function (array $subscriptions) use ($limit): ProcessedResult { + if (count($subscriptions) === 0) { + $this->logger?->info('Subscription Engine: No subscriptions to process, finish processing.'); + + return new ProcessedResult(0); + } - $startIndex = $this->lowestSubscriptionPosition($subscriptions); + /** @var list $errors */ + $errors = []; - $this->logger?->debug( - sprintf( - 'Subscription Engine: Event stream is processed from position %d.', - $startIndex, - ), - ); + $startIndex = $this->lowestSubscriptionPosition($subscriptions); - $stream = null; - $messageCounter = 0; + $this->logger?->debug( + sprintf( + 'Subscription Engine: Event stream is processed from position %d.', + $startIndex, + ), + ); - try { - $criteria = new Criteria(new FromIndexCriterion($startIndex)); - $stream = $this->messageStore->load($criteria); + $stream = null; + $messageCounter = 0; - foreach ($stream as $message) { - $index = $stream->index(); + try { + $criteria = new Criteria(new FromIndexCriterion($startIndex)); + $stream = $this->messageStore->load($criteria); - if ($index === null) { - throw new UnexpectedError('Stream index is null, this should not happen.'); - } + foreach ($stream as $message) { + $index = $stream->index(); - foreach ($subscriptions as $subscription) { - if (!$subscription->isActive()) { - continue; + if ($index === null) { + throw new UnexpectedError('Stream index is null, this should not happen.'); } - if ($subscription->position() >= $index) { - $this->logger?->debug( - sprintf( - 'Subscription Engine: Subscription "%s" is farther than the current position (%d > %d), continue processing.', - $subscription->id(), - $subscription->position(), - $index, - ), - ); + foreach ($subscriptions as $subscription) { + if (!$subscription->isActive()) { + continue; + } - continue; - } + if ($subscription->position() >= $index) { + $this->logger?->debug( + sprintf( + 'Subscription Engine: Subscription "%s" is farther than the current position (%d > %d), continue processing.', + $subscription->id(), + $subscription->position(), + $index, + ), + ); - $error = $this->handleMessage($index, $message, $subscription); + continue; + } - if (!$error) { - continue; - } + $error = $this->handleMessage($index, $message, $subscription); - $errors[] = $error; - } + if (!$error) { + continue; + } - $messageCounter++; + $errors[] = $error; + } - $this->logger?->debug(sprintf( - 'Subscription Engine: Current event stream position: %s', - $index, - )); + $messageCounter++; - if ($limit !== null && $messageCounter >= $limit) { - $this->logger?->info( - sprintf( - 'Subscription Engine: Message limit (%d) reached, finish processing.', - $limit, - ), - ); + $this->logger?->debug(sprintf( + 'Subscription Engine: Current event stream position: %s', + $index, + )); - return new ProcessedResult($messageCounter, false, $errors); - } - } - } finally { - $endIndex = $stream?->index() ?: $startIndex; - $stream?->close(); - - if ($messageCounter > 0) { - foreach ($subscriptions as $subscription) { - if (!$subscription->isActive()) { - continue; + if ($limit !== null && $messageCounter >= $limit) { + $this->logger?->info( + sprintf( + 'Subscription Engine: Message limit (%d) reached, finish processing.', + $limit, + ), + ); + + return new ProcessedResult($messageCounter, false, $errors); } + } + } finally { + $endIndex = $stream?->index() ?: $startIndex; + $stream?->close(); - $this->subscriptionStore->update($subscription); + if ($messageCounter > 0) { + foreach ($subscriptions as $subscription) { + if (!$subscription->isActive()) { + continue; + } + + $this->subscriptionStore->update($subscription); + } } } - } - foreach ($subscriptions as $subscription) { - if (!$subscription->isActive()) { - continue; - } + foreach ($subscriptions as $subscription) { + if (!$subscription->isActive()) { + continue; + } - if ($subscription->runMode() !== RunMode::Once) { - continue; - } + if ($subscription->runMode() !== RunMode::Once) { + continue; + } - $subscription->finished(); - $this->subscriptionStore->update($subscription); + $subscription->finished(); + $this->subscriptionStore->update($subscription); - $this->logger?->info(sprintf( - 'Subscription Engine: Subscription "%s" run only once and has been set to finished.', - $subscription->id(), - )); - } + $this->logger?->info(sprintf( + 'Subscription Engine: Subscription "%s" run only once and has been set to finished.', + $subscription->id(), + )); + } - $this->logger?->info( - sprintf( - 'Subscription Engine: End of stream on position "%d" has been reached, finish processing.', - $endIndex, - ), - ); + $this->logger?->info( + sprintf( + 'Subscription Engine: End of stream on position "%d" has been reached, finish processing.', + $endIndex, + ), + ); - return new ProcessedResult($messageCounter, true, $errors); - }, - ); + return new ProcessedResult($messageCounter, true, $errors); + }, + ); + } finally { + $this->processing = false; + } } public function teardown(SubscriptionEngineCriteria|null $criteria = null): Result @@ -943,7 +965,7 @@ private function findForUpdate(SubscriptionCriteria $criteria, Closure $closure) } return $this->subscriptionStore->inLock( - /** @return T */ + /** @return T */ function () use ($closure, $criteria): mixed { $subscriptions = $this->subscriptionStore->find($criteria); diff --git a/src/Subscription/Engine/RunSubscriptionEngineEventBus.php b/src/Subscription/Engine/RunSubscriptionEngineEventBus.php new file mode 100644 index 000000000..872de070a --- /dev/null +++ b/src/Subscription/Engine/RunSubscriptionEngineEventBus.php @@ -0,0 +1,39 @@ +|null $ids + * @param list|null $groups + * @param positive-int|null $limit + */ + public function __construct( + private readonly SubscriptionEngine $engine, + private readonly array|null $ids = null, + private readonly array|null $groups = null, + private readonly int|null $limit = null, + ) { + } + + public function dispatch(Message ...$messages): void + { + try { + $this->engine->run( + new SubscriptionEngineCriteria( + $this->ids, + $this->groups, + ), + $this->limit, + ); + } catch (AlreadyProcessing) { + // do nothing + } + } +} diff --git a/src/Subscription/Engine/SubscriptionEngine.php b/src/Subscription/Engine/SubscriptionEngine.php index bdba70f12..3e18be635 100644 --- a/src/Subscription/Engine/SubscriptionEngine.php +++ b/src/Subscription/Engine/SubscriptionEngine.php @@ -14,6 +14,7 @@ public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $sk * @param positive-int|null $limit * * @throws SubscriberNotFound + * @throws AlreadyProcessing */ public function boot( SubscriptionEngineCriteria|null $criteria = null, @@ -24,6 +25,7 @@ public function boot( * @param positive-int|null $limit * * @throws SubscriberNotFound + * @throws AlreadyProcessing */ public function run( SubscriptionEngineCriteria|null $criteria = null, diff --git a/tests/Integration/BasicImplementation/BasicIntegrationTest.php b/tests/Integration/BasicImplementation/BasicIntegrationTest.php index b7dc6d68a..f76bb3cb4 100644 --- a/tests/Integration/BasicImplementation/BasicIntegrationTest.php +++ b/tests/Integration/BasicImplementation/BasicIntegrationTest.php @@ -5,7 +5,6 @@ namespace Patchlevel\EventSourcing\Tests\Integration\BasicImplementation; use Doctrine\DBAL\Connection; -use Patchlevel\EventSourcing\EventBus\DefaultEventBus; use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer; use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry; use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; @@ -15,11 +14,12 @@ use Patchlevel\EventSourcing\Snapshot\DefaultSnapshotStore; use Patchlevel\EventSourcing\Store\DoctrineDbalStore; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Engine\RunSubscriptionEngineEventBus; use Patchlevel\EventSourcing\Subscription\Store\InMemorySubscriptionStore; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; use Patchlevel\EventSourcing\Tests\DbalManager; -use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Listener\SendEmailListener; use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\MessageDecorator\FooMessageDecorator; +use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Processor\SendEmailProcessor; use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Projection\ProfileProjector; use PHPUnit\Framework\TestCase; @@ -54,12 +54,13 @@ public function testSuccessful(): void $engine = new DefaultSubscriptionEngine( $store, new InMemorySubscriptionStore(), - new MetadataSubscriberAccessorRepository([$profileProjector]), + new MetadataSubscriberAccessorRepository([ + $profileProjector, + new SendEmailProcessor(), + ]), ); - $eventBus = DefaultEventBus::create([ - new SendEmailListener(), - ]); + $eventBus = new RunSubscriptionEngineEventBus($engine); $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['profile' => Profile::class]), @@ -76,15 +77,12 @@ public function testSuccessful(): void ); $schemaDirector->create(); - $engine->setup(); - $engine->boot(); + $engine->setup(skipBooting: true); $profileId = ProfileId::generate(); $profile = Profile::create($profileId, 'John'); $repository->save($profile); - $engine->run(); - $result = $this->connection->fetchAssociative( 'SELECT * FROM projection_profile WHERE id = ?', [$profileId->toString()], @@ -125,12 +123,13 @@ public function testSnapshot(): void $engine = new DefaultSubscriptionEngine( $store, new InMemorySubscriptionStore(), - new MetadataSubscriberAccessorRepository([$profileProjection]), + new MetadataSubscriberAccessorRepository([ + $profileProjection, + new SendEmailProcessor(), + ]), ); - $eventBus = DefaultEventBus::create([ - new SendEmailListener(), - ]); + $eventBus = new RunSubscriptionEngineEventBus($engine); $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['profile' => Profile::class]), @@ -147,15 +146,12 @@ public function testSnapshot(): void ); $schemaDirector->create(); - $engine->setup(); - $engine->boot(); + $engine->setup(skipBooting: true); $profileId = ProfileId::generate(); $profile = Profile::create($profileId, 'John'); $repository->save($profile); - $engine->run(); - $result = $this->connection->fetchAssociative( 'SELECT * FROM projection_profile WHERE id = ?', [$profileId->toString()], diff --git a/tests/Integration/BasicImplementation/Listener/SendEmailListener.php b/tests/Integration/BasicImplementation/Processor/SendEmailProcessor.php similarity index 80% rename from tests/Integration/BasicImplementation/Listener/SendEmailListener.php rename to tests/Integration/BasicImplementation/Processor/SendEmailProcessor.php index 2e10d2b9b..855276e3b 100644 --- a/tests/Integration/BasicImplementation/Listener/SendEmailListener.php +++ b/tests/Integration/BasicImplementation/Processor/SendEmailProcessor.php @@ -2,14 +2,16 @@ declare(strict_types=1); -namespace Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Listener; +namespace Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Processor; +use Patchlevel\EventSourcing\Attribute\Processor; use Patchlevel\EventSourcing\Attribute\Subscribe; use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Events\ProfileCreated; use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\SendEmailMock; -final class SendEmailListener +#[Processor('send_email')] +final class SendEmailProcessor { #[Subscribe(ProfileCreated::class)] public function onProfileCreated(Message $message): void diff --git a/tests/Unit/EventBus/ChainEventBusTest.php b/tests/Unit/EventBus/ChainEventBusTest.php new file mode 100644 index 000000000..78d58529c --- /dev/null +++ b/tests/Unit/EventBus/ChainEventBusTest.php @@ -0,0 +1,43 @@ +prophesize(EventBus::class); + $eventBus2 = $this->prophesize(EventBus::class); + + $eventBus1->dispatch($message)->shouldBeCalled(); + $eventBus2->dispatch($message)->shouldBeCalled(); + + $chainEventBus = new ChainEventBus( + $eventBus1->reveal(), + $eventBus2->reveal(), + ); + + $chainEventBus->dispatch($message); + } +} diff --git a/tests/Unit/Subscription/Engine/RunSubscriptionEngineEventBusTest.php b/tests/Unit/Subscription/Engine/RunSubscriptionEngineEventBusTest.php new file mode 100644 index 000000000..dae67acb8 --- /dev/null +++ b/tests/Unit/Subscription/Engine/RunSubscriptionEngineEventBusTest.php @@ -0,0 +1,39 @@ +prophesize(SubscriptionEngine::class); + + $criteria = new SubscriptionEngineCriteria( + ['id1', 'id2'], + ['group1', 'group2'], + ); + + $engine->run($criteria, 42)->willReturn(new ProcessedResult(21))->shouldBeCalledOnce(); + + $eventBus = new RunSubscriptionEngineEventBus( + $engine->reveal(), + ['id1', 'id2'], + ['group1', 'group2'], + 42, + ); + + $eventBus->dispatch(); + } +}