Skip to content

Commit

Permalink
Merge pull request #594 from patchlevel/fix-finished-subscription-engine
Browse files Browse the repository at this point in the history
fix finish flag in subscription engine
  • Loading branch information
DavidBadura authored Apr 25, 2024
2 parents 6f96255 + c3df31b commit 17a4440
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 28 deletions.
2 changes: 1 addition & 1 deletion src/Console/Command/SubscriptionBootCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
function (Closure $stop) use ($criteria, $messageLimit, &$finished): void {
$result = $this->engine->boot($criteria, $messageLimit);

if (!$result->streamFinished) {
if (!$result->finished) {
return;
}

Expand Down
6 changes: 3 additions & 3 deletions src/Subscription/Engine/CatchUpSubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,18 @@ public function subscriptions(SubscriptionEngineCriteria|null $criteria = null):
private function mergeResult(ProcessedResult ...$results): ProcessedResult
{
$processedMessages = 0;
$streamFinished = false;
$finished = false;
$errors = [];

foreach ($results as $result) {
$processedMessages += $result->processedMessages;
$streamFinished = $result->streamFinished;
$finished = $result->finished;
$errors[] = $result->errors;
}

return new ProcessedResult(
$processedMessages,
$streamFinished,
$finished,
array_merge(...$errors),
);
}
Expand Down
4 changes: 2 additions & 2 deletions src/Subscription/Engine/DefaultSubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ function ($subscriptions) use ($limit): ProcessedResult {
if (count($subscriptions) === 0) {
$this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.');

return new ProcessedResult(0);
return new ProcessedResult(0, true);
}

/** @var list<Error> $errors */
Expand Down Expand Up @@ -334,7 +334,7 @@ function (array $subscriptions) use ($limit): ProcessedResult {
if (count($subscriptions) === 0) {
$this->logger?->info('Subscription Engine: No subscriptions to process, finish processing.');

return new ProcessedResult(0);
return new ProcessedResult(0, true);
}

/** @var list<Error> $errors */
Expand Down
2 changes: 1 addition & 1 deletion src/Subscription/Engine/ProcessedResult.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ final class ProcessedResult
/** @param list<Error> $errors */
public function __construct(
public readonly int $processedMessages,
public readonly bool $streamFinished = false,
public readonly bool $finished = false,
public readonly array $errors = [],
) {
}
Expand Down
42 changes: 21 additions & 21 deletions tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ public function testNothingToBoot(): void
$result = $engine->boot();

self::assertEquals(0, $result->processedMessages);
self::assertEquals(false, $result->streamFinished);
self::assertEquals(true, $result->finished);
self::assertEquals([], $result->errors);

self::assertEquals([], $store->addedSubscriptions);
Expand Down Expand Up @@ -396,7 +396,7 @@ class {
$result = $engine->boot();

self::assertEquals(0, $result->processedMessages);
self::assertEquals(false, $result->streamFinished);
self::assertEquals(true, $result->finished);
self::assertEquals([], $result->errors);

self::assertEquals([
Expand Down Expand Up @@ -449,7 +449,7 @@ public function handle(Message $message): void
$result = $engine->boot();

self::assertEquals(1, $result->processedMessages);
self::assertEquals(true, $result->streamFinished);
self::assertEquals(true, $result->finished);
self::assertEquals([], $result->errors);

self::assertEquals([], $subscriptionStore->addedSubscriptions);
Expand Down Expand Up @@ -515,7 +515,7 @@ public function handle(Message $message): void
$result = $engine->boot();

self::assertEquals(1, $result->processedMessages);
self::assertEquals(true, $result->streamFinished);
self::assertEquals(true, $result->finished);
self::assertCount(1, $result->errors);

$error = $result->errors[0];
Expand Down Expand Up @@ -581,7 +581,7 @@ public function handle(Message $message): void
$result = $engine->boot(new SubscriptionEngineCriteria(), 1);

self::assertEquals(1, $result->processedMessages);
self::assertEquals(false, $result->streamFinished);
self::assertEquals(false, $result->finished);
self::assertEquals([], $result->errors);

self::assertEquals([], $subscriptionStore->addedSubscriptions);
Expand Down Expand Up @@ -656,7 +656,7 @@ public function handle(Message $message): void
$result = $engine->boot();

self::assertEquals(1, $result->processedMessages);
self::assertEquals(true, $result->streamFinished);
self::assertEquals(true, $result->finished);
self::assertEquals([], $result->errors);

self::assertEquals([
Expand Down Expand Up @@ -734,7 +734,7 @@ public function handle(Message $message): void
$result = $engine->boot();

self::assertEquals(2, $result->processedMessages);
self::assertEquals(true, $result->streamFinished);
self::assertEquals(true, $result->finished);
self::assertEquals([], $result->errors);

self::assertEquals([
Expand Down Expand Up @@ -795,7 +795,7 @@ public function handle(Message $message): void
$result = $engine->boot();

self::assertEquals(1, $result->processedMessages);
self::assertEquals(true, $result->streamFinished);
self::assertEquals(true, $result->finished);
self::assertEquals([], $result->errors);

self::assertEquals([
Expand Down Expand Up @@ -900,7 +900,7 @@ public function handle(Message $message): void
$result = $engine->boot(limit: 1);

self::assertEquals(1, $result->processedMessages);
self::assertEquals(false, $result->streamFinished);
self::assertEquals(false, $result->finished);
self::assertEquals([], $result->errors);

self::assertEquals([], $subscriptionStore->addedSubscriptions);
Expand All @@ -921,7 +921,7 @@ public function handle(Message $message): void
$result = $engine->boot();

self::assertEquals(0, $result->processedMessages);
self::assertEquals(true, $result->streamFinished);
self::assertEquals(true, $result->finished);
self::assertEquals([], $result->errors);

self::assertEquals([
Expand Down Expand Up @@ -955,7 +955,7 @@ class {
$result = $engine->run();

self::assertEquals(0, $result->processedMessages);
self::assertEquals(false, $result->streamFinished);
self::assertEquals(true, $result->finished);
self::assertEquals([], $result->errors);

self::assertEquals([
Expand Down Expand Up @@ -1006,7 +1006,7 @@ public function handle(Message $message): void
$result = $engine->run();

self::assertEquals(1, $result->processedMessages);
self::assertEquals(true, $result->streamFinished);
self::assertEquals(true, $result->finished);
self::assertEquals([], $result->errors);

self::assertEquals([
Expand Down Expand Up @@ -1064,7 +1064,7 @@ public function handle(Message $message): void
$result = $engine->run(new SubscriptionEngineCriteria(), 1);

self::assertEquals(1, $result->processedMessages);
self::assertEquals(false, $result->streamFinished);
self::assertEquals(false, $result->finished);
self::assertEquals([], $result->errors);

self::assertEquals([
Expand Down Expand Up @@ -1137,7 +1137,7 @@ public function handle(Message $message): void
$result = $engine->run();

self::assertEquals(1, $result->processedMessages);
self::assertEquals(true, $result->streamFinished);
self::assertEquals(true, $result->finished);
self::assertEquals([], $result->errors);

self::assertEquals([
Expand Down Expand Up @@ -1202,7 +1202,7 @@ public function handle(Message $message): void
$result = $engine->run();

self::assertEquals(1, $result->processedMessages);
self::assertEquals(true, $result->streamFinished);
self::assertEquals(true, $result->finished);
self::assertCount(1, $result->errors);

$error = $result->errors[0];
Expand Down Expand Up @@ -1256,7 +1256,7 @@ public function testRunningMarkDetached(): void
$result = $engine->run();

self::assertEquals(0, $result->processedMessages);
self::assertEquals(false, $result->streamFinished);
self::assertEquals(true, $result->finished);
self::assertEquals([], $result->errors);

self::assertEquals([
Expand Down Expand Up @@ -1296,7 +1296,7 @@ public function testRunningWithoutActiveSubscribers(): void
$result = $engine->run();

self::assertEquals(0, $result->processedMessages);
self::assertEquals(false, $result->streamFinished);
self::assertEquals(true, $result->finished);
self::assertEquals([], $result->errors);

self::assertEquals([], $subscriptionStore->updatedSubscriptions);
Expand Down Expand Up @@ -1342,7 +1342,7 @@ public function handle(Message $message): void
$result = $engine->run();

self::assertEquals(2, $result->processedMessages);
self::assertEquals(true, $result->streamFinished);
self::assertEquals(true, $result->finished);
self::assertEquals([], $result->errors);

self::assertEquals([
Expand Down Expand Up @@ -1396,7 +1396,7 @@ public function handle(Message $message): void
$result = $engine->run();

self::assertEquals(1, $result->processedMessages);
self::assertEquals(true, $result->streamFinished);
self::assertEquals(true, $result->finished);
self::assertEquals([], $result->errors);

self::assertEquals([
Expand Down Expand Up @@ -1501,7 +1501,7 @@ public function handle(Message $message): void
$result = $engine->run(limit: 1);

self::assertEquals(1, $result->processedMessages);
self::assertEquals(false, $result->streamFinished);
self::assertEquals(false, $result->finished);
self::assertEquals([], $result->errors);

self::assertEquals([], $subscriptionStore->addedSubscriptions);
Expand All @@ -1522,7 +1522,7 @@ public function handle(Message $message): void
$result = $engine->run();

self::assertEquals(0, $result->processedMessages);
self::assertEquals(true, $result->streamFinished);
self::assertEquals(true, $result->finished);
self::assertEquals([], $result->errors);

self::assertEquals([], $subscriptionStore->updatedSubscriptions);
Expand Down

0 comments on commit 17a4440

Please sign in to comment.