From 254d3f123418e473cd9e3d61b46958ad6727e2a8 Mon Sep 17 00:00:00 2001 From: David Badura Date: Tue, 6 Feb 2024 17:09:20 +0100 Subject: [PATCH] rewrite outbox store --- .../Serializer}/MessageSerializer.php | 2 +- .../Serializer/PhpNativeMessageSerializer.php | 28 +++++ src/Outbox/DoctrineOutboxStore.php | 79 ++++--------- .../PhpNativeMessageSerializer.php | 52 -------- src/WatchServer/SocketWatchServer.php | 1 + src/WatchServer/SocketWatchServerClient.php | 1 + tests/Integration/Outbox/OutboxTest.php | 3 +- .../PhpNativeMessageSerializerTest.php | 70 +++++++++++ tests/Unit/Outbox/DoctrineOutboxStoreTest.php | 111 ++++-------------- .../PhpNativeMessageSerializerTest.php | 96 --------------- 10 files changed, 149 insertions(+), 294 deletions(-) rename src/{WatchServer => EventBus/Serializer}/MessageSerializer.php (80%) create mode 100644 src/EventBus/Serializer/PhpNativeMessageSerializer.php delete mode 100644 src/WatchServer/PhpNativeMessageSerializer.php create mode 100644 tests/Unit/EventBus/Serializer/PhpNativeMessageSerializerTest.php delete mode 100644 tests/Unit/WatchServer/PhpNativeMessageSerializerTest.php diff --git a/src/WatchServer/MessageSerializer.php b/src/EventBus/Serializer/MessageSerializer.php similarity index 80% rename from src/WatchServer/MessageSerializer.php rename to src/EventBus/Serializer/MessageSerializer.php index 3787051fc..0cfca701d 100644 --- a/src/WatchServer/MessageSerializer.php +++ b/src/EventBus/Serializer/MessageSerializer.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace Patchlevel\EventSourcing\WatchServer; +namespace Patchlevel\EventSourcing\EventBus\Serializer; use Patchlevel\EventSourcing\EventBus\Message; diff --git a/src/EventBus/Serializer/PhpNativeMessageSerializer.php b/src/EventBus/Serializer/PhpNativeMessageSerializer.php new file mode 100644 index 000000000..e36a727e6 --- /dev/null +++ b/src/EventBus/Serializer/PhpNativeMessageSerializer.php @@ -0,0 +1,28 @@ + true], + ); + } +} diff --git a/src/Outbox/DoctrineOutboxStore.php b/src/Outbox/DoctrineOutboxStore.php index b24e53346..593ed549f 100644 --- a/src/Outbox/DoctrineOutboxStore.php +++ b/src/Outbox/DoctrineOutboxStore.php @@ -8,10 +8,8 @@ use Doctrine\DBAL\Schema\Schema; use Doctrine\DBAL\Types\Types; use Patchlevel\EventSourcing\EventBus\Message; +use Patchlevel\EventSourcing\EventBus\Serializer\MessageSerializer; use Patchlevel\EventSourcing\Schema\SchemaConfigurator; -use Patchlevel\EventSourcing\Serializer\EventSerializer; -use Patchlevel\EventSourcing\Serializer\SerializedEvent; -use Patchlevel\EventSourcing\Store\DoctrineHelper; use Patchlevel\EventSourcing\Store\WrongQueryResult; use function array_map; @@ -20,9 +18,11 @@ final class DoctrineOutboxStore implements OutboxStore, SchemaConfigurator { + public const HEADER_OUTBOX_IDENTIFIER = 'outboxIdentifier'; + public function __construct( private readonly Connection $connection, - private readonly EventSerializer $serializer, + private readonly MessageSerializer $messageSerializer, private readonly string $outboxTable = 'outbox', ) { } @@ -32,24 +32,10 @@ public function saveOutboxMessage(Message ...$messages): void $this->connection->transactional( function (Connection $connection) use ($messages): void { foreach ($messages as $message) { - $event = $message->event(); - - $data = $this->serializer->serialize($event); - $connection->insert( $this->outboxTable, [ - 'aggregate' => $message->aggregateName(), - 'aggregate_id' => $message->aggregateId(), - 'playhead' => $message->playhead(), - 'event' => $data->name, - 'payload' => $data->payload, - 'recorded_on' => $message->recordedOn(), - 'custom_headers' => $message->customHeaders(), - ], - [ - 'recorded_on' => Types::DATETIMETZ_IMMUTABLE, - 'custom_headers' => Types::JSON, + 'message' => $this->messageSerializer->serialize($message), ], ); } @@ -66,20 +52,15 @@ public function retrieveOutboxMessages(int|null $limit = null): array ->setMaxResults($limit) ->getSQL(); - /** @var list $result */ + /** @var list $result */ $result = $this->connection->fetchAllAssociative($sql); - $platform = $this->connection->getDatabasePlatform(); return array_map( - function (array $data) use ($platform) { - $event = $this->serializer->deserialize(new SerializedEvent($data['event'], $data['payload'])); - - return Message::create($event) - ->withAggregateName($data['aggregate']) - ->withAggregateId($data['aggregate_id']) - ->withPlayhead(DoctrineHelper::normalizePlayhead($data['playhead'], $platform)) - ->withRecordedOn(DoctrineHelper::normalizeRecordedOn($data['recorded_on'], $platform)) - ->withCustomHeaders(DoctrineHelper::normalizeCustomHeaders($data['custom_headers'], $platform)); + function (array $data) { + $message = $this->messageSerializer->deserialize($data['message']); + + return $message + ->withCustomHeader(self::HEADER_OUTBOX_IDENTIFIER, $data['id']); }, $result, ); @@ -90,14 +71,9 @@ public function markOutboxMessageConsumed(Message ...$messages): void $this->connection->transactional( function (Connection $connection) use ($messages): void { foreach ($messages as $message) { - $connection->delete( - $this->outboxTable, - [ - 'aggregate' => $message->aggregateName(), - 'aggregate_id' => $message->aggregateId(), - 'playhead' => $message->playhead(), - ], - ); + $id = $message->customHeader(self::HEADER_OUTBOX_IDENTIFIER); + + $connection->delete($this->outboxTable, ['id' => $id]); } }, ); @@ -123,24 +99,13 @@ public function configureSchema(Schema $schema, Connection $connection): void { $table = $schema->createTable($this->outboxTable); - $table->addColumn('aggregate', Types::STRING) - ->setLength(255) - ->setNotnull(true); - $table->addColumn('aggregate_id', Types::STRING) - ->setLength(32) - ->setNotnull(true); - $table->addColumn('playhead', Types::INTEGER) - ->setNotnull(true); - $table->addColumn('event', Types::STRING) - ->setLength(255) - ->setNotnull(true); - $table->addColumn('payload', Types::JSON) - ->setNotnull(true); - $table->addColumn('recorded_on', Types::DATETIMETZ_IMMUTABLE) - ->setNotnull(false); - $table->addColumn('custom_headers', Types::JSON) - ->setNotnull(true); - - $table->setPrimaryKey(['aggregate', 'aggregate_id', 'playhead']); + $table->addColumn('id', Types::INTEGER) + ->setNotnull(true) + ->setAutoincrement(true); + $table->addColumn('message', Types::STRING) + ->setNotnull(true) + ->setLength(16_000); + + $table->setPrimaryKey(['id']); } } diff --git a/src/WatchServer/PhpNativeMessageSerializer.php b/src/WatchServer/PhpNativeMessageSerializer.php deleted file mode 100644 index b89d8fd13..000000000 --- a/src/WatchServer/PhpNativeMessageSerializer.php +++ /dev/null @@ -1,52 +0,0 @@ -event(); - $serializedEvent = $this->serializer->serialize($event); - - $data = [ - 'event' => $serializedEvent->name, - 'payload' => $serializedEvent->payload, - 'headers' => $message->headers(), - ]; - - return base64_encode(serialize($data)); - } - - public function deserialize(string $content): Message - { - /** @var array{event: class-string, payload: string, headers: Headers} $data */ - $data = unserialize( - base64_decode($content), - ['allowed_classes' => [DateTimeImmutable::class]], - ); - - return Message::createWithHeaders( - $this->serializer->deserialize(new SerializedEvent($data['event'], $data['payload'])), - $data['headers'], - ); - } -} diff --git a/src/WatchServer/SocketWatchServer.php b/src/WatchServer/SocketWatchServer.php index e8b9f1c99..1f4862abc 100644 --- a/src/WatchServer/SocketWatchServer.php +++ b/src/WatchServer/SocketWatchServer.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\WatchServer; use Closure; +use Patchlevel\EventSourcing\EventBus\Serializer\MessageSerializer; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use RuntimeException; diff --git a/src/WatchServer/SocketWatchServerClient.php b/src/WatchServer/SocketWatchServerClient.php index 1a2e5b34e..6ea61f55e 100644 --- a/src/WatchServer/SocketWatchServerClient.php +++ b/src/WatchServer/SocketWatchServerClient.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\WatchServer; use Patchlevel\EventSourcing\EventBus\Message; +use Patchlevel\EventSourcing\EventBus\Serializer\MessageSerializer; use function fclose; use function restore_error_handler; diff --git a/tests/Integration/Outbox/OutboxTest.php b/tests/Integration/Outbox/OutboxTest.php index 6146901b0..97f412c66 100644 --- a/tests/Integration/Outbox/OutboxTest.php +++ b/tests/Integration/Outbox/OutboxTest.php @@ -7,6 +7,7 @@ use Doctrine\DBAL\Connection; use Patchlevel\EventSourcing\EventBus\ChainEventBus; use Patchlevel\EventSourcing\EventBus\DefaultConsumer; +use Patchlevel\EventSourcing\EventBus\Serializer\PhpNativeMessageSerializer; use Patchlevel\EventSourcing\Outbox\DoctrineOutboxStore; use Patchlevel\EventSourcing\Outbox\EventBusPublisher; use Patchlevel\EventSourcing\Outbox\OutboxEventBus; @@ -58,7 +59,7 @@ public function testSuccessful(): void $outboxStore = new DoctrineOutboxStore( $this->connection, - $serializer, + new PhpNativeMessageSerializer(), 'outbox', ); diff --git a/tests/Unit/EventBus/Serializer/PhpNativeMessageSerializerTest.php b/tests/Unit/EventBus/Serializer/PhpNativeMessageSerializerTest.php new file mode 100644 index 000000000..19c421ab9 --- /dev/null +++ b/tests/Unit/EventBus/Serializer/PhpNativeMessageSerializerTest.php @@ -0,0 +1,70 @@ +withRecordedOn(new DateTimeImmutable('2020-01-01T20:00:00.000000+0100')); + + $nativeSerializer = new PhpNativeMessageSerializer(); + + $content = $nativeSerializer->serialize($message); + + self::assertEquals('Tzo0MToiUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UiOjg6e3M6NTY6IgBQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcRXZlbnRCdXNcTWVzc2FnZQBhZ2dyZWdhdGVOYW1lIjtOO3M6NTQ6IgBQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcRXZlbnRCdXNcTWVzc2FnZQBhZ2dyZWdhdGVJZCI7TjtzOjUxOiIAUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UAcGxheWhlYWQiO047czo1MzoiAFBhdGNobGV2ZWxcRXZlbnRTb3VyY2luZ1xFdmVudEJ1c1xNZXNzYWdlAHJlY29yZGVkT24iO086MTc6IkRhdGVUaW1lSW1tdXRhYmxlIjozOntzOjQ6ImRhdGUiO3M6MjY6IjIwMjAtMDEtMDEgMjA6MDA6MDAuMDAwMDAwIjtzOjEzOiJ0aW1lem9uZV90eXBlIjtpOjE7czo4OiJ0aW1lem9uZSI7czo2OiIrMDE6MDAiO31zOjU3OiIAUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UAbmV3U3RyZWFtU3RhcnQiO2I6MDtzOjUxOiIAUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UAYXJjaGl2ZWQiO2I6MDtzOjU2OiIAUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UAY3VzdG9tSGVhZGVycyI7YTowOnt9czo0ODoiAFBhdGNobGV2ZWxcRXZlbnRTb3VyY2luZ1xFdmVudEJ1c1xNZXNzYWdlAGV2ZW50IjtPOjU4OiJQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcVGVzdHNcVW5pdFxGaXh0dXJlXFByb2ZpbGVWaXNpdGVkIjoxOntzOjk6InZpc2l0b3JJZCI7Tzo1MzoiUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXFRlc3RzXFVuaXRcRml4dHVyZVxQcm9maWxlSWQiOjE6e3M6NTc6IgBQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcVGVzdHNcVW5pdFxGaXh0dXJlXFByb2ZpbGVJZABpZCI7czozOiJmb28iO319fQ==', $content); + } + + public function testDeserialize(): void + { + $event = new ProfileVisited( + ProfileId::fromString('foo'), + ); + + $nativeSerializer = new PhpNativeMessageSerializer(); + + $message = $nativeSerializer->deserialize('Tzo0MToiUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UiOjg6e3M6NTY6IgBQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcRXZlbnRCdXNcTWVzc2FnZQBhZ2dyZWdhdGVOYW1lIjtOO3M6NTQ6IgBQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcRXZlbnRCdXNcTWVzc2FnZQBhZ2dyZWdhdGVJZCI7TjtzOjUxOiIAUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UAcGxheWhlYWQiO047czo1MzoiAFBhdGNobGV2ZWxcRXZlbnRTb3VyY2luZ1xFdmVudEJ1c1xNZXNzYWdlAHJlY29yZGVkT24iO086MTc6IkRhdGVUaW1lSW1tdXRhYmxlIjozOntzOjQ6ImRhdGUiO3M6MjY6IjIwMjAtMDEtMDEgMjA6MDA6MDAuMDAwMDAwIjtzOjEzOiJ0aW1lem9uZV90eXBlIjtpOjE7czo4OiJ0aW1lem9uZSI7czo2OiIrMDE6MDAiO31zOjU3OiIAUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UAbmV3U3RyZWFtU3RhcnQiO2I6MDtzOjUxOiIAUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UAYXJjaGl2ZWQiO2I6MDtzOjU2OiIAUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXEV2ZW50QnVzXE1lc3NhZ2UAY3VzdG9tSGVhZGVycyI7YTowOnt9czo0ODoiAFBhdGNobGV2ZWxcRXZlbnRTb3VyY2luZ1xFdmVudEJ1c1xNZXNzYWdlAGV2ZW50IjtPOjU4OiJQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcVGVzdHNcVW5pdFxGaXh0dXJlXFByb2ZpbGVWaXNpdGVkIjoxOntzOjk6InZpc2l0b3JJZCI7Tzo1MzoiUGF0Y2hsZXZlbFxFdmVudFNvdXJjaW5nXFRlc3RzXFVuaXRcRml4dHVyZVxQcm9maWxlSWQiOjE6e3M6NTc6IgBQYXRjaGxldmVsXEV2ZW50U291cmNpbmdcVGVzdHNcVW5pdFxGaXh0dXJlXFByb2ZpbGVJZABpZCI7czozOiJmb28iO319fQ=='); + + self::assertEquals($event, $message->event()); + self::assertEquals([ + 'recordedOn' => new DateTimeImmutable('2020-01-01T20:00:00.000000+0100'), + 'newStreamStart' => false, + 'archived' => false, + ], $message->headers()); + } + + public function testEquals(): void + { + $event = new ProfileVisited( + ProfileId::fromString('foo'), + ); + + $message = Message::create($event) + ->withRecordedOn(new DateTimeImmutable('2020-01-01T20:00:00.000000+0100')); + + $nativeSerializer = new PhpNativeMessageSerializer(); + + $content = $nativeSerializer->serialize($message); + $clonedMessage = $nativeSerializer->deserialize($content); + + self::assertEquals($message, $clonedMessage); + } +} diff --git a/tests/Unit/Outbox/DoctrineOutboxStoreTest.php b/tests/Unit/Outbox/DoctrineOutboxStoreTest.php index 1929f1d54..83d7429f8 100644 --- a/tests/Unit/Outbox/DoctrineOutboxStoreTest.php +++ b/tests/Unit/Outbox/DoctrineOutboxStoreTest.php @@ -7,16 +7,14 @@ use DateTimeImmutable; use Doctrine\DBAL\Connection; use Doctrine\DBAL\Driver; -use Doctrine\DBAL\Platforms\AbstractPlatform; use Doctrine\DBAL\Query\QueryBuilder; use Doctrine\DBAL\Schema\Column; use Doctrine\DBAL\Schema\Schema; use Doctrine\DBAL\Schema\Table; use Doctrine\DBAL\Types\Types; use Patchlevel\EventSourcing\EventBus\Message; +use Patchlevel\EventSourcing\EventBus\Serializer\MessageSerializer; use Patchlevel\EventSourcing\Outbox\DoctrineOutboxStore; -use Patchlevel\EventSourcing\Serializer\EventSerializer; -use Patchlevel\EventSourcing\Serializer\SerializedEvent; use Patchlevel\EventSourcing\Store\WrongQueryResult; use Patchlevel\EventSourcing\Tests\Unit\Fixture\Email; use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileCreated; @@ -44,16 +42,7 @@ public function testSaveOutboxMessage(): void $innerMockedConnection = $this->prophesize(Connection::class); $innerMockedConnection->insert( 'outbox', - [ - 'aggregate' => 'profile', - 'aggregate_id' => '1', - 'playhead' => 1, - 'event' => 'profile_created', - 'payload' => '', - 'recorded_on' => $recordedOn, - 'custom_headers' => [], - ], - ['recorded_on' => 'datetimetz_immutable', 'custom_headers' => 'json'], + ['message' => 'serialized'], )->shouldBeCalledOnce(); $driver = $this->prophesize(Driver::class); @@ -65,8 +54,8 @@ public function testSaveOutboxMessage(): void static fn (array $args): mixed => $args[0]($innerMockedConnection->reveal()) ); - $serializer = $this->prophesize(EventSerializer::class); - $serializer->serialize($message->event())->shouldBeCalledOnce()->willReturn(new SerializedEvent('profile_created', '')); + $serializer = $this->prophesize(MessageSerializer::class); + $serializer->serialize($message)->shouldBeCalledOnce()->willReturn('serialized'); $doctrineOutboxStore = new DoctrineOutboxStore( $mockedConnection->reveal(), @@ -85,12 +74,13 @@ public function testMarkOutboxMessageConsumed(): void ->withPlayhead(1) ->withRecordedOn($recordedOn) ->withNewStreamStart(false) - ->withArchived(false); + ->withArchived(false) + ->withCustomHeader(DoctrineOutboxStore::HEADER_OUTBOX_IDENTIFIER, 42); $innerMockedConnection = $this->prophesize(Connection::class); $innerMockedConnection->delete( 'outbox', - ['aggregate' => 'profile', 'aggregate_id' => '1', 'playhead' => 1], + ['id' => 42], )->shouldBeCalledOnce(); $driver = $this->prophesize(Driver::class); @@ -102,7 +92,7 @@ public function testMarkOutboxMessageConsumed(): void static fn (array $args): mixed => $args[0]($innerMockedConnection->reveal()) ); - $serializer = $this->prophesize(EventSerializer::class); + $serializer = $this->prophesize(MessageSerializer::class); $doctrineOutboxStore = new DoctrineOutboxStore( $mockedConnection->reveal(), @@ -123,7 +113,7 @@ public function testCountOutboxMessages(): void $connection->createQueryBuilder()->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); $connection->fetchOne('this sql')->shouldBeCalledOnce()->willReturn('1'); - $serializer = $this->prophesize(EventSerializer::class); + $serializer = $this->prophesize(MessageSerializer::class); $doctrineOutboxStore = new DoctrineOutboxStore( $connection->reveal(), @@ -145,7 +135,7 @@ public function testCountOutboxMessagesFailure(): void $connection->createQueryBuilder()->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); $connection->fetchOne('this sql')->shouldBeCalledOnce()->willReturn([]); - $serializer = $this->prophesize(EventSerializer::class); + $serializer = $this->prophesize(MessageSerializer::class); $doctrineOutboxStore = new DoctrineOutboxStore( $connection->reveal(), @@ -168,11 +158,7 @@ public function testRetrieveOutboxMessagesNoResult(): void $connection->createQueryBuilder()->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); $connection->fetchAllAssociative('this sql')->shouldBeCalledOnce()->willReturn([]); - $platform = $this->prophesize(AbstractPlatform::class); - $connection->getDatabasePlatform()->shouldBeCalledOnce()->willReturn($platform->reveal()); - - $serializer = $this->prophesize(EventSerializer::class); - $serializer->deserialize(Argument::type(SerializedEvent::class))->shouldNotBeCalled(); + $serializer = $this->prophesize(MessageSerializer::class); $doctrineOutboxStore = new DoctrineOutboxStore( $connection->reveal(), @@ -193,7 +179,8 @@ public function testRetrieveOutboxMessages(): void ->withPlayhead(1) ->withRecordedOn($recordedOn) ->withNewStreamStart(false) - ->withArchived(false); + ->withArchived(false) + ->withCustomHeader(DoctrineOutboxStore::HEADER_OUTBOX_IDENTIFIER, 42); $queryBuilder = $this->prophesize(QueryBuilder::class); $queryBuilder->select('*')->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); @@ -205,23 +192,13 @@ public function testRetrieveOutboxMessages(): void $connection->createQueryBuilder()->shouldBeCalledOnce()->willReturn($queryBuilder->reveal()); $connection->fetchAllAssociative('this sql')->shouldBeCalledOnce()->willReturn([ [ - 'event' => 'profile_created', - 'payload' => '{"profile_id": "1", "email": "s"}', - 'aggregate' => 'profile', - 'aggregate_id' => '1', - 'playhead' => 1, - 'recorded_on' => $recordedOn->format('Y-m-d\TH:i:s.ue'), - 'custom_headers' => '{}', + 'id' => 42, + 'message' => 'serialized', ], ]); - $platform = $this->prophesize(AbstractPlatform::class); - $platform->getDateTimeTzFormatString()->shouldBeCalledOnce()->willReturn('Y-m-d\TH:i:s.ue'); - - $connection->getDatabasePlatform()->shouldBeCalledOnce()->willReturn($platform->reveal()); - - $serializer = $this->prophesize(EventSerializer::class); - $serializer->deserialize(new SerializedEvent('profile_created', '{"profile_id": "1", "email": "s"}'))->shouldBeCalledOnce()->willReturn($event); + $serializer = $this->prophesize(MessageSerializer::class); + $serializer->deserialize('serialized')->shouldBeCalledOnce()->willReturn($message); $doctrineOutboxStore = new DoctrineOutboxStore( $connection->reveal(), @@ -235,7 +212,8 @@ public function testRetrieveOutboxMessages(): void public function testConfigureSchema(): void { $connection = $this->prophesize(Connection::class); - $serializer = $this->prophesize(EventSerializer::class); + + $serializer = $this->prophesize(MessageSerializer::class); $doctrineOutboxStore = new DoctrineOutboxStore( $connection->reveal(), @@ -244,65 +222,24 @@ public function testConfigureSchema(): void $table = $this->prophesize(Table::class); $column = $this->prophesize(Column::class); - $table->addColumn('aggregate', Types::STRING)->shouldBeCalledOnce() - ->willReturn( - $column - ->setNotnull(true)->shouldBeCalledOnce() - ->getObjectProphecy()->setLength(255)->shouldBeCalledOnce()->willReturn($column->reveal()) - ->getObjectProphecy()->reveal(), - ); - - $column = $this->prophesize(Column::class); - $table->addColumn('aggregate_id', Types::STRING)->shouldBeCalledOnce() - ->willReturn( - $column - ->setNotnull(true)->shouldBeCalledOnce() - ->getObjectProphecy()->setLength(32)->shouldBeCalledOnce()->willReturn($column->reveal()) - ->getObjectProphecy()->reveal(), - ); - - $column = $this->prophesize(Column::class); - $table->addColumn('playhead', Types::INTEGER)->shouldBeCalledOnce() + $table->addColumn('id', Types::INTEGER)->shouldBeCalledOnce() ->willReturn( $column ->setNotnull(true)->shouldBeCalledOnce() + ->getObjectProphecy()->setAutoincrement(true)->shouldBeCalledOnce()->willReturn($column->reveal()) ->getObjectProphecy()->reveal(), ); $column = $this->prophesize(Column::class); - $table->addColumn('event', Types::STRING)->shouldBeCalledOnce() - ->willReturn( - $column - ->setNotnull(true)->shouldBeCalledOnce() - ->getObjectProphecy()->setLength(255)->shouldBeCalledOnce()->willReturn($column->reveal()) - ->getObjectProphecy()->reveal(), - ); - - $column = $this->prophesize(Column::class); - $table->addColumn('payload', Types::JSON)->shouldBeCalledOnce() - ->willReturn( - $column - ->setNotnull(true)->shouldBeCalledOnce() - ->getObjectProphecy()->reveal(), - ); - - $column = $this->prophesize(Column::class); - $table->addColumn('recorded_on', Types::DATETIMETZ_IMMUTABLE)->shouldBeCalledOnce() - ->willReturn( - $column - ->setNotnull(false)->shouldBeCalledOnce() - ->getObjectProphecy()->reveal(), - ); - - $column = $this->prophesize(Column::class); - $table->addColumn('custom_headers', Types::JSON)->shouldBeCalledOnce() + $table->addColumn('message', Types::TEXT)->shouldBeCalledOnce() ->willReturn( $column ->setNotnull(true)->shouldBeCalledOnce() + ->getObjectProphecy()->setLength(16_000)->shouldBeCalledOnce()->willReturn($column->reveal()) ->getObjectProphecy()->reveal(), ); - $table->setPrimaryKey(['aggregate', 'aggregate_id', 'playhead'])->shouldBeCalledOnce(); + $table->setPrimaryKey(['id'])->shouldBeCalledOnce(); $schema = $this->prophesize(Schema::class); $schema->createTable('outbox')->shouldBeCalledOnce()->willReturn($table->reveal()); diff --git a/tests/Unit/WatchServer/PhpNativeMessageSerializerTest.php b/tests/Unit/WatchServer/PhpNativeMessageSerializerTest.php deleted file mode 100644 index 23df1a614..000000000 --- a/tests/Unit/WatchServer/PhpNativeMessageSerializerTest.php +++ /dev/null @@ -1,96 +0,0 @@ -withRecordedOn(new DateTimeImmutable('2020-01-01T20:00:00.000000+0100')); - - $eventSerializer = $this->prophesize(EventSerializer::class); - $eventSerializer - ->serialize($event) - ->willReturn(new SerializedEvent('profile.visited', '{"profileId": "foo"}')) - ->shouldBeCalledOnce(); - - $nativeSerializer = new PhpNativeMessageSerializer($eventSerializer->reveal()); - - $content = $nativeSerializer->serialize($message); - - self::assertEquals('YTozOntzOjU6ImV2ZW50IjtzOjE1OiJwcm9maWxlLnZpc2l0ZWQiO3M6NzoicGF5bG9hZCI7czoyMDoieyJwcm9maWxlSWQiOiAiZm9vIn0iO3M6NzoiaGVhZGVycyI7YTozOntzOjEwOiJyZWNvcmRlZE9uIjtPOjE3OiJEYXRlVGltZUltbXV0YWJsZSI6Mzp7czo0OiJkYXRlIjtzOjI2OiIyMDIwLTAxLTAxIDIwOjAwOjAwLjAwMDAwMCI7czoxMzoidGltZXpvbmVfdHlwZSI7aToxO3M6ODoidGltZXpvbmUiO3M6NjoiKzAxOjAwIjt9czoxNDoibmV3U3RyZWFtU3RhcnQiO2I6MDtzOjg6ImFyY2hpdmVkIjtiOjA7fX0=', $content); - } - - public function testDeserialize(): void - { - $event = new ProfileVisited( - ProfileId::fromString('foo'), - ); - - $eventSerializer = $this->prophesize(EventSerializer::class); - $eventSerializer - ->deserialize(new SerializedEvent('profile.visited', '{"profileId": "foo"}')) - ->willReturn($event) - ->shouldBeCalledOnce(); - - $nativeSerializer = new PhpNativeMessageSerializer($eventSerializer->reveal()); - - $message = $nativeSerializer->deserialize('YTozOntzOjU6ImV2ZW50IjtzOjE1OiJwcm9maWxlLnZpc2l0ZWQiO3M6NzoicGF5bG9hZCI7czoyMDoieyJwcm9maWxlSWQiOiAiZm9vIn0iO3M6NzoiaGVhZGVycyI7YTozOntzOjEwOiJyZWNvcmRlZE9uIjtPOjE3OiJEYXRlVGltZUltbXV0YWJsZSI6Mzp7czo0OiJkYXRlIjtzOjI2OiIyMDIwLTAxLTAxIDIwOjAwOjAwLjAwMDAwMCI7czoxMzoidGltZXpvbmVfdHlwZSI7aToxO3M6ODoidGltZXpvbmUiO3M6NjoiKzAxOjAwIjt9czoxNDoibmV3U3RyZWFtU3RhcnQiO2I6MDtzOjg6ImFyY2hpdmVkIjtiOjA7fX0='); - - self::assertEquals($event, $message->event()); - self::assertEquals([ - 'recordedOn' => new DateTimeImmutable('2020-01-01T20:00:00.000000+0100'), - 'newStreamStart' => false, - 'archived' => false, - ], $message->headers()); - } - - public function testEquals(): void - { - $event = new ProfileVisited( - ProfileId::fromString('foo'), - ); - - $message = Message::create($event) - ->withRecordedOn(new DateTimeImmutable('2020-01-01T20:00:00.000000+0100')); - - $eventSerializer = $this->prophesize(EventSerializer::class); - - $eventSerializer - ->serialize($event) - ->willReturn(new SerializedEvent('profile.visited', '{"profileId": "foo"}')) - ->shouldBeCalledOnce(); - - $eventSerializer - ->deserialize(new SerializedEvent('profile.visited', '{"profileId": "foo"}')) - ->willReturn($event) - ->shouldBeCalledOnce(); - - $nativeSerializer = new PhpNativeMessageSerializer($eventSerializer->reveal()); - - $content = $nativeSerializer->serialize($message); - $clonedMessage = $nativeSerializer->deserialize($content); - - self::assertEquals($message, $clonedMessage); - } -}