Skip to content

Commit

Permalink
TASK: Block ContentRepository::handle() by default
Browse files Browse the repository at this point in the history
  • Loading branch information
mhsdesign committed May 12, 2024
1 parent 7c5784f commit 2f0039c
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,75 +5,23 @@
namespace Neos\ContentRepository\Core\CommandHandler;

use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\EventStore\Model\Event\Version;
use Neos\EventStore\Model\EventStore\CommitResult;

/**
* Result of the {@see ContentRepository::handle()} method to be able to block until the projections were updated.
* Was the result of the {@see ContentRepository::handle()} method.
* Previously one would need this to be able to block until the projections were updated.
*
* {@see PendingProjections} for a detailed explanation how the blocking works.
* This will no longer be required in the future see https://github.com/neos/neos-development-collection/pull/4988
*
* @deprecated this b/c layer will be removed with the next beta or before Neos 9 final release
* @api
*/
final readonly class CommandResult
{
public function __construct(
private PendingProjections $pendingProjections,
public CommitResult $commitResult,
) {
}

/**
* an empty command result which should not result in projection updates
* @return self
*/
public static function empty(): self
{
return new self(
PendingProjections::empty(),
new CommitResult(
Version::first(),
SequenceNumber::none()
)
);
}

/**
* Wait until all projections are up to date; i.e. have processed the events.
*
* @return void
* @api
* We block by default thus you must not call this method or use this legacy stub
* @deprecated this b/c layer will be removed with the next beta or before Neos 9 final release
*/
public function block(): void
{
foreach ($this->pendingProjections->projections as $pendingProjection) {
$expectedSequenceNumber = $this->pendingProjections->getExpectedSequenceNumber($pendingProjection);
$this->blockProjection($pendingProjection, $expectedSequenceNumber);
}
}

/**
* @param ProjectionInterface<ProjectionStateInterface> $projection
*/
private function blockProjection(ProjectionInterface $projection, SequenceNumber $expectedSequenceNumber): void
{
$attempts = 0;
while ($projection->getCheckpointStorage()->getHighestAppliedSequenceNumber()->value < $expectedSequenceNumber->value) {
usleep(50000); // 50000μs = 50ms
if (++$attempts > 100) { // 5 seconds
throw new \RuntimeException(
sprintf(
'TIMEOUT while waiting for projection "%s" to catch up to sequence number %d ' .
'- check the error logs for details.',
$projection::class,
$expectedSequenceNumber->value
),
1550232279
);
}
}
}
}
46 changes: 41 additions & 5 deletions Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
use Neos\ContentRepository\Core\CommandHandler\CommandResult;
use Neos\ContentRepository\Core\CommandHandler\PendingProjections;
use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface;
use Neos\ContentRepository\Core\Projection\ProjectionInterface;
use Neos\ContentRepository\Core\Projection\Projections;
use Neos\ContentRepository\Core\Projection\ProjectionStateInterface;
use Neos\ContentRepository\Core\Projection\WithMarkStaleInterface;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Exception\ConcurrencyException;
use Neos\EventStore\Model\Event;
use Neos\EventStore\Model\Event\EventId;
use Neos\EventStore\Model\Event\SequenceNumber;
use Neos\EventStore\Model\Events;

/**
Expand All @@ -39,7 +40,7 @@ public function __construct(
public function publishEvents(EventsToPublish $eventsToPublish): CommandResult
{
if ($eventsToPublish->events->isEmpty()) {
return CommandResult::empty();
return new CommandResult();
}
// the following logic could also be done in an AppEventStore::commit method (being called
// directly from the individual Command Handlers).
Expand Down Expand Up @@ -67,7 +68,42 @@ public function publishEvents(EventsToPublish $eventsToPublish): CommandResult
}
$this->projectionCatchUpTrigger->triggerCatchUp($pendingProjections->projections);

// The CommandResult can be used to block until projections are up to date.
return new CommandResult($pendingProjections, $commitResult);
$this->block($pendingProjections);
return new CommandResult();
}

/**
* Wait until all projections are up to date; i.e. have processed the events.
*
* @return void
*/
private function block(PendingProjections $pendingProjections): void
{
foreach ($pendingProjections->projections as $pendingProjection) {
$expectedSequenceNumber = $pendingProjections->getExpectedSequenceNumber($pendingProjection);
$this->blockProjection($pendingProjection, $expectedSequenceNumber);
}
}

/**
* @param ProjectionInterface<ProjectionStateInterface> $projection
*/
private function blockProjection(ProjectionInterface $projection, SequenceNumber $expectedSequenceNumber): void
{
$attempts = 0;
while ($projection->getCheckpointStorage()->getHighestAppliedSequenceNumber()->value < $expectedSequenceNumber->value) {
usleep(50000); // 50000μs = 50ms
if (++$attempts > 100) { // 5 seconds
throw new \RuntimeException(
sprintf(
'TIMEOUT while waiting for projection "%s" to catch up to sequence number %d ' .
'- check the error logs for details.',
$projection::class,
$expectedSequenceNumber->value
),
1550232279
);
}
}
}
}

0 comments on commit 2f0039c

Please sign in to comment.