Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add has header and add only header if used #562

Merged
merged 1 commit into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/pages/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,9 @@ $middleware = new FilterEventTranslator(static function (AggregateChanged $event
With this middleware you can exclude archived events.

```php
use Patchlevel\EventSourcing\Message\Translator\ExcludeArchivedEventTranslator;
use Patchlevel\EventSourcing\Message\Translator\ExcludeEventWithHeaderTranslator;

$middleware = new ExcludeArchivedEventTranslator();
$middleware = new ExcludeEventWithHeaderTranslator();
```
!!! warning

Expand All @@ -258,9 +258,9 @@ $middleware = new ExcludeArchivedEventTranslator();
With this middleware you can only allow archived events.

```php
use Patchlevel\EventSourcing\Message\Translator\OnlyArchivedEventTranslator;
use Patchlevel\EventSourcing\Message\Translator\IncludeEventWithHeaderTranslator;

$middleware = new OnlyArchivedEventTranslator();
$middleware = new IncludeEventWithHeaderTranslator();
```
!!! warning

Expand Down
6 changes: 6 additions & 0 deletions src/Message/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public function header(string $name): object
return $header;
}

/** @param class-string $name */
public function hasHeader(string $name): bool
{
return array_key_exists($name, $this->headers);
}

public function withHeader(object $header): self
{
$message = clone $this;
Expand Down
27 changes: 0 additions & 27 deletions src/Message/Translator/ExcludeArchivedEventTranslator.php

This file was deleted.

26 changes: 26 additions & 0 deletions src/Message/Translator/ExcludeEventWithHeaderTranslator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Message\Message;

final class ExcludeEventWithHeaderTranslator implements Translator
{
/** @param class-string $header */
public function __construct(
private readonly string $header,
) {
}

/** @return list<Message> */
public function __invoke(Message $message): array
{
if ($message->hasHeader($this->header)) {
return [];
}

return [$message];
}
}
26 changes: 26 additions & 0 deletions src/Message/Translator/IncludeEventWithHeaderTranslator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Message\Message;

final class IncludeEventWithHeaderTranslator implements Translator
{
/** @param class-string $header */
public function __construct(
private readonly string $header,
) {
}

/** @return list<Message> */
public function __invoke(Message $message): array
{
if ($message->hasHeader($this->header)) {
return [$message];
}

return [];
}
}
27 changes: 0 additions & 27 deletions src/Message/Translator/OnlyArchivedEventTranslator.php

This file was deleted.

4 changes: 2 additions & 2 deletions src/Metadata/Message/MessageHeaderRegistry.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Debug\Trace\TraceHeader;
use Patchlevel\EventSourcing\Store\ArchivedHeader;
use Patchlevel\EventSourcing\Store\NewStreamStartHeader;
use Patchlevel\EventSourcing\Store\StreamStartHeader;

use function array_flip;
use function array_key_exists;
Expand Down Expand Up @@ -76,7 +76,7 @@ public static function createWithInternalHeaders(array $headerNameToClassMap = [
'aggregate' => AggregateHeader::class,
'trace' => TraceHeader::class,
'archived' => ArchivedHeader::class,
'newStreamStart' => NewStreamStartHeader::class,
'newStreamStart' => StreamStartHeader::class,
];

return new self($headerNameToClassMap + $internalHeaders);
Expand Down
11 changes: 2 additions & 9 deletions src/Repository/DefaultRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
use Patchlevel\EventSourcing\Aggregate\AggregateRootId;
use Patchlevel\EventSourcing\Clock\SystemClock;
use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\Message\HeaderNotFound;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootMetadata;
use Patchlevel\EventSourcing\Repository\MessageDecorator\MessageDecorator;
Expand All @@ -18,9 +17,9 @@
use Patchlevel\EventSourcing\Snapshot\SnapshotVersionInvalid;
use Patchlevel\EventSourcing\Store\ArchivableStore;
use Patchlevel\EventSourcing\Store\CriteriaBuilder;
use Patchlevel\EventSourcing\Store\NewStreamStartHeader;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\Stream;
use Patchlevel\EventSourcing\Store\StreamStartHeader;
use Patchlevel\EventSourcing\Store\UniqueConstraintViolation;
use Psr\Clock\ClockInterface;
use Psr\Log\LoggerInterface;
Expand Down Expand Up @@ -377,13 +376,7 @@ private function archive(Message ...$messages): void
$lastMessageWithNewStreamStart = null;

foreach ($messages as $message) {
try {
$newStreamStartHeader = $message->header(NewStreamStartHeader::class);
} catch (HeaderNotFound) {
continue;
}

if (!$newStreamStartHeader->newStreamStart) {
if (!$message->hasHeader(StreamStartHeader::class)) {
continue;
}

Expand Down
8 changes: 6 additions & 2 deletions src/Repository/MessageDecorator/SplitStreamDecorator.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Metadata\Event\EventMetadataFactory;
use Patchlevel\EventSourcing\Store\NewStreamStartHeader;
use Patchlevel\EventSourcing\Store\StreamStartHeader;

final class SplitStreamDecorator implements MessageDecorator
{
Expand All @@ -20,6 +20,10 @@ public function __invoke(Message $message): Message
$event = $message->event();
$metadata = $this->eventMetadataFactory->metadata($event::class);

return $message->withHeader(new NewStreamStartHeader($metadata->splitStream));
if (!$metadata->splitStream) {
return $message;
}

return $message->withHeader(new StreamStartHeader());
}
}
4 changes: 0 additions & 4 deletions src/Store/ArchivedHeader.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,4 @@
/** @psalm-immutable */
final class ArchivedHeader
{
public function __construct(
public readonly bool $archived,
) {
}
}
34 changes: 11 additions & 23 deletions src/Store/DoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -174,35 +174,23 @@ function (Connection $connection) use ($messages): void {

try {
$aggregateHeader = $message->header(AggregateHeader::class);

$parameters[] = $aggregateHeader->aggregateName;
$parameters[] = $aggregateHeader->aggregateId;
$parameters[] = $aggregateHeader->playhead;
$parameters[] = $data->name;
$parameters[] = $data->payload;

$parameters[] = $aggregateHeader->recordedOn;
$types[$offset + 5] = $dateTimeType;
} catch (HeaderNotFound $e) {
throw new MissingDataForStorage($e->name, $e);
}

try {
$newStreamStart = $message->header(NewStreamStartHeader::class)->newStreamStart;
} catch (HeaderNotFound) {
$newStreamStart = false;
}
$parameters[] = $aggregateHeader->aggregateName;
$parameters[] = $aggregateHeader->aggregateId;
$parameters[] = $aggregateHeader->playhead;
$parameters[] = $data->name;
$parameters[] = $data->payload;

$parameters[] = $newStreamStart;
$types[$offset + 6] = $booleanType;
$parameters[] = $aggregateHeader->recordedOn;
$types[$offset + 5] = $dateTimeType;

try {
$archived = $message->header(ArchivedHeader::class)->archived;
} catch (HeaderNotFound) {
$archived = false;
}
$parameters[] = $message->hasHeader(StreamStartHeader::class);
$types[$offset + 6] = $booleanType;

$parameters[] = $archived;
$parameters[] = $message->hasHeader(ArchivedHeader::class);
$types[$offset + 7] = $booleanType;

$parameters[] = $this->headersSerializer->serialize($this->getCustomHeaders($message));
Expand Down Expand Up @@ -305,7 +293,7 @@ private function getCustomHeaders(Message $message): array
{
$filteredHeaders = [
AggregateHeader::class,
NewStreamStartHeader::class,
StreamStartHeader::class,
ArchivedHeader::class,
];

Expand Down
21 changes: 14 additions & 7 deletions src/Store/DoctrineDbalStoreStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,25 @@ private function buildGenerator(
$this->index = $data['id'];
$event = $eventSerializer->deserialize(new SerializedEvent($data['event'], $data['payload']));

$customHeaders = $headersSerializer->deserialize(DoctrineHelper::normalizeCustomHeaders($data['custom_headers'], $platform));

yield Message::create($event)
$message = Message::create($event)
->withHeader(new AggregateHeader(
$data['aggregate'],
$data['aggregate_id'],
DoctrineHelper::normalizePlayhead($data['playhead'], $platform),
DoctrineHelper::normalizeRecordedOn($data['recorded_on'], $platform),
))
->withHeader(new ArchivedHeader(DoctrineHelper::normalizeArchived($data['archived'], $platform)))
->withHeader(new NewStreamStartHeader(DoctrineHelper::normalizeNewStreamStart($data['new_stream_start'], $platform)))
->withHeaders($customHeaders);
));

if ($data['archived']) {
$message = $message->withHeader(new ArchivedHeader());
}

if ($data['new_stream_start']) {
$message = $message->withHeader(new StreamStartHeader());
}

$customHeaders = $headersSerializer->deserialize(DoctrineHelper::normalizeCustomHeaders($data['custom_headers'], $platform));

yield $message->withHeaders($customHeaders);
}
}

Expand Down
23 changes: 0 additions & 23 deletions src/Store/DoctrineHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
use Doctrine\DBAL\Types\Types;

use function is_array;
use function is_bool;
use function is_int;

final class DoctrineHelper
Expand Down Expand Up @@ -49,26 +48,4 @@ public static function normalizeCustomHeaders(string $customHeaders, AbstractPla

return $normalizedCustomHeaders;
}

public static function normalizeArchived(mixed $value, AbstractPlatform $platform): bool
{
$normalizedValue = Type::getType(Types::BOOLEAN)->convertToPHPValue($value, $platform);

if (!is_bool($normalizedValue)) {
throw new InvalidType('archived', 'boolean');
}

return $normalizedValue;
}

public static function normalizeNewStreamStart(mixed $value, AbstractPlatform $platform): bool
{
$normalizedValue = Type::getType(Types::BOOLEAN)->convertToPHPValue($value, $platform);

if (!is_bool($normalizedValue)) {
throw new InvalidType('new_stream_start', 'boolean');
}

return $normalizedValue;
}
}
14 changes: 0 additions & 14 deletions src/Store/NewStreamStartHeader.php

This file was deleted.

10 changes: 10 additions & 0 deletions src/Store/StreamStartHeader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store;

/** @psalm-immutable */
final class StreamStartHeader
{
}
Loading
Loading