diff --git a/docs/pages/cli.md b/docs/pages/cli.md index a2516ff83..a1ac7320d 100644 --- a/docs/pages/cli.md +++ b/docs/pages/cli.md @@ -80,7 +80,7 @@ $schemaDirector = new DoctrineSchemaDirector($connection, $store); $cli->addCommands([ new Command\DatabaseCreateCommand($connection, $doctrineHelper), new Command\DatabaseDropCommand($connection, $doctrineHelper), - new Command\SubscriptionBootCommand($subscriptionEngine, $store), + new Command\SubscriptionBootCommand($subscriptionEngine), new Command\SubscriptionPauseCommand($subscriptionEngine), new Command\SubscriptionRunCommand($subscriptionEngine, $store), new Command\SubscriptionTeardownCommand($subscriptionEngine), diff --git a/src/Console/Command/SubscriptionBootCommand.php b/src/Console/Command/SubscriptionBootCommand.php index c0e42b106..951832b5b 100644 --- a/src/Console/Command/SubscriptionBootCommand.php +++ b/src/Console/Command/SubscriptionBootCommand.php @@ -6,10 +6,6 @@ use Closure; use Patchlevel\EventSourcing\Console\InputHelper; -use Patchlevel\EventSourcing\Store\Store; -use Patchlevel\EventSourcing\Store\SubscriptionStore; -use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine; -use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria; use Patchlevel\Worker\DefaultWorker; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Input\InputInterface; @@ -23,13 +19,6 @@ )] final class SubscriptionBootCommand extends SubscriptionCommand { - public function __construct( - SubscriptionEngine $engine, - private readonly Store $store, - ) { - parent::__construct($engine); - } - public function configure(): void { parent::configure(); @@ -86,34 +75,23 @@ protected function execute(InputInterface $input, OutputInterface $output): int $criteria = $this->subscriptionEngineCriteria($input); $criteria = $this->resolveCriteriaIntoCriteriaWithOnlyIds($criteria); - if ($this->store instanceof SubscriptionStore) { - $this->store->setupSubscription(); - } - if ($setup) { $this->engine->setup($criteria); } $logger = new ConsoleLogger($output); - $finished = false; $worker = DefaultWorker::create( - function (Closure $stop) use ($criteria, $messageLimit, &$finished, $sleep): void { - $this->engine->boot($criteria, $messageLimit); - - if ($this->isBootingFinished($criteria)) { - $finished = true; - $stop(); + function (Closure $stop) use ($criteria, $messageLimit, &$finished): void { + $result = $this->engine->boot($criteria, $messageLimit); + if (!$result->streamFinished) { return; } - if (!$this->store instanceof SubscriptionStore) { - return; - } - - $this->store->wait($sleep); + $finished = true; + $stop(); }, [ 'runLimit' => $runLimit, @@ -123,22 +101,8 @@ function (Closure $stop) use ($criteria, $messageLimit, &$finished, $sleep): voi $logger, ); - $supportSubscription = $this->store instanceof SubscriptionStore && $this->store->supportSubscription(); - $worker->run($supportSubscription ? 0 : $sleep); + $worker->run($sleep); return $finished ? 0 : 1; } - - private function isBootingFinished(SubscriptionEngineCriteria $criteria): bool - { - $subscriptions = $this->engine->subscriptions($criteria); - - foreach ($subscriptions as $subscription) { - if ($subscription->isBooting()) { - return false; - } - } - - return true; - } } diff --git a/src/Subscription/Engine/DefaultSubscriptionEngine.php b/src/Subscription/Engine/DefaultSubscriptionEngine.php index c1fbddd9b..662123d82 100644 --- a/src/Subscription/Engine/DefaultSubscriptionEngine.php +++ b/src/Subscription/Engine/DefaultSubscriptionEngine.php @@ -36,7 +36,7 @@ public function __construct( ) { } - public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $skipBooting = false): void + public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $skipBooting = false): Result { $criteria ??= new SubscriptionEngineCriteria(); @@ -47,19 +47,22 @@ public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $sk $this->discoverNewSubscriptions(); $this->retrySubscriptions($criteria); - $this->findForUpdate( + return $this->findForUpdate( new SubscriptionCriteria( ids: $criteria->ids, groups: $criteria->groups, status: [Status::New], ), - function (array $subscriptions) use ($skipBooting): void { + function (array $subscriptions) use ($skipBooting): Result { if (count($subscriptions) === 0) { $this->logger?->info('Subscription Engine: No subscriptions to setup, finish setup.'); - return; + return new Result(); } + /** @var list $errors */ + $errors = []; + $latestIndex = $this->latestIndex(); foreach ($subscriptions as $subscription) { @@ -118,8 +121,16 @@ function (array $subscriptions) use ($skipBooting): void { )); $this->handleError($subscription, $e); + + $errors[] = new Error( + $subscription->id(), + $e->getMessage(), + $e, + ); } } + + return new Result($errors); }, ); } @@ -127,7 +138,7 @@ function (array $subscriptions) use ($skipBooting): void { public function boot( SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null, - ): void { + ): ProcessedResult { $criteria ??= new SubscriptionEngineCriteria(); $this->logger?->info( @@ -137,19 +148,22 @@ public function boot( $this->discoverNewSubscriptions(); $this->retrySubscriptions($criteria); - $this->findForUpdate( + return $this->findForUpdate( new SubscriptionCriteria( ids: $criteria->ids, groups: $criteria->groups, status: [Status::Booting], ), - function ($subscriptions) use ($limit): void { + function ($subscriptions) use ($limit): ProcessedResult { if (count($subscriptions) === 0) { $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); - return; + return new ProcessedResult(0); } + /** @var list $errors */ + $errors = []; + $startIndex = $this->lowestSubscriptionPosition($subscriptions); $this->logger?->debug( @@ -192,7 +206,13 @@ function ($subscriptions) use ($limit): void { continue; } - $this->handleMessage($index, $message, $subscription); + $error = $this->handleMessage($index, $message, $subscription); + + if (!$error) { + continue; + } + + $errors[] = $error; } $messageCounter++; @@ -212,7 +232,11 @@ function ($subscriptions) use ($limit): void { ), ); - return; + return new ProcessedResult( + $messageCounter, + false, + $errors, + ); } } } finally { @@ -258,6 +282,12 @@ function ($subscriptions) use ($limit): void { } $this->logger?->info('Subscription Engine: Finish booting.'); + + return new ProcessedResult( + $messageCounter, + true, + $errors, + ); }, ); } @@ -265,7 +295,7 @@ function ($subscriptions) use ($limit): void { public function run( SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null, - ): void { + ): ProcessedResult { $criteria ??= new SubscriptionEngineCriteria(); $this->logger?->info('Subscription Engine: Start processing.'); @@ -274,19 +304,22 @@ public function run( $this->markDetachedSubscriptions($criteria); $this->retrySubscriptions($criteria); - $this->findForUpdate( + return $this->findForUpdate( new SubscriptionCriteria( ids: $criteria->ids, groups: $criteria->groups, status: [Status::Active], ), - function (array $subscriptions) use ($limit): void { + function (array $subscriptions) use ($limit): ProcessedResult { if (count($subscriptions) === 0) { $this->logger?->info('Subscription Engine: No subscriptions to process, finish processing.'); - return; + return new ProcessedResult(0); } + /** @var list $errors */ + $errors = []; + $startIndex = $this->lowestSubscriptionPosition($subscriptions); $this->logger?->debug( @@ -328,12 +361,21 @@ function (array $subscriptions) use ($limit): void { continue; } - $this->handleMessage($index, $message, $subscription); + $error = $this->handleMessage($index, $message, $subscription); + + if (!$error) { + continue; + } + + $errors[] = $error; } $messageCounter++; - $this->logger?->debug(sprintf('Subscription Engine: Current event stream position: %s', $index)); + $this->logger?->debug(sprintf( + 'Subscription Engine: Current event stream position: %s', + $index, + )); if ($limit !== null && $messageCounter >= $limit) { $this->logger?->info( @@ -343,7 +385,7 @@ function (array $subscriptions) use ($limit): void { ), ); - return; + return new ProcessedResult($messageCounter, false, $errors); } } } finally { @@ -385,11 +427,13 @@ function (array $subscriptions) use ($limit): void { $endIndex, ), ); + + return new ProcessedResult($messageCounter, true, $errors); }, ); } - public function teardown(SubscriptionEngineCriteria|null $criteria = null): void + public function teardown(SubscriptionEngineCriteria|null $criteria = null): Result { $criteria ??= new SubscriptionEngineCriteria(); @@ -397,13 +441,16 @@ public function teardown(SubscriptionEngineCriteria|null $criteria = null): void $this->logger?->info('Subscription Engine: Start teardown detached subscriptions.'); - $this->findForUpdate( + return $this->findForUpdate( new SubscriptionCriteria( ids: $criteria->ids, groups: $criteria->groups, status: [Status::Detached], ), - function (array $subscriptions): void { + function (array $subscriptions): Result { + /** @var list $errors */ + $errors = []; + foreach ($subscriptions as $subscription) { $subscriber = $this->subscriber($subscription->id()); @@ -451,6 +498,13 @@ function (array $subscriptions): void { $e->getMessage(), ), ); + + $errors[] = new Error( + $subscription->id(), + $e->getMessage(), + $e, + ); + continue; } @@ -465,22 +519,27 @@ function (array $subscriptions): void { } $this->logger?->info('Subscription Engine: Finish teardown.'); + + return new Result($errors); }, ); } - public function remove(SubscriptionEngineCriteria|null $criteria = null): void + public function remove(SubscriptionEngineCriteria|null $criteria = null): Result { $criteria ??= new SubscriptionEngineCriteria(); $this->discoverNewSubscriptions(); - $this->findForUpdate( + return $this->findForUpdate( new SubscriptionCriteria( ids: $criteria->ids, groups: $criteria->groups, ), - function (array $subscriptions): void { + function (array $subscriptions): Result { + /** @var list $errors */ + $errors = []; + foreach ($subscriptions as $subscription) { $subscriber = $this->subscriber($subscription->id()); @@ -519,6 +578,12 @@ function (array $subscriptions): void { $e->getMessage(), ), ); + + $errors[] = new Error( + $subscription->id(), + $e->getMessage(), + $e, + ); } $this->subscriptionStore->remove($subscription); @@ -527,17 +592,19 @@ function (array $subscriptions): void { sprintf('Subscription Engine: Subscription "%s" removed.', $subscription->id()), ); } + + return new Result($errors); }, ); } - public function reactivate(SubscriptionEngineCriteria|null $criteria = null): void + public function reactivate(SubscriptionEngineCriteria|null $criteria = null): Result { $criteria ??= new SubscriptionEngineCriteria(); $this->discoverNewSubscriptions(); - $this->findForUpdate( + return $this->findForUpdate( new SubscriptionCriteria( ids: $criteria->ids, groups: $criteria->groups, @@ -548,14 +615,16 @@ public function reactivate(SubscriptionEngineCriteria|null $criteria = null): vo Status::Finished, ], ), - function (array $subscriptions): void { - /** @var Subscription $subscription */ + function (array $subscriptions): Result { foreach ($subscriptions as $subscription) { $subscriber = $this->subscriber($subscription->id()); if (!$subscriber) { $this->logger?->debug( - sprintf('Subscription Engine: Subscriber for "%s" not found, skipped.', $subscription->id()), + sprintf( + 'Subscription Engine: Subscriber for "%s" not found, skipped.', + $subscription->id(), + ), ); continue; @@ -587,17 +656,19 @@ function (array $subscriptions): void { $subscription->id(), )); } + + return new Result(); }, ); } - public function pause(SubscriptionEngineCriteria|null $criteria = null): void + public function pause(SubscriptionEngineCriteria|null $criteria = null): Result { $criteria ??= new SubscriptionEngineCriteria(); $this->discoverNewSubscriptions(); - $this->findForUpdate( + return $this->findForUpdate( new SubscriptionCriteria( ids: $criteria->ids, groups: $criteria->groups, @@ -607,14 +678,17 @@ public function pause(SubscriptionEngineCriteria|null $criteria = null): void Status::Error, ], ), - function (array $subscriptions): void { + function (array $subscriptions): Result { /** @var Subscription $subscription */ foreach ($subscriptions as $subscription) { $subscriber = $this->subscriber($subscription->id()); if (!$subscriber) { $this->logger?->debug( - sprintf('Subscription Engine: Subscriber for "%s" not found, skipped.', $subscription->id()), + sprintf( + 'Subscription Engine: Subscriber for "%s" not found, skipped.', + $subscription->id(), + ), ); continue; @@ -629,6 +703,8 @@ function (array $subscriptions): void { $subscription->id(), )); } + + return new Result(); }, ); } @@ -648,7 +724,7 @@ public function subscriptions(SubscriptionEngineCriteria|null $criteria = null): ); } - private function handleMessage(int $index, Message $message, Subscription $subscription): void + private function handleMessage(int $index, Message $message, Subscription $subscription): Error|null { $subscriber = $this->subscriber($subscription->id()); @@ -670,7 +746,7 @@ private function handleMessage(int $index, Message $message, Subscription $subsc ), ); - return; + return null; } try { @@ -690,7 +766,11 @@ private function handleMessage(int $index, Message $message, Subscription $subsc $this->handleError($subscription, $e); - return; + return new Error( + $subscription->id(), + $e->getMessage(), + $e, + ); } $subscription->changePosition($index); @@ -704,6 +784,8 @@ private function handleMessage(int $index, Message $message, Subscription $subsc $message->event()::class, ), ); + + return null; } private function subscriber(string $subscriberId): SubscriberAccessor|null @@ -846,20 +928,27 @@ private function lowestSubscriptionPosition(array $subscriptions): int return $min; } - /** @param Closure(list):void $closure */ - private function findForUpdate(SubscriptionCriteria $criteria, Closure $closure): void + /** + * @param Closure(list):T $closure + * + * @return T + * + * @template T + */ + private function findForUpdate(SubscriptionCriteria $criteria, Closure $closure): mixed { if (!$this->subscriptionStore instanceof LockableSubscriptionStore) { - $closure($this->subscriptionStore->find($criteria)); - - return; + return $closure($this->subscriptionStore->find($criteria)); } - $this->subscriptionStore->inLock(function () use ($closure, $criteria): void { - $subscriptions = $this->subscriptionStore->find($criteria); + return $this->subscriptionStore->inLock( + /** @return T */ + function () use ($closure, $criteria): mixed { + $subscriptions = $this->subscriptionStore->find($criteria); - $closure($subscriptions); - }); + return $closure($subscriptions); + }, + ); } private function handleError(Subscription $subscription, Throwable $throwable): void diff --git a/src/Subscription/Engine/Error.php b/src/Subscription/Engine/Error.php new file mode 100644 index 000000000..c28021a6a --- /dev/null +++ b/src/Subscription/Engine/Error.php @@ -0,0 +1,17 @@ + $errors */ + public function __construct( + public readonly int $processedMessages, + public readonly bool $streamFinished = false, + public readonly array $errors = [], + ) { + } +} diff --git a/src/Subscription/Engine/Result.php b/src/Subscription/Engine/Result.php new file mode 100644 index 000000000..d644bb17d --- /dev/null +++ b/src/Subscription/Engine/Result.php @@ -0,0 +1,14 @@ + $errors */ + public function __construct( + public readonly array $errors = [], + ) { + } +} diff --git a/src/Subscription/Engine/SubscriptionEngine.php b/src/Subscription/Engine/SubscriptionEngine.php index 0e4229eaa..bdba70f12 100644 --- a/src/Subscription/Engine/SubscriptionEngine.php +++ b/src/Subscription/Engine/SubscriptionEngine.php @@ -8,7 +8,7 @@ interface SubscriptionEngine { - public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $skipBooting = false): void; + public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $skipBooting = false): Result; /** * @param positive-int|null $limit @@ -18,7 +18,7 @@ public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $sk public function boot( SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null, - ): void; + ): ProcessedResult; /** * @param positive-int|null $limit @@ -28,15 +28,15 @@ public function boot( public function run( SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null, - ): void; + ): ProcessedResult; - public function teardown(SubscriptionEngineCriteria|null $criteria = null): void; + public function teardown(SubscriptionEngineCriteria|null $criteria = null): Result; - public function remove(SubscriptionEngineCriteria|null $criteria = null): void; + public function remove(SubscriptionEngineCriteria|null $criteria = null): Result; - public function reactivate(SubscriptionEngineCriteria|null $criteria = null): void; + public function reactivate(SubscriptionEngineCriteria|null $criteria = null): Result; - public function pause(SubscriptionEngineCriteria|null $criteria = null): void; + public function pause(SubscriptionEngineCriteria|null $criteria = null): Result; /** @return list */ public function subscriptions(SubscriptionEngineCriteria|null $criteria = null): array; diff --git a/src/Subscription/Store/DoctrineSubscriptionStore.php b/src/Subscription/Store/DoctrineSubscriptionStore.php index 63a6d5c44..661d833d9 100644 --- a/src/Subscription/Store/DoctrineSubscriptionStore.php +++ b/src/Subscription/Store/DoctrineSubscriptionStore.php @@ -182,12 +182,21 @@ public function remove(Subscription $subscription): void $this->connection->delete($this->tableName, ['id' => $subscription->id()]); } - public function inLock(Closure $closure): void + /** + * @param Closure():T $closure + * + * @return T + * + * @throws TransactionCommitNotPossible + * + * @template T + */ + public function inLock(Closure $closure): mixed { $this->connection->beginTransaction(); try { - $closure(); + return $closure(); } finally { try { $this->connection->commit(); diff --git a/src/Subscription/Store/LockableSubscriptionStore.php b/src/Subscription/Store/LockableSubscriptionStore.php index 48d62366e..2163aa6a7 100644 --- a/src/Subscription/Store/LockableSubscriptionStore.php +++ b/src/Subscription/Store/LockableSubscriptionStore.php @@ -8,5 +8,14 @@ interface LockableSubscriptionStore extends SubscriptionStore { - public function inLock(Closure $closure): void; + /** + * @param Closure():T $closure + * + * @return T + * + * @throws TransactionCommitNotPossible + * + * @template T + */ + public function inLock(Closure $closure): mixed; } diff --git a/tests/Integration/Subscription/SubscriptionTest.php b/tests/Integration/Subscription/SubscriptionTest.php index 603dd9bce..290bb8761 100644 --- a/tests/Integration/Subscription/SubscriptionTest.php +++ b/tests/Integration/Subscription/SubscriptionTest.php @@ -105,8 +105,14 @@ public function testHappyPath(): void $engine->subscriptions(), ); - $engine->setup(); - $engine->boot(); + $result = $engine->setup(); + + self::assertEquals([], $result->errors); + + $result = $engine->boot(); + + self::assertEquals(0, $result->processedMessages); + self::assertEquals([], $result->errors); self::assertEquals( [ @@ -124,7 +130,10 @@ public function testHappyPath(): void $profile = Profile::create(ProfileId::fromString('1'), 'John'); $repository->save($profile); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals([], $result->errors); self::assertEquals( [ @@ -150,7 +159,8 @@ public function testHappyPath(): void self::assertSame('1', $result['id']); self::assertSame('John', $result['name']); - $engine->remove(); + $result = $engine->remove(); + self::assertEquals([], $result->errors); self::assertEquals( [ @@ -213,8 +223,12 @@ public function testErrorHandling(): void ), ); - $engine->setup(); - $engine->boot(); + $result = $engine->setup(); + self::assertEquals([], $result->errors); + + $result = $engine->boot(); + self::assertEquals(0, $result->processedMessages); + self::assertEquals([], $result->errors); $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); @@ -228,7 +242,16 @@ public function testErrorHandling(): void $repository->save($profile); $subscriber->subscribeError = true; - $engine->run(); + + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals('error_producer', $error->subscriptionId); + self::assertEquals('subscribe error', $error->message); $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); @@ -237,7 +260,10 @@ public function testErrorHandling(): void self::assertEquals(Status::Active, $subscription->subscriptionError()?->previousStatus); self::assertEquals(0, $subscription->retryAttempt()); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(0, $result->processedMessages); + self::assertEquals([], $result->errors); $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); @@ -248,7 +274,15 @@ public function testErrorHandling(): void $clock->sleep(5); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals('error_producer', $error->subscriptionId); + self::assertEquals('subscribe error', $error->message); $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); @@ -259,7 +293,15 @@ public function testErrorHandling(): void $clock->sleep(10); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals('error_producer', $error->subscriptionId); + self::assertEquals('subscribe error', $error->message); $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); @@ -278,7 +320,15 @@ public function testErrorHandling(): void self::assertEquals(null, $subscription->subscriptionError()); self::assertEquals(0, $subscription->retryAttempt()); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals('error_producer', $error->subscriptionId); + self::assertEquals('subscribe error', $error->message); $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); @@ -290,7 +340,10 @@ public function testErrorHandling(): void $clock->sleep(5); $subscriber->subscribeError = false; - $engine->run(); + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals([], $result->errors); $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); diff --git a/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php b/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php index e6ce5068c..076affab4 100644 --- a/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php +++ b/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php @@ -55,10 +55,11 @@ public function testNothingToSetup(): void logger: new NullLogger(), ); - $engine->setup(); + $result = $engine->setup(); self::assertEquals([], $store->addedSubscriptions); self::assertEquals([], $store->updatedSubscriptions); + self::assertEquals([], $result->errors); } public function testSetupWithoutCreateMethod(): void @@ -82,7 +83,9 @@ class { logger: new NullLogger(), ); - $engine->setup(); + $result = $engine->setup(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -131,7 +134,9 @@ public function create(): void logger: new NullLogger(), ); - $engine->setup(); + $result = $engine->setup(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -187,7 +192,15 @@ public function create(): void logger: new NullLogger(), ); - $engine->setup(); + $result = $engine->setup(); + + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals($subscriptionId, $error->subscriptionId); + self::assertEquals('ERROR', $error->message); + self::assertInstanceOf(RuntimeException::class, $error->throwable); self::assertEquals( [ @@ -236,7 +249,9 @@ class { logger: new NullLogger(), ); - $engine->setup(null, true); + $result = $engine->setup(null, true); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -276,7 +291,9 @@ class { logger: new NullLogger(), ); - $engine->setup(); + $result = $engine->setup(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -315,7 +332,9 @@ class { logger: new NullLogger(), ); - $engine->setup(); + $result = $engine->setup(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -342,7 +361,11 @@ public function testNothingToBoot(): void logger: new NullLogger(), ); - $engine->boot(); + $result = $engine->boot(); + + self::assertEquals(0, $result->processedMessages); + self::assertEquals(false, $result->streamFinished); + self::assertEquals([], $result->errors); self::assertEquals([], $store->addedSubscriptions); self::assertEquals([], $store->updatedSubscriptions); @@ -367,7 +390,11 @@ class { logger: new NullLogger(), ); - $engine->boot(); + $result = $engine->boot(); + + self::assertEquals(0, $result->processedMessages); + self::assertEquals(false, $result->streamFinished); + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -416,7 +443,11 @@ public function handle(Message $message): void logger: new NullLogger(), ); - $engine->boot(); + $result = $engine->boot(); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals(true, $result->streamFinished); + self::assertEquals([], $result->errors); self::assertEquals([], $subscriptionStore->addedSubscriptions); @@ -440,6 +471,75 @@ public function handle(Message $message): void self::assertSame($message, $subscriber->message); } + public function testBootWithError(): void + { + $subscriptionId = 'test'; + $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] + class { + public function __construct( + public readonly RuntimeException $exception = new RuntimeException('ERROR'), + ) { + } + + #[Subscribe(ProfileVisited::class)] + public function handle(Message $message): void + { + throw $this->exception; + } + }; + + $subscriptionStore = new DummySubscriptionStore([ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Booting, + ), + ]); + + $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $streamableStore = $this->prophesize(Store::class); + $streamableStore->load($this->criteria())->willReturn(new ArrayStream([$message]))->shouldBeCalledOnce(); + + $engine = new DefaultSubscriptionEngine( + $streamableStore->reveal(), + $subscriptionStore, + new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), + ); + + $result = $engine->boot(); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals(true, $result->streamFinished); + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals($subscriptionId, $error->subscriptionId); + self::assertEquals('ERROR', $error->message); + self::assertInstanceOf(RuntimeException::class, $error->throwable); + + self::assertEquals( + [ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Error, + 0, + new SubscriptionError( + 'ERROR', + Status::Booting, + ThrowableToErrorContextTransformer::transform($subscriber->exception), + ), + ), + ], + $subscriptionStore->updatedSubscriptions, + ); + } + public function testBootWithLimit(): void { $subscriptionId = 'test'; @@ -475,7 +575,11 @@ public function handle(Message $message): void logger: new NullLogger(), ); - $engine->boot(new SubscriptionEngineCriteria(), 1); + $result = $engine->boot(new SubscriptionEngineCriteria(), 1); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals(false, $result->streamFinished); + self::assertEquals([], $result->errors); self::assertEquals([], $subscriptionStore->addedSubscriptions); @@ -546,7 +650,11 @@ public function handle(Message $message): void logger: new NullLogger(), ); - $engine->boot(); + $result = $engine->boot(); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals(true, $result->streamFinished); + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -620,7 +728,11 @@ public function handle(Message $message): void logger: new NullLogger(), ); - $engine->boot(); + $result = $engine->boot(); + + self::assertEquals(2, $result->processedMessages); + self::assertEquals(true, $result->streamFinished); + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -677,7 +789,11 @@ public function handle(Message $message): void logger: new NullLogger(), ); - $engine->boot(); + $result = $engine->boot(); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals(true, $result->streamFinished); + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -716,7 +832,11 @@ class { logger: new NullLogger(), ); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(0, $result->processedMessages); + self::assertEquals(false, $result->streamFinished); + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -763,7 +883,11 @@ public function handle(Message $message): void logger: new NullLogger(), ); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals(true, $result->streamFinished); + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -817,7 +941,11 @@ public function handle(Message $message): void logger: new NullLogger(), ); - $engine->run(new SubscriptionEngineCriteria(), 1); + $result = $engine->run(new SubscriptionEngineCriteria(), 1); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals(false, $result->streamFinished); + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -886,7 +1014,11 @@ public function handle(Message $message): void logger: new NullLogger(), ); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals(true, $result->streamFinished); + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -947,7 +1079,17 @@ public function handle(Message $message): void logger: new NullLogger(), ); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals(true, $result->streamFinished); + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals($subscriptionId, $error->subscriptionId); + self::assertEquals('ERROR', $error->message); + self::assertInstanceOf(RuntimeException::class, $error->throwable); self::assertEquals( [ @@ -968,7 +1110,7 @@ public function handle(Message $message): void ); } - public function testRunningMarkOutdated(): void + public function testRunningMarkDetached(): void { $subscriptionId = 'test'; @@ -991,7 +1133,11 @@ public function testRunningMarkOutdated(): void logger: new NullLogger(), ); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(0, $result->processedMessages); + self::assertEquals(false, $result->streamFinished); + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1027,7 +1173,11 @@ public function testRunningWithoutActiveSubscribers(): void logger: new NullLogger(), ); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(0, $result->processedMessages); + self::assertEquals(false, $result->streamFinished); + self::assertEquals([], $result->errors); self::assertEquals([], $subscriptionStore->updatedSubscriptions); } @@ -1069,7 +1219,11 @@ public function handle(Message $message): void logger: new NullLogger(), ); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(2, $result->processedMessages); + self::assertEquals(true, $result->streamFinished); + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1119,7 +1273,11 @@ public function handle(Message $message): void logger: new NullLogger(), ); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals(true, $result->streamFinished); + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1158,7 +1316,9 @@ class { logger: new NullLogger(), ); - $engine->teardown(); + $result = $engine->teardown(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1195,7 +1355,9 @@ class { logger: new NullLogger(), ); - $engine->teardown(); + $result = $engine->teardown(); + + self::assertEquals([], $result->errors); self::assertEquals([], $subscriptionStore->updatedSubscriptions); self::assertEquals([$subscription], $subscriptionStore->removedSubscriptions); @@ -1234,7 +1396,9 @@ public function drop(): void logger: new NullLogger(), ); - $engine->teardown(); + $result = $engine->teardown(); + + self::assertEquals([], $result->errors); self::assertEquals([], $subscriptionStore->updatedSubscriptions); self::assertEquals([$subscription], $subscriptionStore->removedSubscriptions); @@ -1274,7 +1438,15 @@ public function drop(): void logger: new NullLogger(), ); - $engine->teardown(); + $result = $engine->teardown(); + + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals($subscriptionId, $error->subscriptionId); + self::assertEquals('ERROR', $error->message); + self::assertInstanceOf(RuntimeException::class, $error->throwable); self::assertEquals([], $subscriptionStore->updatedSubscriptions); self::assertEquals([], $subscriptionStore->removedSubscriptions); @@ -1302,7 +1474,9 @@ public function testTeardownWithoutSubscriber(): void logger: new NullLogger(), ); - $engine->teardown(); + $result = $engine->teardown(); + + self::assertEquals([], $result->errors); self::assertEquals([], $subscriptionStore->updatedSubscriptions); self::assertEquals([], $subscriptionStore->removedSubscriptions); @@ -1325,7 +1499,9 @@ class { logger: new NullLogger(), ); - $engine->remove(); + $result = $engine->remove(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1368,7 +1544,9 @@ public function drop(): void logger: new NullLogger(), ); - $engine->remove(); + $result = $engine->remove(); + + self::assertEquals([], $result->errors); self::assertEquals([], $subscriptionStore->updatedSubscriptions); self::assertEquals([$subscription], $subscriptionStore->removedSubscriptions); @@ -1399,7 +1577,9 @@ class { logger: new NullLogger(), ); - $engine->remove(); + $result = $engine->remove(); + + self::assertEquals([], $result->errors); self::assertEquals([], $subscriptionStore->updatedSubscriptions); self::assertEquals([$subscription], $subscriptionStore->removedSubscriptions); @@ -1436,7 +1616,15 @@ public function drop(): void logger: new NullLogger(), ); - $engine->remove(); + $result = $engine->remove(); + + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals($subscriptionId, $error->subscriptionId); + self::assertEquals('ERROR', $error->message); + self::assertInstanceOf(RuntimeException::class, $error->throwable); self::assertEquals([], $subscriptionStore->updatedSubscriptions); self::assertEquals([$subscription], $subscriptionStore->removedSubscriptions); @@ -1463,7 +1651,9 @@ public function testRemoveWithoutSubscriber(): void logger: new NullLogger(), ); - $engine->remove(); + $result = $engine->remove(); + + self::assertEquals([], $result->errors); self::assertEquals([], $subscriptionStore->updatedSubscriptions); self::assertEquals([$subscription], $subscriptionStore->removedSubscriptions); @@ -1486,7 +1676,9 @@ class { logger: new NullLogger(), ); - $engine->reactivate(); + $result = $engine->reactivate(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1525,7 +1717,9 @@ class { logger: new NullLogger(), ); - $engine->reactivate(); + $result = $engine->reactivate(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1538,7 +1732,7 @@ class { ], $subscriptionStore->updatedSubscriptions); } - public function testReactivateOutdated(): void + public function testReactivateDetached(): void { $subscriptionId = 'test'; $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] @@ -1563,7 +1757,9 @@ class { logger: new NullLogger(), ); - $engine->reactivate(); + $result = $engine->reactivate(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1600,7 +1796,9 @@ class { logger: new NullLogger(), ); - $engine->reactivate(); + $result = $engine->reactivate(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1637,7 +1835,9 @@ class { logger: new NullLogger(), ); - $engine->reactivate(); + $result = $engine->reactivate(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1666,7 +1866,9 @@ class { logger: new NullLogger(), ); - $engine->pause(); + $result = $engine->pause(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1703,7 +1905,9 @@ class { logger: new NullLogger(), ); - $engine->pause(); + $result = $engine->pause(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1740,7 +1944,9 @@ class { logger: new NullLogger(), ); - $engine->pause(); + $result = $engine->pause(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1779,7 +1985,9 @@ class { logger: new NullLogger(), ); - $engine->pause(); + $result = $engine->pause(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1870,7 +2078,16 @@ public function subscribe(): void new NullLogger(), ); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals($subscriptionId, $error->subscriptionId); + self::assertEquals('ERROR2', $error->message); + self::assertInstanceOf(RuntimeException::class, $error->throwable); self::assertCount(2, $subscriptionStore->updatedSubscriptions); @@ -1923,7 +2140,10 @@ class { new NullLogger(), ); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(0, $result->processedMessages); + self::assertCount(0, $result->errors); self::assertEquals([], $subscriptionStore->updatedSubscriptions); }