Skip to content

Commit

Permalink
move archive logic into store
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Apr 26, 2024
1 parent 38c677c commit e316dd3
Show file tree
Hide file tree
Showing 6 changed files with 371 additions and 198 deletions.
48 changes: 40 additions & 8 deletions baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,46 @@
<code><![CDATA[addMethods]]></code>
</DeprecatedMethod>
<InternalMethod>
<code><![CDATA[new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder($abstractPlatform->reveal(), 'FOR UPDATE', 'SKIP LOCKED')]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
<code><![CDATA[new DefaultSelectSQLBuilder(
$abstractPlatform->reveal(),
'FOR UPDATE',
'SKIP LOCKED',
)]]></code>
</InternalMethod>
</file>
<file src="tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php">
Expand Down
75 changes: 16 additions & 59 deletions src/Repository/DefaultRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
use Patchlevel\EventSourcing\Snapshot\SnapshotNotFound;
use Patchlevel\EventSourcing\Snapshot\SnapshotStore;
use Patchlevel\EventSourcing\Snapshot\SnapshotVersionInvalid;
use Patchlevel\EventSourcing\Store\ArchivableStore;
use Patchlevel\EventSourcing\Store\Criteria\CriteriaBuilder;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\Stream;
use Patchlevel\EventSourcing\Store\StreamStartHeader;
use Patchlevel\EventSourcing\Store\UniqueConstraintViolation;
use Psr\Clock\ClockInterface;
use Psr\Log\LoggerInterface;
Expand Down Expand Up @@ -238,35 +236,31 @@ static function (object $event) use ($aggregateName, $aggregateId, &$playhead, $
$events,
);

$this->store->transactional(function () use ($messages, $aggregate, $aggregateId, $newAggregate): void {
try {
$this->store->save(...$messages);
} catch (UniqueConstraintViolation) {
if ($newAggregate) {
$this->logger->error(
sprintf(
'Repository: Aggregate "%s" with the id "%s" already exists.',
$aggregate::class,
$aggregateId,
),
);

throw new AggregateAlreadyExists($aggregate::class, $aggregate->aggregateRootId());
}

try {
$this->store->save(...$messages);
} catch (UniqueConstraintViolation) {
if ($newAggregate) {
$this->logger->error(
sprintf(
'Repository: Aggregate "%s" with the id "%s" is outdated.',
'Repository: Aggregate "%s" with the id "%s" already exists.',
$aggregate::class,
$aggregateId,
),
);

throw new AggregateOutdated($aggregate::class, $aggregate->aggregateRootId());
throw new AggregateAlreadyExists($aggregate::class, $aggregate->aggregateRootId());
}

$this->archive(...$messages);
});
$this->logger->error(
sprintf(
'Repository: Aggregate "%s" with the id "%s" is outdated.',
$aggregate::class,
$aggregateId,
),
);

throw new AggregateOutdated($aggregate::class, $aggregate->aggregateRootId());
}

$this->aggregateIsValid[$aggregate] = true;

Expand Down Expand Up @@ -368,43 +362,6 @@ private function assertValidAggregate(AggregateRoot $aggregate): void
}
}

private function archive(Message ...$messages): void
{
if (!$this->store instanceof ArchivableStore) {
return;
}

$lastMessageWithNewStreamStart = null;

foreach ($messages as $message) {
if (!$message->hasHeader(StreamStartHeader::class)) {
continue;
}

$lastMessageWithNewStreamStart = $message;
}

if ($lastMessageWithNewStreamStart === null) {
return;
}

$aggregateHeader = $lastMessageWithNewStreamStart->header(AggregateHeader::class);
$this->store->archiveMessages(
$aggregateHeader->aggregateName,
$aggregateHeader->aggregateId,
$aggregateHeader->playhead,
);

$this->logger->debug(
sprintf(
'Repository: Archive messages for aggregate "%s" with the id "%s" until playhead "%d".',
$aggregateHeader->aggregateName,
$aggregateHeader->aggregateId,
$aggregateHeader->playhead,
),
);
}

/** @return Traversable<object> */
private function unpack(Stream $stream): Traversable
{
Expand Down
10 changes: 0 additions & 10 deletions src/Store/ArchivableStore.php

This file was deleted.

60 changes: 36 additions & 24 deletions src/Store/DoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
use function is_string;
use function sprintf;

final class DoctrineDbalStore implements Store, ArchivableStore, SubscriptionStore, DoctrineSchemaConfigurator
final class DoctrineDbalStore implements Store, SubscriptionStore, DoctrineSchemaConfigurator
{
/**
* PostgreSQL has a limit of 65535 parameters in a single query.
Expand Down Expand Up @@ -157,6 +157,9 @@ public function save(Message ...$messages): void

$this->connection->transactional(
function (Connection $connection) use ($messages): void {
/** @var array<string, int> $achievedUntilPlayhead */
$achievedUntilPlayhead = [];

$booleanType = Type::getType(Types::BOOLEAN);
$dateTimeType = Type::getType(Types::DATETIMETZ_IMMUTABLE);

Expand Down Expand Up @@ -203,7 +206,14 @@ function (Connection $connection) use ($messages): void {
$parameters[] = $aggregateHeader->recordedOn;
$types[$offset + 5] = $dateTimeType;

$parameters[] = $message->hasHeader(StreamStartHeader::class);
$streamStart = $message->hasHeader(StreamStartHeader::class);

if ($streamStart) {
$key = $aggregateHeader->aggregateName . '/' . $aggregateHeader->aggregateId;
$achievedUntilPlayhead[$key] = $aggregateHeader->playhead;
}

$parameters[] = $streamStart;
$types[$offset + 6] = $booleanType;

$parameters[] = $message->hasHeader(ArchivedHeader::class);
Expand All @@ -226,11 +236,32 @@ function (Connection $connection) use ($messages): void {
$position = 0;
}

if ($position === 0) {
return;
if ($position !== 0) {
$this->executeSave($columns, $placeholders, $parameters, $types, $connection);
}

$this->executeSave($columns, $placeholders, $parameters, $types, $connection);
foreach ($achievedUntilPlayhead as $key => $playhead) {
[$aggregateName, $aggregateId] = explode('/', $key);

$connection->executeStatement(
sprintf(
<<<'SQL'
UPDATE %s
SET archived = true
WHERE aggregate = :aggregate
AND aggregate_id = :aggregate_id
AND playhead < :playhead
AND archived = false
SQL,
$this->config['table_name'],
),
[
'aggregate' => $aggregateName,
'aggregate_id' => $aggregateId,
'playhead' => $playhead,
],
);
}
},
);
}
Expand All @@ -245,25 +276,6 @@ public function transactional(Closure $function): void
$this->connection->transactional($function);
}

public function archiveMessages(string $aggregateName, string $aggregateId, int $untilPlayhead): void
{
$statement = $this->connection->prepare(sprintf(
'UPDATE %s
SET archived = true
WHERE aggregate = :aggregate
AND aggregate_id = :aggregate_id
AND playhead < :playhead
AND archived = false',
$this->config['table_name'],
));

$statement->bindValue('aggregate', $aggregateName);
$statement->bindValue('aggregate_id', $aggregateId);
$statement->bindValue('playhead', $untilPlayhead);

$statement->executeQuery();
}

public function configureSchema(Schema $schema, Connection $connection): void
{
if ($this->connection !== $connection) {
Expand Down
47 changes: 0 additions & 47 deletions tests/Unit/Repository/DefaultRepositoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
use Patchlevel\EventSourcing\Repository\WrongAggregate;
use Patchlevel\EventSourcing\Snapshot\SnapshotNotFound;
use Patchlevel\EventSourcing\Snapshot\SnapshotStore;
use Patchlevel\EventSourcing\Store\ArchivableStore;
use Patchlevel\EventSourcing\Store\ArchivedHeader;
use Patchlevel\EventSourcing\Store\ArrayStream;
use Patchlevel\EventSourcing\Store\Criteria\AggregateIdCriterion;
Expand Down Expand Up @@ -75,11 +74,6 @@ public function testSaveAggregate(): void
}),
)->shouldBeCalled();

$store->transactional(Argument::any())->will(
/** @param array{0: callable} $args */
static fn (array $args): mixed => $args[0](),
);

$repository = new DefaultRepository(
$store->reveal(),
Profile::metadata(),
Expand Down Expand Up @@ -126,11 +120,6 @@ public function testUpdateAggregate(): void
}),
)->shouldBeCalled();

$store->transactional(Argument::any())->will(
/** @param array{0: callable} $args */
static fn (array $args): mixed => $args[0](),
);

$repository = new DefaultRepository(
$store->reveal(),
Profile::metadata(),
Expand Down Expand Up @@ -179,11 +168,6 @@ public function testEventBus(): void
}),
)->shouldBeCalled();

$store->transactional(Argument::any())->will(
/** @param array{0: callable} $args */
static fn (array $args): mixed => $args[0](),
);

$eventBus = $this->prophesize(EventBus::class);
$eventBus->dispatch(
Argument::that(static function (Message $message) {
Expand Down Expand Up @@ -252,11 +236,6 @@ public function testDecorator(): void
}),
)->shouldBeCalled();

$store->transactional(Argument::any())->will(
/** @param array{0: callable} $args */
static fn (array $args): mixed => $args[0](),
);

$decorator = new class implements MessageDecorator {
public function __invoke(Message $message): Message
{
Expand Down Expand Up @@ -317,11 +296,6 @@ public function testSaveAggregateWithEmptyEventStream(): void
}),
)->shouldBeCalledOnce();

$store->transactional(Argument::any())->will(
/** @param array{0: callable} $args */
static fn (array $args): mixed => $args[0](),
);

$repository = new DefaultRepository(
$store->reveal(),
Profile::metadata(),
Expand All @@ -343,11 +317,6 @@ public function testDetachedException(): void
Argument::type(Message::class),
)->willThrow(new RuntimeException());

$store->transactional(Argument::any())->will(
/** @param array{0: callable} $args */
static fn (array $args): mixed => $args[0](),
);

$repository = new DefaultRepository(
$store->reveal(),
Profile::metadata(),
Expand Down Expand Up @@ -404,11 +373,6 @@ public function testDuplicate(): void
Argument::type(Message::class),
)->willThrow(new UniqueConstraintViolation());

$store->transactional(Argument::any())->will(
/** @param array{0: callable} $args */
static fn (array $args): mixed => $args[0](),
);

$repository = new DefaultRepository(
$store->reveal(),
Profile::metadata(),
Expand Down Expand Up @@ -440,11 +404,6 @@ public function testOutdated(): void
}),
)->willThrow(new UniqueConstraintViolation());

$store->transactional(Argument::any())->will(
/** @param array{0: callable} $args */
static fn (array $args): mixed => $args[0](),
);

$repository = new DefaultRepository(
$store->reveal(),
Profile::metadata(),
Expand All @@ -465,7 +424,6 @@ public function testOutdated(): void
public function testSaveAggregateWithSplitStream(): void
{
$store = $this->prophesize(Store::class);
$store->willImplement(ArchivableStore::class);
$store->save(
Argument::that(static function (Message $message) {
if ($message->header(AggregateHeader::class)->aggregateName !== 'profile') {
Expand Down Expand Up @@ -501,11 +459,6 @@ public function testSaveAggregateWithSplitStream(): void
return $message->header(AggregateHeader::class)->playhead === 3;
}),
)->shouldBeCalled();
$store->archiveMessages('profile', '1', 3)->shouldBeCalledOnce();
$store->transactional(Argument::any())->will(
/** @param array{0: callable} $args */
static fn (array $args): mixed => $args[0](),
);

$repository = new DefaultRepository(
$store->reveal(),
Expand Down
Loading

0 comments on commit e316dd3

Please sign in to comment.