From c3df31bcdd90b8f69385b9ab67e399e1a5fe7efb Mon Sep 17 00:00:00 2001 From: David Badura Date: Thu, 25 Apr 2024 09:24:23 +0200 Subject: [PATCH] fix finish flag in subscription engine --- .../Command/SubscriptionBootCommand.php | 2 +- .../Engine/CatchUpSubscriptionEngine.php | 6 +-- .../Engine/DefaultSubscriptionEngine.php | 4 +- src/Subscription/Engine/ProcessedResult.php | 2 +- .../Engine/DefaultSubscriptionEngineTest.php | 42 +++++++++---------- 5 files changed, 28 insertions(+), 28 deletions(-) diff --git a/src/Console/Command/SubscriptionBootCommand.php b/src/Console/Command/SubscriptionBootCommand.php index 951832b5..01e05566 100644 --- a/src/Console/Command/SubscriptionBootCommand.php +++ b/src/Console/Command/SubscriptionBootCommand.php @@ -86,7 +86,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int function (Closure $stop) use ($criteria, $messageLimit, &$finished): void { $result = $this->engine->boot($criteria, $messageLimit); - if (!$result->streamFinished) { + if (!$result->finished) { return; } diff --git a/src/Subscription/Engine/CatchUpSubscriptionEngine.php b/src/Subscription/Engine/CatchUpSubscriptionEngine.php index 8a0b3cdf..118dd6fe 100644 --- a/src/Subscription/Engine/CatchUpSubscriptionEngine.php +++ b/src/Subscription/Engine/CatchUpSubscriptionEngine.php @@ -89,18 +89,18 @@ public function subscriptions(SubscriptionEngineCriteria|null $criteria = null): private function mergeResult(ProcessedResult ...$results): ProcessedResult { $processedMessages = 0; - $streamFinished = false; + $finished = false; $errors = []; foreach ($results as $result) { $processedMessages += $result->processedMessages; - $streamFinished = $result->streamFinished; + $finished = $result->finished; $errors[] = $result->errors; } return new ProcessedResult( $processedMessages, - $streamFinished, + $finished, array_merge(...$errors), ); } diff --git a/src/Subscription/Engine/DefaultSubscriptionEngine.php b/src/Subscription/Engine/DefaultSubscriptionEngine.php index 84f8e349..44dd8231 100644 --- a/src/Subscription/Engine/DefaultSubscriptionEngine.php +++ b/src/Subscription/Engine/DefaultSubscriptionEngine.php @@ -168,7 +168,7 @@ function ($subscriptions) use ($limit): ProcessedResult { if (count($subscriptions) === 0) { $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); - return new ProcessedResult(0); + return new ProcessedResult(0, true); } /** @var list $errors */ @@ -334,7 +334,7 @@ function (array $subscriptions) use ($limit): ProcessedResult { if (count($subscriptions) === 0) { $this->logger?->info('Subscription Engine: No subscriptions to process, finish processing.'); - return new ProcessedResult(0); + return new ProcessedResult(0, true); } /** @var list $errors */ diff --git a/src/Subscription/Engine/ProcessedResult.php b/src/Subscription/Engine/ProcessedResult.php index 2dd0423b..d34f6c97 100644 --- a/src/Subscription/Engine/ProcessedResult.php +++ b/src/Subscription/Engine/ProcessedResult.php @@ -9,7 +9,7 @@ final class ProcessedResult /** @param list $errors */ public function __construct( public readonly int $processedMessages, - public readonly bool $streamFinished = false, + public readonly bool $finished = false, public readonly array $errors = [], ) { } diff --git a/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php b/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php index a35ba8b0..96011ae2 100644 --- a/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php +++ b/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php @@ -367,7 +367,7 @@ public function testNothingToBoot(): void $result = $engine->boot(); self::assertEquals(0, $result->processedMessages); - self::assertEquals(false, $result->streamFinished); + self::assertEquals(true, $result->finished); self::assertEquals([], $result->errors); self::assertEquals([], $store->addedSubscriptions); @@ -396,7 +396,7 @@ class { $result = $engine->boot(); self::assertEquals(0, $result->processedMessages); - self::assertEquals(false, $result->streamFinished); + self::assertEquals(true, $result->finished); self::assertEquals([], $result->errors); self::assertEquals([ @@ -449,7 +449,7 @@ public function handle(Message $message): void $result = $engine->boot(); self::assertEquals(1, $result->processedMessages); - self::assertEquals(true, $result->streamFinished); + self::assertEquals(true, $result->finished); self::assertEquals([], $result->errors); self::assertEquals([], $subscriptionStore->addedSubscriptions); @@ -515,7 +515,7 @@ public function handle(Message $message): void $result = $engine->boot(); self::assertEquals(1, $result->processedMessages); - self::assertEquals(true, $result->streamFinished); + self::assertEquals(true, $result->finished); self::assertCount(1, $result->errors); $error = $result->errors[0]; @@ -581,7 +581,7 @@ public function handle(Message $message): void $result = $engine->boot(new SubscriptionEngineCriteria(), 1); self::assertEquals(1, $result->processedMessages); - self::assertEquals(false, $result->streamFinished); + self::assertEquals(false, $result->finished); self::assertEquals([], $result->errors); self::assertEquals([], $subscriptionStore->addedSubscriptions); @@ -656,7 +656,7 @@ public function handle(Message $message): void $result = $engine->boot(); self::assertEquals(1, $result->processedMessages); - self::assertEquals(true, $result->streamFinished); + self::assertEquals(true, $result->finished); self::assertEquals([], $result->errors); self::assertEquals([ @@ -734,7 +734,7 @@ public function handle(Message $message): void $result = $engine->boot(); self::assertEquals(2, $result->processedMessages); - self::assertEquals(true, $result->streamFinished); + self::assertEquals(true, $result->finished); self::assertEquals([], $result->errors); self::assertEquals([ @@ -795,7 +795,7 @@ public function handle(Message $message): void $result = $engine->boot(); self::assertEquals(1, $result->processedMessages); - self::assertEquals(true, $result->streamFinished); + self::assertEquals(true, $result->finished); self::assertEquals([], $result->errors); self::assertEquals([ @@ -900,7 +900,7 @@ public function handle(Message $message): void $result = $engine->boot(limit: 1); self::assertEquals(1, $result->processedMessages); - self::assertEquals(false, $result->streamFinished); + self::assertEquals(false, $result->finished); self::assertEquals([], $result->errors); self::assertEquals([], $subscriptionStore->addedSubscriptions); @@ -921,7 +921,7 @@ public function handle(Message $message): void $result = $engine->boot(); self::assertEquals(0, $result->processedMessages); - self::assertEquals(true, $result->streamFinished); + self::assertEquals(true, $result->finished); self::assertEquals([], $result->errors); self::assertEquals([ @@ -955,7 +955,7 @@ class { $result = $engine->run(); self::assertEquals(0, $result->processedMessages); - self::assertEquals(false, $result->streamFinished); + self::assertEquals(true, $result->finished); self::assertEquals([], $result->errors); self::assertEquals([ @@ -1006,7 +1006,7 @@ public function handle(Message $message): void $result = $engine->run(); self::assertEquals(1, $result->processedMessages); - self::assertEquals(true, $result->streamFinished); + self::assertEquals(true, $result->finished); self::assertEquals([], $result->errors); self::assertEquals([ @@ -1064,7 +1064,7 @@ public function handle(Message $message): void $result = $engine->run(new SubscriptionEngineCriteria(), 1); self::assertEquals(1, $result->processedMessages); - self::assertEquals(false, $result->streamFinished); + self::assertEquals(false, $result->finished); self::assertEquals([], $result->errors); self::assertEquals([ @@ -1137,7 +1137,7 @@ public function handle(Message $message): void $result = $engine->run(); self::assertEquals(1, $result->processedMessages); - self::assertEquals(true, $result->streamFinished); + self::assertEquals(true, $result->finished); self::assertEquals([], $result->errors); self::assertEquals([ @@ -1202,7 +1202,7 @@ public function handle(Message $message): void $result = $engine->run(); self::assertEquals(1, $result->processedMessages); - self::assertEquals(true, $result->streamFinished); + self::assertEquals(true, $result->finished); self::assertCount(1, $result->errors); $error = $result->errors[0]; @@ -1256,7 +1256,7 @@ public function testRunningMarkDetached(): void $result = $engine->run(); self::assertEquals(0, $result->processedMessages); - self::assertEquals(false, $result->streamFinished); + self::assertEquals(true, $result->finished); self::assertEquals([], $result->errors); self::assertEquals([ @@ -1296,7 +1296,7 @@ public function testRunningWithoutActiveSubscribers(): void $result = $engine->run(); self::assertEquals(0, $result->processedMessages); - self::assertEquals(false, $result->streamFinished); + self::assertEquals(true, $result->finished); self::assertEquals([], $result->errors); self::assertEquals([], $subscriptionStore->updatedSubscriptions); @@ -1342,7 +1342,7 @@ public function handle(Message $message): void $result = $engine->run(); self::assertEquals(2, $result->processedMessages); - self::assertEquals(true, $result->streamFinished); + self::assertEquals(true, $result->finished); self::assertEquals([], $result->errors); self::assertEquals([ @@ -1396,7 +1396,7 @@ public function handle(Message $message): void $result = $engine->run(); self::assertEquals(1, $result->processedMessages); - self::assertEquals(true, $result->streamFinished); + self::assertEquals(true, $result->finished); self::assertEquals([], $result->errors); self::assertEquals([ @@ -1501,7 +1501,7 @@ public function handle(Message $message): void $result = $engine->run(limit: 1); self::assertEquals(1, $result->processedMessages); - self::assertEquals(false, $result->streamFinished); + self::assertEquals(false, $result->finished); self::assertEquals([], $result->errors); self::assertEquals([], $subscriptionStore->addedSubscriptions); @@ -1522,7 +1522,7 @@ public function handle(Message $message): void $result = $engine->run(); self::assertEquals(0, $result->processedMessages); - self::assertEquals(true, $result->streamFinished); + self::assertEquals(true, $result->finished); self::assertEquals([], $result->errors); self::assertEquals([], $subscriptionStore->updatedSubscriptions);