diff --git a/docs/pages/getting_started.md b/docs/pages/getting_started.md index 022217ee9..dc06d4f18 100644 --- a/docs/pages/getting_started.md +++ b/docs/pages/getting_started.md @@ -275,6 +275,7 @@ use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; use Patchlevel\EventSourcing\Store\DoctrineDbalStore; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Engine\RunSubscriptionEngineEventBus; use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; @@ -311,6 +312,7 @@ $engine = new DefaultSubscriptionEngine( $repositoryManager = new DefaultRepositoryManager( $aggregateRegistry, $eventStore, + new RunSubscriptionEngineEventBus($engine), ); $hotelRepository = $repositoryManager->get(Hotel::class); @@ -375,15 +377,8 @@ $hotel2 = $hotelRepository->load(Uuid::fromString('d0d0d0d0-d0d0-d0d0-d0d0-d0d0d $hotel2->checkIn('David'); $hotelRepository->save($hotel2); -/** @var SubscriptionEngine $engine */ -$engine->run(); - $hotels = $hotelProjection->getHotels(); ``` -!!! warning - - You need to run the subscription engine to update the projections and execute the processors. - !!! note You can also use other forms of IDs such as uuid version 6 or a custom format. diff --git a/src/EventBus/ChainEventBus.php b/src/EventBus/ChainEventBus.php new file mode 100644 index 000000000..16c3d90c8 --- /dev/null +++ b/src/EventBus/ChainEventBus.php @@ -0,0 +1,28 @@ + */ + private readonly array $eventBus; + + public function __construct( + EventBus ...$eventBus, + ) { + $this->eventBus = array_values($eventBus); + } + + public function dispatch(Message ...$messages): void + { + foreach ($this->eventBus as $eventBus) { + $eventBus->dispatch(...$messages); + } + } +} diff --git a/src/Subscription/Engine/AlreadyProcessing.php b/src/Subscription/Engine/AlreadyProcessing.php new file mode 100644 index 000000000..89f2d3f0e --- /dev/null +++ b/src/Subscription/Engine/AlreadyProcessing.php @@ -0,0 +1,15 @@ +processing) { + throw new AlreadyProcessing(); + } - $this->logger?->info( - 'Subscription Engine: Start booting.', - ); + $this->processing = true; - $this->discoverNewSubscriptions(); - $this->retrySubscriptions($criteria); + try { + $criteria ??= new SubscriptionEngineCriteria(); - return $this->findForUpdate( - new SubscriptionCriteria( - ids: $criteria->ids, - groups: $criteria->groups, - status: [Status::Booting], - ), - function ($subscriptions) use ($limit): ProcessedResult { - if (count($subscriptions) === 0) { - $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); + $this->logger?->info( + 'Subscription Engine: Start booting.', + ); - return new ProcessedResult(0); - } + $this->discoverNewSubscriptions(); + $this->retrySubscriptions($criteria); - /** @var list $errors */ - $errors = []; + return $this->findForUpdate( + new SubscriptionCriteria( + ids: $criteria->ids, + groups: $criteria->groups, + status: [Status::Booting], + ), + function ($subscriptions) use ($limit): ProcessedResult { + if (count($subscriptions) === 0) { + $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); - $startIndex = $this->lowestSubscriptionPosition($subscriptions); + return new ProcessedResult(0); + } - $this->logger?->debug( - sprintf( - 'Subscription Engine: Event stream is processed for booting from position %s.', - $startIndex, - ), - ); + /** @var list $errors */ + $errors = []; - $stream = null; - $messageCounter = 0; + $startIndex = $this->lowestSubscriptionPosition($subscriptions); - try { - $stream = $this->messageStore->load( - new Criteria(new FromIndexCriterion($startIndex)), + $this->logger?->debug( + sprintf( + 'Subscription Engine: Event stream is processed for booting from position %s.', + $startIndex, + ), ); - foreach ($stream as $message) { - $index = $stream->index(); + $stream = null; + $messageCounter = 0; - if ($index === null) { - throw new UnexpectedError('Stream index is null, this should not happen.'); - } + try { + $stream = $this->messageStore->load( + new Criteria(new FromIndexCriterion($startIndex)), + ); - foreach ($subscriptions as $subscription) { - if (!$subscription->isBooting()) { - continue; + foreach ($stream as $message) { + $index = $stream->index(); + + if ($index === null) { + throw new UnexpectedError('Stream index is null, this should not happen.'); } - if ($subscription->position() >= $index) { - $this->logger?->debug( - sprintf( - 'Subscription Engine: Subscription "%s" is farther than the current position (%d > %d), continue booting.', - $subscription->id(), - $subscription->position(), - $index, - ), - ); + foreach ($subscriptions as $subscription) { + if (!$subscription->isBooting()) { + continue; + } - continue; - } + if ($subscription->position() >= $index) { + $this->logger?->debug( + sprintf( + 'Subscription Engine: Subscription "%s" is farther than the current position (%d > %d), continue booting.', + $subscription->id(), + $subscription->position(), + $index, + ), + ); - $error = $this->handleMessage($index, $message, $subscription); + continue; + } - if (!$error) { - continue; - } + $error = $this->handleMessage($index, $message, $subscription); - $errors[] = $error; - } + if (!$error) { + continue; + } - $messageCounter++; + $errors[] = $error; + } - $this->logger?->debug( - sprintf( - 'Subscription Engine: Current event stream position for booting: %s', - $index, - ), - ); + $messageCounter++; - if ($limit !== null && $messageCounter >= $limit) { - $this->logger?->info( + $this->logger?->debug( sprintf( - 'Subscription Engine: Message limit (%d) reached, finish booting.', - $limit, + 'Subscription Engine: Current event stream position for booting: %s', + $index, ), ); - return new ProcessedResult( - $messageCounter, - false, - $errors, - ); - } - } - } finally { - $stream?->close(); + if ($limit !== null && $messageCounter >= $limit) { + $this->logger?->info( + sprintf( + 'Subscription Engine: Message limit (%d) reached, finish booting.', + $limit, + ), + ); - if ($messageCounter > 0) { - foreach ($subscriptions as $subscription) { - if (!$subscription->isBooting()) { - continue; + return new ProcessedResult( + $messageCounter, + false, + $errors, + ); } + } + } finally { + $stream?->close(); - $this->subscriptionStore->update($subscription); + if ($messageCounter > 0) { + foreach ($subscriptions as $subscription) { + if (!$subscription->isBooting()) { + continue; + } + + $this->subscriptionStore->update($subscription); + } } } - } - $this->logger?->debug('Subscription Engine: End of stream for booting has been reached.'); + $this->logger?->debug('Subscription Engine: End of stream for booting has been reached.'); - foreach ($subscriptions as $subscription) { - if (!$subscription->isBooting()) { - continue; - } + foreach ($subscriptions as $subscription) { + if (!$subscription->isBooting()) { + continue; + } - if ($subscription->runMode() === RunMode::Once) { - $subscription->finished(); + if ($subscription->runMode() === RunMode::Once) { + $subscription->finished(); + $this->subscriptionStore->update($subscription); + + $this->logger?->info(sprintf( + 'Subscription Engine: Subscription "%s" run only once and has been set to finished.', + $subscription->id(), + )); + + continue; + } + + $subscription->active(); $this->subscriptionStore->update($subscription); $this->logger?->info(sprintf( - 'Subscription Engine: Subscription "%s" run only once and has been set to finished.', + 'Subscription Engine: Subscription "%s" has been set to active after booting.', $subscription->id(), )); - - continue; } - $subscription->active(); - $this->subscriptionStore->update($subscription); + $this->logger?->info('Subscription Engine: Finish booting.'); - $this->logger?->info(sprintf( - 'Subscription Engine: Subscription "%s" has been set to active after booting.', - $subscription->id(), - )); - } - - $this->logger?->info('Subscription Engine: Finish booting.'); - - return new ProcessedResult( - $messageCounter, - true, - $errors, - ); - }, - ); + return new ProcessedResult( + $messageCounter, + true, + $errors, + ); + }, + ); + } finally { + $this->processing = false; + } } public function run( SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null, ): ProcessedResult { - $criteria ??= new SubscriptionEngineCriteria(); + if ($this->processing) { + throw new AlreadyProcessing(); + } - $this->logger?->info('Subscription Engine: Start processing.'); + $this->processing = true; - $this->discoverNewSubscriptions(); - $this->markDetachedSubscriptions($criteria); - $this->retrySubscriptions($criteria); + try { + $criteria ??= new SubscriptionEngineCriteria(); - return $this->findForUpdate( - new SubscriptionCriteria( - ids: $criteria->ids, - groups: $criteria->groups, - status: [Status::Active], - ), - function (array $subscriptions) use ($limit): ProcessedResult { - if (count($subscriptions) === 0) { - $this->logger?->info('Subscription Engine: No subscriptions to process, finish processing.'); + $this->logger?->info('Subscription Engine: Start processing.'); - return new ProcessedResult(0); - } + $this->discoverNewSubscriptions(); + $this->markDetachedSubscriptions($criteria); + $this->retrySubscriptions($criteria); - /** @var list $errors */ - $errors = []; + return $this->findForUpdate( + new SubscriptionCriteria( + ids: $criteria->ids, + groups: $criteria->groups, + status: [Status::Active], + ), + function (array $subscriptions) use ($limit): ProcessedResult { + if (count($subscriptions) === 0) { + $this->logger?->info('Subscription Engine: No subscriptions to process, finish processing.'); + + return new ProcessedResult(0); + } - $startIndex = $this->lowestSubscriptionPosition($subscriptions); + /** @var list $errors */ + $errors = []; - $this->logger?->debug( - sprintf( - 'Subscription Engine: Event stream is processed from position %d.', - $startIndex, - ), - ); + $startIndex = $this->lowestSubscriptionPosition($subscriptions); - $stream = null; - $messageCounter = 0; + $this->logger?->debug( + sprintf( + 'Subscription Engine: Event stream is processed from position %d.', + $startIndex, + ), + ); - try { - $criteria = new Criteria(new FromIndexCriterion($startIndex)); - $stream = $this->messageStore->load($criteria); + $stream = null; + $messageCounter = 0; - foreach ($stream as $message) { - $index = $stream->index(); + try { + $criteria = new Criteria(new FromIndexCriterion($startIndex)); + $stream = $this->messageStore->load($criteria); - if ($index === null) { - throw new UnexpectedError('Stream index is null, this should not happen.'); - } + foreach ($stream as $message) { + $index = $stream->index(); - foreach ($subscriptions as $subscription) { - if (!$subscription->isActive()) { - continue; + if ($index === null) { + throw new UnexpectedError('Stream index is null, this should not happen.'); } - if ($subscription->position() >= $index) { - $this->logger?->debug( - sprintf( - 'Subscription Engine: Subscription "%s" is farther than the current position (%d > %d), continue processing.', - $subscription->id(), - $subscription->position(), - $index, - ), - ); + foreach ($subscriptions as $subscription) { + if (!$subscription->isActive()) { + continue; + } - continue; - } + if ($subscription->position() >= $index) { + $this->logger?->debug( + sprintf( + 'Subscription Engine: Subscription "%s" is farther than the current position (%d > %d), continue processing.', + $subscription->id(), + $subscription->position(), + $index, + ), + ); - $error = $this->handleMessage($index, $message, $subscription); + continue; + } - if (!$error) { - continue; - } + $error = $this->handleMessage($index, $message, $subscription); - $errors[] = $error; - } + if (!$error) { + continue; + } - $messageCounter++; + $errors[] = $error; + } - $this->logger?->debug(sprintf( - 'Subscription Engine: Current event stream position: %s', - $index, - )); + $messageCounter++; - if ($limit !== null && $messageCounter >= $limit) { - $this->logger?->info( - sprintf( - 'Subscription Engine: Message limit (%d) reached, finish processing.', - $limit, - ), - ); + $this->logger?->debug(sprintf( + 'Subscription Engine: Current event stream position: %s', + $index, + )); - return new ProcessedResult($messageCounter, false, $errors); - } - } - } finally { - $endIndex = $stream?->index() ?: $startIndex; - $stream?->close(); - - if ($messageCounter > 0) { - foreach ($subscriptions as $subscription) { - if (!$subscription->isActive()) { - continue; + if ($limit !== null && $messageCounter >= $limit) { + $this->logger?->info( + sprintf( + 'Subscription Engine: Message limit (%d) reached, finish processing.', + $limit, + ), + ); + + return new ProcessedResult($messageCounter, false, $errors); } + } + } finally { + $endIndex = $stream?->index() ?: $startIndex; + $stream?->close(); - $this->subscriptionStore->update($subscription); + if ($messageCounter > 0) { + foreach ($subscriptions as $subscription) { + if (!$subscription->isActive()) { + continue; + } + + $this->subscriptionStore->update($subscription); + } } } - } - foreach ($subscriptions as $subscription) { - if (!$subscription->isActive()) { - continue; - } + foreach ($subscriptions as $subscription) { + if (!$subscription->isActive()) { + continue; + } - if ($subscription->runMode() !== RunMode::Once) { - continue; - } + if ($subscription->runMode() !== RunMode::Once) { + continue; + } - $subscription->finished(); - $this->subscriptionStore->update($subscription); + $subscription->finished(); + $this->subscriptionStore->update($subscription); - $this->logger?->info(sprintf( - 'Subscription Engine: Subscription "%s" run only once and has been set to finished.', - $subscription->id(), - )); - } + $this->logger?->info(sprintf( + 'Subscription Engine: Subscription "%s" run only once and has been set to finished.', + $subscription->id(), + )); + } - $this->logger?->info( - sprintf( - 'Subscription Engine: End of stream on position "%d" has been reached, finish processing.', - $endIndex, - ), - ); + $this->logger?->info( + sprintf( + 'Subscription Engine: End of stream on position "%d" has been reached, finish processing.', + $endIndex, + ), + ); - return new ProcessedResult($messageCounter, true, $errors); - }, - ); + return new ProcessedResult($messageCounter, true, $errors); + }, + ); + } finally { + $this->processing = false; + } } public function teardown(SubscriptionEngineCriteria|null $criteria = null): Result @@ -943,7 +965,7 @@ private function findForUpdate(SubscriptionCriteria $criteria, Closure $closure) } return $this->subscriptionStore->inLock( - /** @return T */ + /** @return T */ function () use ($closure, $criteria): mixed { $subscriptions = $this->subscriptionStore->find($criteria); diff --git a/src/Subscription/Engine/RunSubscriptionEngineEventBus.php b/src/Subscription/Engine/RunSubscriptionEngineEventBus.php new file mode 100644 index 000000000..872de070a --- /dev/null +++ b/src/Subscription/Engine/RunSubscriptionEngineEventBus.php @@ -0,0 +1,39 @@ +|null $ids + * @param list|null $groups + * @param positive-int|null $limit + */ + public function __construct( + private readonly SubscriptionEngine $engine, + private readonly array|null $ids = null, + private readonly array|null $groups = null, + private readonly int|null $limit = null, + ) { + } + + public function dispatch(Message ...$messages): void + { + try { + $this->engine->run( + new SubscriptionEngineCriteria( + $this->ids, + $this->groups, + ), + $this->limit, + ); + } catch (AlreadyProcessing) { + // do nothing + } + } +} diff --git a/src/Subscription/Engine/SubscriptionEngine.php b/src/Subscription/Engine/SubscriptionEngine.php index bdba70f12..3e18be635 100644 --- a/src/Subscription/Engine/SubscriptionEngine.php +++ b/src/Subscription/Engine/SubscriptionEngine.php @@ -14,6 +14,7 @@ public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $sk * @param positive-int|null $limit * * @throws SubscriberNotFound + * @throws AlreadyProcessing */ public function boot( SubscriptionEngineCriteria|null $criteria = null, @@ -24,6 +25,7 @@ public function boot( * @param positive-int|null $limit * * @throws SubscriberNotFound + * @throws AlreadyProcessing */ public function run( SubscriptionEngineCriteria|null $criteria = null, diff --git a/tests/Integration/BasicImplementation/BasicIntegrationTest.php b/tests/Integration/BasicImplementation/BasicIntegrationTest.php index b7dc6d68a..f76bb3cb4 100644 --- a/tests/Integration/BasicImplementation/BasicIntegrationTest.php +++ b/tests/Integration/BasicImplementation/BasicIntegrationTest.php @@ -5,7 +5,6 @@ namespace Patchlevel\EventSourcing\Tests\Integration\BasicImplementation; use Doctrine\DBAL\Connection; -use Patchlevel\EventSourcing\EventBus\DefaultEventBus; use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer; use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry; use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; @@ -15,11 +14,12 @@ use Patchlevel\EventSourcing\Snapshot\DefaultSnapshotStore; use Patchlevel\EventSourcing\Store\DoctrineDbalStore; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Engine\RunSubscriptionEngineEventBus; use Patchlevel\EventSourcing\Subscription\Store\InMemorySubscriptionStore; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; use Patchlevel\EventSourcing\Tests\DbalManager; -use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Listener\SendEmailListener; use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\MessageDecorator\FooMessageDecorator; +use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Processor\SendEmailProcessor; use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Projection\ProfileProjector; use PHPUnit\Framework\TestCase; @@ -54,12 +54,13 @@ public function testSuccessful(): void $engine = new DefaultSubscriptionEngine( $store, new InMemorySubscriptionStore(), - new MetadataSubscriberAccessorRepository([$profileProjector]), + new MetadataSubscriberAccessorRepository([ + $profileProjector, + new SendEmailProcessor(), + ]), ); - $eventBus = DefaultEventBus::create([ - new SendEmailListener(), - ]); + $eventBus = new RunSubscriptionEngineEventBus($engine); $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['profile' => Profile::class]), @@ -76,15 +77,12 @@ public function testSuccessful(): void ); $schemaDirector->create(); - $engine->setup(); - $engine->boot(); + $engine->setup(skipBooting: true); $profileId = ProfileId::generate(); $profile = Profile::create($profileId, 'John'); $repository->save($profile); - $engine->run(); - $result = $this->connection->fetchAssociative( 'SELECT * FROM projection_profile WHERE id = ?', [$profileId->toString()], @@ -125,12 +123,13 @@ public function testSnapshot(): void $engine = new DefaultSubscriptionEngine( $store, new InMemorySubscriptionStore(), - new MetadataSubscriberAccessorRepository([$profileProjection]), + new MetadataSubscriberAccessorRepository([ + $profileProjection, + new SendEmailProcessor(), + ]), ); - $eventBus = DefaultEventBus::create([ - new SendEmailListener(), - ]); + $eventBus = new RunSubscriptionEngineEventBus($engine); $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['profile' => Profile::class]), @@ -147,15 +146,12 @@ public function testSnapshot(): void ); $schemaDirector->create(); - $engine->setup(); - $engine->boot(); + $engine->setup(skipBooting: true); $profileId = ProfileId::generate(); $profile = Profile::create($profileId, 'John'); $repository->save($profile); - $engine->run(); - $result = $this->connection->fetchAssociative( 'SELECT * FROM projection_profile WHERE id = ?', [$profileId->toString()], diff --git a/tests/Integration/BasicImplementation/Listener/SendEmailListener.php b/tests/Integration/BasicImplementation/Processor/SendEmailProcessor.php similarity index 80% rename from tests/Integration/BasicImplementation/Listener/SendEmailListener.php rename to tests/Integration/BasicImplementation/Processor/SendEmailProcessor.php index 2e10d2b9b..855276e3b 100644 --- a/tests/Integration/BasicImplementation/Listener/SendEmailListener.php +++ b/tests/Integration/BasicImplementation/Processor/SendEmailProcessor.php @@ -2,14 +2,16 @@ declare(strict_types=1); -namespace Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Listener; +namespace Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Processor; +use Patchlevel\EventSourcing\Attribute\Processor; use Patchlevel\EventSourcing\Attribute\Subscribe; use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Events\ProfileCreated; use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\SendEmailMock; -final class SendEmailListener +#[Processor('send_email')] +final class SendEmailProcessor { #[Subscribe(ProfileCreated::class)] public function onProfileCreated(Message $message): void diff --git a/tests/Unit/EventBus/ChainEventBusTest.php b/tests/Unit/EventBus/ChainEventBusTest.php new file mode 100644 index 000000000..78d58529c --- /dev/null +++ b/tests/Unit/EventBus/ChainEventBusTest.php @@ -0,0 +1,43 @@ +prophesize(EventBus::class); + $eventBus2 = $this->prophesize(EventBus::class); + + $eventBus1->dispatch($message)->shouldBeCalled(); + $eventBus2->dispatch($message)->shouldBeCalled(); + + $chainEventBus = new ChainEventBus( + $eventBus1->reveal(), + $eventBus2->reveal(), + ); + + $chainEventBus->dispatch($message); + } +} diff --git a/tests/Unit/Subscription/Engine/RunSubscriptionEngineEventBusTest.php b/tests/Unit/Subscription/Engine/RunSubscriptionEngineEventBusTest.php new file mode 100644 index 000000000..dae67acb8 --- /dev/null +++ b/tests/Unit/Subscription/Engine/RunSubscriptionEngineEventBusTest.php @@ -0,0 +1,39 @@ +prophesize(SubscriptionEngine::class); + + $criteria = new SubscriptionEngineCriteria( + ['id1', 'id2'], + ['group1', 'group2'], + ); + + $engine->run($criteria, 42)->willReturn(new ProcessedResult(21))->shouldBeCalledOnce(); + + $eventBus = new RunSubscriptionEngineEventBus( + $engine->reveal(), + ['id1', 'id2'], + ['group1', 'group2'], + 42, + ); + + $eventBus->dispatch(); + } +}