Skip to content

Commit

Permalink
make repository rollback safe
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed May 3, 2022
1 parent f0537f7 commit c5801bd
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 225 deletions.
45 changes: 14 additions & 31 deletions src/Repository/DefaultRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

use Patchlevel\EventSourcing\Aggregate\AggregateRoot;
use Patchlevel\EventSourcing\EventBus\EventBus;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootMetadata;
use Patchlevel\EventSourcing\Snapshot\SnapshotNotFound;
use Patchlevel\EventSourcing\Snapshot\SnapshotStore;
use Patchlevel\EventSourcing\Store\Store;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Throwable;

use function array_key_exists;
use function assert;
use function count;
use function sprintf;
Expand All @@ -30,13 +30,10 @@ final class DefaultRepository implements Repository
/** @var class-string<T> */
private string $aggregateClass;

/** @var array<string, T> */
private array $instances = [];

private ?SnapshotStore $snapshotStore;
private LoggerInterface $logger;

private ?bool $snapshotConfigured = null;
private AggregateRootMetadata $metadata;

/**
* @param class-string<T> $aggregateClass
Expand All @@ -53,20 +50,17 @@ public function __construct(
$this->aggregateClass = $aggregateClass;
$this->snapshotStore = $snapshotStore;
$this->logger = $logger ?? new NullLogger();
$this->metadata = $aggregateClass::metadata();
}

/**
* @return T
*/
public function load(string $id): AggregateRoot
{
if (array_key_exists($id, $this->instances)) {
return $this->instances[$id];
}

$aggregateClass = $this->aggregateClass;

if ($this->snapshotStore && $this->snapshotConfigured()) {
if ($this->snapshotStore && $this->metadata->snapshotStore) {
try {
return $this->loadFromSnapshot($aggregateClass, $id);
} catch (SnapshotRebuildFailed $exception) {
Expand All @@ -88,15 +82,11 @@ public function load(string $id): AggregateRoot
throw new AggregateNotFound($aggregateClass, $id);
}

return $this->instances[$id] = $aggregateClass::createFromMessages($messages);
return $aggregateClass::createFromMessages($messages);
}

public function has(string $id): bool
{
if (array_key_exists($id, $this->instances)) {
return true;
}

return $this->store->has($this->aggregateClass, $id);
}

Expand All @@ -115,12 +105,6 @@ public function save(AggregateRoot $aggregate): void

$this->store->save(...$messages);
$this->eventBus->dispatch(...$messages);

if (!$this->snapshotStore || !$this->snapshotConfigured()) {
return;
}

$this->snapshotStore->save($aggregate);
}

/**
Expand All @@ -135,24 +119,23 @@ private function loadFromSnapshot(string $aggregateClass, string $id): Aggregate
$aggregate = $this->snapshotStore->load($aggregateClass, $id);
$messages = $this->store->load($aggregateClass, $id, $aggregate->playhead());

if ($messages === []) {
return $aggregate;
}

try {
$aggregate->catchUp($messages);

return $aggregate;
} catch (Throwable $exception) {
throw new SnapshotRebuildFailed($aggregateClass, $id, $exception);
}
}

private function snapshotConfigured(): bool
{
if ($this->snapshotConfigured !== null) {
return $this->snapshotConfigured;
}
$batchSize = $this->metadata->snapshotBatch ?: 1;

$aggregateClass = $this->aggregateClass;
if (count($messages) >= $batchSize) {
$this->snapshotStore->save($aggregate);
}

return $this->snapshotConfigured = $aggregateClass::metadata()->snapshotStore !== null;
return $aggregate;
}

private function assertRightAggregate(AggregateRoot $aggregate): void
Expand Down
35 changes: 1 addition & 34 deletions src/Snapshot/DefaultSnapshotStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ final class DefaultSnapshotStore implements SnapshotStore

private AggregateRootHydrator $hydrator;

/** @var array<string, int> */
private array $playheadCache = [];

/**
* @param array<string, SnapshotAdapter> $snapshotAdapters
*/
Expand All @@ -37,18 +34,12 @@ public function save(AggregateRoot $aggregateRoot): void
$aggregateClass = $aggregateRoot::class;
$key = $this->key($aggregateClass, $aggregateRoot->aggregateRootId());

if (!$this->shouldBeSaved($aggregateRoot, $key)) {
return;
}

$adapter = $this->adapter($aggregateClass);

$adapter->save(
$key,
$this->hydrator->extract($aggregateRoot),
);

$this->playheadCache[$key] = $aggregateRoot->playhead();
}

/**
Expand All @@ -71,16 +62,7 @@ public function load(string $aggregateClass, string $id): AggregateRoot
throw new SnapshotNotFound($aggregateClass, $id, $exception);
}

$aggregate = $this->hydrator->hydrate($aggregateClass, $data);

$this->playheadCache[$key] = $aggregate->playhead();

return $aggregate;
}

public function freeMemory(): void
{
$this->playheadCache = [];
return $this->hydrator->hydrate($aggregateClass, $data);
}

/**
Expand Down Expand Up @@ -110,19 +92,4 @@ private function key(string $aggregateClass, string $aggregateId): string

return sprintf('%s-%s', $aggregateName, $aggregateId);
}

private function shouldBeSaved(AggregateRoot $aggregateRoot, string $key): bool
{
$batchSize = $aggregateRoot::metadata()->snapshotBatch;

if (!$batchSize) {
return true;
}

$beforePlayhead = $this->playheadCache[$key] ?? 0;

$diff = $aggregateRoot->playhead() - $beforePlayhead;

return $diff >= $batchSize;
}
}
122 changes: 61 additions & 61 deletions tests/Unit/Repository/DefaultRepositoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Patchlevel\EventSourcing\Tests\Unit\Fixture\Profile;
use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileCreated;
use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId;
use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited;
use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileWithSnapshot;
use PHPUnit\Framework\TestCase;
use Prophecy\Argument;
Expand Down Expand Up @@ -131,7 +132,7 @@ public function testLoadAggregate(): void
self::assertEquals(Email::fromString('hallo@patchlevel.de'), $aggregate->email());
}

public function testLoadAggregateCached(): void
public function testLoadAggregateTwice(): void
{
$store = $this->prophesize(Store::class);
$store->load(
Expand All @@ -157,14 +158,11 @@ public function testLoadAggregateCached(): void
Profile::class,
);

$aggregate = $repository->load('1');

self::assertInstanceOf(Profile::class, $aggregate);
self::assertSame(1, $aggregate->playhead());
self::assertEquals(ProfileId::fromString('1'), $aggregate->id());
self::assertEquals(Email::fromString('hallo@patchlevel.de'), $aggregate->email());
$aggregate1 = $repository->load('1');
$aggregate2 = $repository->load('1');

self::assertSame($aggregate, $repository->load('1'));
self::assertEquals($aggregate1, $aggregate2);
self::assertNotSame($aggregate1, $aggregate2);
}

public function testAggregateNotFound(): void
Expand Down Expand Up @@ -207,42 +205,6 @@ public function testHasAggregate(): void
self::assertTrue($repository->has('1'));
}

public function testHasAggregateCached(): void
{
$store = $this->prophesize(Store::class);
$store->load(Profile::class, '1')
->willReturn([
new Message(
Profile::class,
'1',
1,
new ProfileCreated(
ProfileId::fromString('1'),
Email::fromString('hallo@patchlevel.de')
)
),
]);

$store->has(Profile::class, '1')->shouldNotBeCalled();

$eventBus = $this->prophesize(EventBus::class);

$repository = new DefaultRepository(
$store->reveal(),
$eventBus->reveal(),
Profile::class,
);

$aggregate = $repository->load('1');

self::assertInstanceOf(Profile::class, $aggregate);
self::assertSame(1, $aggregate->playhead());
self::assertEquals(ProfileId::fromString('1'), $aggregate->id());
self::assertEquals(Email::fromString('hallo@patchlevel.de'), $aggregate->email());

self::assertTrue($repository->has('1'));
}

public function testNotHasAggregate(): void
{
$store = $this->prophesize(Store::class);
Expand All @@ -262,23 +224,29 @@ public function testNotHasAggregate(): void
self::assertFalse($repository->has('1'));
}

public function testSaveAggregateWithSnapshot(): void
public function testLoadAggregateWithSnapshot(): void
{
$aggregate = ProfileWithSnapshot::createProfile(
$profile = ProfileWithSnapshot::createProfile(
ProfileId::fromString('1'),
Email::fromString('hallo@patchlevel.de')
);

$store = $this->prophesize(Store::class);
$store->save(
Argument::type(Message::class)
)->shouldBeCalled();
$store->load(
ProfileWithSnapshot::class,
'1',
1
)->willReturn([]);

$eventBus = $this->prophesize(EventBus::class);
$eventBus->dispatch(Argument::type(Message::class))->shouldBeCalled();

$snapshotStore = $this->prophesize(SnapshotStore::class);
$snapshotStore->save($aggregate)->shouldBeCalled();
$snapshotStore->load(
ProfileWithSnapshot::class,
'1'
)->willReturn($profile);

//$snapshotStore->save($profile)->shouldBeCalled();

$repository = new DefaultRepository(
$store->reveal(),
Expand All @@ -287,30 +255,62 @@ public function testSaveAggregateWithSnapshot(): void
$snapshotStore->reveal()
);

$repository->save($aggregate);
$aggregate = $repository->load('1');

self::assertInstanceOf(ProfileWithSnapshot::class, $aggregate);
self::assertSame(1, $aggregate->playhead());
self::assertEquals(ProfileId::fromString('1'), $aggregate->id());
self::assertEquals(Email::fromString('hallo@patchlevel.de'), $aggregate->email());
}

public function testLoadAggregateWithSnapshot(): void
public function testLoadAggregateWithSnapshotAndSaveNewVersion(): void
{
$profile = ProfileWithSnapshot::createProfile(
ProfileId::fromString('1'),
Email::fromString('hallo@patchlevel.de')
);

$store = $this->prophesize(Store::class);
$store->load(
ProfileWithSnapshot::class,
'1',
1
)->willReturn([]);
)->willReturn([
new Message(
ProfileWithSnapshot::class,
'1',
2,
new ProfileVisited(
ProfileId::fromString('1'),
)
),
new Message(
ProfileWithSnapshot::class,
'1',
3,
new ProfileVisited(
ProfileId::fromString('1'),
)
),
new Message(
ProfileWithSnapshot::class,
'1',
4,
new ProfileVisited(
ProfileId::fromString('1'),
)
),
]);

$eventBus = $this->prophesize(EventBus::class);

$snapshotStore = $this->prophesize(SnapshotStore::class);
$snapshotStore->load(
ProfileWithSnapshot::class,
'1'
)->willReturn(
ProfileWithSnapshot::createProfile(
ProfileId::fromString('1'),
Email::fromString('hallo@patchlevel.de')
)
);
)->willReturn($profile);

$snapshotStore->save($profile)->shouldBeCalled();

$repository = new DefaultRepository(
$store->reveal(),
Expand All @@ -322,7 +322,7 @@ public function testLoadAggregateWithSnapshot(): void
$aggregate = $repository->load('1');

self::assertInstanceOf(ProfileWithSnapshot::class, $aggregate);
self::assertSame(1, $aggregate->playhead());
self::assertSame(4, $aggregate->playhead());
self::assertEquals(ProfileId::fromString('1'), $aggregate->id());
self::assertEquals(Email::fromString('hallo@patchlevel.de'), $aggregate->email());
}
Expand Down
Loading

0 comments on commit c5801bd

Please sign in to comment.