Skip to content

Commit

Permalink
rewrite outbox store
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Feb 6, 2024
1 parent 6c6317f commit 254d3f1
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 294 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\WatchServer;
namespace Patchlevel\EventSourcing\EventBus\Serializer;

use Patchlevel\EventSourcing\EventBus\Message;

Expand Down
28 changes: 28 additions & 0 deletions src/EventBus/Serializer/PhpNativeMessageSerializer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\EventBus\Serializer;

use Patchlevel\EventSourcing\EventBus\Message;

use function base64_decode;
use function base64_encode;
use function serialize;
use function unserialize;

final class PhpNativeMessageSerializer implements MessageSerializer
{
public function serialize(Message $message): string
{
return base64_encode(serialize($message));
}

public function deserialize(string $content): Message

Check failure on line 21 in src/EventBus/Serializer/PhpNativeMessageSerializer.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

MixedInferredReturnType

src/EventBus/Serializer/PhpNativeMessageSerializer.php:21:51: MixedInferredReturnType: Could not verify return type 'Patchlevel\EventSourcing\EventBus\Message' for Patchlevel\EventSourcing\EventBus\Serializer\PhpNativeMessageSerializer::deserialize (see https://psalm.dev/047)
{
return unserialize(

Check failure on line 23 in src/EventBus/Serializer/PhpNativeMessageSerializer.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

MixedReturnStatement

src/EventBus/Serializer/PhpNativeMessageSerializer.php:23:16: MixedReturnStatement: Could not infer a return type (see https://psalm.dev/138)

Check failure on line 23 in src/EventBus/Serializer/PhpNativeMessageSerializer.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

Method Patchlevel\EventSourcing\EventBus\Serializer\PhpNativeMessageSerializer::deserialize() should return Patchlevel\EventSourcing\EventBus\Message but returns mixed.
base64_decode($content),
['allowed_classes' => true],
);
}
}
79 changes: 22 additions & 57 deletions src/Outbox/DoctrineOutboxStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Types\Types;
use Patchlevel\EventSourcing\EventBus\Message;
use Patchlevel\EventSourcing\EventBus\Serializer\MessageSerializer;
use Patchlevel\EventSourcing\Schema\SchemaConfigurator;
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Serializer\SerializedEvent;
use Patchlevel\EventSourcing\Store\DoctrineHelper;
use Patchlevel\EventSourcing\Store\WrongQueryResult;

use function array_map;
Expand All @@ -20,9 +18,11 @@

final class DoctrineOutboxStore implements OutboxStore, SchemaConfigurator
{
public const HEADER_OUTBOX_IDENTIFIER = 'outboxIdentifier';

public function __construct(
private readonly Connection $connection,
private readonly EventSerializer $serializer,
private readonly MessageSerializer $messageSerializer,
private readonly string $outboxTable = 'outbox',
) {
}
Expand All @@ -32,24 +32,10 @@ public function saveOutboxMessage(Message ...$messages): void
$this->connection->transactional(
function (Connection $connection) use ($messages): void {
foreach ($messages as $message) {
$event = $message->event();

$data = $this->serializer->serialize($event);

$connection->insert(
$this->outboxTable,
[
'aggregate' => $message->aggregateName(),
'aggregate_id' => $message->aggregateId(),
'playhead' => $message->playhead(),
'event' => $data->name,
'payload' => $data->payload,
'recorded_on' => $message->recordedOn(),
'custom_headers' => $message->customHeaders(),
],
[
'recorded_on' => Types::DATETIMETZ_IMMUTABLE,
'custom_headers' => Types::JSON,
'message' => $this->messageSerializer->serialize($message),
],
);
}
Expand All @@ -66,20 +52,15 @@ public function retrieveOutboxMessages(int|null $limit = null): array
->setMaxResults($limit)
->getSQL();

/** @var list<array{aggregate: string, aggregate_id: string, playhead: string|int, event: string, payload: string, recorded_on: string, custom_headers: string}> $result */
/** @var list<array{id: int, message: string}> $result */
$result = $this->connection->fetchAllAssociative($sql);
$platform = $this->connection->getDatabasePlatform();

return array_map(
function (array $data) use ($platform) {
$event = $this->serializer->deserialize(new SerializedEvent($data['event'], $data['payload']));

return Message::create($event)
->withAggregateName($data['aggregate'])
->withAggregateId($data['aggregate_id'])
->withPlayhead(DoctrineHelper::normalizePlayhead($data['playhead'], $platform))
->withRecordedOn(DoctrineHelper::normalizeRecordedOn($data['recorded_on'], $platform))
->withCustomHeaders(DoctrineHelper::normalizeCustomHeaders($data['custom_headers'], $platform));
function (array $data) {
$message = $this->messageSerializer->deserialize($data['message']);

return $message
->withCustomHeader(self::HEADER_OUTBOX_IDENTIFIER, $data['id']);
},
$result,
);
Expand All @@ -90,14 +71,9 @@ public function markOutboxMessageConsumed(Message ...$messages): void
$this->connection->transactional(
function (Connection $connection) use ($messages): void {
foreach ($messages as $message) {
$connection->delete(
$this->outboxTable,
[
'aggregate' => $message->aggregateName(),
'aggregate_id' => $message->aggregateId(),
'playhead' => $message->playhead(),
],
);
$id = $message->customHeader(self::HEADER_OUTBOX_IDENTIFIER);

Check failure on line 74 in src/Outbox/DoctrineOutboxStore.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

MixedAssignment

src/Outbox/DoctrineOutboxStore.php:74:21: MixedAssignment: Unable to determine the type that $id is being assigned to (see https://psalm.dev/032)

$connection->delete($this->outboxTable, ['id' => $id]);
}
},
);
Expand All @@ -123,24 +99,13 @@ public function configureSchema(Schema $schema, Connection $connection): void
{
$table = $schema->createTable($this->outboxTable);

$table->addColumn('aggregate', Types::STRING)
->setLength(255)
->setNotnull(true);
$table->addColumn('aggregate_id', Types::STRING)
->setLength(32)
->setNotnull(true);
$table->addColumn('playhead', Types::INTEGER)
->setNotnull(true);
$table->addColumn('event', Types::STRING)
->setLength(255)
->setNotnull(true);
$table->addColumn('payload', Types::JSON)
->setNotnull(true);
$table->addColumn('recorded_on', Types::DATETIMETZ_IMMUTABLE)
->setNotnull(false);
$table->addColumn('custom_headers', Types::JSON)
->setNotnull(true);

$table->setPrimaryKey(['aggregate', 'aggregate_id', 'playhead']);
$table->addColumn('id', Types::INTEGER)
->setNotnull(true)
->setAutoincrement(true);
$table->addColumn('message', Types::STRING)
->setNotnull(true)
->setLength(16_000);

$table->setPrimaryKey(['id']);
}
}
52 changes: 0 additions & 52 deletions src/WatchServer/PhpNativeMessageSerializer.php

This file was deleted.

1 change: 1 addition & 0 deletions src/WatchServer/SocketWatchServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Patchlevel\EventSourcing\WatchServer;

use Closure;
use Patchlevel\EventSourcing\EventBus\Serializer\MessageSerializer;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use RuntimeException;
Expand Down
1 change: 1 addition & 0 deletions src/WatchServer/SocketWatchServerClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Patchlevel\EventSourcing\WatchServer;

use Patchlevel\EventSourcing\EventBus\Message;
use Patchlevel\EventSourcing\EventBus\Serializer\MessageSerializer;

use function fclose;
use function restore_error_handler;
Expand Down
3 changes: 2 additions & 1 deletion tests/Integration/Outbox/OutboxTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\EventBus\ChainEventBus;
use Patchlevel\EventSourcing\EventBus\DefaultConsumer;
use Patchlevel\EventSourcing\EventBus\Serializer\PhpNativeMessageSerializer;
use Patchlevel\EventSourcing\Outbox\DoctrineOutboxStore;
use Patchlevel\EventSourcing\Outbox\EventBusPublisher;
use Patchlevel\EventSourcing\Outbox\OutboxEventBus;
Expand Down Expand Up @@ -58,7 +59,7 @@ public function testSuccessful(): void

$outboxStore = new DoctrineOutboxStore(
$this->connection,
$serializer,
new PhpNativeMessageSerializer(),
'outbox',
);

Expand Down
70 changes: 70 additions & 0 deletions tests/Unit/EventBus/Serializer/PhpNativeMessageSerializerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Tests\Unit\EventBus\Serializer;

use DateTimeImmutable;
use Patchlevel\EventSourcing\EventBus\Message;
use Patchlevel\EventSourcing\EventBus\Serializer\PhpNativeMessageSerializer;
use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId;
use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited;
use PHPUnit\Framework\TestCase;
use Prophecy\PhpUnit\ProphecyTrait;

/** @covers \Patchlevel\EventSourcing\EventBus\Serializer\PhpNativeMessageSerializer */
final class PhpNativeMessageSerializerTest extends TestCase
{
use ProphecyTrait;

public function testSerialize(): void
{
$event = new ProfileVisited(
ProfileId::fromString('foo'),
);

$message = Message::create($event)
->withRecordedOn(new DateTimeImmutable('2020-01-01T20:00:00.000000+0100'));

$nativeSerializer = new PhpNativeMessageSerializer();

$content = $nativeSerializer->serialize($message);

self::assertEquals('Tzo0MToiUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UiOjg6e3M6NTY6IgBQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcRXZlbnRCdXNcTWVzc2FnZQBhZ2dyZWdhdGVOYW1lIjtOO3M6NTQ6IgBQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcRXZlbnRCdXNcTWVzc2FnZQBhZ2dyZWdhdGVJZCI7TjtzOjUxOiIAUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UAcGxheWhlYWQiO047czo1MzoiAFBhdGNobGV2ZWxcRXZlbnRTb3VyY2luZ1xFdmVudEJ1c1xNZXNzYWdlAHJlY29yZGVkT24iO086MTc6IkRhdGVUaW1lSW1tdXRhYmxlIjozOntzOjQ6ImRhdGUiO3M6MjY6IjIwMjAtMDEtMDEgMjA6MDA6MDAuMDAwMDAwIjtzOjEzOiJ0aW1lem9uZV90eXBlIjtpOjE7czo4OiJ0aW1lem9uZSI7czo2OiIrMDE6MDAiO31zOjU3OiIAUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UAbmV3U3RyZWFtU3RhcnQiO2I6MDtzOjUxOiIAUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UAYXJjaGl2ZWQiO2I6MDtzOjU2OiIAUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UAY3VzdG9tSGVhZGVycyI7YTowOnt9czo0ODoiAFBhdGNobGV2ZWxcRXZlbnRTb3VyY2luZ1xFdmVudEJ1c1xNZXNzYWdlAGV2ZW50IjtPOjU4OiJQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcVGVzdHNcVW5pdFxGaXh0dXJlXFByb2ZpbGVWaXNpdGVkIjoxOntzOjk6InZpc2l0b3JJZCI7Tzo1MzoiUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXFRlc3RzXFVuaXRcRml4dHVyZVxQcm9maWxlSWQiOjE6e3M6NTc6IgBQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcVGVzdHNcVW5pdFxGaXh0dXJlXFByb2ZpbGVJZABpZCI7czozOiJmb28iO319fQ==', $content);
}

public function testDeserialize(): void
{
$event = new ProfileVisited(
ProfileId::fromString('foo'),
);

$nativeSerializer = new PhpNativeMessageSerializer();

$message = $nativeSerializer->deserialize('Tzo0MToiUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UiOjg6e3M6NTY6IgBQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcRXZlbnRCdXNcTWVzc2FnZQBhZ2dyZWdhdGVOYW1lIjtOO3M6NTQ6IgBQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcRXZlbnRCdXNcTWVzc2FnZQBhZ2dyZWdhdGVJZCI7TjtzOjUxOiIAUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UAcGxheWhlYWQiO047czo1MzoiAFBhdGNobGV2ZWxcRXZlbnRTb3VyY2luZ1xFdmVudEJ1c1xNZXNzYWdlAHJlY29yZGVkT24iO086MTc6IkRhdGVUaW1lSW1tdXRhYmxlIjozOntzOjQ6ImRhdGUiO3M6MjY6IjIwMjAtMDEtMDEgMjA6MDA6MDAuMDAwMDAwIjtzOjEzOiJ0aW1lem9uZV90eXBlIjtpOjE7czo4OiJ0aW1lem9uZSI7czo2OiIrMDE6MDAiO31zOjU3OiIAUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UAbmV3U3RyZWFtU3RhcnQiO2I6MDtzOjUxOiIAUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UAYXJjaGl2ZWQiO2I6MDtzOjU2OiIAUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UAY3VzdG9tSGVhZGVycyI7YTowOnt9czo0ODoiAFBhdGNobGV2ZWxcRXZlbnRTb3VyY2luZ1xFdmVudEJ1c1xNZXNzYWdlAGV2ZW50IjtPOjU4OiJQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcVGVzdHNcVW5pdFxGaXh0dXJlXFByb2ZpbGVWaXNpdGVkIjoxOntzOjk6InZpc2l0b3JJZCI7Tzo1MzoiUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXFRlc3RzXFVuaXRcRml4dHVyZVxQcm9maWxlSWQiOjE6e3M6NTc6IgBQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcVGVzdHNcVW5pdFxGaXh0dXJlXFByb2ZpbGVJZABpZCI7czozOiJmb28iO319fQ==');

self::assertEquals($event, $message->event());
self::assertEquals([
'recordedOn' => new DateTimeImmutable('2020-01-01T20:00:00.000000+0100'),
'newStreamStart' => false,
'archived' => false,
], $message->headers());
}

public function testEquals(): void
{
$event = new ProfileVisited(
ProfileId::fromString('foo'),
);

$message = Message::create($event)
->withRecordedOn(new DateTimeImmutable('2020-01-01T20:00:00.000000+0100'));

$nativeSerializer = new PhpNativeMessageSerializer();

$content = $nativeSerializer->serialize($message);
$clonedMessage = $nativeSerializer->deserialize($content);

self::assertEquals($message, $clonedMessage);
}
}
Loading

0 comments on commit 254d3f1

Please sign in to comment.