From e316dd3f786f501eca2f94bf520dafb08c94d2fd Mon Sep 17 00:00:00 2001 From: David Badura Date: Fri, 26 Apr 2024 11:13:22 +0200 Subject: [PATCH] move archive logic into store --- baseline.xml | 48 ++- src/Repository/DefaultRepository.php | 75 +--- src/Store/ArchivableStore.php | 10 - src/Store/DoctrineDbalStore.php | 60 ++-- .../Unit/Repository/DefaultRepositoryTest.php | 47 --- tests/Unit/Store/DoctrineDbalStoreTest.php | 329 +++++++++++++++--- 6 files changed, 371 insertions(+), 198 deletions(-) delete mode 100644 src/Store/ArchivableStore.php diff --git a/baseline.xml b/baseline.xml index e8fda31e..f10e0c84 100644 --- a/baseline.xml +++ b/baseline.xml @@ -272,14 +272,46 @@ - reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]> - reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]> - reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]> - reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]> - reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]> - reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]> - reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]> - reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]> + reveal(), + 'FOR UPDATE', + 'SKIP LOCKED', + )]]> + reveal(), + 'FOR UPDATE', + 'SKIP LOCKED', + )]]> + reveal(), + 'FOR UPDATE', + 'SKIP LOCKED', + )]]> + reveal(), + 'FOR UPDATE', + 'SKIP LOCKED', + )]]> + reveal(), + 'FOR UPDATE', + 'SKIP LOCKED', + )]]> + reveal(), + 'FOR UPDATE', + 'SKIP LOCKED', + )]]> + reveal(), + 'FOR UPDATE', + 'SKIP LOCKED', + )]]> + reveal(), + 'FOR UPDATE', + 'SKIP LOCKED', + )]]> diff --git a/src/Repository/DefaultRepository.php b/src/Repository/DefaultRepository.php index ea8e1fc0..d45d90f7 100644 --- a/src/Repository/DefaultRepository.php +++ b/src/Repository/DefaultRepository.php @@ -15,11 +15,9 @@ use Patchlevel\EventSourcing\Snapshot\SnapshotNotFound; use Patchlevel\EventSourcing\Snapshot\SnapshotStore; use Patchlevel\EventSourcing\Snapshot\SnapshotVersionInvalid; -use Patchlevel\EventSourcing\Store\ArchivableStore; use Patchlevel\EventSourcing\Store\Criteria\CriteriaBuilder; use Patchlevel\EventSourcing\Store\Store; use Patchlevel\EventSourcing\Store\Stream; -use Patchlevel\EventSourcing\Store\StreamStartHeader; use Patchlevel\EventSourcing\Store\UniqueConstraintViolation; use Psr\Clock\ClockInterface; use Psr\Log\LoggerInterface; @@ -238,35 +236,31 @@ static function (object $event) use ($aggregateName, $aggregateId, &$playhead, $ $events, ); - $this->store->transactional(function () use ($messages, $aggregate, $aggregateId, $newAggregate): void { - try { - $this->store->save(...$messages); - } catch (UniqueConstraintViolation) { - if ($newAggregate) { - $this->logger->error( - sprintf( - 'Repository: Aggregate "%s" with the id "%s" already exists.', - $aggregate::class, - $aggregateId, - ), - ); - - throw new AggregateAlreadyExists($aggregate::class, $aggregate->aggregateRootId()); - } - + try { + $this->store->save(...$messages); + } catch (UniqueConstraintViolation) { + if ($newAggregate) { $this->logger->error( sprintf( - 'Repository: Aggregate "%s" with the id "%s" is outdated.', + 'Repository: Aggregate "%s" with the id "%s" already exists.', $aggregate::class, $aggregateId, ), ); - throw new AggregateOutdated($aggregate::class, $aggregate->aggregateRootId()); + throw new AggregateAlreadyExists($aggregate::class, $aggregate->aggregateRootId()); } - $this->archive(...$messages); - }); + $this->logger->error( + sprintf( + 'Repository: Aggregate "%s" with the id "%s" is outdated.', + $aggregate::class, + $aggregateId, + ), + ); + + throw new AggregateOutdated($aggregate::class, $aggregate->aggregateRootId()); + } $this->aggregateIsValid[$aggregate] = true; @@ -368,43 +362,6 @@ private function assertValidAggregate(AggregateRoot $aggregate): void } } - private function archive(Message ...$messages): void - { - if (!$this->store instanceof ArchivableStore) { - return; - } - - $lastMessageWithNewStreamStart = null; - - foreach ($messages as $message) { - if (!$message->hasHeader(StreamStartHeader::class)) { - continue; - } - - $lastMessageWithNewStreamStart = $message; - } - - if ($lastMessageWithNewStreamStart === null) { - return; - } - - $aggregateHeader = $lastMessageWithNewStreamStart->header(AggregateHeader::class); - $this->store->archiveMessages( - $aggregateHeader->aggregateName, - $aggregateHeader->aggregateId, - $aggregateHeader->playhead, - ); - - $this->logger->debug( - sprintf( - 'Repository: Archive messages for aggregate "%s" with the id "%s" until playhead "%d".', - $aggregateHeader->aggregateName, - $aggregateHeader->aggregateId, - $aggregateHeader->playhead, - ), - ); - } - /** @return Traversable */ private function unpack(Stream $stream): Traversable { diff --git a/src/Store/ArchivableStore.php b/src/Store/ArchivableStore.php deleted file mode 100644 index fc498128..00000000 --- a/src/Store/ArchivableStore.php +++ /dev/null @@ -1,10 +0,0 @@ -connection->transactional( function (Connection $connection) use ($messages): void { + /** @var array $achievedUntilPlayhead */ + $achievedUntilPlayhead = []; + $booleanType = Type::getType(Types::BOOLEAN); $dateTimeType = Type::getType(Types::DATETIMETZ_IMMUTABLE); @@ -203,7 +206,14 @@ function (Connection $connection) use ($messages): void { $parameters[] = $aggregateHeader->recordedOn; $types[$offset + 5] = $dateTimeType; - $parameters[] = $message->hasHeader(StreamStartHeader::class); + $streamStart = $message->hasHeader(StreamStartHeader::class); + + if ($streamStart) { + $key = $aggregateHeader->aggregateName . '/' . $aggregateHeader->aggregateId; + $achievedUntilPlayhead[$key] = $aggregateHeader->playhead; + } + + $parameters[] = $streamStart; $types[$offset + 6] = $booleanType; $parameters[] = $message->hasHeader(ArchivedHeader::class); @@ -226,11 +236,32 @@ function (Connection $connection) use ($messages): void { $position = 0; } - if ($position === 0) { - return; + if ($position !== 0) { + $this->executeSave($columns, $placeholders, $parameters, $types, $connection); } - $this->executeSave($columns, $placeholders, $parameters, $types, $connection); + foreach ($achievedUntilPlayhead as $key => $playhead) { + [$aggregateName, $aggregateId] = explode('/', $key); + + $connection->executeStatement( + sprintf( + <<<'SQL' + UPDATE %s + SET archived = true + WHERE aggregate = :aggregate + AND aggregate_id = :aggregate_id + AND playhead < :playhead + AND archived = false + SQL, + $this->config['table_name'], + ), + [ + 'aggregate' => $aggregateName, + 'aggregate_id' => $aggregateId, + 'playhead' => $playhead, + ], + ); + } }, ); } @@ -245,25 +276,6 @@ public function transactional(Closure $function): void $this->connection->transactional($function); } - public function archiveMessages(string $aggregateName, string $aggregateId, int $untilPlayhead): void - { - $statement = $this->connection->prepare(sprintf( - 'UPDATE %s - SET archived = true - WHERE aggregate = :aggregate - AND aggregate_id = :aggregate_id - AND playhead < :playhead - AND archived = false', - $this->config['table_name'], - )); - - $statement->bindValue('aggregate', $aggregateName); - $statement->bindValue('aggregate_id', $aggregateId); - $statement->bindValue('playhead', $untilPlayhead); - - $statement->executeQuery(); - } - public function configureSchema(Schema $schema, Connection $connection): void { if ($this->connection !== $connection) { diff --git a/tests/Unit/Repository/DefaultRepositoryTest.php b/tests/Unit/Repository/DefaultRepositoryTest.php index 2e5460d6..300de0df 100644 --- a/tests/Unit/Repository/DefaultRepositoryTest.php +++ b/tests/Unit/Repository/DefaultRepositoryTest.php @@ -20,7 +20,6 @@ use Patchlevel\EventSourcing\Repository\WrongAggregate; use Patchlevel\EventSourcing\Snapshot\SnapshotNotFound; use Patchlevel\EventSourcing\Snapshot\SnapshotStore; -use Patchlevel\EventSourcing\Store\ArchivableStore; use Patchlevel\EventSourcing\Store\ArchivedHeader; use Patchlevel\EventSourcing\Store\ArrayStream; use Patchlevel\EventSourcing\Store\Criteria\AggregateIdCriterion; @@ -75,11 +74,6 @@ public function testSaveAggregate(): void }), )->shouldBeCalled(); - $store->transactional(Argument::any())->will( - /** @param array{0: callable} $args */ - static fn (array $args): mixed => $args[0](), - ); - $repository = new DefaultRepository( $store->reveal(), Profile::metadata(), @@ -126,11 +120,6 @@ public function testUpdateAggregate(): void }), )->shouldBeCalled(); - $store->transactional(Argument::any())->will( - /** @param array{0: callable} $args */ - static fn (array $args): mixed => $args[0](), - ); - $repository = new DefaultRepository( $store->reveal(), Profile::metadata(), @@ -179,11 +168,6 @@ public function testEventBus(): void }), )->shouldBeCalled(); - $store->transactional(Argument::any())->will( - /** @param array{0: callable} $args */ - static fn (array $args): mixed => $args[0](), - ); - $eventBus = $this->prophesize(EventBus::class); $eventBus->dispatch( Argument::that(static function (Message $message) { @@ -252,11 +236,6 @@ public function testDecorator(): void }), )->shouldBeCalled(); - $store->transactional(Argument::any())->will( - /** @param array{0: callable} $args */ - static fn (array $args): mixed => $args[0](), - ); - $decorator = new class implements MessageDecorator { public function __invoke(Message $message): Message { @@ -317,11 +296,6 @@ public function testSaveAggregateWithEmptyEventStream(): void }), )->shouldBeCalledOnce(); - $store->transactional(Argument::any())->will( - /** @param array{0: callable} $args */ - static fn (array $args): mixed => $args[0](), - ); - $repository = new DefaultRepository( $store->reveal(), Profile::metadata(), @@ -343,11 +317,6 @@ public function testDetachedException(): void Argument::type(Message::class), )->willThrow(new RuntimeException()); - $store->transactional(Argument::any())->will( - /** @param array{0: callable} $args */ - static fn (array $args): mixed => $args[0](), - ); - $repository = new DefaultRepository( $store->reveal(), Profile::metadata(), @@ -404,11 +373,6 @@ public function testDuplicate(): void Argument::type(Message::class), )->willThrow(new UniqueConstraintViolation()); - $store->transactional(Argument::any())->will( - /** @param array{0: callable} $args */ - static fn (array $args): mixed => $args[0](), - ); - $repository = new DefaultRepository( $store->reveal(), Profile::metadata(), @@ -440,11 +404,6 @@ public function testOutdated(): void }), )->willThrow(new UniqueConstraintViolation()); - $store->transactional(Argument::any())->will( - /** @param array{0: callable} $args */ - static fn (array $args): mixed => $args[0](), - ); - $repository = new DefaultRepository( $store->reveal(), Profile::metadata(), @@ -465,7 +424,6 @@ public function testOutdated(): void public function testSaveAggregateWithSplitStream(): void { $store = $this->prophesize(Store::class); - $store->willImplement(ArchivableStore::class); $store->save( Argument::that(static function (Message $message) { if ($message->header(AggregateHeader::class)->aggregateName !== 'profile') { @@ -501,11 +459,6 @@ public function testSaveAggregateWithSplitStream(): void return $message->header(AggregateHeader::class)->playhead === 3; }), )->shouldBeCalled(); - $store->archiveMessages('profile', '1', 3)->shouldBeCalledOnce(); - $store->transactional(Argument::any())->will( - /** @param array{0: callable} $args */ - static fn (array $args): mixed => $args[0](), - ); $repository = new DefaultRepository( $store->reveal(), diff --git a/tests/Unit/Store/DoctrineDbalStoreTest.php b/tests/Unit/Store/DoctrineDbalStoreTest.php index e373d6d5..86c84878 100644 --- a/tests/Unit/Store/DoctrineDbalStoreTest.php +++ b/tests/Unit/Store/DoctrineDbalStoreTest.php @@ -7,7 +7,6 @@ use ArrayIterator; use DateTimeImmutable; use Doctrine\DBAL\Connection; -use Doctrine\DBAL\Driver; use Doctrine\DBAL\Exception\UniqueConstraintViolationException; use Doctrine\DBAL\Platforms\AbstractPlatform; use Doctrine\DBAL\Platforms\PostgreSQLPlatform; @@ -15,7 +14,6 @@ use Doctrine\DBAL\Result; use Doctrine\DBAL\Schema\Schema; use Doctrine\DBAL\SQL\Builder\DefaultSelectSQLBuilder; -use Doctrine\DBAL\Statement; use Doctrine\DBAL\Types\Type; use Doctrine\DBAL\Types\Types; use EmptyIterator; @@ -27,6 +25,7 @@ use Patchlevel\EventSourcing\Store\Criteria\CriteriaBuilder; use Patchlevel\EventSourcing\Store\DoctrineDbalStore; use Patchlevel\EventSourcing\Store\MissingDataForStorage; +use Patchlevel\EventSourcing\Store\StreamStartHeader; use Patchlevel\EventSourcing\Store\UniqueConstraintViolation; use Patchlevel\EventSourcing\Store\WrongQueryResult; use Patchlevel\EventSourcing\Tests\Unit\Fixture\Email; @@ -67,7 +66,11 @@ public function testLoadWithNoEvents(): void )->willReturn($result->reveal()); $abstractPlatform = $this->prophesize(AbstractPlatform::class); - $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn(new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')); + $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn(new DefaultSelectSQLBuilder( + $abstractPlatform->reveal(), + 'FOR UPDATE', + 'SKIP LOCKED', + )); $connection->getDatabasePlatform()->willReturn($abstractPlatform->reveal()); $queryBuilder = new QueryBuilder($connection->reveal()); @@ -113,7 +116,11 @@ public function testLoadWithLimit(): void )->willReturn($result->reveal()); $abstractPlatform = $this->prophesize(AbstractPlatform::class); - $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn(new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')); + $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn(new DefaultSelectSQLBuilder( + $abstractPlatform->reveal(), + 'FOR UPDATE', + 'SKIP LOCKED', + )); $connection->getDatabasePlatform()->willReturn($abstractPlatform->reveal()); $queryBuilder = new QueryBuilder($connection->reveal()); @@ -164,7 +171,11 @@ public function testLoadWithOffset(): void )->willReturn($result->reveal()); $abstractPlatform = $this->prophesize(AbstractPlatform::class); - $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn(new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')); + $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn(new DefaultSelectSQLBuilder( + $abstractPlatform->reveal(), + 'FOR UPDATE', + 'SKIP LOCKED', + )); $connection->getDatabasePlatform()->willReturn($abstractPlatform->reveal()); $queryBuilder = new QueryBuilder($connection->reveal()); @@ -212,7 +223,11 @@ public function testLoadWithIndex(): void )->willReturn($result->reveal()); $abstractPlatform = $this->prophesize(AbstractPlatform::class); - $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn(new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')); + $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn(new DefaultSelectSQLBuilder( + $abstractPlatform->reveal(), + 'FOR UPDATE', + 'SKIP LOCKED', + )); $connection->getDatabasePlatform()->willReturn($abstractPlatform->reveal()); $queryBuilder = new QueryBuilder($connection->reveal()); @@ -275,7 +290,11 @@ public function testLoadWithOneEvent(): void $abstractPlatform = $this->prophesize(AbstractPlatform::class); - $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn(new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')); + $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn(new DefaultSelectSQLBuilder( + $abstractPlatform->reveal(), + 'FOR UPDATE', + 'SKIP LOCKED', + )); $abstractPlatform->getDateTimeTzFormatString()->shouldBeCalledOnce()->willReturn('Y-m-d H:i:s'); $connection->getDatabasePlatform()->willReturn($abstractPlatform->reveal()); @@ -318,7 +337,10 @@ public function testLoadWithOneEvent(): void self::assertInstanceOf(ProfileCreated::class, $message->event()); self::assertSame('1', $message->header(AggregateHeader::class)->aggregateId); self::assertSame(1, $message->header(AggregateHeader::class)->playhead); - self::assertEquals(new DateTimeImmutable('2021-02-17 10:00:00'), $message->header(AggregateHeader::class)->recordedOn); + self::assertEquals( + new DateTimeImmutable('2021-02-17 10:00:00'), + $message->header(AggregateHeader::class)->recordedOn, + ); iterator_to_array($stream); @@ -371,7 +393,11 @@ public function testLoadWithTwoEvents(): void )->willReturn($result->reveal()); $abstractPlatform = $this->prophesize(AbstractPlatform::class); - $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn(new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')); + $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn(new DefaultSelectSQLBuilder( + $abstractPlatform->reveal(), + 'FOR UPDATE', + 'SKIP LOCKED', + )); $abstractPlatform->getDateTimeTzFormatString()->shouldBeCalledTimes(2)->willReturn('Y-m-d H:i:s'); $connection->getDatabasePlatform()->willReturn($abstractPlatform->reveal()); @@ -417,7 +443,10 @@ public function testLoadWithTwoEvents(): void self::assertInstanceOf(ProfileCreated::class, $message->event()); self::assertSame('1', $message->header(AggregateHeader::class)->aggregateId); self::assertSame(1, $message->header(AggregateHeader::class)->playhead); - self::assertEquals(new DateTimeImmutable('2021-02-17 10:00:00'), $message->header(AggregateHeader::class)->recordedOn); + self::assertEquals( + new DateTimeImmutable('2021-02-17 10:00:00'), + $message->header(AggregateHeader::class)->recordedOn, + ); $stream->next(); $message = $stream->current(); @@ -429,7 +458,10 @@ public function testLoadWithTwoEvents(): void self::assertInstanceOf(ProfileEmailChanged::class, $message->event()); self::assertSame('1', $message->header(AggregateHeader::class)->aggregateId); self::assertSame(2, $message->header(AggregateHeader::class)->playhead); - self::assertEquals(new DateTimeImmutable('2021-02-17 11:00:00'), $message->header(AggregateHeader::class)->recordedOn); + self::assertEquals( + new DateTimeImmutable('2021-02-17 11:00:00'), + $message->header(AggregateHeader::class)->recordedOn, + ); } public function testTransactional(): void @@ -475,11 +507,11 @@ public function testSaveWithOneEvent(): void ], )->shouldBeCalledOnce(); - $driver = $this->prophesize(Driver::class); - $driver->connect(Argument::any())->willReturn($innerMockedConnection->reveal()); - $eventSerializer = $this->prophesize(EventSerializer::class); - $eventSerializer->serialize($message->event())->shouldBeCalledOnce()->willReturn(new SerializedEvent('profile_created', '')); + $eventSerializer->serialize($message->event())->shouldBeCalledOnce()->willReturn(new SerializedEvent( + 'profile_created', + '', + )); $headersSerializer = $this->prophesize(HeadersSerializer::class); $headersSerializer->serialize([])->willReturn('[]'); @@ -515,11 +547,11 @@ public function testSaveWithoutAggregateHeader(): void ], )->shouldNotBeCalled(); - $driver = $this->prophesize(Driver::class); - $driver->connect(Argument::any())->willReturn($innerMockedConnection->reveal()); - $eventSerializer = $this->prophesize(EventSerializer::class); - $eventSerializer->serialize($message->event())->shouldBeCalledOnce()->willReturn(new SerializedEvent('profile_created', '')); + $eventSerializer->serialize($message->event())->shouldBeCalledOnce()->willReturn(new SerializedEvent( + 'profile_created', + '', + )); $headersSerializer = $this->prophesize(HeadersSerializer::class); $headersSerializer->serialize([])->willReturn('[]'); @@ -592,12 +624,15 @@ public function testSaveWithTwoEvents(): void ], )->shouldBeCalledOnce(); - $driver = $this->prophesize(Driver::class); - $driver->connect(Argument::any())->willReturn($innerMockedConnection->reveal()); - $eventSerializer = $this->prophesize(EventSerializer::class); - $eventSerializer->serialize($message1->event())->shouldBeCalledOnce()->willReturn(new SerializedEvent('profile_created', '')); - $eventSerializer->serialize($message2->event())->shouldBeCalledOnce()->willReturn(new SerializedEvent('profile_email_changed', '')); + $eventSerializer->serialize($message1->event())->shouldBeCalledOnce()->willReturn(new SerializedEvent( + 'profile_created', + '', + )); + $eventSerializer->serialize($message2->event())->shouldBeCalledOnce()->willReturn(new SerializedEvent( + 'profile_email_changed', + '', + )); $headersSerializer = $this->prophesize(HeadersSerializer::class); $headersSerializer->serialize([])->willReturn('[]'); @@ -668,11 +703,11 @@ public function testSaveWithUniqueConstraintViolation(): void ], )->shouldBeCalledOnce()->willThrow(UniqueConstraintViolationException::class); - $driver = $this->prophesize(Driver::class); - $driver->connect(Argument::any())->willReturn($innerMockedConnection->reveal()); - $eventSerializer = $this->prophesize(EventSerializer::class); - $eventSerializer->serialize($message1->event())->shouldBeCalledTimes(2)->willReturn(new SerializedEvent('profile_created', '')); + $eventSerializer->serialize($message1->event())->shouldBeCalledTimes(2)->willReturn(new SerializedEvent( + 'profile_created', + '', + )); $headersSerializer = $this->prophesize(HeadersSerializer::class); $headersSerializer->serialize([])->willReturn('[]'); @@ -713,11 +748,11 @@ public function testSaveWithThousandEvents(): void $innerMockedConnection->executeStatement(Argument::any(), Argument::any(), Argument::any()) ->shouldBeCalledTimes(2); - $driver = $this->prophesize(Driver::class); - $driver->connect(Argument::any())->willReturn($innerMockedConnection->reveal()); - $eventSerializer = $this->prophesize(EventSerializer::class); - $eventSerializer->serialize($messages[0]->event())->shouldBeCalledTimes(10000)->willReturn(new SerializedEvent('profile_email_changed', '')); + $eventSerializer->serialize($messages[0]->event())->shouldBeCalledTimes(10000)->willReturn(new SerializedEvent( + 'profile_email_changed', + '', + )); $headersSerializer = $this->prophesize(HeadersSerializer::class); $headersSerializer->serialize([])->willReturn('[]'); @@ -765,11 +800,11 @@ public function testSaveWithCustomHeaders(): void ], )->shouldBeCalledOnce(); - $driver = $this->prophesize(Driver::class); - $driver->connect(Argument::any())->willReturn($innerMockedConnection->reveal()); - $eventSerializer = $this->prophesize(EventSerializer::class); - $eventSerializer->serialize($message->event())->shouldBeCalledOnce()->willReturn(new SerializedEvent('profile_created', '')); + $eventSerializer->serialize($message->event())->shouldBeCalledOnce()->willReturn(new SerializedEvent( + 'profile_created', + '', + )); $headersSerializer = $this->prophesize(HeadersSerializer::class); $headersSerializer->serialize($customHeaders)->willReturn('{foo: "foo", baz: "baz"}'); @@ -803,7 +838,11 @@ public function testCount(): void )->willReturn('1'); $abstractPlatform = $this->prophesize(AbstractPlatform::class); - $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn(new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')); + $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn(new DefaultSelectSQLBuilder( + $abstractPlatform->reveal(), + 'FOR UPDATE', + 'SKIP LOCKED', + )); $connection->getDatabasePlatform()->willReturn($abstractPlatform->reveal()); $queryBuilder = new QueryBuilder($connection->reveal()); @@ -845,7 +884,11 @@ public function testCountWrongResult(): void )->willReturn([]); $abstractPlatform = $this->prophesize(AbstractPlatform::class); - $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn(new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')); + $abstractPlatform->createSelectSQLBuilder()->shouldBeCalledOnce()->willReturn(new DefaultSelectSQLBuilder( + $abstractPlatform->reveal(), + 'FOR UPDATE', + 'SKIP LOCKED', + )); $connection->getDatabasePlatform()->willReturn($abstractPlatform->reveal()); $queryBuilder = new QueryBuilder($connection->reveal()); @@ -1123,32 +1166,218 @@ public function testConfigureSchemaWithStringAsAggregateIdType(): void } #[RequiresPhp('>= 8.2')] - public function testArchiveMessages(): void + public function testArchiveMessagesDifferentAggregates(): void { + $recordedOn = new DateTimeImmutable(); + $message1 = Message::create(new ProfileCreated(ProfileId::fromString('1'), Email::fromString('s'))) + ->withHeader(new AggregateHeader( + 'profile', + '1', + 5, + $recordedOn, + )) + ->withHeader(new StreamStartHeader()); + + $message2 = Message::create(new ProfileEmailChanged(ProfileId::fromString('2'), Email::fromString('d'))) + ->withHeader(new AggregateHeader( + 'profile', + '2', + 42, + $recordedOn, + )) + ->withHeader(new StreamStartHeader()); + + $innerMockedConnection = $this->prophesize(Connection::class); + + $innerMockedConnection->executeStatement( + "INSERT INTO eventstore (aggregate, aggregate_id, playhead, event, payload, recorded_on, new_stream_start, archived, custom_headers) VALUES\n(?, ?, ?, ?, ?, ?, ?, ?, ?),\n(?, ?, ?, ?, ?, ?, ?, ?, ?)", + [ + 'profile', + '1', + 5, + 'profile_created', + '', + $recordedOn, + true, + false, + '[]', + 'profile', + '2', + 42, + 'profile_email_changed', + '', + $recordedOn, + true, + false, + '[]', + ], + [ + 5 => Type::getType(Types::DATETIMETZ_IMMUTABLE), + 6 => Type::getType(Types::BOOLEAN), + 7 => Type::getType(Types::BOOLEAN), + 14 => Type::getType(Types::DATETIMETZ_IMMUTABLE), + 15 => Type::getType(Types::BOOLEAN), + 16 => Type::getType(Types::BOOLEAN), + ], + )->shouldBeCalledOnce(); + + $innerMockedConnection->executeStatement( + <<<'SQL' + UPDATE eventstore + SET archived = true + WHERE aggregate = :aggregate + AND aggregate_id = :aggregate_id + AND playhead < :playhead + AND archived = false + SQL, + [ + 'aggregate' => 'profile', + 'aggregate_id' => '1', + 'playhead' => 5, + ], + )->shouldBeCalledOnce(); + + $innerMockedConnection->executeStatement( + <<<'SQL' + UPDATE eventstore + SET archived = true + WHERE aggregate = :aggregate + AND aggregate_id = :aggregate_id + AND playhead < :playhead + AND archived = false + SQL, + [ + 'aggregate' => 'profile', + 'aggregate_id' => '2', + 'playhead' => 42, + ], + )->shouldBeCalledOnce(); + $eventSerializer = $this->prophesize(EventSerializer::class); - $headersSerializer = $this->prophesize(HeadersSerializer::class); + $eventSerializer->serialize($message1->event())->shouldBeCalledOnce()->willReturn(new SerializedEvent( + 'profile_created', + '', + )); + $eventSerializer->serialize($message2->event())->shouldBeCalledOnce()->willReturn(new SerializedEvent( + 'profile_email_changed', + '', + )); - $statement = $this->prophesize(Statement::class); - $statement->bindValue('aggregate', 'profile')->shouldBeCalledOnce(); - $statement->bindValue('aggregate_id', '1')->shouldBeCalledOnce(); - $statement->bindValue('playhead', 1)->shouldBeCalledOnce(); - $statement->executeQuery()->shouldBeCalledOnce(); + $headersSerializer = $this->prophesize(HeadersSerializer::class); + $headersSerializer->serialize([])->willReturn('[]'); $mockedConnection = $this->prophesize(Connection::class); - $mockedConnection->prepare( - 'UPDATE eventstore + $mockedConnection->transactional(Argument::any())->will( + /** @param array{0: callable} $args */ + static fn (array $args): mixed => $args[0]($innerMockedConnection->reveal()), + ); + + $singleTableStore = new DoctrineDbalStore( + $mockedConnection->reveal(), + $eventSerializer->reveal(), + $headersSerializer->reveal(), + ); + + $singleTableStore->save($message1, $message2); + } + + #[RequiresPhp('>= 8.2')] + public function testArchiveMessagesSameAggregate(): void + { + $recordedOn = new DateTimeImmutable(); + $message1 = Message::create(new ProfileCreated(ProfileId::fromString('1'), Email::fromString('s'))) + ->withHeader(new AggregateHeader( + 'profile', + '1', + 5, + $recordedOn, + )) + ->withHeader(new StreamStartHeader()); + + $message2 = Message::create(new ProfileEmailChanged(ProfileId::fromString('1'), Email::fromString('d'))) + ->withHeader(new AggregateHeader( + 'profile', + '1', + 42, + $recordedOn, + )) + ->withHeader(new StreamStartHeader()); + + $innerMockedConnection = $this->prophesize(Connection::class); + + $innerMockedConnection->executeStatement( + "INSERT INTO eventstore (aggregate, aggregate_id, playhead, event, payload, recorded_on, new_stream_start, archived, custom_headers) VALUES\n(?, ?, ?, ?, ?, ?, ?, ?, ?),\n(?, ?, ?, ?, ?, ?, ?, ?, ?)", + [ + 'profile', + '1', + 5, + 'profile_created', + '', + $recordedOn, + true, + false, + '[]', + 'profile', + '1', + 42, + 'profile_email_changed', + '', + $recordedOn, + true, + false, + '[]', + ], + [ + 5 => Type::getType(Types::DATETIMETZ_IMMUTABLE), + 6 => Type::getType(Types::BOOLEAN), + 7 => Type::getType(Types::BOOLEAN), + 14 => Type::getType(Types::DATETIMETZ_IMMUTABLE), + 15 => Type::getType(Types::BOOLEAN), + 16 => Type::getType(Types::BOOLEAN), + ], + )->shouldBeCalledOnce(); + + $innerMockedConnection->executeStatement( + <<<'SQL' + UPDATE eventstore SET archived = true WHERE aggregate = :aggregate AND aggregate_id = :aggregate_id AND playhead < :playhead - AND archived = false', - )->shouldBeCalledOnce()->willReturn($statement->reveal()); + AND archived = false + SQL, + [ + 'aggregate' => 'profile', + 'aggregate_id' => '1', + 'playhead' => 42, + ], + )->shouldBeCalledOnce(); + + $eventSerializer = $this->prophesize(EventSerializer::class); + $eventSerializer->serialize($message1->event())->shouldBeCalledOnce()->willReturn(new SerializedEvent( + 'profile_created', + '', + )); + $eventSerializer->serialize($message2->event())->shouldBeCalledOnce()->willReturn(new SerializedEvent( + 'profile_email_changed', + '', + )); + + $headersSerializer = $this->prophesize(HeadersSerializer::class); + $headersSerializer->serialize([])->willReturn('[]'); + + $mockedConnection = $this->prophesize(Connection::class); + $mockedConnection->transactional(Argument::any())->will( + /** @param array{0: callable} $args */ + static fn (array $args): mixed => $args[0]($innerMockedConnection->reveal()), + ); $singleTableStore = new DoctrineDbalStore( $mockedConnection->reveal(), $eventSerializer->reveal(), $headersSerializer->reveal(), ); - $singleTableStore->archiveMessages('profile', '1', 1); + + $singleTableStore->save($message1, $message2); } }