Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TASK: Yield events to publish in workspace command handler #5315

Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ Feature: ForkContentStream Without Dimensions
| propertyValues | {"text": {"value": "original value", "type": "string"}} |
| propertiesToUnset | {} |

Scenario: Try to fork a content stream that is closed:
Scenario: Try to create a workspace with the base workspace referring to a closed content stream
When the command CloseContentStream is executed with payload:
| Key | Value |
| contentStreamId | "cs-identifier" |
When the command ForkContentStream is executed with payload and exceptions are caught:
| Key | Value |
| contentStreamId | "user-cs-identifier" |
| sourceContentStreamId | "cs-identifier" |
When the command CreateWorkspace is executed with payload and exceptions are caught:
| Key | Value |
| workspaceName | "user-test" |
| baseWorkspaceName | "live" |
| newContentStreamId | "user-cs-identifier" |
Then the last command should have thrown an exception of type "ContentStreamIsClosed"
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ Feature: Individual node publication
Scenario: It is possible to publish a single node; and only this one is live.
# create nodes in user WS
Given I am in workspace "user-test"
And I am in workspace "user-test"
And I am in dimension space point {}
And the following CreateNodeAggregateWithNode commands are executed:
| nodeAggregateId | nodeTypeName | parentNodeAggregateId | nodeName | tetheredDescendantNodeAggregateIds |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Neos\ContentRepository\Core\CommandHandlingDependencies;
use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\EventStore\EventsToPublish;
use Neos\ContentRepository\Core\EventStore\EventsToPublishFailed;

/**
* Implementation Detail of {@see ContentRepository::handle}, which does the command dispatching to the different
Expand All @@ -26,7 +27,10 @@ public function __construct(CommandHandlerInterface ...$handlers)
$this->handlers = $handlers;
}

public function handle(CommandInterface $command, CommandHandlingDependencies $commandHandlingDependencies): EventsToPublish
/**
* @return EventsToPublish|\Generator<int, EventsToPublish>
*/
public function handle(CommandInterface $command, CommandHandlingDependencies $commandHandlingDependencies): EventsToPublish|\Generator
{
// TODO fail if multiple handlers can handle the same command
foreach ($this->handlers as $handler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Neos\ContentRepository\Core\CommandHandler;

use Neos\ContentRepository\Core\CommandHandlingDependencies;
use Neos\ContentRepository\Core\ContentRepository;
use Neos\ContentRepository\Core\EventStore\EventsToPublish;

/**
Expand All @@ -19,5 +18,9 @@
interface CommandHandlerInterface
{
public function canHandle(CommandInterface $command): bool;
public function handle(CommandInterface $command, CommandHandlingDependencies $commandHandlingDependencies): EventsToPublish;

/**
* @return EventsToPublish|\Generator<int, EventsToPublish>
*/
public function handle(CommandInterface $command, CommandHandlingDependencies $commandHandlingDependencies): EventsToPublish|\Generator;
}
96 changes: 64 additions & 32 deletions Neos.ContentRepository.Core/Classes/ContentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName;
use Neos\ContentRepository\Core\SharedModel\Workspace\Workspaces;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Exception\ConcurrencyException;
use Neos\EventStore\Model\Event\EventMetadata;
use Neos\EventStore\Model\EventEnvelope;
use Neos\EventStore\Model\EventStream\VirtualStreamName;
Expand Down Expand Up @@ -101,38 +102,36 @@ public function handle(CommandInterface $command): void
{
// the commands only calculate which events they want to have published, but do not do the
// publishing themselves
$eventsToPublish = $this->commandBus->handle($command, $this->commandHandlingDependencies);

// TODO meaningful exception message
$initiatingUserId = $this->userIdProvider->getUserId();
$initiatingTimestamp = $this->clock->now()->format(\DateTimeInterface::ATOM);

// Add "initiatingUserId" and "initiatingTimestamp" metadata to all events.
// This is done in order to keep information about the _original_ metadata when an
// event is re-applied during publishing/rebasing
// "initiatingUserId": The identifier of the user that originally triggered this event. This will never
// be overridden if it is set once.
// "initiatingTimestamp": The timestamp of the original event. The "recordedAt" timestamp will always be
// re-created and reflects the time an event was actually persisted in a stream,
// the "initiatingTimestamp" will be kept and is never overridden again.
// TODO: cleanup
$eventsToPublish = new EventsToPublish(
$eventsToPublish->streamName,
Events::fromArray(
$eventsToPublish->events->map(function (EventInterface|DecoratedEvent $event) use (
$initiatingUserId,
$initiatingTimestamp
) {
$metadata = $event instanceof DecoratedEvent ? $event->eventMetadata?->value ?? [] : [];
$metadata['initiatingUserId'] ??= $initiatingUserId;
$metadata['initiatingTimestamp'] ??= $initiatingTimestamp;
return DecoratedEvent::create($event, metadata: EventMetadata::fromArray($metadata));
})
),
$eventsToPublish->expectedVersion,
);

$this->eventPersister->publishEvents($this, $eventsToPublish);
$eventsToPublishOrGenerator = $this->commandBus->handle($command, $this->commandHandlingDependencies);

if ($eventsToPublishOrGenerator instanceof EventsToPublish) {
$eventsToPublish = $this->enrichEventsToPublishWithMetadata($eventsToPublishOrGenerator);
$this->eventPersister->publishEvents($this, $eventsToPublish);
} else {
foreach ($eventsToPublishOrGenerator as $eventsToPublish) {
assert($eventsToPublish instanceof EventsToPublish); // just for the ide
$eventsToPublish = $this->enrichEventsToPublishWithMetadata($eventsToPublish);
try {
$this->eventPersister->publishEvents($this, $eventsToPublish);
} catch (ConcurrencyException $e) {
// we pass the exception into the generator, so it could be try-caught and reacted upon:
//
// try {
// yield EventsToPublish();
// } catch (ConcurrencyException $e) {
// yield $restoreState();
// throw $e;
// }

$errorStrategy = $eventsToPublishOrGenerator->throw($e);

if ($errorStrategy instanceof EventsToPublish) {
$eventsToPublish = $this->enrichEventsToPublishWithMetadata($errorStrategy);
$this->eventPersister->publishEvents($this, $this->enrichEventsToPublishWithMetadata($eventsToPublish));
}
}
}
}
bwaidelich marked this conversation as resolved.
Show resolved Hide resolved
}


Expand Down Expand Up @@ -305,4 +304,37 @@ public function getContentDimensionSource(): ContentDimensionSourceInterface
{
return $this->contentDimensionSource;
}

/**
* Add "initiatingUserId" and "initiatingTimestamp" metadata to all events.
* This is done in order to keep information about the _original_ metadata when an
* event is re-applied during publishing/rebasing
* "initiatingUserId": The identifier of the user that originally triggered this event. This will never
* be overridden if it is set once.
* "initiatingTimestamp": The timestamp of the original event. The "recordedAt" timestamp will always be
* re-created and reflects the time an event was actually persisted in a stream,
* the "initiatingTimestamp" will be kept and is never overridden again.
*/
private function enrichEventsToPublishWithMetadata(EventsToPublish $eventsToPublish): EventsToPublish
{
$initiatingUserId = $this->userIdProvider->getUserId();
$initiatingTimestamp = $this->clock->now()->format(\DateTimeInterface::ATOM);

return new EventsToPublish(
$eventsToPublish->streamName,
Events::fromArray(
$eventsToPublish->events->map(function (EventInterface|DecoratedEvent $event) use (
$initiatingUserId,
$initiatingTimestamp
) {
$metadata = $event instanceof DecoratedEvent ? $event->eventMetadata?->value ?? [] : [];
$metadata['initiatingUserId'] ??= $initiatingUserId;
$metadata['initiatingTimestamp'] ??= $initiatingTimestamp;

return DecoratedEvent::create($event, metadata: EventMetadata::fromArray($metadata));
})
),
$eventsToPublish->expectedVersion,
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public function __construct(
}

/**
* @param EventsToPublish $eventsToPublish
* @throws ConcurrencyException in case the expectedVersion does not match
*/
public function publishEvents(ContentRepository $contentRepository, EventsToPublish $eventsToPublish): void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId;

/**
* @internal implementation detail. You must not use this command directly.
* Direct use may lead to hard to revert senseless state in your content repository.
* Please use the higher level workspace commands instead.
* @internal only exposed for testing purposes. You must not use this command directly.
* Direct use will leave your content stream in a closed state.
bwaidelich marked this conversation as resolved.
Show resolved Hide resolved
*/
final readonly class CloseContentStream implements CommandInterface
{
Expand Down
Loading
Loading