Skip to content

Commit

Permalink
Lockable subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
bwaidelich committed Jul 25, 2024
1 parent b1bee09 commit b779408
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 60 deletions.
15 changes: 0 additions & 15 deletions src/Exceptions/AlreadyProcessingException.php

This file was deleted.

103 changes: 72 additions & 31 deletions src/Subscription/Engine/DefaultSubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,25 @@
use Wwwision\DCBEventStore\Types\SequenceNumber;
use Wwwision\DCBLibrary\DomainEvent;
use Wwwision\DCBLibrary\EventSerializer;
use Wwwision\DCBLibrary\Exceptions\AlreadyProcessingException;
use Wwwision\DCBLibrary\ProvidesSetup;
use Wwwision\DCBLibrary\Subscription\RetryStrategy\RetryStrategy;
use Wwwision\DCBLibrary\Subscription\RunMode;
use Wwwision\DCBLibrary\Subscription\Status;
use Wwwision\DCBLibrary\Subscription\Store\SubscriptionCriteria;
use Wwwision\DCBLibrary\Subscription\Store\SubscriptionStore;
use Wwwision\DCBLibrary\Subscription\Subscriber\Subscribers;
use Wwwision\DCBLibrary\Subscription\Subscription;
use Wwwision\DCBLibrary\Subscription\SubscriptionId;
use Wwwision\DCBLibrary\Subscription\Subscriptions;

final class DefaultSubscriptionEngine implements SubscriptionEngine
{

private bool $processing = false;

public function __construct(
private readonly EventStore $eventStore,
private readonly SubscriptionStore $subscriptionStore,
private readonly Subscribers $subscribers,
private readonly EventSerializer $eventSerializer,
private readonly RetryStrategy $retryStrategy,
private readonly LoggerInterface|null $logger = null,
) {
}
Expand All @@ -40,64 +38,64 @@ public function setup(
SubscriptionEngineCriteria $criteria = null,
int $limit = null,
): void {
if ($this->processing) {
throw new AlreadyProcessingException();
}
$criteria ??= SubscriptionEngineCriteria::noConstraints();
// TODO acquire lock
$this->processing = true;

$subscriptionCriteria = SubscriptionCriteria::create(
ids: $criteria->ids,
groups: $criteria->groups,
status: [Status::NEW]
status: [Status::NEW],
);

try {
$this->runInternal($subscriptionCriteria, 'setup', $limit);
} finally {
$this->processing = false;
// TODO release lock
}
$this->runInternal($subscriptionCriteria, 'setup', $limit);
}


public function run(
SubscriptionEngineCriteria $criteria = null,
int $limit = null,
): void {
if ($this->processing) {
throw new AlreadyProcessingException();
}
$criteria ??= SubscriptionEngineCriteria::noConstraints();
// TODO acquire lock
$this->processing = true;

$subscriptionCriteria = SubscriptionCriteria::create(
ids: $criteria->ids,
groups: $criteria->groups,
status: [Status::ACTIVE]
status: [Status::ACTIVE],
);
$this->runInternal($subscriptionCriteria, 'run', $limit);
}

try {
$this->runInternal($subscriptionCriteria, 'run', $limit);
} finally {
$this->processing = false;
// TODO release lock
private function lockSubscriptions(Subscriptions $subscriptions): void
{
foreach ($subscriptions as $subscription) {
$sT = microtime(true);
while (!$this->subscriptionStore->acquireLock($subscription->id)) {
if (microtime(true) - $sT > 5) {
// TODO better exception handling
throw new \RuntimeException(sprintf('Failed to acquire lock for subscription "%s"', $subscription->id->value), 1721895494);
}
}
}
}

private function releaseSubscriptions(Subscriptions $subscriptions): void
{
foreach ($subscriptions as $subscription) {
$this->subscriptionStore->releaseLock($subscription->id);
}
}

private function runInternal(SubscriptionCriteria $criteria, string $process, int|null $limit): void
{
$this->logger?->info(sprintf('Subscription Engine: %s: Start.', $process));
$this->discoverNewSubscriptions();
// TODO $this->discoverDetachedSubscriptions($criteria);
// TODO $this->retrySubscriptions($criteria);
//$this->discoverDetachedSubscriptions($criteria);
$this->retrySubscriptions($criteria);
$subscriptions = $this->subscriptionStore->findByCriteria($criteria);
if ($subscriptions->isEmpty()) {
$this->logger?->info(sprintf('Subscription Engine: %s: No subscriptions to process, finishing', $process));
return;// new ProcessedResult(0, true);
}

$this->lockSubscriptions($subscriptions);

$startSequenceNumber = $this->lowestSubscriptionPosition($subscriptions)->next();
$this->logger?->debug(
sprintf(
Expand Down Expand Up @@ -151,6 +149,7 @@ private function runInternal(SubscriptionCriteria $criteria, string $process, in
$limit,
),
);
$this->releaseSubscriptions($subscriptions);

return;// new ProcessedResult($messageCounter, false, $errors);
}
Expand Down Expand Up @@ -179,6 +178,7 @@ private function runInternal(SubscriptionCriteria $criteria, string $process, in
$lastSequenceNumber?->value ?: $startSequenceNumber->value,
),
);
$this->releaseSubscriptions($subscriptions);

return;// new ProcessedResult($messageCounter, true, $errors);
}
Expand Down Expand Up @@ -253,6 +253,47 @@ private function discoverNewSubscriptions(): void
}
}

private function retrySubscriptions(SubscriptionCriteria $criteria): void
{
$failedSubscriptions = $this->subscriptionStore->findByCriteria(
SubscriptionCriteria::create(
ids: $criteria->ids,
groups: $criteria->groups,
status: [Status::ERROR],
)
);
foreach ($failedSubscriptions as $subscription) {
if ($subscription->error === null) {
continue;
}
$retryable = in_array(
$subscription->error->previousStatus,
[Status::NEW, Status::BOOTING, Status::ACTIVE],
true,
);
if (!$retryable) {
continue;
}
if (!$this->retryStrategy->shouldRetry($subscription)) {
continue;
}
$this->subscriptionStore->update($subscription->id, static fn(Subscription $subscription) => $subscription->with(
status: $subscription->error->previousStatus,

Check failure on line 281 in src/Subscription/Engine/DefaultSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / build (8.2)

Cannot access property $previousStatus on Wwwision\DCBLibrary\Subscription\SubscriptionError|null.

Check failure on line 281 in src/Subscription/Engine/DefaultSubscriptionEngine.php

View workflow job for this annotation

GitHub Actions / build (8.3)

Cannot access property $previousStatus on Wwwision\DCBLibrary\Subscription\SubscriptionError|null.
retryAttempt: $subscription->retryAttempt + 1,
)->withoutError());

$this->logger?->info(
sprintf(
'Subscription Engine: Retry subscription "%s" (%d) and set back to %s.',
$subscription->id->value,
$subscription->retryAttempt + 1,
$subscription->error->previousStatus->name,
),
);
}
}


private function lastSequenceNumber(): SequenceNumber
{
$events = $this->eventStore->readAll(ReadOptions::create(backwards: true));
Expand Down
61 changes: 61 additions & 0 deletions src/Subscription/RetryStrategy/ClockBasedRetryStrategy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?php

declare(strict_types=1);

namespace Wwwision\DCBLibrary\Subscription\RetryStrategy;

use DateTimeImmutable;
use Psr\Clock\ClockInterface;
use Wwwision\DCBLibrary\Subscription\Subscription;

final class ClockBasedRetryStrategy implements RetryStrategy
{
public const DEFAULT_BASE_DELAY = 5;
public const DEFAULT_DELAY_FACTOR = 2;
public const DEFAULT_MAX_ATTEMPTS = 5;

/**
* @param int $baseDelay in seconds
* @param positive-int $maxAttempts
*/
public function __construct(
private readonly ClockInterface $clock,
private readonly int $baseDelay = self::DEFAULT_BASE_DELAY,
private readonly float $delayFactor = self::DEFAULT_DELAY_FACTOR,
private readonly int $maxAttempts = self::DEFAULT_MAX_ATTEMPTS,
) {
}

public function shouldRetry(Subscription $subscription): bool
{
if ($subscription->retryAttempt >= $this->maxAttempts) {
return false;
}

$lastSavedAt = $subscription->lastSavedAt;

if ($lastSavedAt === null) {
return false;
}

$nextRetryDate = $this->calculateNextRetryDate($lastSavedAt, $subscription->retryAttempt);

return $nextRetryDate <= $this->clock->now();
}

private function calculateNextRetryDate(DateTimeImmutable $lastDate, int $attempt): DateTimeImmutable
{
$nextDate = $lastDate->modify(sprintf('+%d seconds', $this->calculateDelay($attempt)));

if ($nextDate === false) {
throw new \RuntimeException('Could not calculate next retry date.', 1721897113);
}

return $nextDate;
}

private function calculateDelay(int $attempt): int
{
return (int)round($this->baseDelay * ($this->delayFactor ** $attempt));
}
}
15 changes: 15 additions & 0 deletions src/Subscription/RetryStrategy/NoRetryStrategy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Wwwision\DCBLibrary\Subscription\RetryStrategy;

use Wwwision\DCBLibrary\Subscription\Subscription;

final class NoRetryStrategy implements RetryStrategy
{
public function shouldRetry(Subscription $subscription): bool
{
return false;
}
}
12 changes: 12 additions & 0 deletions src/Subscription/RetryStrategy/RetryStrategy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Wwwision\DCBLibrary\Subscription\RetryStrategy;

use Wwwision\DCBLibrary\Subscription\Subscription;

interface RetryStrategy
{
public function shouldRetry(Subscription $subscription): bool;
}
4 changes: 4 additions & 0 deletions src/Subscription/Store/SubscriptionStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ public function findOneById(SubscriptionId $subscriptionId): ?Subscription;

public function findByCriteria(SubscriptionCriteria $criteria): Subscriptions;

public function acquireLock(SubscriptionId $subscriptionId): bool;

public function releaseLock(SubscriptionId $subscriptionId): void;

public function add(Subscription $subscription): void;

/**
Expand Down
18 changes: 4 additions & 14 deletions src/Subscription/Subscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public function __construct(
public readonly RunMode $runMode,
public readonly Status $status,
public readonly SequenceNumber $position,
public readonly bool $locked = false,
public readonly SubscriptionError|null $error = null,
public readonly int $retryAttempt = 0,
public readonly DateTimeImmutable|null $lastSavedAt = null,
Expand Down Expand Up @@ -47,6 +48,7 @@ public function with(
$this->runMode,
$status ?? $this->status,
$position ?? $this->position,
$this->locked,
$this->error,
$retryAttempt ?? $this->retryAttempt,
$this->lastSavedAt,
Expand All @@ -66,25 +68,12 @@ public function withError(Throwable|string $throwableOrMessage): self
$this->runMode,
Status::ERROR,
$this->position,
$this->locked,
$error,
$this->retryAttempt,
$this->lastSavedAt,
);
}

public function withLastSavedAt(DateTimeImmutable $lastSavedAt): self
{
return new self(
$this->id,
$this->group,
$this->runMode,
$this->status,
$this->position,
$this->error,
$this->retryAttempt,
$lastSavedAt,
);
}
//
// public function doRetry(): void
// {
Expand All @@ -104,6 +93,7 @@ public function withoutError(): self
$this->runMode,
$this->status,
$this->position,
$this->locked,
null,
$this->retryAttempt,
$this->lastSavedAt,
Expand Down

0 comments on commit b779408

Please sign in to comment.