Skip to content

Commit

Permalink
Merge pull request #206 from patchlevel/cleanup-save-api
Browse files Browse the repository at this point in the history
cleanup save method in store
  • Loading branch information
DavidBadura authored Mar 28, 2022
2 parents 65d7c33 + 5267016 commit f37ab56
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 96 deletions.
2 changes: 1 addition & 1 deletion src/Repository/DefaultRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public function save(AggregateRoot $aggregate): void
return;
}

$this->store->saveBatch($messageStream);
$this->store->save(...$messageStream);

foreach ($messageStream as $message) {
$this->eventStream->dispatch($message);
Expand Down
12 changes: 5 additions & 7 deletions src/Repository/SnapshotRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,8 @@ public function has(string $id): bool

public function save(AggregateRoot $aggregate): void
{
$class = $aggregate::class;

if (!$aggregate instanceof $this->aggregateClass) {
throw new WrongAggregate($class, $this->aggregateClass);
throw new WrongAggregate($aggregate::class, $this->aggregateClass);
}

$messages = $aggregate->releaseMessages();
Expand All @@ -99,13 +97,13 @@ public function save(AggregateRoot $aggregate): void
return;
}

$this->store->saveBatch($messages);

$snapshot = $aggregate->toSnapshot();
$this->snapshotStore->save($snapshot);
$this->store->save(...$messages);

foreach ($messages as $message) {
$this->eventStream->dispatch($message);
}

$snapshot = $aggregate->toSnapshot();
$this->snapshotStore->save($snapshot);
}
}
72 changes: 26 additions & 46 deletions src/Store/MultiTableStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,36 @@ public function has(string $aggregate, string $id): bool
return ((int)$result) > 0;
}

/**
* @param list<Message> $messages
*/
public function saveBatch(array $messages): void
public function save(Message ...$messages): void
{
$this->connection->transactional(
function (Connection $connection) use ($messages): void {
foreach ($messages as $message) {
$this->saveMessage(
$connection,
$this->tableName($message->aggregateClass()),
$message
$event = $message->event();
$aggregateName = $this->tableName($message->aggregateClass());

$connection->insert(
$this->metadataTableName,
[
'aggregate' => $aggregateName,
'aggregate_id' => $message->aggregateId(),
'playhead' => $message->playhead(),
],
);

$connection->insert(
$aggregateName,
[
'id' => (int)$connection->lastInsertId(),
'aggregate_id' => $message->aggregateId(),
'playhead' => $message->playhead(),
'event' => $event::class,
'payload' => $this->serializer->serialize($event),
'recorded_on' => $message->recordedOn(),
],
[
'recorded_on' => Types::DATETIMETZ_IMMUTABLE,
]
);
}
}
Expand Down Expand Up @@ -213,44 +231,6 @@ public function count(int $fromIndex = 0): int
return (int)$result;
}

public function save(Message $message): void
{
$this->saveMessage(
$this->connection,
$this->tableName($message->aggregateClass()),
$message
);
}

private function saveMessage(Connection $connection, string $aggregateName, Message $message): void
{
$event = $message->event();

$connection->insert(
$this->metadataTableName,
[
'aggregate' => $aggregateName,
'aggregate_id' => $message->aggregateId(),
'playhead' => $message->playhead(),
],
);

$connection->insert(
$aggregateName,
[
'id' => (int)$connection->lastInsertId(),
'aggregate_id' => $message->aggregateId(),
'playhead' => $message->playhead(),
'event' => $event::class,
'payload' => $this->serializer->serialize($event),
'recorded_on' => $message->recordedOn(),
],
[
'recorded_on' => Types::DATETIMETZ_IMMUTABLE,
]
);
}

public function schema(): Schema
{
$schema = new Schema([], [], $this->connection->createSchemaManager()->createSchemaConfig());
Expand Down
31 changes: 3 additions & 28 deletions src/Store/SingleTableStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,15 @@ public function has(string $aggregate, string $id): bool
return ((int)$result) > 0;
}

/**
* @param list<Message> $messages
*/
public function saveBatch(array $messages): void
public function save(Message ...$messages): void
{
$storeTableName = $this->storeTableName;

$this->connection->transactional(
function (Connection $connection) use ($messages, $storeTableName): void {
function (Connection $connection) use ($messages): void {
foreach ($messages as $message) {
$event = $message->event();

$connection->insert(
$storeTableName,
$this->storeTableName,
[
'aggregate' => $this->shortName($message->aggregateClass()),
'aggregate_id' => $message->aggregateId(),
Expand Down Expand Up @@ -205,26 +200,6 @@ public function count(int $fromIndex = 0): int
return (int)$result;
}

public function save(Message $message): void
{
$event = $message->event();

$this->connection->insert(
$this->storeTableName,
[
'aggregate' => $this->shortName($message->aggregateClass()),
'aggregate_id' => $message->aggregateId(),
'playhead' => $message->playhead(),
'event' => $event::class,
'payload' => $this->serializer->serialize($event),
'recorded_on' => $message->recordedOn(),
],
[
'recorded_on' => Types::DATETIMETZ_IMMUTABLE,
]
);
}

public function schema(): Schema
{
$schema = new Schema([], [], $this->connection->createSchemaManager()->createSchemaConfig());
Expand Down
7 changes: 1 addition & 6 deletions src/Store/Store.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,5 @@ public function load(string $aggregate, string $id, int $fromPlayhead = 0): arra
*/
public function has(string $aggregate, string $id): bool;

public function save(Message $message): void;

/**
* @param list<Message> $messages
*/
public function saveBatch(array $messages): void;
public function save(Message ...$messages): void;
}
8 changes: 4 additions & 4 deletions tests/Unit/Repository/DefaultRepositoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public function testInstantiateWithWrongClass(): void
public function testSaveAggregate(): void
{
$store = $this->prophesize(Store::class);
$store->saveBatch(
Argument::size(1)
$store->save(
Argument::type(Message::class)
)->shouldBeCalled();

$eventBus = $this->prophesize(EventBus::class);
Expand Down Expand Up @@ -87,8 +87,8 @@ public function testSaveWrongAggregate(): void
public function testSaveAggregateWithEmptyEventStream(): void
{
$store = $this->prophesize(Store::class);
$store->saveBatch(
Argument::size(1)
$store->save(
Argument::type(Message::class)
)->shouldNotBeCalled();

$eventBus = $this->prophesize(EventBus::class);
Expand Down
8 changes: 4 additions & 4 deletions tests/Unit/Repository/SnapshotRepositoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public function testSaveWrongAggregate(): void
public function testSaveAggregateWithEmptyEventStream(): void
{
$store = $this->prophesize(Store::class);
$store->saveBatch(
Argument::size(1)
$store->save(
Argument::type(Message::class)
)->shouldNotBeCalled();

$eventBus = $this->prophesize(EventBus::class);
Expand All @@ -98,8 +98,8 @@ public function testSaveAggregateWithEmptyEventStream(): void
public function testSaveAggregate(): void
{
$store = $this->prophesize(Store::class);
$store->saveBatch(
Argument::size(1)
$store->save(
Argument::type(Message::class)
)->shouldBeCalled();

$eventBus = $this->prophesize(EventBus::class);
Expand Down

0 comments on commit f37ab56

Please sign in to comment.