Skip to content

Commit

Permalink
Rewrite to central deduplication queue
Browse files Browse the repository at this point in the history
  • Loading branch information
kitsunet committed Nov 29, 2023
1 parent 1a7990e commit 242b5f6
Show file tree
Hide file tree
Showing 10 changed files with 209 additions and 149 deletions.
3 changes: 3 additions & 0 deletions Neos.ContentRepository.Core/Classes/ContentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
use Neos\ContentRepository\Core\Projection\Workspace\WorkspaceFinder;
use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface;
use Neos\ContentRepositoryRegistry\Service\CatchUpDeduplicationQueue;
use Neos\EventStore\CatchUp\CatchUp;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Model\Event\EventMetadata;
Expand Down Expand Up @@ -79,6 +80,7 @@ public function __construct(
private readonly ContentDimensionSourceInterface $contentDimensionSource,
private readonly UserIdProviderInterface $userIdProvider,
private readonly ClockInterface $clock,
private readonly CatchUpDeduplicationQueue $catchUpDeduplicationQueue,
) {
}

Expand Down Expand Up @@ -191,6 +193,7 @@ public function catchUpProjection(string $projectionClassName, CatchUpOptions $o
}
$catchUp->run($eventStream);
$catchUpHook?->onAfterCatchUp();
$this->catchUpDeduplicationQueue->releaseCatchUpLock($projectionClassName);
}

public function setUp(): SetupResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@

use Neos\ContentRepository\Core\CommandHandler\CommandResult;
use Neos\ContentRepository\Core\CommandHandler\PendingProjections;
use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface;
use Neos\ContentRepository\Core\Projection\Projections;
use Neos\ContentRepository\Core\Projection\WithMarkStaleInterface;
use Neos\ContentRepositoryRegistry\Service\CatchUpDeduplicationQueue;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Exception\ConcurrencyException;
use Neos\EventStore\Model\Event;
use Neos\EventStore\Model\Event\EventId;
use Neos\EventStore\Model\Event\EventMetadata;
use Neos\EventStore\Model\Events;
use Neos\EventStore\Model\EventStore\CommitResult;

/**
* Internal service to persist {@see EventInterface} with the proper normalization, and triggering the
Expand All @@ -27,7 +26,7 @@ final class EventPersister
{
public function __construct(
private readonly EventStoreInterface $eventStore,
private readonly ProjectionCatchUpTriggerInterface $projectionCatchUpTrigger,
private readonly CatchUpDeduplicationQueue $catchUpDeduplicationQueue,
private readonly EventNormalizer $eventNormalizer,
private readonly Projections $projections,
) {
Expand Down Expand Up @@ -67,7 +66,7 @@ public function publishEvents(EventsToPublish $eventsToPublish): CommandResult
$projection->markStale();
}
}
$this->projectionCatchUpTrigger->triggerCatchUp($pendingProjections->projections);
$this->catchUpDeduplicationQueue->requestCatchUp($pendingProjections->projections);

// The CommandResult can be used to block until projections are up to date.
return new CommandResult($pendingProjections, $commitResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
use Neos\ContentRepository\Core\Feature\WorkspaceCommandHandler;
use Neos\ContentRepository\Core\Infrastructure\Property\PropertyConverter;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface;
use Neos\ContentRepository\Core\Projection\ProjectionsAndCatchUpHooks;
use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface;
use Neos\ContentRepositoryRegistry\Service\CatchUpDeduplicationQueue;
use Neos\EventStore\EventStoreInterface;
use Psr\Clock\ClockInterface;
use Symfony\Component\Serializer\Serializer;
Expand All @@ -52,7 +52,7 @@ public function __construct(
ContentDimensionSourceInterface $contentDimensionSource,
Serializer $propertySerializer,
ProjectionsAndCatchUpHooksFactory $projectionsAndCatchUpHooksFactory,
private readonly ProjectionCatchUpTriggerInterface $projectionCatchUpTrigger,
private readonly CatchUpDeduplicationQueue $catchUpDeduplicationQueue,
private readonly UserIdProviderInterface $userIdProvider,
private readonly ClockInterface $clock,
) {
Expand Down Expand Up @@ -100,6 +100,7 @@ public function getOrBuild(): ContentRepository
$this->projectionFactoryDependencies->contentDimensionSource,
$this->userIdProvider,
$this->clock,
$this->catchUpDeduplicationQueue
);
}
return $this->contentRepository;
Expand Down Expand Up @@ -164,7 +165,7 @@ private function buildEventPersister(): EventPersister
if (!$this->eventPersister) {
$this->eventPersister = new EventPersister(
$this->projectionFactoryDependencies->eventStore,
$this->projectionCatchUpTrigger,
$this->catchUpDeduplicationQueue,
$this->projectionFactoryDependencies->eventNormalizer,
$this->projectionsAndCatchUpHooks->projections,
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,8 @@ public function getIterator(): \Traversable
{
yield from $this->projections;
}
public function isEmpty(): bool
{
return count($this->projections) === 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
use Neos\ContentRepositoryRegistry\ContentRepositoryRegistry;
use Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\SubprocessProjectionCatchUpTrigger;
use Neos\ContentRepository\Core\Factory\ContentRepositoryId;
use Neos\ContentRepositoryRegistry\Service\AsynchronousCatchUpRunnerState;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\Cli\CommandController;

Expand Down Expand Up @@ -41,9 +40,7 @@ class SubprocessProjectionCatchUpCommandController extends CommandController
public function catchupCommand(string $contentRepositoryIdentifier, string $projectionClassName): void
{
$contentRepositoryId = ContentRepositoryId::fromString($contentRepositoryIdentifier);
$runnerState = AsynchronousCatchUpRunnerState::create($contentRepositoryId, $projectionClassName, $this->catchUpStatesCache);
$contentRepository = $this->contentRepositoryRegistry->get($contentRepositoryId);
$contentRepository->catchUpProjection($projectionClassName, CatchUpOptions::create());
$runnerState->setStopped();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

namespace Neos\ContentRepositoryRegistry;

use Neos\Cache\Frontend\FrontendInterface;
use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\Dimension\ContentDimensionSourceInterface;
use Neos\ContentRepository\Core\Factory\ContentRepositoryFactory;
Expand All @@ -14,7 +15,6 @@
use Neos\ContentRepository\Core\Projection\CatchUpHookFactoryInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\ContentSubgraphInterface;
use Neos\ContentRepository\Core\Projection\ContentGraph\Node;
use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface;
use Neos\ContentRepository\Core\Projection\ProjectionFactoryInterface;
use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface;
use Neos\ContentRepositoryRegistry\Exception\ContentRepositoryNotFoundException;
Expand All @@ -25,6 +25,7 @@
use Neos\ContentRepositoryRegistry\Factory\NodeTypeManager\NodeTypeManagerFactoryInterface;
use Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\ProjectionCatchUpTriggerFactoryInterface;
use Neos\ContentRepositoryRegistry\Factory\UserIdProvider\UserIdProviderFactoryInterface;
use Neos\ContentRepositoryRegistry\Service\CatchUpDeduplicationQueue;
use Neos\EventStore\EventStoreInterface;
use Neos\Flow\Annotations as Flow;
use Neos\Flow\ObjectManagement\ObjectManagerInterface;
Expand Down Expand Up @@ -139,7 +140,7 @@ private function buildFactory(ContentRepositoryId $contentRepositoryId): Content
$this->buildContentDimensionSource($contentRepositoryId, $contentRepositorySettings),
$this->buildPropertySerializer($contentRepositoryId, $contentRepositorySettings),
$this->buildProjectionsFactory($contentRepositoryId, $contentRepositorySettings),
$this->buildProjectionCatchUpTrigger($contentRepositoryId, $contentRepositorySettings),
$this->buildCatchUpDeduplicationQueue($contentRepositoryId, $contentRepositorySettings),
$this->buildUserIdProvider($contentRepositoryId, $contentRepositorySettings),
$clock,
);
Expand Down Expand Up @@ -228,14 +229,25 @@ private function buildProjectionsFactory(ContentRepositoryId $contentRepositoryI
}

/** @param array<string, mixed> $contentRepositorySettings */
private function buildProjectionCatchUpTrigger(ContentRepositoryId $contentRepositoryId, array $contentRepositorySettings): ProjectionCatchUpTriggerInterface
private function buildCatchUpDeduplicationQueue(ContentRepositoryId $contentRepositoryId, array $contentRepositorySettings): CatchUpDeduplicationQueue
{
isset($contentRepositorySettings['projectionCatchUpTrigger']['factoryObjectName']) || throw InvalidConfigurationException::fromMessage('Content repository "%s" does not have projectionCatchUpTrigger.factoryObjectName configured.', $contentRepositoryId->value);
$projectionCatchUpTriggerFactory = $this->objectManager->get($contentRepositorySettings['projectionCatchUpTrigger']['factoryObjectName']);
if (!$projectionCatchUpTriggerFactory instanceof ProjectionCatchUpTriggerFactoryInterface) {
throw InvalidConfigurationException::fromMessage('projectionCatchUpTrigger.factoryObjectName for content repository "%s" is not an instance of %s but %s.', $contentRepositoryId->value, ProjectionCatchUpTriggerFactoryInterface::class, get_debug_type($projectionCatchUpTriggerFactory));
}
return $projectionCatchUpTriggerFactory->build($contentRepositoryId, $contentRepositorySettings['projectionCatchUpTrigger']['options'] ?? []);
$projectionCatchUpTrigger = $projectionCatchUpTriggerFactory->build($contentRepositoryId, $contentRepositorySettings['projectionCatchUpTrigger']['options'] ?? []);

$catchUpStateCache = $this->objectManager->get('Neos.ContentRepositoryRegistry:CacheCatchUpStates');
if (!$catchUpStateCache instanceof FrontendInterface) {
throw InvalidConfigurationException::fromMessage('The virtual object "Neos.ContentRepositoryRegistry:CacheCatchUpStates" must provide a Cache Frontend, but is "%s".', get_debug_type($catchUpStateCache));
}

return new CatchUpDeduplicationQueue(
$contentRepositoryId,
$catchUpStateCache,
$projectionCatchUpTrigger
);
}

/** @param array<string, mixed> $contentRepositorySettings */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@
declare(strict_types=1);
namespace Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger;

use Neos\Cache\Frontend\VariableFrontend;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
use Neos\ContentRepositoryRegistry\Service\AsynchronousCatchUpRunnerState;
use Neos\Flow\Annotations as Flow;
use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface;
use Neos\ContentRepository\Core\Projection\Projections;
Expand All @@ -24,12 +20,6 @@ class SubprocessProjectionCatchUpTrigger implements ProjectionCatchUpTriggerInte
*/
protected $flowSettings;

/**
* @Flow\Inject(name="Neos.ContentRepositoryRegistry:CacheCatchUpStates")
* @var VariableFrontend
*/
protected $catchUpStatesCache;

public function __construct(
private readonly ContentRepositoryId $contentRepositoryId
) {
Expand All @@ -39,71 +29,15 @@ public function triggerCatchUp(Projections $projections): void
{
// modelled after https://github.com/neos/Neos.EventSourcing/blob/master/Classes/EventPublisher/JobQueueEventPublisher.php#L103
// and https://github.com/Flowpack/jobqueue-common/blob/master/Classes/Queue/FakeQueue.php
$queuedProjections = [];
foreach ($projections as $projection) {
$runnerState = AsynchronousCatchUpRunnerState::create($this->contentRepositoryId, $projection::class, $this->catchUpStatesCache);
if (!$runnerState->isRunning()) {
$this->startCatchUp($projection, $runnerState);
continue;
}

if (!$runnerState->isQueued()) {
$runnerState->queue();
$queuedProjections[] = [$projection, $runnerState];
}
}

for ($attempts = 0; $attempts < 50 && !empty($queuedProjections); $attempts++) {
// Incremental back off with some randomness to get a wide spread between processes.
usleep(random_int(100, 25000) + ($attempts * $attempts * 10)); // 50000μs = 50ms
$queuedProjections = $this->recheckQueuedProjections($queuedProjections);
}
}

/**
* @param array<array{ProjectionInterface<ProjectionStateInterface>, AsynchronousCatchUpRunnerState}> $queuedProjections
* @return array<array{ProjectionInterface<ProjectionStateInterface>, AsynchronousCatchUpRunnerState}>
*/
private function recheckQueuedProjections(array $queuedProjections): array
{
$nextQueuedProjections = [];
/**
* @var ProjectionInterface<ProjectionStateInterface> $projection
* @var AsynchronousCatchUpRunnerState $runnerState
*/
foreach ($queuedProjections as [$projection, $runnerState]) {
// another process has started a catchUp and cleared the queue while we waited, our queue has become irrelevant
if ($runnerState->isQueued() === false) {
continue;
}

if ($runnerState->isRunning() === false) {
$this->startCatchUp($projection, $runnerState);
}

$nextQueuedProjections[] = [$projection, $runnerState];
Scripts::executeCommandAsync(
'neos.contentrepositoryregistry:subprocessprojectioncatchup:catchup',
$this->flowSettings,
[
'contentRepositoryIdentifier' => $this->contentRepositoryId->value,
'projectionClassName' => get_class($projection)
]
);
}

return $nextQueuedProjections;
}

/**
* @param ProjectionInterface<ProjectionStateInterface> $projection
* @param AsynchronousCatchUpRunnerState $runnerState
* @return void
*/
private function startCatchUp(ProjectionInterface $projection, AsynchronousCatchUpRunnerState $runnerState): void
{
$runnerState->run();
// We are about to start a catchUp and can therefore discard any queue that exists right now, apparently someone else is waiting for it.
$runnerState->dequeue();
Scripts::executeCommandAsync(
'neos.contentrepositoryregistry:subprocessprojectioncatchup:catchup',
$this->flowSettings,
[
'contentRepositoryIdentifier' => $this->contentRepositoryId->value,
'projectionClassName' => $projection::class
]
);
}
}

This file was deleted.

Loading

0 comments on commit 242b5f6

Please sign in to comment.