From 773908de143eb6ac80e1069901f126303037a6d8 Mon Sep 17 00:00:00 2001 From: David Badura Date: Mon, 22 Apr 2024 11:46:39 +0200 Subject: [PATCH] add run subscription engine repository and repository manager --- deptrac.yaml | 1 + docs/pages/getting_started.md | 18 +- docs/pages/subscription.md | 41 +- src/Subscription/Engine/AlreadyProcessing.php | 15 + .../Engine/DefaultSubscriptionEngine.php | 448 +++++++++--------- .../Engine/SubscriptionEngine.php | 2 + .../RunSubscriptionEngineRepository.php | 63 +++ ...RunSubscriptionEngineRepositoryManager.php | 45 ++ .../BasicIntegrationTest.php | 74 ++- .../SendEmailProcessor.php} | 6 +- .../Subscription/DummySubscriptionStore.php | 7 + .../Engine/DefaultSubscriptionEngineTest.php | 230 ++++++++- ...ubscriptionEngineRepositoryManagerTest.php | 39 ++ .../RunSubscriptionEngineRepositoryTest.php | 126 +++++ 14 files changed, 844 insertions(+), 271 deletions(-) create mode 100644 src/Subscription/Engine/AlreadyProcessing.php create mode 100644 src/Subscription/Repository/RunSubscriptionEngineRepository.php create mode 100644 src/Subscription/Repository/RunSubscriptionEngineRepositoryManager.php rename tests/Integration/BasicImplementation/{Listener/SendEmailListener.php => Processor/SendEmailProcessor.php} (80%) create mode 100644 tests/Unit/Subscription/Repository/RunSubscriptionEngineRepositoryManagerTest.php create mode 100644 tests/Unit/Subscription/Repository/RunSubscriptionEngineRepositoryTest.php diff --git a/deptrac.yaml b/deptrac.yaml index 186f3d82..703db335 100644 --- a/deptrac.yaml +++ b/deptrac.yaml @@ -151,6 +151,7 @@ deptrac: - Clock - Message - MetadataSubscriber + - Repository - Schema - Store Repository: diff --git a/docs/pages/getting_started.md b/docs/pages/getting_started.md index 022217ee..36bdb359 100644 --- a/docs/pages/getting_started.md +++ b/docs/pages/getting_started.md @@ -275,6 +275,7 @@ use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; use Patchlevel\EventSourcing\Store\DoctrineDbalStore; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Repository\RunSubscriptionEngineRepositoryManager; use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; @@ -308,9 +309,12 @@ $engine = new DefaultSubscriptionEngine( $subscriberRepository, ); -$repositoryManager = new DefaultRepositoryManager( - $aggregateRegistry, - $eventStore, +$repositoryManager = new RunSubscriptionEngineRepositoryManager( + new DefaultRepositoryManager( + $aggregateRegistry, + $eventStore, + ), + $engine, ); $hotelRepository = $repositoryManager->get(Hotel::class); @@ -361,7 +365,6 @@ We are now ready to use the Event Sourcing System. We can load, change and save ```php use Patchlevel\EventSourcing\Aggregate\Uuid; use Patchlevel\EventSourcing\Repository\Repository; -use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine; $hotel1 = Hotel::create(Uuid::generate(), 'HOTEL'); $hotel1->checkIn('David'); @@ -375,15 +378,8 @@ $hotel2 = $hotelRepository->load(Uuid::fromString('d0d0d0d0-d0d0-d0d0-d0d0-d0d0d $hotel2->checkIn('David'); $hotelRepository->save($hotel2); -/** @var SubscriptionEngine $engine */ -$engine->run(); - $hotels = $hotelProjection->getHotels(); ``` -!!! warning - - You need to run the subscription engine to update the projections and execute the processors. - !!! note You can also use other forms of IDs such as uuid version 6 or a custom format. diff --git a/docs/pages/subscription.md b/docs/pages/subscription.md index f5d6a28f..cd61b8c6 100644 --- a/docs/pages/subscription.md +++ b/docs/pages/subscription.md @@ -681,7 +681,7 @@ $catchupSubscriptionEngine = new CatchUpSubscriptionEngine($subscriptionEngine); You can use the `CatchUpSubscriptionEngine` in your tests to process the events immediately. -### Throw by error Subscription Engine +### Throw on error Subscription Engine This is another decorator for the subscription engine. It throws an exception if a subscription is in error state. This is useful for testing or development to get directly feedback if something is wrong. @@ -691,13 +691,50 @@ use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Engine\ThrowOnErrorSubscriptionEngine; /** @var SubscriptionEngine $subscriptionStore */ -$throwByErrorSubscriptionEngine = new ThrowOnErrorSubscriptionEngine($subscriptionEngine); +$throwOnErrorSubscriptionEngine = new ThrowOnErrorSubscriptionEngine($subscriptionEngine); ``` !!! warning This is only for testing or development. Don't use it in production. The subscription engine has an build in retry strategy to retry subscriptions that have failed. +### Run Subscription Engine after save + +You can trigger the subscription engine after calling the `save` method on the repository. +This means that a worker to run the subscriptions are not needed. + +```php +use Patchlevel\EventSourcing\Repository\RepositoryManager; +use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Repository\RunSubscriptionEngineRepositoryManager; + +/** + * @var SubscriptionEngine $subscriptionEngine + * @var RepositoryManager $defaultRepositoryManager + */ +$eventBus = new RunSubscriptionEngineRepositoryManager( + $defaultRepositoryManager, + $subscriptionEngine, + ['id1', 'id2'], // filter subscribers by id + ['group1', 'group2'], // filter subscribers by group + 100, // limit the number of messages +); +``` +!!! danger + + By using this, you can't wrap the repository in a transaction. + A rollback is not supported and can break the subscription engine. + Internally, the events are saved in a transaction to ensure data consistency. + +!!! note + + More about repository manager and repository can be found [here](./repository.md). + +!!! tip + + You can perfectly use it in development or testing. + Especially in combination with the `CatchUpSubscriptionEngine` and `ThrowOnErrorSubscriptionEngine` decorators. + ## Usage The Subscription Engine has a few methods needed to use it effectively. diff --git a/src/Subscription/Engine/AlreadyProcessing.php b/src/Subscription/Engine/AlreadyProcessing.php new file mode 100644 index 00000000..89f2d3f0 --- /dev/null +++ b/src/Subscription/Engine/AlreadyProcessing.php @@ -0,0 +1,15 @@ +processing) { + throw new AlreadyProcessing(); + } - $this->logger?->info( - 'Subscription Engine: Start booting.', - ); + $this->processing = true; - $this->discoverNewSubscriptions(); - $this->retrySubscriptions($criteria); + try { + $criteria ??= new SubscriptionEngineCriteria(); - return $this->findForUpdate( - new SubscriptionCriteria( - ids: $criteria->ids, - groups: $criteria->groups, - status: [Status::Booting], - ), - function ($subscriptions) use ($limit): ProcessedResult { - if (count($subscriptions) === 0) { - $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); + $this->logger?->info( + 'Subscription Engine: Start booting.', + ); - return new ProcessedResult(0); - } + $this->discoverNewSubscriptions(); + $this->retrySubscriptions($criteria); - /** @var list $errors */ - $errors = []; + return $this->findForUpdate( + new SubscriptionCriteria( + ids: $criteria->ids, + groups: $criteria->groups, + status: [Status::Booting], + ), + function ($subscriptions) use ($limit): ProcessedResult { + if (count($subscriptions) === 0) { + $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); - $startIndex = $this->lowestSubscriptionPosition($subscriptions); + return new ProcessedResult(0); + } - $this->logger?->debug( - sprintf( - 'Subscription Engine: Event stream is processed for booting from position %s.', - $startIndex, - ), - ); + /** @var list $errors */ + $errors = []; - $stream = null; - $messageCounter = 0; + $startIndex = $this->lowestSubscriptionPosition($subscriptions); - try { - $stream = $this->messageStore->load( - new Criteria(new FromIndexCriterion($startIndex)), + $this->logger?->debug( + sprintf( + 'Subscription Engine: Event stream is processed for booting from position %s.', + $startIndex, + ), ); - foreach ($stream as $message) { - $index = $stream->index(); + $stream = null; + $messageCounter = 0; - if ($index === null) { - throw new UnexpectedError('Stream index is null, this should not happen.'); - } + try { + $stream = $this->messageStore->load( + new Criteria(new FromIndexCriterion($startIndex)), + ); - foreach ($subscriptions as $subscription) { - if (!$subscription->isBooting()) { - continue; + foreach ($stream as $message) { + $index = $stream->index(); + + if ($index === null) { + throw new UnexpectedError('Stream index is null, this should not happen.'); } - if ($subscription->position() >= $index) { - $this->logger?->debug( - sprintf( - 'Subscription Engine: Subscription "%s" is farther than the current position (%d > %d), continue booting.', - $subscription->id(), - $subscription->position(), - $index, - ), - ); + foreach ($subscriptions as $subscription) { + if (!$subscription->isBooting()) { + continue; + } - continue; - } + if ($subscription->position() >= $index) { + $this->logger?->debug( + sprintf( + 'Subscription Engine: Subscription "%s" is farther than the current position (%d > %d), continue booting.', + $subscription->id(), + $subscription->position(), + $index, + ), + ); - $error = $this->handleMessage($index, $message, $subscription); + continue; + } - if (!$error) { - continue; - } + $error = $this->handleMessage($index, $message, $subscription); - $errors[] = $error; - } + if (!$error) { + continue; + } - $messageCounter++; + $errors[] = $error; + } - $this->logger?->debug( - sprintf( - 'Subscription Engine: Current event stream position for booting: %s', - $index, - ), - ); + $messageCounter++; - if ($limit !== null && $messageCounter >= $limit) { - $this->logger?->info( + $this->logger?->debug( sprintf( - 'Subscription Engine: Message limit (%d) reached, finish booting.', - $limit, + 'Subscription Engine: Current event stream position for booting: %s', + $index, ), ); - return new ProcessedResult( - $messageCounter, - false, - $errors, - ); - } - } - } finally { - $stream?->close(); + if ($limit !== null && $messageCounter >= $limit) { + $this->logger?->info( + sprintf( + 'Subscription Engine: Message limit (%d) reached, finish booting.', + $limit, + ), + ); - if ($messageCounter > 0) { - foreach ($subscriptions as $subscription) { - if (!$subscription->isBooting()) { - continue; + return new ProcessedResult( + $messageCounter, + false, + $errors, + ); } + } + } finally { + $stream?->close(); - $this->subscriptionStore->update($subscription); + if ($messageCounter > 0) { + foreach ($subscriptions as $subscription) { + if (!$subscription->isBooting()) { + continue; + } + + $this->subscriptionStore->update($subscription); + } } } - } - $this->logger?->debug('Subscription Engine: End of stream for booting has been reached.'); + $this->logger?->debug('Subscription Engine: End of stream for booting has been reached.'); - foreach ($subscriptions as $subscription) { - if (!$subscription->isBooting()) { - continue; - } + foreach ($subscriptions as $subscription) { + if (!$subscription->isBooting()) { + continue; + } - if ($subscription->runMode() === RunMode::Once) { - $subscription->finished(); + if ($subscription->runMode() === RunMode::Once) { + $subscription->finished(); + $this->subscriptionStore->update($subscription); + + $this->logger?->info(sprintf( + 'Subscription Engine: Subscription "%s" run only once and has been set to finished.', + $subscription->id(), + )); + + continue; + } + + $subscription->active(); $this->subscriptionStore->update($subscription); $this->logger?->info(sprintf( - 'Subscription Engine: Subscription "%s" run only once and has been set to finished.', + 'Subscription Engine: Subscription "%s" has been set to active after booting.', $subscription->id(), )); - - continue; } - $subscription->active(); - $this->subscriptionStore->update($subscription); + $this->logger?->info('Subscription Engine: Finish booting.'); - $this->logger?->info(sprintf( - 'Subscription Engine: Subscription "%s" has been set to active after booting.', - $subscription->id(), - )); - } - - $this->logger?->info('Subscription Engine: Finish booting.'); - - return new ProcessedResult( - $messageCounter, - true, - $errors, - ); - }, - ); + return new ProcessedResult( + $messageCounter, + true, + $errors, + ); + }, + ); + } finally { + $this->processing = false; + } } public function run( SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null, ): ProcessedResult { - $criteria ??= new SubscriptionEngineCriteria(); + if ($this->processing) { + throw new AlreadyProcessing(); + } - $this->logger?->info('Subscription Engine: Start processing.'); + $this->processing = true; - $this->discoverNewSubscriptions(); - $this->markDetachedSubscriptions($criteria); - $this->retrySubscriptions($criteria); + try { + $criteria ??= new SubscriptionEngineCriteria(); - return $this->findForUpdate( - new SubscriptionCriteria( - ids: $criteria->ids, - groups: $criteria->groups, - status: [Status::Active], - ), - function (array $subscriptions) use ($limit): ProcessedResult { - if (count($subscriptions) === 0) { - $this->logger?->info('Subscription Engine: No subscriptions to process, finish processing.'); + $this->logger?->info('Subscription Engine: Start processing.'); - return new ProcessedResult(0); - } + $this->discoverNewSubscriptions(); + $this->markDetachedSubscriptions($criteria); + $this->retrySubscriptions($criteria); - /** @var list $errors */ - $errors = []; + return $this->findForUpdate( + new SubscriptionCriteria( + ids: $criteria->ids, + groups: $criteria->groups, + status: [Status::Active], + ), + function (array $subscriptions) use ($limit): ProcessedResult { + if (count($subscriptions) === 0) { + $this->logger?->info('Subscription Engine: No subscriptions to process, finish processing.'); + + return new ProcessedResult(0); + } - $startIndex = $this->lowestSubscriptionPosition($subscriptions); + /** @var list $errors */ + $errors = []; - $this->logger?->debug( - sprintf( - 'Subscription Engine: Event stream is processed from position %d.', - $startIndex, - ), - ); + $startIndex = $this->lowestSubscriptionPosition($subscriptions); - $stream = null; - $messageCounter = 0; + $this->logger?->debug( + sprintf( + 'Subscription Engine: Event stream is processed from position %d.', + $startIndex, + ), + ); - try { - $criteria = new Criteria(new FromIndexCriterion($startIndex)); - $stream = $this->messageStore->load($criteria); + $stream = null; + $messageCounter = 0; - foreach ($stream as $message) { - $index = $stream->index(); + try { + $criteria = new Criteria(new FromIndexCriterion($startIndex)); + $stream = $this->messageStore->load($criteria); - if ($index === null) { - throw new UnexpectedError('Stream index is null, this should not happen.'); - } + foreach ($stream as $message) { + $index = $stream->index(); - foreach ($subscriptions as $subscription) { - if (!$subscription->isActive()) { - continue; + if ($index === null) { + throw new UnexpectedError('Stream index is null, this should not happen.'); } - if ($subscription->position() >= $index) { - $this->logger?->debug( - sprintf( - 'Subscription Engine: Subscription "%s" is farther than the current position (%d > %d), continue processing.', - $subscription->id(), - $subscription->position(), - $index, - ), - ); + foreach ($subscriptions as $subscription) { + if (!$subscription->isActive()) { + continue; + } - continue; - } + if ($subscription->position() >= $index) { + $this->logger?->debug( + sprintf( + 'Subscription Engine: Subscription "%s" is farther than the current position (%d > %d), continue processing.', + $subscription->id(), + $subscription->position(), + $index, + ), + ); - $error = $this->handleMessage($index, $message, $subscription); + continue; + } - if (!$error) { - continue; - } + $error = $this->handleMessage($index, $message, $subscription); - $errors[] = $error; - } + if (!$error) { + continue; + } - $messageCounter++; + $errors[] = $error; + } - $this->logger?->debug(sprintf( - 'Subscription Engine: Current event stream position: %s', - $index, - )); + $messageCounter++; - if ($limit !== null && $messageCounter >= $limit) { - $this->logger?->info( - sprintf( - 'Subscription Engine: Message limit (%d) reached, finish processing.', - $limit, - ), - ); + $this->logger?->debug(sprintf( + 'Subscription Engine: Current event stream position: %s', + $index, + )); - return new ProcessedResult($messageCounter, false, $errors); - } - } - } finally { - $endIndex = $stream?->index() ?: $startIndex; - $stream?->close(); - - if ($messageCounter > 0) { - foreach ($subscriptions as $subscription) { - if (!$subscription->isActive()) { - continue; + if ($limit !== null && $messageCounter >= $limit) { + $this->logger?->info( + sprintf( + 'Subscription Engine: Message limit (%d) reached, finish processing.', + $limit, + ), + ); + + return new ProcessedResult($messageCounter, false, $errors); } + } + } finally { + $endIndex = $stream?->index() ?: $startIndex; + $stream?->close(); - $this->subscriptionStore->update($subscription); + if ($messageCounter > 0) { + foreach ($subscriptions as $subscription) { + if (!$subscription->isActive()) { + continue; + } + + $this->subscriptionStore->update($subscription); + } } } - } - foreach ($subscriptions as $subscription) { - if (!$subscription->isActive()) { - continue; - } + foreach ($subscriptions as $subscription) { + if (!$subscription->isActive()) { + continue; + } - if ($subscription->runMode() !== RunMode::Once) { - continue; - } + if ($subscription->runMode() !== RunMode::Once) { + continue; + } - $subscription->finished(); - $this->subscriptionStore->update($subscription); + $subscription->finished(); + $this->subscriptionStore->update($subscription); - $this->logger?->info(sprintf( - 'Subscription Engine: Subscription "%s" run only once and has been set to finished.', - $subscription->id(), - )); - } + $this->logger?->info(sprintf( + 'Subscription Engine: Subscription "%s" run only once and has been set to finished.', + $subscription->id(), + )); + } - $this->logger?->info( - sprintf( - 'Subscription Engine: End of stream on position "%d" has been reached, finish processing.', - $endIndex, - ), - ); + $this->logger?->info( + sprintf( + 'Subscription Engine: End of stream on position "%d" has been reached, finish processing.', + $endIndex, + ), + ); - return new ProcessedResult($messageCounter, true, $errors); - }, - ); + return new ProcessedResult($messageCounter, true, $errors); + }, + ); + } finally { + $this->processing = false; + } } public function teardown(SubscriptionEngineCriteria|null $criteria = null): Result @@ -943,7 +965,7 @@ private function findForUpdate(SubscriptionCriteria $criteria, Closure $closure) } return $this->subscriptionStore->inLock( - /** @return T */ + /** @return T */ function () use ($closure, $criteria): mixed { $subscriptions = $this->subscriptionStore->find($criteria); diff --git a/src/Subscription/Engine/SubscriptionEngine.php b/src/Subscription/Engine/SubscriptionEngine.php index bdba70f1..3e18be63 100644 --- a/src/Subscription/Engine/SubscriptionEngine.php +++ b/src/Subscription/Engine/SubscriptionEngine.php @@ -14,6 +14,7 @@ public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $sk * @param positive-int|null $limit * * @throws SubscriberNotFound + * @throws AlreadyProcessing */ public function boot( SubscriptionEngineCriteria|null $criteria = null, @@ -24,6 +25,7 @@ public function boot( * @param positive-int|null $limit * * @throws SubscriberNotFound + * @throws AlreadyProcessing */ public function run( SubscriptionEngineCriteria|null $criteria = null, diff --git a/src/Subscription/Repository/RunSubscriptionEngineRepository.php b/src/Subscription/Repository/RunSubscriptionEngineRepository.php new file mode 100644 index 00000000..4da7c605 --- /dev/null +++ b/src/Subscription/Repository/RunSubscriptionEngineRepository.php @@ -0,0 +1,63 @@ + + */ +final class RunSubscriptionEngineRepository implements Repository +{ + /** + * @param Repository $repository + * @param list|null $ids + * @param list|null $groups + * @param positive-int|null $limit + */ + public function __construct( + private readonly Repository $repository, + private readonly SubscriptionEngine $engine, + private readonly array|null $ids = null, + private readonly array|null $groups = null, + private readonly int|null $limit = null, + ) { + } + + /** @return T */ + public function load(AggregateRootId $id): AggregateRoot + { + return $this->repository->load($id); + } + + public function has(AggregateRootId $id): bool + { + return $this->repository->has($id); + } + + /** @param T $aggregate */ + public function save(AggregateRoot $aggregate): void + { + $this->repository->save($aggregate); + + try { + $this->engine->run( + new SubscriptionEngineCriteria( + $this->ids, + $this->groups, + ), + $this->limit, + ); + } catch (AlreadyProcessing) { + // do nothing + } + } +} diff --git a/src/Subscription/Repository/RunSubscriptionEngineRepositoryManager.php b/src/Subscription/Repository/RunSubscriptionEngineRepositoryManager.php new file mode 100644 index 00000000..455f6682 --- /dev/null +++ b/src/Subscription/Repository/RunSubscriptionEngineRepositoryManager.php @@ -0,0 +1,45 @@ +|null $ids + * @param list|null $groups + * @param positive-int|null $limit + */ + public function __construct( + private readonly RepositoryManager $repositoryManager, + private readonly SubscriptionEngine $engine, + private readonly array|null $ids = null, + private readonly array|null $groups = null, + private readonly int|null $limit = null, + ) { + } + + /** + * @param class-string $aggregateClass + * + * @return Repository + * + * @template T of AggregateRoot + */ + public function get(string $aggregateClass): Repository + { + return new RunSubscriptionEngineRepository( + $this->repositoryManager->get($aggregateClass), + $this->engine, + $this->ids, + $this->groups, + $this->limit, + ); + } +} diff --git a/tests/Integration/BasicImplementation/BasicIntegrationTest.php b/tests/Integration/BasicImplementation/BasicIntegrationTest.php index b7dc6d68..532dba18 100644 --- a/tests/Integration/BasicImplementation/BasicIntegrationTest.php +++ b/tests/Integration/BasicImplementation/BasicIntegrationTest.php @@ -5,7 +5,6 @@ namespace Patchlevel\EventSourcing\Tests\Integration\BasicImplementation; use Doctrine\DBAL\Connection; -use Patchlevel\EventSourcing\EventBus\DefaultEventBus; use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer; use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry; use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; @@ -15,11 +14,12 @@ use Patchlevel\EventSourcing\Snapshot\DefaultSnapshotStore; use Patchlevel\EventSourcing\Store\DoctrineDbalStore; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Repository\RunSubscriptionEngineRepositoryManager; use Patchlevel\EventSourcing\Subscription\Store\InMemorySubscriptionStore; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; use Patchlevel\EventSourcing\Tests\DbalManager; -use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Listener\SendEmailListener; use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\MessageDecorator\FooMessageDecorator; +use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Processor\SendEmailProcessor; use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Projection\ProfileProjector; use PHPUnit\Framework\TestCase; @@ -54,20 +54,23 @@ public function testSuccessful(): void $engine = new DefaultSubscriptionEngine( $store, new InMemorySubscriptionStore(), - new MetadataSubscriberAccessorRepository([$profileProjector]), + new MetadataSubscriberAccessorRepository([ + $profileProjector, + new SendEmailProcessor(), + ]), ); - $eventBus = DefaultEventBus::create([ - new SendEmailListener(), - ]); - - $manager = new DefaultRepositoryManager( - new AggregateRootRegistry(['profile' => Profile::class]), - $store, - $eventBus, - null, - new FooMessageDecorator(), + $manager = new RunSubscriptionEngineRepositoryManager( + new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + null, + null, + new FooMessageDecorator(), + ), + $engine, ); + $repository = $manager->get(Profile::class); $schemaDirector = new DoctrineSchemaDirector( @@ -76,15 +79,12 @@ public function testSuccessful(): void ); $schemaDirector->create(); - $engine->setup(); - $engine->boot(); + $engine->setup(skipBooting: true); $profileId = ProfileId::generate(); $profile = Profile::create($profileId, 'John'); $repository->save($profile); - $engine->run(); - $result = $this->connection->fetchAssociative( 'SELECT * FROM projection_profile WHERE id = ?', [$profileId->toString()], @@ -95,11 +95,6 @@ public function testSuccessful(): void self::assertSame($profileId->toString(), $result['id']); self::assertSame('John', $result['name']); - $manager = new DefaultRepositoryManager( - new AggregateRootRegistry(['profile' => Profile::class]), - $store, - $eventBus, - ); $repository = $manager->get(Profile::class); $profile = $repository->load($profileId); @@ -125,20 +120,23 @@ public function testSnapshot(): void $engine = new DefaultSubscriptionEngine( $store, new InMemorySubscriptionStore(), - new MetadataSubscriberAccessorRepository([$profileProjection]), + new MetadataSubscriberAccessorRepository([ + $profileProjection, + new SendEmailProcessor(), + ]), ); - $eventBus = DefaultEventBus::create([ - new SendEmailListener(), - ]); - - $manager = new DefaultRepositoryManager( - new AggregateRootRegistry(['profile' => Profile::class]), - $store, - $eventBus, - new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]), - new FooMessageDecorator(), + $manager = new RunSubscriptionEngineRepositoryManager( + new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + null, + new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]), + new FooMessageDecorator(), + ), + $engine, ); + $repository = $manager->get(Profile::class); $schemaDirector = new DoctrineSchemaDirector( @@ -147,15 +145,12 @@ public function testSnapshot(): void ); $schemaDirector->create(); - $engine->setup(); - $engine->boot(); + $engine->setup(skipBooting: true); $profileId = ProfileId::generate(); $profile = Profile::create($profileId, 'John'); $repository->save($profile); - $engine->run(); - $result = $this->connection->fetchAssociative( 'SELECT * FROM projection_profile WHERE id = ?', [$profileId->toString()], @@ -166,11 +161,6 @@ public function testSnapshot(): void self::assertSame($profileId->toString(), $result['id']); self::assertSame('John', $result['name']); - $manager = new DefaultRepositoryManager( - new AggregateRootRegistry(['profile' => Profile::class]), - $store, - $eventBus, - ); $repository = $manager->get(Profile::class); $profile = $repository->load($profileId); diff --git a/tests/Integration/BasicImplementation/Listener/SendEmailListener.php b/tests/Integration/BasicImplementation/Processor/SendEmailProcessor.php similarity index 80% rename from tests/Integration/BasicImplementation/Listener/SendEmailListener.php rename to tests/Integration/BasicImplementation/Processor/SendEmailProcessor.php index 2e10d2b9..855276e3 100644 --- a/tests/Integration/BasicImplementation/Listener/SendEmailListener.php +++ b/tests/Integration/BasicImplementation/Processor/SendEmailProcessor.php @@ -2,14 +2,16 @@ declare(strict_types=1); -namespace Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Listener; +namespace Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Processor; +use Patchlevel\EventSourcing\Attribute\Processor; use Patchlevel\EventSourcing\Attribute\Subscribe; use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Events\ProfileCreated; use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\SendEmailMock; -final class SendEmailListener +#[Processor('send_email')] +final class SendEmailProcessor { #[Subscribe(ProfileCreated::class)] public function onProfileCreated(Message $message): void diff --git a/tests/Unit/Subscription/DummySubscriptionStore.php b/tests/Unit/Subscription/DummySubscriptionStore.php index caf41c80..da97c6d4 100644 --- a/tests/Unit/Subscription/DummySubscriptionStore.php +++ b/tests/Unit/Subscription/DummySubscriptionStore.php @@ -56,4 +56,11 @@ public function remove(Subscription $subscription): void $this->parentStore->remove($subscription); $this->removedSubscriptions[] = clone $subscription; } + + public function reset(): void + { + $this->addedSubscriptions = []; + $this->updatedSubscriptions = []; + $this->removedSubscriptions = []; + } } diff --git a/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php b/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php index 99a58915..a35ba8b0 100644 --- a/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php +++ b/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php @@ -15,7 +15,9 @@ use Patchlevel\EventSourcing\Store\Criteria\Criteria; use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion; use Patchlevel\EventSourcing\Store\Store; +use Patchlevel\EventSourcing\Subscription\Engine\AlreadyProcessing; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria; use Patchlevel\EventSourcing\Subscription\RetryStrategy\RetryStrategy; use Patchlevel\EventSourcing\Subscription\RunMode; @@ -816,6 +818,123 @@ public function handle(Message $message): void self::assertEquals($message1, $subscriber->message); } + public function testBootAlreadyProcessing(): void + { + $subscriptionId = 'test'; + $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] + class { + public SubscriptionEngine|null $engine = null; + + #[Subscribe(ProfileVisited::class)] + public function handle(): void + { + $this->engine?->boot(); + } + }; + + $subscriptionStore = new DummySubscriptionStore([ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Booting, + ), + ]); + + $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $streamableStore = $this->prophesize(Store::class); + $streamableStore->load($this->criteria())->willReturn(new ArrayStream([$message1]))->shouldBeCalledOnce(); + + $engine = new DefaultSubscriptionEngine( + $streamableStore->reveal(), + $subscriptionStore, + new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), + ); + + $subscriber->engine = $engine; + + $result = $engine->boot(); + + self::assertCount(1, $result->errors); + self::assertInstanceOf(AlreadyProcessing::class, $result->errors[0]->throwable); + } + + public function testBootTwice(): void + { + $subscriptionId = 'test'; + $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] + class { + public Message|null $message = null; + + #[Subscribe(ProfileVisited::class)] + public function handle(Message $message): void + { + $this->message = $message; + } + }; + + $subscriptionStore = new DummySubscriptionStore([ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Booting, + ), + ]); + + $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $streamableStore = $this->prophesize(Store::class); + $streamableStore->load($this->criteria())->willReturn(new ArrayStream([$message]))->shouldBeCalledOnce(); + $streamableStore->load($this->criteria(1))->willReturn(new ArrayStream([]))->shouldBeCalledOnce(); + + $engine = new DefaultSubscriptionEngine( + $streamableStore->reveal(), + $subscriptionStore, + new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), + ); + + $result = $engine->boot(limit: 1); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals(false, $result->streamFinished); + self::assertEquals([], $result->errors); + + self::assertEquals([], $subscriptionStore->addedSubscriptions); + + self::assertEquals([ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Booting, + 1, + ), + ], $subscriptionStore->updatedSubscriptions); + + self::assertSame($message, $subscriber->message); + + $subscriptionStore->reset(); + $result = $engine->boot(); + + self::assertEquals(0, $result->processedMessages); + self::assertEquals(true, $result->streamFinished); + self::assertEquals([], $result->errors); + + self::assertEquals([ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Active, + 1, + ), + ], $subscriptionStore->updatedSubscriptions); + } + public function testRunDiscoverNewSubscribers(): void { $subscriptionId = 'test'; @@ -1239,7 +1358,7 @@ public function handle(Message $message): void self::assertSame([$message1, $message2], $subscriber->messages); } - public function testRunnningWithOnlyOnce(): void + public function testRunningWithOnlyOnce(): void { $subscriptionId = 'test'; $subscriber = new #[Subscriber('test', RunMode::Once)] @@ -1300,6 +1419,115 @@ public function handle(Message $message): void self::assertEquals($message1, $subscriber->message); } + public function testRunningAlreadyProcessing(): void + { + $subscriptionId = 'test'; + $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] + class { + public SubscriptionEngine|null $engine = null; + + #[Subscribe(ProfileVisited::class)] + public function handle(): void + { + $this->engine?->run(); + } + }; + + $subscriptionStore = new DummySubscriptionStore([ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Active, + ), + ]); + + $message1 = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $streamableStore = $this->prophesize(Store::class); + $streamableStore->load($this->criteria())->willReturn(new ArrayStream([$message1]))->shouldBeCalledOnce(); + + $engine = new DefaultSubscriptionEngine( + $streamableStore->reveal(), + $subscriptionStore, + new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), + ); + + $subscriber->engine = $engine; + + $result = $engine->run(); + + self::assertCount(1, $result->errors); + self::assertInstanceOf(AlreadyProcessing::class, $result->errors[0]->throwable); + } + + public function testRunningTwice(): void + { + $subscriptionId = 'test'; + $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] + class { + public Message|null $message = null; + + #[Subscribe(ProfileVisited::class)] + public function handle(Message $message): void + { + $this->message = $message; + } + }; + + $subscriptionStore = new DummySubscriptionStore([ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Active, + ), + ]); + + $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $streamableStore = $this->prophesize(Store::class); + $streamableStore->load($this->criteria())->willReturn(new ArrayStream([$message]))->shouldBeCalledOnce(); + $streamableStore->load($this->criteria(1))->willReturn(new ArrayStream([]))->shouldBeCalledOnce(); + + $engine = new DefaultSubscriptionEngine( + $streamableStore->reveal(), + $subscriptionStore, + new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), + ); + + $result = $engine->run(limit: 1); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals(false, $result->streamFinished); + self::assertEquals([], $result->errors); + + self::assertEquals([], $subscriptionStore->addedSubscriptions); + + self::assertEquals([ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Active, + 1, + ), + ], $subscriptionStore->updatedSubscriptions); + + self::assertSame($message, $subscriber->message); + + $subscriptionStore->reset(); + $result = $engine->run(); + + self::assertEquals(0, $result->processedMessages); + self::assertEquals(true, $result->streamFinished); + self::assertEquals([], $result->errors); + + self::assertEquals([], $subscriptionStore->updatedSubscriptions); + } + public function testTeardownDiscoverNewSubscribers(): void { $subscriptionId = 'test'; diff --git a/tests/Unit/Subscription/Repository/RunSubscriptionEngineRepositoryManagerTest.php b/tests/Unit/Subscription/Repository/RunSubscriptionEngineRepositoryManagerTest.php new file mode 100644 index 00000000..dfa021ca --- /dev/null +++ b/tests/Unit/Subscription/Repository/RunSubscriptionEngineRepositoryManagerTest.php @@ -0,0 +1,39 @@ +prophesize(Repository::class)->reveal(); + + $defaultRepositoryManager = $this->prophesize(RepositoryManager::class); + $defaultRepositoryManager->get(Profile::class)->willReturn($defaultRepository)->shouldBeCalledOnce(); + + $engine = $this->prophesize(SubscriptionEngine::class); + + $repository = new RunSubscriptionEngineRepositoryManager( + $defaultRepositoryManager->reveal(), + $engine->reveal(), + ['id1', 'id2'], + ['group1', 'group2'], + 42, + ); + + $repository->get(Profile::class); + } +} diff --git a/tests/Unit/Subscription/Repository/RunSubscriptionEngineRepositoryTest.php b/tests/Unit/Subscription/Repository/RunSubscriptionEngineRepositoryTest.php new file mode 100644 index 00000000..f71c99e5 --- /dev/null +++ b/tests/Unit/Subscription/Repository/RunSubscriptionEngineRepositoryTest.php @@ -0,0 +1,126 @@ +prophesize(Repository::class); + $defaultRepository->load($profileId)->willReturn($aggregate)->shouldBeCalledOnce(); + + $engine = $this->prophesize(SubscriptionEngine::class); + + $repository = new RunSubscriptionEngineRepository( + $defaultRepository->reveal(), + $engine->reveal(), + ['id1', 'id2'], + ['group1', 'group2'], + 42, + ); + + self::assertSame($aggregate, $repository->load($profileId)); + } + + public function testHas(): void + { + $profileId = ProfileId::fromString('id1'); + + $defaultRepository = $this->prophesize(Repository::class); + $defaultRepository->has($profileId)->willReturn(true)->shouldBeCalledOnce(); + + $engine = $this->prophesize(SubscriptionEngine::class); + + $repository = new RunSubscriptionEngineRepository( + $defaultRepository->reveal(), + $engine->reveal(), + ['id1', 'id2'], + ['group1', 'group2'], + 42, + ); + + self::assertSame(true, $repository->has($profileId)); + } + + public function testSave(): void + { + $criteria = new SubscriptionEngineCriteria( + ['id1', 'id2'], + ['group1', 'group2'], + ); + + $aggregate = Profile::createProfile( + ProfileId::fromString('id1'), + Email::fromString('info@patchlevel.de'), + ); + + $defaultRepository = $this->prophesize(Repository::class); + $defaultRepository->save($aggregate)->shouldBeCalledOnce(); + + $engine = $this->prophesize(SubscriptionEngine::class); + $engine->run($criteria, 42)->willReturn(new ProcessedResult(21))->shouldBeCalledOnce(); + + $repository = new RunSubscriptionEngineRepository( + $defaultRepository->reveal(), + $engine->reveal(), + ['id1', 'id2'], + ['group1', 'group2'], + 42, + ); + + $repository->save($aggregate); + } + + public function testSaveWithAlreadyProcessing(): void + { + $criteria = new SubscriptionEngineCriteria( + ['id1', 'id2'], + ['group1', 'group2'], + ); + + $aggregate = Profile::createProfile( + ProfileId::fromString('id1'), + Email::fromString('info@patchlevel.de'), + ); + + $defaultRepository = $this->prophesize(Repository::class); + $defaultRepository->save($aggregate)->shouldBeCalledOnce(); + + $engine = $this->prophesize(SubscriptionEngine::class); + $engine->run($criteria, 42)->willThrow(new AlreadyProcessing())->shouldBeCalledOnce(); + + $repository = new RunSubscriptionEngineRepository( + $defaultRepository->reveal(), + $engine->reveal(), + ['id1', 'id2'], + ['group1', 'group2'], + 42, + ); + + $repository->save($aggregate); + } +}