Skip to content

Commit

Permalink
Merge pull request #11 from patchlevel/add-multi-table-support
Browse files Browse the repository at this point in the history
add multi table support
  • Loading branch information
DavidBadura authored Jan 8, 2021
2 parents 2ca0be8 + ea63c56 commit af87dca
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 13 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
help: ## shows this help
@awk 'BEGIN {FS = ":.*?## "} /^[a-zA-Z_\-\.]+:.*?## / {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST)

vendor: composer.json composer.lock
vendor: composer.lock
composer install

.PHONY: php-cs-check
php-cs-check: vendor ## run cs fixer (dry-run)
vendor/bin/php-cs-fixer fix --diff --dry-run
Expand All @@ -29,7 +29,7 @@ static: php-cs-fix phpstan psalm

.PHONY: test
test: vendor
phpunit ## run tests
vendor/bin/phpunit ## run tests

.PHONY: dev
dev: static test ## run dev tools
167 changes: 167 additions & 0 deletions src/Store/MysqlMultiTableStore.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store;

use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Aggregate\AggregateChanged;
use RuntimeException;
use function sprintf;

final class MysqlMultiTableStore implements Store
{
private Connection $connection;

/**
* @var array<class-string>
*/
private array $aggregates;

/**
* @param array<class-string> $aggregates
*/
public function __construct(Connection $eventConnection, array $aggregates)
{
$this->connection = $eventConnection;
$this->aggregates = $aggregates;
}

/**
* @param class-string $aggregate
*
* @return AggregateChanged[]
*/
public function load(string $aggregate, string $id): array
{
$tableName = self::tableName($aggregate);

$result = $this->connection->fetchAllAssociative(
"
SELECT *
FROM $tableName
WHERE aggregateId = :id
",
[
'id' => $id,
]
);

return array_map(
/** @param array<string, mixed> $data */
static function (array $data) {
return AggregateChanged::deserialize($data);
},
$result
);
}

/**
* @param class-string $aggregate
*/
public function has(string $aggregate, string $id): bool
{
$tableName = self::tableName($aggregate);

$result = (int)$this->connection->fetchOne(
"
SELECT COUNT(*)
FROM $tableName
WHERE aggregateId = :id
LIMIT 1
",
[
'id' => $id,
]
);

return $result > 0;
}

/**
* @param class-string $aggregate
* @param AggregateChanged[] $events
*/
public function saveBatch(string $aggregate, string $id, array $events): void
{
$tableName = self::tableName($aggregate);

$this->connection->transactional(
static function (Connection $connection) use ($tableName, $id, $events): void {
foreach ($events as $event) {
if ($event->aggregateId() !== $id) {
throw new StoreException('id missmatch');
}

$data = $event->serialize();
$connection->insert($tableName, $data);
}
}
);
}

public function prepare(): void
{
foreach ($this->aggregates as $aggregate) {
$this->createTableForAggregate($aggregate);
}
}

public function drop(): void
{
foreach ($this->aggregates as $aggregate) {
$this->dropTableForAggregate($aggregate);
}
}

/**
* @param class-string $aggregate
*/
public function createTableForAggregate(string $aggregate): void
{
$tableName = self::tableName($aggregate);

$this->connection->executeQuery("
CREATE TABLE IF NOT EXISTS $tableName (
id INT AUTO_INCREMENT PRIMARY KEY,
aggregateId VARCHAR(255) NOT NULL,
playhead INT NOT NULL,
event VARCHAR(255) NOT NULL,
payload JSON NOT NULL,
recordedOn DATETIME NOT NULL,
UNIQUE KEY aggregate_key (aggregateId, playhead)
)
");
}

/**
* @param class-string $aggregate
*/
public function dropTableForAggregate(string $aggregate): void
{
$tableName = self::tableName($aggregate);

$this->connection->executeQuery("DROP TABLE IF EXISTS $tableName;");
}

/**
* @param class-string $name
*/
private static function tableName(string $name): string
{
$parts = explode('\\', $name);
$shortName = array_pop($parts);

if (!$shortName) {
throw new RuntimeException(sprintf('%s is not a valid classname', $name));
}

$string = (string)preg_replace('/(?<=[a-z])([A-Z])/', '_$1', $shortName);

if (!$string) {
throw new RuntimeException(sprintf('%s is not a valid table name', $string));
}

return strtolower($string);
}
}
6 changes: 4 additions & 2 deletions src/Store/MysqlSingleTableStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,17 @@ public function count(): int
*/
public function saveBatch(string $aggregate, string $id, array $events): void
{
$shortName = self::shortName($aggregate);

$this->connection->transactional(
static function (Connection $connection) use ($aggregate, $id, $events): void {
static function (Connection $connection) use ($shortName, $id, $events): void {
foreach ($events as $event) {
if ($event->aggregateId() !== $id) {
throw new StoreException('id missmatch');
}

$data = $event->serialize();
$data['aggregate'] = self::shortName($aggregate);
$data['aggregate'] = $shortName;

$connection->insert('eventstore', $data);
}
Expand Down
8 changes: 0 additions & 8 deletions src/Store/Store.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace Patchlevel\EventSourcing\Store;

use Generator;
use Patchlevel\EventSourcing\Aggregate\AggregateChanged;

interface Store
Expand All @@ -16,18 +15,11 @@ interface Store
*/
public function load(string $aggregate, string $id): array;

/**
* @return Generator<AggregateChanged>
*/
public function loadAll(): Generator;

/**
* @param class-string $aggregate
*/
public function has(string $aggregate, string $id): bool;

public function count(): int;

/**
* @param class-string $aggregate
* @param AggregateChanged[] $events
Expand Down

0 comments on commit af87dca

Please sign in to comment.