Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TASK: Prevent multiple catchup runs #4751

Open
wants to merge 9 commits into
base: 9.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 40 additions & 27 deletions Neos.ContentRepository.Core/Classes/ContentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
use Neos\ContentRepository\Core\Projection\CatchUpOptions;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphInterface;
use Neos\ContentRepository\Core\Projection\ContentStream\ContentStreamFinder;
use Neos\ContentRepository\Core\Projection\ProjectionCatchUpLockIdentifier;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionsAndCatchUpHooks;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
Expand All @@ -44,6 +45,7 @@
use Neos\EventStore\Model\EventStream\VirtualStreamName;
use Neos\EventStore\ProvidesSetupInterface;
use Psr\Clock\ClockInterface;
use Symfony\Component\Lock\LockFactory;

/**
* Main Entry Point to the system. Encapsulates the full event-sourced Content Repository.
Expand All @@ -63,7 +65,6 @@ final class ContentRepository
*/
private array $projectionStateCache;


/**
* @internal use the {@see ContentRepositoryFactory::getOrBuild()} to instantiate
*/
Expand All @@ -79,6 +80,7 @@ public function __construct(
private readonly ContentDimensionSourceInterface $contentDimensionSource,
private readonly UserIdProviderInterface $userIdProvider,
private readonly ClockInterface $clock,
private readonly LockFactory $lockFactory,
) {
}

Expand Down Expand Up @@ -158,39 +160,50 @@ public function projectionState(string $projectionStateClassName): ProjectionSta
*/
public function catchUpProjection(string $projectionClassName, CatchUpOptions $options): void
{
$projection = $this->projectionsAndCatchUpHooks->projections->get($projectionClassName);
$lock = $this->lockFactory->createLock(
ProjectionCatchUpLockIdentifier::createRunning($this->id, $projectionClassName)->value
);
if (!$lock->acquire()) {
return;
}

$catchUpHookFactory = $this->projectionsAndCatchUpHooks->getCatchUpHookFactoryForProjection($projection);
$catchUpHook = $catchUpHookFactory?->build($this);
try {
$projection = $this->projectionsAndCatchUpHooks->projections->get($projectionClassName);

// TODO allow custom stream name per projection
$streamName = VirtualStreamName::all();
$eventStream = $this->eventStore->load($streamName);
if ($options->maximumSequenceNumber !== null) {
$eventStream = $eventStream->withMaximumSequenceNumber($options->maximumSequenceNumber);
}
$catchUpHookFactory = $this->projectionsAndCatchUpHooks->getCatchUpHookFactoryForProjection($projection);
$catchUpHook = $catchUpHookFactory?->build($this);

$eventApplier = function (EventEnvelope $eventEnvelope) use ($projection, $catchUpHook, $options) {
$event = $this->eventNormalizer->denormalize($eventEnvelope->event);
if ($options->progressCallback !== null) {
($options->progressCallback)($event, $eventEnvelope);
}
if (!$projection->canHandle($event)) {
return;
// TODO allow custom stream name per projection
$streamName = VirtualStreamName::all();
$eventStream = $this->eventStore->load($streamName);
if ($options->maximumSequenceNumber !== null) {
$eventStream = $eventStream->withMaximumSequenceNumber($options->maximumSequenceNumber);
}
$catchUpHook?->onBeforeEvent($event, $eventEnvelope);
$projection->apply($event, $eventEnvelope);
$catchUpHook?->onAfterEvent($event, $eventEnvelope);
};

$catchUp = CatchUp::create($eventApplier, $projection->getCheckpointStorage());
$eventApplier = function (EventEnvelope $eventEnvelope) use ($projection, $catchUpHook, $options) {
$event = $this->eventNormalizer->denormalize($eventEnvelope->event);
if ($options->progressCallback !== null) {
($options->progressCallback)($event, $eventEnvelope);
}
if (!$projection->canHandle($event)) {
return;
}
$catchUpHook?->onBeforeEvent($event, $eventEnvelope);
$projection->apply($event, $eventEnvelope);
$catchUpHook?->onAfterEvent($event, $eventEnvelope);
};

if ($catchUpHook !== null) {
$catchUpHook->onBeforeCatchUp();
$catchUp = $catchUp->withOnBeforeBatchCompleted(fn() => $catchUpHook->onBeforeBatchCompleted());
$catchUp = CatchUp::create($eventApplier, $projection->getCheckpointStorage());

if ($catchUpHook !== null) {
$catchUpHook->onBeforeCatchUp();
$catchUp = $catchUp->withOnBeforeBatchCompleted(fn() => $catchUpHook->onBeforeBatchCompleted());
}
$catchUp->run($eventStream);
$catchUpHook?->onAfterCatchUp();
} finally {
$lock->release();
}
$catchUp->run($eventStream);
$catchUpHook?->onAfterCatchUp();
}

public function setUp(): SetupResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@

use Neos\ContentRepository\Core\CommandHandler\CommandResult;
use Neos\ContentRepository\Core\CommandHandler\PendingProjections;
use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface;
use Neos\ContentRepository\Core\Projection\Projections;
use Neos\ContentRepository\Core\Projection\WithMarkStaleInterface;
use Neos\ContentRepositoryRegistry\Service\CatchUpDeduplicationQueue;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Exception\ConcurrencyException;
use Neos\EventStore\Model\Event;
use Neos\EventStore\Model\Event\EventId;
use Neos\EventStore\Model\Event\EventMetadata;
use Neos\EventStore\Model\Events;
use Neos\EventStore\Model\EventStore\CommitResult;

/**
* Internal service to persist {@see EventInterface} with the proper normalization, and triggering the
Expand All @@ -27,7 +26,7 @@ final class EventPersister
{
public function __construct(
private readonly EventStoreInterface $eventStore,
private readonly ProjectionCatchUpTriggerInterface $projectionCatchUpTrigger,
private readonly CatchUpDeduplicationQueue $catchUpDeduplicationQueue,
private readonly EventNormalizer $eventNormalizer,
private readonly Projections $projections,
) {
Expand Down Expand Up @@ -67,7 +66,7 @@ public function publishEvents(EventsToPublish $eventsToPublish): CommandResult
$projection->markStale();
}
}
$this->projectionCatchUpTrigger->triggerCatchUp($pendingProjections->projections);
$this->catchUpDeduplicationQueue->requestCatchUp($pendingProjections->projections);

// The CommandResult can be used to block until projections are up to date.
return new CommandResult($pendingProjections, $commitResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
use Neos\ContentRepository\Core\Feature\WorkspaceCommandHandler;
use Neos\ContentRepository\Core\Infrastructure\Property\PropertyConverter;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface;
use Neos\ContentRepository\Core\Projection\ProjectionsAndCatchUpHooks;
use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface;
use Neos\ContentRepositoryRegistry\Service\CatchUpDeduplicationQueue;
use Neos\EventStore\EventStoreInterface;
use Psr\Clock\ClockInterface;
use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Lock\PersistingStoreInterface;
use Symfony\Component\Serializer\Serializer;

/**
Expand All @@ -52,9 +54,10 @@ public function __construct(
ContentDimensionSourceInterface $contentDimensionSource,
Serializer $propertySerializer,
ProjectionsAndCatchUpHooksFactory $projectionsAndCatchUpHooksFactory,
private readonly ProjectionCatchUpTriggerInterface $projectionCatchUpTrigger,
private readonly CatchUpDeduplicationQueue $catchUpDeduplicationQueue,
private readonly UserIdProviderInterface $userIdProvider,
private readonly ClockInterface $clock,
private readonly PersistingStoreInterface $lockStorage
) {
$contentDimensionZookeeper = new ContentDimensionZookeeper($contentDimensionSource);
$interDimensionalVariationGraph = new InterDimensionalVariationGraph(
Expand Down Expand Up @@ -100,6 +103,7 @@ public function getOrBuild(): ContentRepository
$this->projectionFactoryDependencies->contentDimensionSource,
$this->userIdProvider,
$this->clock,
new LockFactory($this->lockStorage)
);
}
return $this->contentRepository;
Expand Down Expand Up @@ -164,7 +168,7 @@ private function buildEventPersister(): EventPersister
if (!$this->eventPersister) {
$this->eventPersister = new EventPersister(
$this->projectionFactoryDependencies->eventStore,
$this->projectionCatchUpTrigger,
$this->catchUpDeduplicationQueue,
$this->projectionFactoryDependencies->eventNormalizer,
$this->projectionsAndCatchUpHooks->projections,
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

namespace Neos\ContentRepository\Core\Projection;

use Neos\ContentRepository\Core\Factory\ContentRepositoryId;

/**
*
*/
final class ProjectionCatchUpLockIdentifier

Check failure on line 10 in Neos.ContentRepository.Core/Classes/Projection/ProjectionCatchUpLockIdentifier.php

View workflow job for this annotation

GitHub Actions / PHP 8.2 Test linting-unit-functionaltests-mysql (deps: highest)

Class needs @api or @internal annotation.

Check failure on line 10 in Neos.ContentRepository.Core/Classes/Projection/ProjectionCatchUpLockIdentifier.php

View workflow job for this annotation

GitHub Actions / PHP 8.3 Test linting-unit-functionaltests-mysql (deps: highest)

Class needs @api or @internal annotation.
{
private function __construct(public string $value)
{
}

private static function generateIdentifier(ContentRepositoryId $contentRepositoryId, string $projectionClassName): string
{
return md5(sprintf('%s_%s', $contentRepositoryId->value, $projectionClassName));
}

public static function createRunning(ContentRepositoryId $contentRepositoryId, string $projectionClassName): self
{
return new self(self::generateIdentifier($contentRepositoryId, $projectionClassName) . 'RUN');
}

public static function createQueued(ContentRepositoryId $contentRepositoryId, string $projectionClassName): self
{
return new self(self::generateIdentifier($contentRepositoryId, $projectionClassName) . 'QUEUE');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,9 @@ public function getIterator(): \Traversable
{
yield from $this->projections;
}

public function isEmpty(): bool
kitsunet marked this conversation as resolved.
Show resolved Hide resolved
{
return $this->projections === [];
}
}
1 change: 1 addition & 0 deletions Neos.ContentRepository.Core/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"neos/utility-arrays": "*",
"doctrine/dbal": "^2.13",
"symfony/serializer": "^6.3",
"symfony/lock": "^6.0.0",
"psr/clock": "^1",
"behat/transliterator": "~1.0",
"ramsey/uuid": "^3.0 || ^4.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
use Neos\ContentRepositoryRegistry\ContentRepositoryRegistry;
use Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\SubprocessProjectionCatchUpTrigger;
use Neos\ContentRepository\Core\Factory\ContentRepositoryId;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\Cli\CommandController;

/**
Expand All @@ -18,11 +17,10 @@
*/
class SubprocessProjectionCatchUpCommandController extends CommandController
{
/**
* @Flow\Inject
* @var ContentRepositoryRegistry
*/
protected $contentRepositoryRegistry;
public function __construct(private readonly ContentRepositoryRegistry $contentRepositoryRegistry)
{
parent::__construct();
}

/**
* @param string $contentRepositoryIdentifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

namespace Neos\ContentRepositoryRegistry;

use Neos\Cache\Frontend\FrontendInterface;
use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\Dimension\ContentDimensionSourceInterface;
use Neos\ContentRepository\Core\Factory\ContentRepositoryFactory;
Expand All @@ -14,7 +15,6 @@
use Neos\ContentRepository\Core\Projection\CatchUpHookFactoryInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentSubgraphInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\Node;
use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface;
use Neos\ContentRepository\Core\Projection\ProjectionFactoryInterface;
use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface;
use Neos\ContentRepositoryRegistry\Exception\ContentRepositoryNotFoundException;
Expand All @@ -25,12 +25,16 @@
use Neos\ContentRepositoryRegistry\Factory\NodeTypeManager\NodeTypeManagerFactoryInterface;
use Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\ProjectionCatchUpTriggerFactoryInterface;
use Neos\ContentRepositoryRegistry\Factory\UserIdProvider\UserIdProviderFactoryInterface;
use Neos\ContentRepositoryRegistry\Service\CatchUpDeduplicationQueue;
use Neos\EventStore\EventStoreInterface;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\ObjectManagement\ObjectManagerInterface;
use Neos\Utility\Arrays;
use Neos\Utility\PositionalArraySorter;
use Psr\Clock\ClockInterface;
use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Lock\PersistingStoreInterface;
use Symfony\Component\Lock\Store\DoctrineDbalStore;
use Symfony\Component\Serializer\Normalizer\DenormalizerInterface;
use Symfony\Component\Serializer\Normalizer\NormalizerInterface;
use Symfony\Component\Serializer\Serializer;
Expand Down Expand Up @@ -139,9 +143,10 @@
$this->buildContentDimensionSource($contentRepositoryId, $contentRepositorySettings),
$this->buildPropertySerializer($contentRepositoryId, $contentRepositorySettings),
$this->buildProjectionsFactory($contentRepositoryId, $contentRepositorySettings),
$this->buildProjectionCatchUpTrigger($contentRepositoryId, $contentRepositorySettings),
$this->buildCatchUpDeduplicationQueue($contentRepositoryId, $contentRepositorySettings),
$this->buildUserIdProvider($contentRepositoryId, $contentRepositorySettings),
$clock,
$this->objectManager->get('Neos.ContentRepositoryRegistry:QueueLockStorage')

Check failure on line 149 in Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php

View workflow job for this annotation

GitHub Actions / PHP 8.2 Test linting-unit-functionaltests-mysql (deps: highest)

Parameter #10 $lockStorage of class Neos\ContentRepository\Core\Factory\ContentRepositoryFactory constructor expects Symfony\Component\Lock\PersistingStoreInterface, object given.

Check failure on line 149 in Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php

View workflow job for this annotation

GitHub Actions / PHP 8.3 Test linting-unit-functionaltests-mysql (deps: highest)

Parameter #10 $lockStorage of class Neos\ContentRepository\Core\Factory\ContentRepositoryFactory constructor expects Symfony\Component\Lock\PersistingStoreInterface, object given.
);
} catch (\Exception $exception) {
throw InvalidConfigurationException::fromException($contentRepositoryId, $exception);
Expand Down Expand Up @@ -228,14 +233,31 @@
}

/** @param array<string, mixed> $contentRepositorySettings */
private function buildProjectionCatchUpTrigger(ContentRepositoryId $contentRepositoryId, array $contentRepositorySettings): ProjectionCatchUpTriggerInterface
private function buildCatchUpDeduplicationQueue(ContentRepositoryId $contentRepositoryId, array $contentRepositorySettings): CatchUpDeduplicationQueue
{
isset($contentRepositorySettings['projectionCatchUpTrigger']['factoryObjectName']) || throw InvalidConfigurationException::fromMessage('Content repository "%s" does not have projectionCatchUpTrigger.factoryObjectName configured.', $contentRepositoryId->value);
$projectionCatchUpTriggerFactory = $this->objectManager->get($contentRepositorySettings['projectionCatchUpTrigger']['factoryObjectName']);
if (!$projectionCatchUpTriggerFactory instanceof ProjectionCatchUpTriggerFactoryInterface) {
throw InvalidConfigurationException::fromMessage('projectionCatchUpTrigger.factoryObjectName for content repository "%s" is not an instance of %s but %s.', $contentRepositoryId->value, ProjectionCatchUpTriggerFactoryInterface::class, get_debug_type($projectionCatchUpTriggerFactory));
}
return $projectionCatchUpTriggerFactory->build($contentRepositoryId, $contentRepositorySettings['projectionCatchUpTrigger']['options'] ?? []);
$projectionCatchUpTrigger = $projectionCatchUpTriggerFactory->build($contentRepositoryId, $contentRepositorySettings['projectionCatchUpTrigger']['options'] ?? []);

$catchUpStateLockStorage = $this->objectManager->get('Neos.ContentRepositoryRegistry:QueueLockStorage');
if (!$catchUpStateLockStorage instanceof PersistingStoreInterface) {
throw InvalidConfigurationException::fromMessage('The virtual object "Neos.ContentRepositoryRegistry:QueueLockStorage" must provide a \Symfony\Component\Lock\PersistingStoreInterface, but is "%s".', get_debug_type($catchUpStateLockStorage));
}
if ($catchUpStateLockStorage instanceof DoctrineDbalStore) {
try {
// hack to ensure tables exist for Dbal
$catchUpStateLockStorage->createTable();
} catch (\Doctrine\DBAL\Exception\TableExistsException $_) {}
}

return new CatchUpDeduplicationQueue(
$contentRepositoryId,
$catchUpStateLockStorage,
$projectionCatchUpTrigger
);
}

/** @param array<string, mixed> $contentRepositorySettings */
Expand Down
Loading
Loading