diff --git a/Makefile b/Makefile index 35e00a69..87dff0db 100644 --- a/Makefile +++ b/Makefile @@ -1,9 +1,9 @@ help: ## shows this help @awk 'BEGIN {FS = ":.*?## "} /^[a-zA-Z_\-\.]+:.*?## / {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST) -vendor: composer.json composer.lock +vendor: composer.lock composer install - + .PHONY: php-cs-check php-cs-check: vendor ## run cs fixer (dry-run) vendor/bin/php-cs-fixer fix --diff --dry-run @@ -29,7 +29,7 @@ static: php-cs-fix phpstan psalm .PHONY: test test: vendor - phpunit ## run tests + vendor/bin/phpunit ## run tests .PHONY: dev dev: static test ## run dev tools diff --git a/src/Store/MysqlMultiTableStore.php b/src/Store/MysqlMultiTableStore.php new file mode 100644 index 00000000..725fe272 --- /dev/null +++ b/src/Store/MysqlMultiTableStore.php @@ -0,0 +1,167 @@ + + */ + private array $aggregates; + + /** + * @param array $aggregates + */ + public function __construct(Connection $eventConnection, array $aggregates) + { + $this->connection = $eventConnection; + $this->aggregates = $aggregates; + } + + /** + * @param class-string $aggregate + * + * @return AggregateChanged[] + */ + public function load(string $aggregate, string $id): array + { + $tableName = self::tableName($aggregate); + + $result = $this->connection->fetchAllAssociative( + " + SELECT * + FROM $tableName + WHERE aggregateId = :id + ", + [ + 'id' => $id, + ] + ); + + return array_map( + /** @param array $data */ + static function (array $data) { + return AggregateChanged::deserialize($data); + }, + $result + ); + } + + /** + * @param class-string $aggregate + */ + public function has(string $aggregate, string $id): bool + { + $tableName = self::tableName($aggregate); + + $result = (int)$this->connection->fetchOne( + " + SELECT COUNT(*) + FROM $tableName + WHERE aggregateId = :id + LIMIT 1 + ", + [ + 'id' => $id, + ] + ); + + return $result > 0; + } + + /** + * @param class-string $aggregate + * @param AggregateChanged[] $events + */ + public function saveBatch(string $aggregate, string $id, array $events): void + { + $tableName = self::tableName($aggregate); + + $this->connection->transactional( + static function (Connection $connection) use ($tableName, $id, $events): void { + foreach ($events as $event) { + if ($event->aggregateId() !== $id) { + throw new StoreException('id missmatch'); + } + + $data = $event->serialize(); + $connection->insert($tableName, $data); + } + } + ); + } + + public function prepare(): void + { + foreach ($this->aggregates as $aggregate) { + $this->createTableForAggregate($aggregate); + } + } + + public function drop(): void + { + foreach ($this->aggregates as $aggregate) { + $this->dropTableForAggregate($aggregate); + } + } + + /** + * @param class-string $aggregate + */ + public function createTableForAggregate(string $aggregate): void + { + $tableName = self::tableName($aggregate); + + $this->connection->executeQuery(" + CREATE TABLE IF NOT EXISTS $tableName ( + id INT AUTO_INCREMENT PRIMARY KEY, + aggregateId VARCHAR(255) NOT NULL, + playhead INT NOT NULL, + event VARCHAR(255) NOT NULL, + payload JSON NOT NULL, + recordedOn DATETIME NOT NULL, + UNIQUE KEY aggregate_key (aggregateId, playhead) + ) + "); + } + + /** + * @param class-string $aggregate + */ + public function dropTableForAggregate(string $aggregate): void + { + $tableName = self::tableName($aggregate); + + $this->connection->executeQuery("DROP TABLE IF EXISTS $tableName;"); + } + + /** + * @param class-string $name + */ + private static function tableName(string $name): string + { + $parts = explode('\\', $name); + $shortName = array_pop($parts); + + if (!$shortName) { + throw new RuntimeException(sprintf('%s is not a valid classname', $name)); + } + + $string = (string)preg_replace('/(?<=[a-z])([A-Z])/', '_$1', $shortName); + + if (!$string) { + throw new RuntimeException(sprintf('%s is not a valid table name', $string)); + } + + return strtolower($string); + } +} diff --git a/src/Store/MysqlSingleTableStore.php b/src/Store/MysqlSingleTableStore.php index 2b390775..dccd11ee 100644 --- a/src/Store/MysqlSingleTableStore.php +++ b/src/Store/MysqlSingleTableStore.php @@ -94,15 +94,17 @@ public function count(): int */ public function saveBatch(string $aggregate, string $id, array $events): void { + $shortName = self::shortName($aggregate); + $this->connection->transactional( - static function (Connection $connection) use ($aggregate, $id, $events): void { + static function (Connection $connection) use ($shortName, $id, $events): void { foreach ($events as $event) { if ($event->aggregateId() !== $id) { throw new StoreException('id missmatch'); } $data = $event->serialize(); - $data['aggregate'] = self::shortName($aggregate); + $data['aggregate'] = $shortName; $connection->insert('eventstore', $data); } diff --git a/src/Store/Store.php b/src/Store/Store.php index df69a2ab..823e1385 100644 --- a/src/Store/Store.php +++ b/src/Store/Store.php @@ -4,7 +4,6 @@ namespace Patchlevel\EventSourcing\Store; -use Generator; use Patchlevel\EventSourcing\Aggregate\AggregateChanged; interface Store @@ -16,18 +15,11 @@ interface Store */ public function load(string $aggregate, string $id): array; - /** - * @return Generator - */ - public function loadAll(): Generator; - /** * @param class-string $aggregate */ public function has(string $aggregate, string $id): bool; - public function count(): int; - /** * @param class-string $aggregate * @param AggregateChanged[] $events