diff --git a/src/Exceptions/AlreadyProcessingException.php b/src/Exceptions/AlreadyProcessingException.php deleted file mode 100644 index 2a07996..0000000 --- a/src/Exceptions/AlreadyProcessingException.php +++ /dev/null @@ -1,15 +0,0 @@ -processing) { - throw new AlreadyProcessingException(); - } $criteria ??= SubscriptionEngineCriteria::noConstraints(); - // TODO acquire lock - $this->processing = true; - $subscriptionCriteria = SubscriptionCriteria::create( ids: $criteria->ids, groups: $criteria->groups, - status: [Status::NEW] + status: [Status::NEW], ); - - try { - $this->runInternal($subscriptionCriteria, 'setup', $limit); - } finally { - $this->processing = false; - // TODO release lock - } + $this->runInternal($subscriptionCriteria, 'setup', $limit); } @@ -66,24 +52,33 @@ public function run( SubscriptionEngineCriteria $criteria = null, int $limit = null, ): void { - if ($this->processing) { - throw new AlreadyProcessingException(); - } $criteria ??= SubscriptionEngineCriteria::noConstraints(); - // TODO acquire lock - $this->processing = true; $subscriptionCriteria = SubscriptionCriteria::create( ids: $criteria->ids, groups: $criteria->groups, - status: [Status::ACTIVE] + status: [Status::ACTIVE], ); + $this->runInternal($subscriptionCriteria, 'run', $limit); + } - try { - $this->runInternal($subscriptionCriteria, 'run', $limit); - } finally { - $this->processing = false; - // TODO release lock + private function lockSubscriptions(Subscriptions $subscriptions): void + { + foreach ($subscriptions as $subscription) { + $sT = microtime(true); + while (!$this->subscriptionStore->acquireLock($subscription->id)) { + if (microtime(true) - $sT > 5) { + // TODO better exception handling + throw new \RuntimeException(sprintf('Failed to acquire lock for subscription "%s"', $subscription->id->value), 1721895494); + } + } + } + } + + private function releaseSubscriptions(Subscriptions $subscriptions): void + { + foreach ($subscriptions as $subscription) { + $this->subscriptionStore->releaseLock($subscription->id); } } @@ -91,13 +86,16 @@ private function runInternal(SubscriptionCriteria $criteria, string $process, in { $this->logger?->info(sprintf('Subscription Engine: %s: Start.', $process)); $this->discoverNewSubscriptions(); - // TODO $this->discoverDetachedSubscriptions($criteria); - // TODO $this->retrySubscriptions($criteria); + //$this->discoverDetachedSubscriptions($criteria); + $this->retrySubscriptions($criteria); $subscriptions = $this->subscriptionStore->findByCriteria($criteria); if ($subscriptions->isEmpty()) { $this->logger?->info(sprintf('Subscription Engine: %s: No subscriptions to process, finishing', $process)); return;// new ProcessedResult(0, true); } + + $this->lockSubscriptions($subscriptions); + $startSequenceNumber = $this->lowestSubscriptionPosition($subscriptions)->next(); $this->logger?->debug( sprintf( @@ -151,6 +149,7 @@ private function runInternal(SubscriptionCriteria $criteria, string $process, in $limit, ), ); + $this->releaseSubscriptions($subscriptions); return;// new ProcessedResult($messageCounter, false, $errors); } @@ -179,6 +178,7 @@ private function runInternal(SubscriptionCriteria $criteria, string $process, in $lastSequenceNumber?->value ?: $startSequenceNumber->value, ), ); + $this->releaseSubscriptions($subscriptions); return;// new ProcessedResult($messageCounter, true, $errors); } @@ -253,6 +253,47 @@ private function discoverNewSubscriptions(): void } } + private function retrySubscriptions(SubscriptionCriteria $criteria): void + { + $failedSubscriptions = $this->subscriptionStore->findByCriteria( + SubscriptionCriteria::create( + ids: $criteria->ids, + groups: $criteria->groups, + status: [Status::ERROR], + ) + ); + foreach ($failedSubscriptions as $subscription) { + if ($subscription->error === null) { + continue; + } + $retryable = in_array( + $subscription->error->previousStatus, + [Status::NEW, Status::BOOTING, Status::ACTIVE], + true, + ); + if (!$retryable) { + continue; + } + if (!$this->retryStrategy->shouldRetry($subscription)) { + continue; + } + $this->subscriptionStore->update($subscription->id, static fn(Subscription $subscription) => $subscription->with( + status: $subscription->error->previousStatus, + retryAttempt: $subscription->retryAttempt + 1, + )->withoutError()); + + $this->logger?->info( + sprintf( + 'Subscription Engine: Retry subscription "%s" (%d) and set back to %s.', + $subscription->id->value, + $subscription->retryAttempt + 1, + $subscription->error->previousStatus->name, + ), + ); + } + } + + private function lastSequenceNumber(): SequenceNumber { $events = $this->eventStore->readAll(ReadOptions::create(backwards: true)); diff --git a/src/Subscription/RetryStrategy/ClockBasedRetryStrategy.php b/src/Subscription/RetryStrategy/ClockBasedRetryStrategy.php new file mode 100644 index 0000000..f025cf6 --- /dev/null +++ b/src/Subscription/RetryStrategy/ClockBasedRetryStrategy.php @@ -0,0 +1,61 @@ +retryAttempt >= $this->maxAttempts) { + return false; + } + + $lastSavedAt = $subscription->lastSavedAt; + + if ($lastSavedAt === null) { + return false; + } + + $nextRetryDate = $this->calculateNextRetryDate($lastSavedAt, $subscription->retryAttempt); + + return $nextRetryDate <= $this->clock->now(); + } + + private function calculateNextRetryDate(DateTimeImmutable $lastDate, int $attempt): DateTimeImmutable + { + $nextDate = $lastDate->modify(sprintf('+%d seconds', $this->calculateDelay($attempt))); + + if ($nextDate === false) { + throw new \RuntimeException('Could not calculate next retry date.', 1721897113); + } + + return $nextDate; + } + + private function calculateDelay(int $attempt): int + { + return (int)round($this->baseDelay * ($this->delayFactor ** $attempt)); + } +} \ No newline at end of file diff --git a/src/Subscription/RetryStrategy/NoRetryStrategy.php b/src/Subscription/RetryStrategy/NoRetryStrategy.php new file mode 100644 index 0000000..5f746ef --- /dev/null +++ b/src/Subscription/RetryStrategy/NoRetryStrategy.php @@ -0,0 +1,15 @@ +runMode, $status ?? $this->status, $position ?? $this->position, + $this->locked, $this->error, $retryAttempt ?? $this->retryAttempt, $this->lastSavedAt, @@ -66,25 +68,12 @@ public function withError(Throwable|string $throwableOrMessage): self $this->runMode, Status::ERROR, $this->position, + $this->locked, $error, $this->retryAttempt, $this->lastSavedAt, ); } - - public function withLastSavedAt(DateTimeImmutable $lastSavedAt): self - { - return new self( - $this->id, - $this->group, - $this->runMode, - $this->status, - $this->position, - $this->error, - $this->retryAttempt, - $lastSavedAt, - ); - } // // public function doRetry(): void // { @@ -104,6 +93,7 @@ public function withoutError(): self $this->runMode, $this->status, $this->position, + $this->locked, null, $this->retryAttempt, $this->lastSavedAt,