Skip to content

Commit

Permalink
add header for messages
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed May 6, 2022
1 parent 4551fdf commit b10a20b
Show file tree
Hide file tree
Showing 42 changed files with 638 additions and 524 deletions.
11 changes: 7 additions & 4 deletions baseline.xml
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<files psalm-version="4.22.0@fc2c6ab4d5fa5d644d8617089f012f3bb84b8703">
<files psalm-version="4.23.0@f1fe6ff483bf325c803df9f510d09a03fd796f88">
<file src="src/EventBus/Message.php">
<PropertyTypeCoercion occurrences="1">
<code>$new-&gt;headers</code>
</PropertyTypeCoercion>
</file>
<file src="src/Repository/DefaultRepository.php">
<MixedArgument occurrences="1">
<code>++$playhead</code>
</MixedArgument>
<ArgumentTypeCoercion occurrences="1"/>
<MixedAssignment occurrences="1">
<code>$playhead</code>
</MixedAssignment>
Expand Down
318 changes: 201 additions & 117 deletions composer.lock

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions phpstan-baseline.neon
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,21 @@ parameters:
count: 1
path: src/Console/DoctrineHelper.php

-
message: "#^Parameter \\#1 \\$params of static method Doctrine\\\\DBAL\\\\DriverManager\\:\\:getConnection\\(\\) expects array\\{charset\\?\\: string, dbname\\?\\: string, default_dbname\\?\\: string, driver\\?\\: 'ibm_db2'\\|'mysqli'\\|'oci8'\\|'pdo_mysql'\\|'pdo_oci'\\|'pdo_pgsql'\\|'pdo_sqlite'\\|'pdo_sqlsrv'\\|'sqlsrv', driverClass\\?\\: class\\-string\\<Doctrine\\\\DBAL\\\\Driver\\>, driverOptions\\?\\: array, host\\?\\: string, keepSlave\\?\\: bool, \\.\\.\\.\\}, array\\<'charset'\\|'default_dbname'\\|'defaultTableOptions'\\|'driver'\\|'driverClass'\\|'driverOptions'\\|'host'\\|'keepReplica'\\|'keepSlave'\\|'master'\\|'memory'\\|'password'\\|'pdo'\\|'platform'\\|'port'\\|'primary'\\|'replica'\\|'serverVersion'\\|'sharding'\\|'slaves'\\|'unix_socket'\\|'user'\\|'wrapperClass', array\\|bool\\|Doctrine\\\\DBAL\\\\Platforms\\\\AbstractPlatform\\|int\\|PDO\\|string\\> given\\.$#"
count: 1
path: src/Console/DoctrineHelper.php

-
message: "#^Offset T of string does not exist on array\\{aggregateClass\\?\\: class\\-string\\<Patchlevel\\\\EventSourcing\\\\Aggregate\\\\AggregateRoot\\>, aggregateId\\?\\: string, playhead\\?\\: int, recordedOn\\?\\: DateTimeImmutable\\}\\.$#"
count: 1
path: src/EventBus/Message.php

-
message: "#^Property Patchlevel\\\\EventSourcing\\\\EventBus\\\\Message\\:\\:\\$headers \\(array\\{aggregateClass\\?\\: class\\-string\\<Patchlevel\\\\EventSourcing\\\\Aggregate\\\\AggregateRoot\\>, aggregateId\\?\\: string, playhead\\?\\: int, recordedOn\\?\\: DateTimeImmutable\\}\\) does not accept non\\-empty\\-array\\<T of string, mixed\\>\\.$#"
count: 1
path: src/EventBus/Message.php

-
message: "#^While loop condition is always true\\.$#"
count: 1
Expand Down
17 changes: 17 additions & 0 deletions src/EventBus/HeaderNotFound.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\EventBus;

use RuntimeException;

use function sprintf;

class HeaderNotFound extends RuntimeException
{
public function __construct(string $name)
{
parent::__construct(sprintf('message header "%s" is not defined', $name));
}
}
109 changes: 82 additions & 27 deletions src/EventBus/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,66 +6,121 @@

use DateTimeImmutable;
use Patchlevel\EventSourcing\Aggregate\AggregateRoot;
use Patchlevel\EventSourcing\Clock;

use function array_key_exists;

/**
* @psalm-immutable
*/
final class Message
{
/** @var class-string<AggregateRoot> */
private string $aggregateClass;

private string $aggregateId;

private int $playhead;
public const HEADER_AGGREGATE_CLASS = 'aggregateClass';
public const HEADER_AGGREGATE_ID = 'aggregateId';
public const HEADER_PLAYHEAD = 'playhead';
public const HEADER_RECORDED_ON = 'recordedOn';

private object $event;

private DateTimeImmutable $recordedOn;
/** @var array{aggregateClass?: class-string<AggregateRoot>, aggregateId?:string, playhead?:int, recordedOn?: DateTimeImmutable} */
private array $headers;

/**
* @param class-string<AggregateRoot> $aggregateClass
* @param array{aggregateClass?: class-string<AggregateRoot>, aggregateId?:string, playhead?:int, recordedOn?: DateTimeImmutable} $headers
*/
public function __construct(
string $aggregateClass,
string $aggregateId,
int $playhead,
object $event,
?DateTimeImmutable $recordedOn = null
) {
$this->aggregateClass = $aggregateClass;
$this->aggregateId = $aggregateId;
$this->playhead = $playhead;
public function __construct(object $event, array $headers = [])
{
$this->event = $event;
$this->recordedOn = $recordedOn ?? Clock::createDateTimeImmutable();
$this->headers = $headers;
}

public function event(): object
{
return $this->event;
}

/**
* @return array{aggregateClass?: class-string<AggregateRoot>, aggregateId?:string, playhead?:int, recordedOn?: DateTimeImmutable}
*/
public function headers(): array
{
return $this->headers;
}

/**
* @return class-string<AggregateRoot>
*/
public function aggregateClass(): string
{
return $this->aggregateClass;
return $this->header(self::HEADER_AGGREGATE_CLASS);
}

public function aggregateId(): string
{
return $this->aggregateId;
return $this->header(self::HEADER_AGGREGATE_ID);
}

public function playhead(): int
{
return $this->playhead;
return $this->header(self::HEADER_PLAYHEAD);
}

public function event(): object
public function recordedOn(): DateTimeImmutable
{
return $this->event;
return $this->header(self::HEADER_RECORDED_ON);
}

public function recordedOn(): DateTimeImmutable
/**
* @param T $name
*
* @template T as string
* @psalm-param (
* T is self::HEADER_AGGREGATE_CLASS
* ? class-string<AggregateRoot>
* : (T is self::HEADER_AGGREGATE_ID
* ? string
* : (T is self::HEADER_PLAYHEAD
* ? int
* : (T is self::HEADER_RECORDED_ON
* ? DateTimeImmutable
* : mixed
* )
* )
* )
* ) $value
*/
public function withHeader(string $name, mixed $value): self
{
return $this->recordedOn;
$new = clone $this;
$new->headers[$name] = $value;

return $new;
}

/**
* @param T $name
*
* @template T as string
* @psalm-return (
* T is self::HEADER_AGGREGATE_CLASS
* ? class-string<AggregateRoot>
* : (T is self::HEADER_AGGREGATE_ID
* ? string
* : (T is self::HEADER_PLAYHEAD
* ? int
* : (T is self::HEADER_RECORDED_ON
* ? DateTimeImmutable
* : mixed
* )
* )
* )
* )
*/
public function header(string $name): mixed
{
if (!array_key_exists($name, $this->headers)) {
throw new HeaderNotFound($name);
}

return $this->headers[$name];
}
}
8 changes: 1 addition & 7 deletions src/Pipeline/Middleware/RecalculatePlayheadMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,7 @@ public function __invoke(Message $message): array
}

return [
new Message(
$message->aggregateClass(),
$message->aggregateId(),
$playhead,
$message->event(),
$message->recordedOn()
),
$message->withHeader(Message::HEADER_PLAYHEAD, $playhead),
];
}

Expand Down
5 changes: 1 addition & 4 deletions src/Pipeline/Middleware/ReplaceEventMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,8 @@ public function __invoke(Message $message): array

return [
new Message(
$message->aggregateClass(),
$message->aggregateId(),
$message->playhead(),
$newEvent,
$message->recordedOn()
$message->headers()
),
];
}
Expand Down
12 changes: 8 additions & 4 deletions src/Repository/DefaultRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Patchlevel\EventSourcing\Repository;

use Patchlevel\EventSourcing\Aggregate\AggregateRoot;
use Patchlevel\EventSourcing\Clock;
use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\EventBus\Message;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootMetadata;
Expand Down Expand Up @@ -121,10 +122,13 @@ public function save(AggregateRoot $aggregate): void
$messages = array_map(
static function (object $event) use ($aggregate, &$playhead) {
return new Message(
$aggregate::class,
$aggregate->aggregateRootId(),
++$playhead,
$event
$event,
[
Message::HEADER_AGGREGATE_CLASS => $aggregate::class,
Message::HEADER_AGGREGATE_ID => $aggregate->aggregateRootId(),
Message::HEADER_PLAYHEAD => ++$playhead,
Message::HEADER_RECORDED_ON => Clock::createDateTimeImmutable(),
]
);
},
$events
Expand Down
10 changes: 6 additions & 4 deletions src/Store/DoctrineStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,13 @@ public function retrieveOutboxMessages(?int $limit = null): array
return array_map(
function (array $data) use ($platform) {
return new Message(
$this->aggregateRootRegistry->aggregateClass($data['aggregate']),
$data['aggregate_id'],
self::normalizePlayhead($data['playhead'], $platform),
$this->serializer->deserialize(new SerializedEvent($data['event'], $data['payload'])),
self::normalizeRecordedOn($data['recorded_on'], $platform)
[
Message::HEADER_AGGREGATE_CLASS => $this->aggregateRootRegistry->aggregateClass($data['aggregate']),
Message::HEADER_AGGREGATE_ID => $data['aggregate_id'],
Message::HEADER_PLAYHEAD => self::normalizePlayhead($data['playhead'], $platform),
Message::HEADER_RECORDED_ON => self::normalizeRecordedOn($data['recorded_on'], $platform),
]
);
},
$result
Expand Down
20 changes: 12 additions & 8 deletions src/Store/MultiTableStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ public function load(string $aggregate, string $id, int $fromPlayhead = 0): arra
return array_map(
function (array $data) use ($platform, $aggregate): Message {
return new Message(
$aggregate,
$data['aggregate_id'],
self::normalizePlayhead($data['playhead'], $platform),
$this->serializer->deserialize(new SerializedEvent($data['event'], $data['payload'])),
self::normalizeRecordedOn($data['recorded_on'], $platform)
[
Message::HEADER_AGGREGATE_CLASS => $aggregate,
Message::HEADER_AGGREGATE_ID => $data['aggregate_id'],
Message::HEADER_PLAYHEAD => self::normalizePlayhead($data['playhead'], $platform),
Message::HEADER_RECORDED_ON => self::normalizeRecordedOn($data['recorded_on'], $platform),
]
);
},
$result
Expand Down Expand Up @@ -193,11 +195,13 @@ public function stream(int $fromIndex = 0): Generator
}

yield new Message(
$this->aggregateRootRegistry->aggregateClass($name),
$eventData['aggregate_id'],
self::normalizePlayhead($eventData['playhead'], $platform),
$this->serializer->deserialize(new SerializedEvent($eventData['event'], $eventData['payload'])),
self::normalizeRecordedOn($eventData['recorded_on'], $platform)
[
Message::HEADER_AGGREGATE_CLASS => $this->aggregateRootRegistry->aggregateClass($name),
Message::HEADER_AGGREGATE_ID => $eventData['aggregate_id'],
Message::HEADER_PLAYHEAD => self::normalizePlayhead($eventData['playhead'], $platform),
Message::HEADER_RECORDED_ON => self::normalizeRecordedOn($eventData['recorded_on'], $platform),
]
);
}
}
Expand Down
20 changes: 12 additions & 8 deletions src/Store/SingleTableStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ public function load(string $aggregate, string $id, int $fromPlayhead = 0): arra
return array_map(
function (array $data) use ($platform, $aggregate) {
return new Message(
$aggregate,
$data['aggregate_id'],
self::normalizePlayhead($data['playhead'], $platform),
$this->serializer->deserialize(new SerializedEvent($data['event'], $data['payload'])),
self::normalizeRecordedOn($data['recorded_on'], $platform)
[
Message::HEADER_AGGREGATE_CLASS => $aggregate,
Message::HEADER_AGGREGATE_ID => $data['aggregate_id'],
Message::HEADER_PLAYHEAD => self::normalizePlayhead($data['playhead'], $platform),
Message::HEADER_RECORDED_ON => self::normalizeRecordedOn($data['recorded_on'], $platform),
]
);
},
$result
Expand Down Expand Up @@ -159,11 +161,13 @@ public function stream(int $fromIndex = 0): Generator

foreach ($result as $data) {
yield new Message(
$this->aggregateRootRegistry->aggregateClass($data['aggregate']),
$data['aggregate_id'],
self::normalizePlayhead($data['playhead'], $platform),
$this->serializer->deserialize(new SerializedEvent($data['event'], $data['payload'])),
self::normalizeRecordedOn($data['recorded_on'], $platform)
[
Message::HEADER_AGGREGATE_CLASS => $this->aggregateRootRegistry->aggregateClass($data['aggregate']),
Message::HEADER_AGGREGATE_ID => $data['aggregate_id'],
Message::HEADER_PLAYHEAD => self::normalizePlayhead($data['playhead'], $platform),
Message::HEADER_RECORDED_ON => self::normalizeRecordedOn($data['recorded_on'], $platform),
]
);
}
}
Expand Down
Loading

0 comments on commit b10a20b

Please sign in to comment.