Skip to content

Commit

Permalink
Merge pull request #562 from patchlevel/add-has-header
Browse files Browse the repository at this point in the history
add has header and add only header if used
  • Loading branch information
DavidBadura authored Apr 3, 2024
2 parents df276b2 + e3c4560 commit e21a37f
Show file tree
Hide file tree
Showing 26 changed files with 142 additions and 306 deletions.
8 changes: 4 additions & 4 deletions docs/pages/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,9 @@ $middleware = new FilterEventTranslator(static function (AggregateChanged $event
With this middleware you can exclude archived events.

```php
use Patchlevel\EventSourcing\Message\Translator\ExcludeArchivedEventTranslator;
use Patchlevel\EventSourcing\Message\Translator\ExcludeEventWithHeaderTranslator;

$middleware = new ExcludeArchivedEventTranslator();
$middleware = new ExcludeEventWithHeaderTranslator();
```
!!! warning

Expand All @@ -258,9 +258,9 @@ $middleware = new ExcludeArchivedEventTranslator();
With this middleware you can only allow archived events.

```php
use Patchlevel\EventSourcing\Message\Translator\OnlyArchivedEventTranslator;
use Patchlevel\EventSourcing\Message\Translator\IncludeEventWithHeaderTranslator;

$middleware = new OnlyArchivedEventTranslator();
$middleware = new IncludeEventWithHeaderTranslator();
```
!!! warning

Expand Down
6 changes: 6 additions & 0 deletions src/Message/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public function header(string $name): object
return $header;
}

/** @param class-string $name */
public function hasHeader(string $name): bool
{
return array_key_exists($name, $this->headers);
}

public function withHeader(object $header): self
{
$message = clone $this;
Expand Down
27 changes: 0 additions & 27 deletions src/Message/Translator/ExcludeArchivedEventTranslator.php

This file was deleted.

26 changes: 26 additions & 0 deletions src/Message/Translator/ExcludeEventWithHeaderTranslator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Message\Message;

final class ExcludeEventWithHeaderTranslator implements Translator
{
/** @param class-string $header */
public function __construct(
private readonly string $header,
) {
}

/** @return list<Message> */
public function __invoke(Message $message): array
{
if ($message->hasHeader($this->header)) {
return [];
}

return [$message];
}
}
26 changes: 26 additions & 0 deletions src/Message/Translator/IncludeEventWithHeaderTranslator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Message\Message;

final class IncludeEventWithHeaderTranslator implements Translator
{
/** @param class-string $header */
public function __construct(
private readonly string $header,
) {
}

/** @return list<Message> */
public function __invoke(Message $message): array
{
if ($message->hasHeader($this->header)) {
return [$message];
}

return [];
}
}
27 changes: 0 additions & 27 deletions src/Message/Translator/OnlyArchivedEventTranslator.php

This file was deleted.

4 changes: 2 additions & 2 deletions src/Metadata/Message/MessageHeaderRegistry.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Debug\Trace\TraceHeader;
use Patchlevel\EventSourcing\Store\ArchivedHeader;
use Patchlevel\EventSourcing\Store\NewStreamStartHeader;
use Patchlevel\EventSourcing\Store\StreamStartHeader;

use function array_flip;
use function array_key_exists;
Expand Down Expand Up @@ -76,7 +76,7 @@ public static function createWithInternalHeaders(array $headerNameToClassMap = [
'aggregate' => AggregateHeader::class,
'trace' => TraceHeader::class,
'archived' => ArchivedHeader::class,
'newStreamStart' => NewStreamStartHeader::class,
'newStreamStart' => StreamStartHeader::class,
];

return new self($headerNameToClassMap + $internalHeaders);
Expand Down
11 changes: 2 additions & 9 deletions src/Repository/DefaultRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
use Patchlevel\EventSourcing\Aggregate\AggregateRootId;
use Patchlevel\EventSourcing\Clock\SystemClock;
use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\Message\HeaderNotFound;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootMetadata;
use Patchlevel\EventSourcing\Repository\MessageDecorator\MessageDecorator;
Expand All @@ -18,9 +17,9 @@
use Patchlevel\EventSourcing\Snapshot\SnapshotVersionInvalid;
use Patchlevel\EventSourcing\Store\ArchivableStore;
use Patchlevel\EventSourcing\Store\CriteriaBuilder;
use Patchlevel\EventSourcing\Store\NewStreamStartHeader;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\Stream;
use Patchlevel\EventSourcing\Store\StreamStartHeader;
use Patchlevel\EventSourcing\Store\UniqueConstraintViolation;
use Psr\Clock\ClockInterface;
use Psr\Log\LoggerInterface;
Expand Down Expand Up @@ -377,13 +376,7 @@ private function archive(Message ...$messages): void
$lastMessageWithNewStreamStart = null;

foreach ($messages as $message) {
try {
$newStreamStartHeader = $message->header(NewStreamStartHeader::class);
} catch (HeaderNotFound) {
continue;
}

if (!$newStreamStartHeader->newStreamStart) {
if (!$message->hasHeader(StreamStartHeader::class)) {
continue;
}

Expand Down
8 changes: 6 additions & 2 deletions src/Repository/MessageDecorator/SplitStreamDecorator.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Metadata\Event\EventMetadataFactory;
use Patchlevel\EventSourcing\Store\NewStreamStartHeader;
use Patchlevel\EventSourcing\Store\StreamStartHeader;

final class SplitStreamDecorator implements MessageDecorator
{
Expand All @@ -20,6 +20,10 @@ public function __invoke(Message $message): Message
$event = $message->event();
$metadata = $this->eventMetadataFactory->metadata($event::class);

return $message->withHeader(new NewStreamStartHeader($metadata->splitStream));
if (!$metadata->splitStream) {
return $message;
}

return $message->withHeader(new StreamStartHeader());
}
}
4 changes: 0 additions & 4 deletions src/Store/ArchivedHeader.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,4 @@
/** @psalm-immutable */
final class ArchivedHeader
{
public function __construct(
public readonly bool $archived,
) {
}
}
34 changes: 11 additions & 23 deletions src/Store/DoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -174,35 +174,23 @@ function (Connection $connection) use ($messages): void {

try {
$aggregateHeader = $message->header(AggregateHeader::class);

$parameters[] = $aggregateHeader->aggregateName;
$parameters[] = $aggregateHeader->aggregateId;
$parameters[] = $aggregateHeader->playhead;
$parameters[] = $data->name;
$parameters[] = $data->payload;

$parameters[] = $aggregateHeader->recordedOn;
$types[$offset + 5] = $dateTimeType;
} catch (HeaderNotFound $e) {
throw new MissingDataForStorage($e->name, $e);
}

try {
$newStreamStart = $message->header(NewStreamStartHeader::class)->newStreamStart;
} catch (HeaderNotFound) {
$newStreamStart = false;
}
$parameters[] = $aggregateHeader->aggregateName;
$parameters[] = $aggregateHeader->aggregateId;
$parameters[] = $aggregateHeader->playhead;
$parameters[] = $data->name;
$parameters[] = $data->payload;

$parameters[] = $newStreamStart;
$types[$offset + 6] = $booleanType;
$parameters[] = $aggregateHeader->recordedOn;
$types[$offset + 5] = $dateTimeType;

try {
$archived = $message->header(ArchivedHeader::class)->archived;
} catch (HeaderNotFound) {
$archived = false;
}
$parameters[] = $message->hasHeader(StreamStartHeader::class);
$types[$offset + 6] = $booleanType;

$parameters[] = $archived;
$parameters[] = $message->hasHeader(ArchivedHeader::class);
$types[$offset + 7] = $booleanType;

$parameters[] = $this->headersSerializer->serialize($this->getCustomHeaders($message));
Expand Down Expand Up @@ -305,7 +293,7 @@ private function getCustomHeaders(Message $message): array
{
$filteredHeaders = [
AggregateHeader::class,
NewStreamStartHeader::class,
StreamStartHeader::class,
ArchivedHeader::class,
];

Expand Down
21 changes: 14 additions & 7 deletions src/Store/DoctrineDbalStoreStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,25 @@ private function buildGenerator(
$this->index = $data['id'];
$event = $eventSerializer->deserialize(new SerializedEvent($data['event'], $data['payload']));

$customHeaders = $headersSerializer->deserialize(DoctrineHelper::normalizeCustomHeaders($data['custom_headers'], $platform));

yield Message::create($event)
$message = Message::create($event)
->withHeader(new AggregateHeader(
$data['aggregate'],
$data['aggregate_id'],
DoctrineHelper::normalizePlayhead($data['playhead'], $platform),
DoctrineHelper::normalizeRecordedOn($data['recorded_on'], $platform),
))
->withHeader(new ArchivedHeader(DoctrineHelper::normalizeArchived($data['archived'], $platform)))
->withHeader(new NewStreamStartHeader(DoctrineHelper::normalizeNewStreamStart($data['new_stream_start'], $platform)))
->withHeaders($customHeaders);
));

if ($data['archived']) {
$message = $message->withHeader(new ArchivedHeader());
}

if ($data['new_stream_start']) {
$message = $message->withHeader(new StreamStartHeader());
}

$customHeaders = $headersSerializer->deserialize(DoctrineHelper::normalizeCustomHeaders($data['custom_headers'], $platform));

yield $message->withHeaders($customHeaders);
}
}

Expand Down
23 changes: 0 additions & 23 deletions src/Store/DoctrineHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
use Doctrine\DBAL\Types\Types;

use function is_array;
use function is_bool;
use function is_int;

final class DoctrineHelper
Expand Down Expand Up @@ -49,26 +48,4 @@ public static function normalizeCustomHeaders(string $customHeaders, AbstractPla

return $normalizedCustomHeaders;
}

public static function normalizeArchived(mixed $value, AbstractPlatform $platform): bool
{
$normalizedValue = Type::getType(Types::BOOLEAN)->convertToPHPValue($value, $platform);

if (!is_bool($normalizedValue)) {
throw new InvalidType('archived', 'boolean');
}

return $normalizedValue;
}

public static function normalizeNewStreamStart(mixed $value, AbstractPlatform $platform): bool
{
$normalizedValue = Type::getType(Types::BOOLEAN)->convertToPHPValue($value, $platform);

if (!is_bool($normalizedValue)) {
throw new InvalidType('new_stream_start', 'boolean');
}

return $normalizedValue;
}
}
14 changes: 0 additions & 14 deletions src/Store/NewStreamStartHeader.php

This file was deleted.

10 changes: 10 additions & 0 deletions src/Store/StreamStartHeader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store;

/** @psalm-immutable */
final class StreamStartHeader
{
}
Loading

0 comments on commit e21a37f

Please sign in to comment.