Skip to content

Commit

Permalink
Merge pull request #5288 from mhsdesign/task/cleanup-ProjectionCatchU…
Browse files Browse the repository at this point in the history
…pTrigger-extensibility

TASK: Cleanup projection catch-up trigger extensibility
  • Loading branch information
mhsdesign authored Oct 22, 2024
2 parents 0160656 + e5889e0 commit 339987c
Show file tree
Hide file tree
Showing 20 changed files with 39 additions and 398 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public function createNodesForPerformanceTest(int $nodesPerLevel, int $levels):
NodeAggregateClassification::CLASSIFICATION_ROOT,
);

$this->eventPersister->publishEvents(new EventsToPublish(
$this->eventPersister->publishEvents($this->contentRepository, new EventsToPublish(
$this->contentStreamEventStream->getEventStreamName(),
Events::with($rootNodeAggregateWasCreated),
ExpectedVersion::ANY()
Expand All @@ -102,7 +102,7 @@ public function createNodesForPerformanceTest(int $nodesPerLevel, int $levels):
$sumSoFar = 0;
$events = [];
$this->createHierarchy($rootNodeAggregateId, 1, $levels, $nodesPerLevel, $sumSoFar, $events);
$this->eventPersister->publishEvents(new EventsToPublish(
$this->eventPersister->publishEvents($this->contentRepository, new EventsToPublish(
$this->contentStreamEventStream->getEventStreamName(),
Events::fromArray($events),
ExpectedVersion::ANY()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
use Neos\ContentRepository\BehavioralTests\ProjectionRaceConditionTester\Dto\TraceEntryType;
use Neos\ContentRepository\Core\EventStore\EventInterface;
use Neos\ContentRepository\Core\Projection\CatchUpHookInterface;
use Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\SubprocessProjectionCatchUpTrigger;
use Neos\EventStore\Model\EventEnvelope;
use Neos\Flow\Annotations as Flow;

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ final class CommandHandlingDependencies
*/
private array $overriddenContentGraphInstances = [];

public function __construct(private readonly ContentRepository $contentRepository)
{
public function __construct(
private readonly ContentRepository $contentRepository
) {
}

public function handle(CommandInterface $command): void
Expand Down Expand Up @@ -110,4 +111,12 @@ public function overrideContentStreamId(WorkspaceName $workspaceName, ContentStr
unset($this->overriddenContentGraphInstances[$workspaceName->value]);
}
}

/**
* Fixme only required to build the possible catchup hooks
*/
public function getContentRepository(): ContentRepository
{
return $this->contentRepository;
}
}
11 changes: 10 additions & 1 deletion Neos.ContentRepository.Core/Classes/ContentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public function handle(CommandInterface $command): void
$eventsToPublish->expectedVersion,
);

$this->eventPersister->publishEvents($eventsToPublish);
$this->eventPersister->publishEvents($this, $eventsToPublish);
}


Expand Down Expand Up @@ -199,6 +199,15 @@ public function catchUpProjection(string $projectionClassName, CatchUpOptions $o
$catchUpHook?->onAfterCatchUp();
}

public function catchupProjections(): void
{
foreach ($this->projectionsAndCatchUpHooks->projections as $projection) {
// FIXME optimise by only loading required events once and not per projection
// see https://github.com/neos/neos-development-collection/pull/4988/
$this->catchUpProjection($projection::class, CatchUpOptions::create());
}
}

public function setUp(): void
{
$this->eventStore->setup();
Expand Down
20 changes: 4 additions & 16 deletions Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@

namespace Neos\ContentRepository\Core\EventStore;

use Neos\ContentRepository\Core\CommandHandler\PendingProjections;
use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface;
use Neos\ContentRepository\Core\Projection\Projections;
use Neos\ContentRepository\Core\ContentRepository;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Exception\ConcurrencyException;
use Neos\EventStore\Model\Events;
Expand All @@ -21,17 +19,15 @@
{
public function __construct(
private EventStoreInterface $eventStore,
private ProjectionCatchUpTriggerInterface $projectionCatchUpTrigger,
private EventNormalizer $eventNormalizer,
private Projections $projections,
) {
}

/**
* @param EventsToPublish $eventsToPublish
* @throws ConcurrencyException in case the expectedVersion does not match
*/
public function publishEvents(EventsToPublish $eventsToPublish): void
public function publishEvents(ContentRepository $contentRepository, EventsToPublish $eventsToPublish): void
{
if ($eventsToPublish->events->isEmpty()) {
return;
Expand All @@ -41,20 +37,12 @@ public function publishEvents(EventsToPublish $eventsToPublish): void
$normalizedEvents = Events::fromArray(
$eventsToPublish->events->map($this->eventNormalizer->normalize(...))
);
$commitResult = $this->eventStore->commit(
$this->eventStore->commit(
$eventsToPublish->streamName,
$normalizedEvents,
$eventsToPublish->expectedVersion
);
// for performance reasons, we do not want to update ALL projections all the time; but instead only
// the projections which are interested in the events from above.
// Further details can be found in the docs of PendingProjections.
$pendingProjections = PendingProjections::fromProjectionsAndEventsAndSequenceNumber(
$this->projections,
$eventsToPublish->events,
$commitResult->highestCommittedSequenceNumber
);

$this->projectionCatchUpTrigger->triggerCatchUp($pendingProjections->projections);
$contentRepository->catchUpProjections();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
use Neos\ContentRepository\Core\Feature\WorkspaceCommandHandler;
use Neos\ContentRepository\Core\Infrastructure\Property\PropertyConverter;
use Neos\ContentRepository\Core\NodeType\NodeTypeManager;
use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface;
use Neos\ContentRepository\Core\Projection\ProjectionsAndCatchUpHooks;
use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId;
use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface;
Expand All @@ -53,7 +52,6 @@ public function __construct(
ContentDimensionSourceInterface $contentDimensionSource,
Serializer $propertySerializer,
ProjectionsAndCatchUpHooksFactory $projectionsAndCatchUpHooksFactory,
private readonly ProjectionCatchUpTriggerInterface $projectionCatchUpTrigger,
private readonly UserIdProviderInterface $userIdProvider,
private readonly ClockInterface $clock,
) {
Expand Down Expand Up @@ -166,9 +164,7 @@ private function buildEventPersister(): EventPersister
if (!$this->eventPersister) {
$this->eventPersister = new EventPersister(
$this->projectionFactoryDependencies->eventStore,
$this->projectionCatchUpTrigger,
$this->projectionFactoryDependencies->eventNormalizer,
$this->projectionsAndCatchUpHooks->projections,
);
}
return $this->eventPersister;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ private function handlePublishWorkspace(
$baseWorkspace = $this->requireBaseWorkspace($workspace, $commandHandlingDependencies);

$this->publishContentStream(
$commandHandlingDependencies,
$workspace->currentContentStreamId,
$baseWorkspace->workspaceName,
$baseWorkspace->currentContentStreamId
Expand Down Expand Up @@ -237,6 +238,7 @@ private function handlePublishWorkspace(
* @throws \Exception
*/
private function publishContentStream(
CommandHandlingDependencies $commandHandlingDependencies,
ContentStreamId $contentStreamId,
WorkspaceName $baseWorkspaceName,
ContentStreamId $baseContentStreamId,
Expand Down Expand Up @@ -289,6 +291,7 @@ private function publishContentStream(
}
try {
$this->eventPersister->publishEvents(
$commandHandlingDependencies->getContentRepository(),
new EventsToPublish(
$baseWorkspaceContentStreamName->getEventStreamName(),
Events::fromArray($events),
Expand Down Expand Up @@ -500,6 +503,7 @@ function () use ($matchingCommands, $commandHandlingDependencies, $baseWorkspace

// 5) take EVENTS(MATCHING) and apply them to base WS.
$this->publishContentStream(
$commandHandlingDependencies,
$command->contentStreamIdForMatchingPart,
$baseWorkspace->workspaceName,
$baseWorkspace->currentContentStreamId
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Neos\ContentRepository\Core\Projection;

use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\EventStore\EventPersister;

/**
Expand All @@ -17,10 +18,10 @@
interface WithMarkStaleInterface
{
/**
* Triggered directly before {@see ProjectionCatchUpTriggerInterface::triggerCatchUp()} is called;
* by the {@see EventPersister::publishEvents()} method.
* Triggered during catching up after applying events
* {@see ContentRepository::catchUpProjection()}
*
* Can be f.e. used to disable caches inside the Projection State.
* Can be f.e. used to flush caches inside the Projection State.
*
* @return void
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class StructureAdjustmentService implements ContentRepositoryServiceInterface
private readonly ContentGraphInterface $liveContentGraph;

public function __construct(
ContentRepository $contentRepository,
private readonly ContentRepository $contentRepository,
private readonly EventPersister $eventPersister,
NodeTypeManager $nodeTypeManager,
InterDimensionalVariationGraph $interDimensionalVariationGraph,
Expand Down Expand Up @@ -102,7 +102,7 @@ public function fixError(StructureAdjustment $adjustment): void
$remediation = $adjustment->remediation;
$eventsToPublish = $remediation();
assert($eventsToPublish instanceof EventsToPublish);
$this->eventPersister->publishEvents($eventsToPublish);
$this->eventPersister->publishEvents($this->contentRepository, $eventsToPublish);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ protected function publishEvent(string $eventType, StreamName $streamName, array
->getValue($eventPersister);
$event = $eventNormalizer->denormalize($artificiallyConstructedEvent);

$eventPersister->publishEvents(new EventsToPublish(
$eventPersister->publishEvents($this->currentContentRepository, new EventsToPublish(
$streamName,
Events::with($event),
ExpectedVersion::ANY()
Expand Down
Loading

0 comments on commit 339987c

Please sign in to comment.