diff --git a/composer.lock b/composer.lock index 4b5da2c3..a64b4c8f 100644 --- a/composer.lock +++ b/composer.lock @@ -479,16 +479,16 @@ }, { "name": "symfony/console", - "version": "v5.2.1", + "version": "v5.2.2", "source": { "type": "git", "url": "https://github.com/symfony/console.git", - "reference": "47c02526c532fb381374dab26df05e7313978976" + "reference": "d62ec79478b55036f65e2602e282822b8eaaff0a" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/console/zipball/47c02526c532fb381374dab26df05e7313978976", - "reference": "47c02526c532fb381374dab26df05e7313978976", + "url": "https://api.github.com/repos/symfony/console/zipball/d62ec79478b55036f65e2602e282822b8eaaff0a", + "reference": "d62ec79478b55036f65e2602e282822b8eaaff0a", "shasum": "" }, "require": { @@ -547,7 +547,7 @@ "homepage": "https://symfony.com/contributors" } ], - "description": "Symfony Console Component", + "description": "Eases the creation of beautiful and testable command line interfaces", "homepage": "https://symfony.com", "keywords": [ "cli", @@ -556,7 +556,7 @@ "terminal" ], "support": { - "source": "https://github.com/symfony/console/tree/v5.2.1" + "source": "https://github.com/symfony/console/tree/v5.2.2" }, "funding": [ { @@ -572,7 +572,7 @@ "type": "tidelift" } ], - "time": "2020-12-18T08:03:05+00:00" + "time": "2021-01-27T10:15:41+00:00" }, { "name": "symfony/polyfill-ctype", @@ -1141,16 +1141,16 @@ }, { "name": "symfony/string", - "version": "v5.2.1", + "version": "v5.2.2", "source": { "type": "git", "url": "https://github.com/symfony/string.git", - "reference": "5bd67751d2e3f7d6f770c9154b8fbcb2aa05f7ed" + "reference": "c95468897f408dd0aca2ff582074423dd0455122" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/string/zipball/5bd67751d2e3f7d6f770c9154b8fbcb2aa05f7ed", - "reference": "5bd67751d2e3f7d6f770c9154b8fbcb2aa05f7ed", + "url": "https://api.github.com/repos/symfony/string/zipball/c95468897f408dd0aca2ff582074423dd0455122", + "reference": "c95468897f408dd0aca2ff582074423dd0455122", "shasum": "" }, "require": { @@ -1193,7 +1193,7 @@ "homepage": "https://symfony.com/contributors" } ], - "description": "Symfony String component", + "description": "Provides an object-oriented API to strings and deals with bytes, UTF-8 code points and grapheme clusters in a unified way", "homepage": "https://symfony.com", "keywords": [ "grapheme", @@ -1204,7 +1204,7 @@ "utf8" ], "support": { - "source": "https://github.com/symfony/string/tree/v5.2.1" + "source": "https://github.com/symfony/string/tree/v5.2.2" }, "funding": [ { @@ -1220,7 +1220,7 @@ "type": "tidelift" } ], - "time": "2020-12-05T07:33:16+00:00" + "time": "2021-01-25T15:14:59+00:00" } ], "packages-dev": [ @@ -2925,16 +2925,16 @@ }, { "name": "phpstan/phpstan", - "version": "0.12.69", + "version": "0.12.70", "source": { "type": "git", "url": "https://github.com/phpstan/phpstan.git", - "reference": "8f436ea35241da33487fd0d38b4bc3e6dfe30ea8" + "reference": "07f0ef37f5f876e8cee44cc8ea0ec3fe80d499ee" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpstan/phpstan/zipball/8f436ea35241da33487fd0d38b4bc3e6dfe30ea8", - "reference": "8f436ea35241da33487fd0d38b4bc3e6dfe30ea8", + "url": "https://api.github.com/repos/phpstan/phpstan/zipball/07f0ef37f5f876e8cee44cc8ea0ec3fe80d499ee", + "reference": "07f0ef37f5f876e8cee44cc8ea0ec3fe80d499ee", "shasum": "" }, "require": { @@ -2965,7 +2965,7 @@ "description": "PHPStan - PHP Static Analysis Tool", "support": { "issues": "https://github.com/phpstan/phpstan/issues", - "source": "https://github.com/phpstan/phpstan/tree/0.12.69" + "source": "https://github.com/phpstan/phpstan/tree/0.12.70" }, "funding": [ { @@ -2981,7 +2981,7 @@ "type": "tidelift" } ], - "time": "2021-01-24T14:55:37+00:00" + "time": "2021-01-27T17:06:47+00:00" }, { "name": "phpunit/php-code-coverage", @@ -4665,16 +4665,16 @@ }, { "name": "symfony/amqp-messenger", - "version": "v5.2.1", + "version": "v5.2.2", "source": { "type": "git", "url": "https://github.com/symfony/amqp-messenger.git", - "reference": "a7f681b022f9cfb88febc12face05b12a47232c1" + "reference": "cf309a35ed08caa77886ee6a352b8491c7681424" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/amqp-messenger/zipball/a7f681b022f9cfb88febc12face05b12a47232c1", - "reference": "a7f681b022f9cfb88febc12face05b12a47232c1", + "url": "https://api.github.com/repos/symfony/amqp-messenger/zipball/cf309a35ed08caa77886ee6a352b8491c7681424", + "reference": "cf309a35ed08caa77886ee6a352b8491c7681424", "shasum": "" }, "require": { @@ -4714,7 +4714,7 @@ "description": "Symfony AMQP extension Messenger Bridge", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/amqp-messenger/tree/v5.2.1" + "source": "https://github.com/symfony/amqp-messenger/tree/v5.2.2" }, "funding": [ { @@ -4730,7 +4730,7 @@ "type": "tidelift" } ], - "time": "2020-12-15T11:52:46+00:00" + "time": "2021-01-27T11:19:04+00:00" }, { "name": "symfony/deprecation-contracts", @@ -4801,16 +4801,16 @@ }, { "name": "symfony/doctrine-messenger", - "version": "v5.2.1", + "version": "v5.2.2", "source": { "type": "git", "url": "https://github.com/symfony/doctrine-messenger.git", - "reference": "8ad222fc5abce88f2b7c9cb91aa53b9f445a5889" + "reference": "e00cd690e49f08b41472e18b0d420c9874d2c707" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/doctrine-messenger/zipball/8ad222fc5abce88f2b7c9cb91aa53b9f445a5889", - "reference": "8ad222fc5abce88f2b7c9cb91aa53b9f445a5889", + "url": "https://api.github.com/repos/symfony/doctrine-messenger/zipball/e00cd690e49f08b41472e18b0d420c9874d2c707", + "reference": "e00cd690e49f08b41472e18b0d420c9874d2c707", "shasum": "" }, "require": { @@ -4854,7 +4854,7 @@ "description": "Symfony Doctrine Messenger Bridge", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/doctrine-messenger/tree/v5.2.1" + "source": "https://github.com/symfony/doctrine-messenger/tree/v5.2.2" }, "funding": [ { @@ -4870,20 +4870,20 @@ "type": "tidelift" } ], - "time": "2020-12-10T19:16:15+00:00" + "time": "2021-01-27T11:19:04+00:00" }, { "name": "symfony/filesystem", - "version": "v5.2.1", + "version": "v5.2.2", "source": { "type": "git", "url": "https://github.com/symfony/filesystem.git", - "reference": "fa8f8cab6b65e2d99a118e082935344c5ba8c60d" + "reference": "262d033b57c73e8b59cd6e68a45c528318b15038" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/filesystem/zipball/fa8f8cab6b65e2d99a118e082935344c5ba8c60d", - "reference": "fa8f8cab6b65e2d99a118e082935344c5ba8c60d", + "url": "https://api.github.com/repos/symfony/filesystem/zipball/262d033b57c73e8b59cd6e68a45c528318b15038", + "reference": "262d033b57c73e8b59cd6e68a45c528318b15038", "shasum": "" }, "require": { @@ -4913,10 +4913,10 @@ "homepage": "https://symfony.com/contributors" } ], - "description": "Symfony Filesystem Component", + "description": "Provides basic utilities for the filesystem", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/filesystem/tree/v5.2.1" + "source": "https://github.com/symfony/filesystem/tree/v5.2.2" }, "funding": [ { @@ -4932,20 +4932,20 @@ "type": "tidelift" } ], - "time": "2020-11-30T17:05:38+00:00" + "time": "2021-01-27T10:01:46+00:00" }, { "name": "symfony/finder", - "version": "v5.2.1", + "version": "v5.2.2", "source": { "type": "git", "url": "https://github.com/symfony/finder.git", - "reference": "0b9231a5922fd7287ba5b411893c0ecd2733e5ba" + "reference": "196f45723b5e618bf0e23b97e96d11652696ea9e" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/finder/zipball/0b9231a5922fd7287ba5b411893c0ecd2733e5ba", - "reference": "0b9231a5922fd7287ba5b411893c0ecd2733e5ba", + "url": "https://api.github.com/repos/symfony/finder/zipball/196f45723b5e618bf0e23b97e96d11652696ea9e", + "reference": "196f45723b5e618bf0e23b97e96d11652696ea9e", "shasum": "" }, "require": { @@ -4974,10 +4974,10 @@ "homepage": "https://symfony.com/contributors" } ], - "description": "Symfony Finder Component", + "description": "Finds files and directories via an intuitive fluent interface", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/finder/tree/v5.2.1" + "source": "https://github.com/symfony/finder/tree/v5.2.2" }, "funding": [ { @@ -4993,20 +4993,20 @@ "type": "tidelift" } ], - "time": "2020-12-08T17:02:38+00:00" + "time": "2021-01-27T10:01:46+00:00" }, { "name": "symfony/messenger", - "version": "v5.2.1", + "version": "v5.2.2", "source": { "type": "git", "url": "https://github.com/symfony/messenger.git", - "reference": "3fc7766f5fa6c21096fa962a01684868b68f1f3c" + "reference": "ce658034cd7884428a8c6f50c2d4f8cf66348c40" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/messenger/zipball/3fc7766f5fa6c21096fa962a01684868b68f1f3c", - "reference": "3fc7766f5fa6c21096fa962a01684868b68f1f3c", + "url": "https://api.github.com/repos/symfony/messenger/zipball/ce658034cd7884428a8c6f50c2d4f8cf66348c40", + "reference": "ce658034cd7884428a8c6f50c2d4f8cf66348c40", "shasum": "" }, "require": { @@ -5062,10 +5062,10 @@ "homepage": "https://symfony.com/contributors" } ], - "description": "Symfony Messenger Component", + "description": "Helps applications send and receive messages to/from other applications or via message queues", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/messenger/tree/v5.2.1" + "source": "https://github.com/symfony/messenger/tree/v5.2.2" }, "funding": [ { @@ -5081,20 +5081,20 @@ "type": "tidelift" } ], - "time": "2020-12-18T07:27:35+00:00" + "time": "2021-01-27T11:24:50+00:00" }, { "name": "symfony/process", - "version": "v5.2.1", + "version": "v5.2.2", "source": { "type": "git", "url": "https://github.com/symfony/process.git", - "reference": "bd8815b8b6705298beaa384f04fabd459c10bedd" + "reference": "313a38f09c77fbcdc1d223e57d368cea76a2fd2f" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/process/zipball/bd8815b8b6705298beaa384f04fabd459c10bedd", - "reference": "bd8815b8b6705298beaa384f04fabd459c10bedd", + "url": "https://api.github.com/repos/symfony/process/zipball/313a38f09c77fbcdc1d223e57d368cea76a2fd2f", + "reference": "313a38f09c77fbcdc1d223e57d368cea76a2fd2f", "shasum": "" }, "require": { @@ -5124,10 +5124,10 @@ "homepage": "https://symfony.com/contributors" } ], - "description": "Symfony Process Component", + "description": "Executes commands in sub-processes", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/process/tree/v5.2.1" + "source": "https://github.com/symfony/process/tree/v5.2.2" }, "funding": [ { @@ -5143,20 +5143,20 @@ "type": "tidelift" } ], - "time": "2020-12-08T17:03:37+00:00" + "time": "2021-01-27T10:15:41+00:00" }, { "name": "symfony/redis-messenger", - "version": "v5.2.1", + "version": "v5.2.2", "source": { "type": "git", "url": "https://github.com/symfony/redis-messenger.git", - "reference": "c9ab1c7467657661554e7010a59f47f723f69c93" + "reference": "7e68914bf35cda948ee4d9081b8eaed9fd783fe5" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/redis-messenger/zipball/c9ab1c7467657661554e7010a59f47f723f69c93", - "reference": "c9ab1c7467657661554e7010a59f47f723f69c93", + "url": "https://api.github.com/repos/symfony/redis-messenger/zipball/7e68914bf35cda948ee4d9081b8eaed9fd783fe5", + "reference": "7e68914bf35cda948ee4d9081b8eaed9fd783fe5", "shasum": "" }, "require": { @@ -5194,7 +5194,7 @@ "description": "Symfony Redis extension Messenger Bridge", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/redis-messenger/tree/v5.2.1" + "source": "https://github.com/symfony/redis-messenger/tree/v5.2.2" }, "funding": [ { @@ -5210,20 +5210,20 @@ "type": "tidelift" } ], - "time": "2020-12-15T11:52:46+00:00" + "time": "2021-01-27T11:24:50+00:00" }, { "name": "symfony/var-dumper", - "version": "v5.2.1", + "version": "v5.2.2", "source": { "type": "git", "url": "https://github.com/symfony/var-dumper.git", - "reference": "13e7e882eaa55863faa7c4ad7c60f12f1a8b5089" + "reference": "72ca213014a92223a5d18651ce79ef441c12b694" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/var-dumper/zipball/13e7e882eaa55863faa7c4ad7c60f12f1a8b5089", - "reference": "13e7e882eaa55863faa7c4ad7c60f12f1a8b5089", + "url": "https://api.github.com/repos/symfony/var-dumper/zipball/72ca213014a92223a5d18651ce79ef441c12b694", + "reference": "72ca213014a92223a5d18651ce79ef441c12b694", "shasum": "" }, "require": { @@ -5239,7 +5239,7 @@ "ext-iconv": "*", "symfony/console": "^4.4|^5.0", "symfony/process": "^4.4|^5.0", - "twig/twig": "^2.4|^3.0" + "twig/twig": "^2.13|^3.0.4" }, "suggest": { "ext-iconv": "To convert non-UTF-8 strings to UTF-8 (or symfony/polyfill-iconv in case ext-iconv cannot be used).", @@ -5275,14 +5275,14 @@ "homepage": "https://symfony.com/contributors" } ], - "description": "Symfony mechanism for exploring and dumping PHP variables", + "description": "Provides mechanisms for walking through any arbitrary PHP variable", "homepage": "https://symfony.com", "keywords": [ "debug", "dump" ], "support": { - "source": "https://github.com/symfony/var-dumper/tree/v5.2.1" + "source": "https://github.com/symfony/var-dumper/tree/v5.2.2" }, "funding": [ { @@ -5298,7 +5298,7 @@ "type": "tidelift" } ], - "time": "2020-12-16T17:02:19+00:00" + "time": "2021-01-27T10:15:41+00:00" }, { "name": "thecodingmachine/safe", diff --git a/src/Console/ProjectionCreateCommand.php b/src/Console/ProjectionCreateCommand.php new file mode 100644 index 00000000..641c8993 --- /dev/null +++ b/src/Console/ProjectionCreateCommand.php @@ -0,0 +1,34 @@ +projectionRepository = $projectionRepository; + } + + protected function configure(): void + { + $this->setName('event-sourcing:projection:create'); + } + + protected function execute(InputInterface $input, OutputInterface $output): int + { + $this->projectionRepository->create(); + + return 0; + } +} diff --git a/src/Console/ProjectionDropCommand.php b/src/Console/ProjectionDropCommand.php new file mode 100644 index 00000000..eac51d82 --- /dev/null +++ b/src/Console/ProjectionDropCommand.php @@ -0,0 +1,34 @@ +projectionRepository = $projectionRepository; + } + + protected function configure(): void + { + $this->setName('event-sourcing:projection:drop'); + } + + protected function execute(InputInterface $input, OutputInterface $output): int + { + $this->projectionRepository->create(); + + return 0; + } +} diff --git a/src/Console/ProjectionRebuildCommand.php b/src/Console/ProjectionRebuildCommand.php new file mode 100644 index 00000000..d94116c1 --- /dev/null +++ b/src/Console/ProjectionRebuildCommand.php @@ -0,0 +1,71 @@ +store = $store; + $this->projectionRepository = $projectionRepository; + } + + protected function configure(): void + { + $this + ->setName('event-sourcing:projection:rebuild') + ->addOption('recreate', 'r', InputOption::VALUE_OPTIONAL, 'drop and create projections', false); + } + + protected function execute(InputInterface $input, OutputInterface $output): int + { + $console = new SymfonyStyle($input, $output); + + $store = $this->store; + + if (!$store instanceof PipelineStore) { + $console->error('store is not supported'); + + return 1; + } + + if ($input->getOption('recreate')) { + $this->projectionRepository->drop(); + $this->projectionRepository->create(); + } + + $pipeline = new Pipeline( + new StoreSource($store), + new ProjectionRepositoryTarget($this->projectionRepository) + ); + + $console->progressStart($pipeline->count()); + + $pipeline->run(static function () use ($console): void { + $console->progressAdvance(); + }); + + $console->progressFinish(); + + return 0; + } +} diff --git a/src/Console/CreateSchemaCommand.php b/src/Console/SchemaCreateCommand.php similarity index 95% rename from src/Console/CreateSchemaCommand.php rename to src/Console/SchemaCreateCommand.php index 90e82c16..22e26e84 100644 --- a/src/Console/CreateSchemaCommand.php +++ b/src/Console/SchemaCreateCommand.php @@ -10,7 +10,7 @@ use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; -class CreateSchemaCommand extends Command +class SchemaCreateCommand extends Command { private Store $store; private SchemaManager $schemaManager; diff --git a/src/Console/DropSchemaCommand.php b/src/Console/SchemaDropCommand.php similarity index 95% rename from src/Console/DropSchemaCommand.php rename to src/Console/SchemaDropCommand.php index 2f4a180b..ec7e6886 100644 --- a/src/Console/DropSchemaCommand.php +++ b/src/Console/SchemaDropCommand.php @@ -10,7 +10,7 @@ use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; -class DropSchemaCommand extends Command +class SchemaDropCommand extends Command { private Store $store; private SchemaManager $schemaManager; diff --git a/src/Console/UpdateSchemaCommand.php b/src/Console/SchemaUpdateCommand.php similarity index 95% rename from src/Console/UpdateSchemaCommand.php rename to src/Console/SchemaUpdateCommand.php index c5336e95..cc4a1014 100644 --- a/src/Console/UpdateSchemaCommand.php +++ b/src/Console/SchemaUpdateCommand.php @@ -10,7 +10,7 @@ use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; -class UpdateSchemaCommand extends Command +class SchemaUpdateCommand extends Command { private Store $store; private SchemaManager $schemaManager; diff --git a/src/Pipeline/EventBucket.php b/src/Pipeline/EventBucket.php new file mode 100644 index 00000000..f4892801 --- /dev/null +++ b/src/Pipeline/EventBucket.php @@ -0,0 +1,37 @@ + */ + private string $aggregateClass; + private AggregateChanged $event; + + /** + * @param class-string $aggregateClass + */ + public function __construct(string $aggregateClass, AggregateChanged $event) + { + $this->aggregateClass = $aggregateClass; + $this->event = $event; + } + + /** + * @return class-string + */ + public function aggregateClass(): string + { + return $this->aggregateClass; + } + + public function event(): AggregateChanged + { + return $this->event; + } +} diff --git a/src/Pipeline/Middleware/DeleteEventMiddleware.php b/src/Pipeline/Middleware/DeleteEventMiddleware.php new file mode 100644 index 00000000..19f97d80 --- /dev/null +++ b/src/Pipeline/Middleware/DeleteEventMiddleware.php @@ -0,0 +1,36 @@ +> */ + private array $classes; + + /** + * @param list> $classes + */ + public function __construct(array $classes) + { + $this->classes = $classes; + } + + /** + * @return list + */ + public function __invoke(EventBucket $bucket): array + { + foreach ($this->classes as $class) { + if ($bucket->event() instanceof $class) { + return []; + } + } + + return [$bucket]; + } +} diff --git a/src/Pipeline/Middleware/FilterEventMiddleware.php b/src/Pipeline/Middleware/FilterEventMiddleware.php new file mode 100644 index 00000000..426bd851 --- /dev/null +++ b/src/Pipeline/Middleware/FilterEventMiddleware.php @@ -0,0 +1,36 @@ +> */ + private array $classes; + + /** + * @param list> $classes + */ + public function __construct(array $classes) + { + $this->classes = $classes; + } + + /** + * @return list + */ + public function __invoke(EventBucket $bucket): array + { + foreach ($this->classes as $class) { + if ($bucket->event() instanceof $class) { + return [$bucket]; + } + } + + return []; + } +} diff --git a/src/Pipeline/Middleware/Middleware.php b/src/Pipeline/Middleware/Middleware.php new file mode 100644 index 00000000..20254925 --- /dev/null +++ b/src/Pipeline/Middleware/Middleware.php @@ -0,0 +1,15 @@ + + */ + public function __invoke(EventBucket $bucket): array; +} diff --git a/src/Pipeline/Middleware/RecalculatePlayheadMiddleware.php b/src/Pipeline/Middleware/RecalculatePlayheadMiddleware.php new file mode 100644 index 00000000..b913264e --- /dev/null +++ b/src/Pipeline/Middleware/RecalculatePlayheadMiddleware.php @@ -0,0 +1,64 @@ +, array> */ + private array $index = []; + + private ReflectionProperty $reflectionProperty; + + public function __construct() + { + $reflectionClass = new ReflectionClass(AggregateChanged::class); + + $this->reflectionProperty = $reflectionClass->getProperty('playhead'); + $this->reflectionProperty->setAccessible(true); + } + + /** + * @return list + */ + public function __invoke(EventBucket $bucket): array + { + $event = $bucket->event(); + $playhead = $this->nextPlayhead($bucket->aggregateClass(), $event->aggregateId()); + + if ($event->playhead() === $playhead) { + return [$bucket]; + } + + $this->reflectionProperty->setValue($event, $playhead); + + return [$bucket]; + } + + /** + * @param class-string $aggregateClass + */ + private function nextPlayhead(string $aggregateClass, string $aggregateId): int + { + if (!array_key_exists($aggregateClass, $this->index)) { + $this->index[$aggregateClass] = []; + } + + if (!array_key_exists($aggregateId, $this->index[$aggregateClass])) { + $this->index[$aggregateClass][$aggregateId] = -1; + } + + $this->index[$aggregateClass][$aggregateId]++; + + return $this->index[$aggregateClass][$aggregateId]; + } +} diff --git a/src/Pipeline/Middleware/ReplaceEventMiddleware.php b/src/Pipeline/Middleware/ReplaceEventMiddleware.php new file mode 100644 index 00000000..772d0b01 --- /dev/null +++ b/src/Pipeline/Middleware/ReplaceEventMiddleware.php @@ -0,0 +1,73 @@ + */ + private string $class; + + /** @var callable(AggregateChanged $event):AggregateChanged */ + private $callable; + + private ReflectionProperty $recoredOnProperty; + private ReflectionProperty $playheadProperty; + + /** + * @param class-string $class + * @param callable(AggregateChanged $event):AggregateChanged $callable + */ + public function __construct(string $class, callable $callable) + { + $this->class = $class; + $this->callable = $callable; + + $reflectionClass = new ReflectionClass(AggregateChanged::class); + + $this->recoredOnProperty = $reflectionClass->getProperty('recordedOn'); + $this->recoredOnProperty->setAccessible(true); + + $this->playheadProperty = $reflectionClass->getProperty('playhead'); + $this->playheadProperty->setAccessible(true); + } + + /** + * @return list + */ + public function __invoke(EventBucket $bucket): array + { + $event = $bucket->event(); + + if (!$event instanceof $this->class) { + return [$bucket]; + } + + $callable = $this->callable; + + $newEvent = $callable($event); + + $this->recoredOnProperty->setValue( + $newEvent, + $this->recoredOnProperty->getValue($event) + ); + + $this->playheadProperty->setValue( + $newEvent, + $this->playheadProperty->getValue($event) + ); + + return [ + new EventBucket( + $bucket->aggregateClass(), + $newEvent + ), + ]; + } +} diff --git a/src/Pipeline/Pipeline.php b/src/Pipeline/Pipeline.php new file mode 100644 index 00000000..5a9775e8 --- /dev/null +++ b/src/Pipeline/Pipeline.php @@ -0,0 +1,81 @@ + */ + private array $middlewares; + + /** + * @param list $middlewares + */ + public function __construct(Source $source, Target $target, array $middlewares = []) + { + $this->source = $source; + $this->target = $target; + $this->middlewares = $middlewares; + } + + /** + * @param callable(EventBucket $event):void|null $observer + */ + public function run(?callable $observer = null): void + { + if ($observer === null) { + $observer = static function (EventBucket $event): void { + }; + } + + foreach ($this->source->load() as $bucket) { + foreach ($this->processMiddlewares($bucket) as $resultBucket) { + $this->target->save($resultBucket); + } + + $observer($bucket); + } + } + + public function count(): int + { + return $this->source->count(); + } + + /** + * @return list + */ + private function processMiddlewares(EventBucket $bucket): array + { + $buckets = [$bucket]; + + foreach ($this->middlewares as $middleware) { + $buckets = $this->processMiddleware($middleware, $buckets); + } + + return $buckets; + } + + /** + * @param list $buckets + * + * @return list + */ + private function processMiddleware(Middleware $middleware, array $buckets): array + { + $result = []; + + foreach ($buckets as $bucket) { + $result += $middleware($bucket); + } + + return $result; + } +} diff --git a/src/Pipeline/Source/InMemorySource.php b/src/Pipeline/Source/InMemorySource.php new file mode 100644 index 00000000..eb4d8af6 --- /dev/null +++ b/src/Pipeline/Source/InMemorySource.php @@ -0,0 +1,39 @@ + */ + private array $events; + + /** + * @param list $events + */ + public function __construct(array $events) + { + $this->events = $events; + } + + /** + * @return Generator + */ + public function load(): Generator + { + foreach ($this->events as $event) { + yield $event; + } + } + + public function count(): int + { + return count($this->events); + } +} diff --git a/src/Pipeline/Source/Source.php b/src/Pipeline/Source/Source.php new file mode 100644 index 00000000..8dffa1dd --- /dev/null +++ b/src/Pipeline/Source/Source.php @@ -0,0 +1,18 @@ + + */ + public function load(): Generator; + + public function count(): int; +} diff --git a/src/Pipeline/Source/StoreSource.php b/src/Pipeline/Source/StoreSource.php new file mode 100644 index 00000000..1ab45ccb --- /dev/null +++ b/src/Pipeline/Source/StoreSource.php @@ -0,0 +1,32 @@ +store = $store; + } + + /** + * @return Generator + */ + public function load(): Generator + { + return $this->store->all(); + } + + public function count(): int + { + return $this->store->count(); + } +} diff --git a/src/Pipeline/Target/InMemoryTarget.php b/src/Pipeline/Target/InMemoryTarget.php new file mode 100644 index 00000000..f5345668 --- /dev/null +++ b/src/Pipeline/Target/InMemoryTarget.php @@ -0,0 +1,26 @@ + */ + private array $buckets = []; + + public function save(EventBucket $bucket): void + { + $this->buckets[] = $bucket; + } + + /** + * @return list + */ + public function buckets(): array + { + return $this->buckets; + } +} diff --git a/src/Pipeline/Target/ProjectionRepositoryTarget.php b/src/Pipeline/Target/ProjectionRepositoryTarget.php new file mode 100644 index 00000000..dc060e6d --- /dev/null +++ b/src/Pipeline/Target/ProjectionRepositoryTarget.php @@ -0,0 +1,23 @@ +projectionRepository = $projectionRepository; + } + + public function save(EventBucket $bucket): void + { + $this->projectionRepository->handle($bucket->event()); + } +} diff --git a/src/Pipeline/Target/ProjectionTarget.php b/src/Pipeline/Target/ProjectionTarget.php new file mode 100644 index 00000000..b0df524c --- /dev/null +++ b/src/Pipeline/Target/ProjectionTarget.php @@ -0,0 +1,24 @@ +projectionRepository = new ProjectionRepository([$projection]); + } + + public function save(EventBucket $bucket): void + { + $this->projectionRepository->handle($bucket->event()); + } +} diff --git a/src/Pipeline/Target/StoreTarget.php b/src/Pipeline/Target/StoreTarget.php new file mode 100644 index 00000000..c5fd6439 --- /dev/null +++ b/src/Pipeline/Target/StoreTarget.php @@ -0,0 +1,23 @@ +store = $store; + } + + public function save(EventBucket $bucket): void + { + $this->store->saveEventBucket($bucket); + } +} diff --git a/src/Pipeline/Target/Target.php b/src/Pipeline/Target/Target.php new file mode 100644 index 00000000..6861126d --- /dev/null +++ b/src/Pipeline/Target/Target.php @@ -0,0 +1,12 @@ +, string> */ public function handledEvents(): iterable; + public function create(): void; + public function drop(): void; } diff --git a/src/Projection/ProjectionRepository.php b/src/Projection/ProjectionRepository.php index 10cbebef..5330b19b 100644 --- a/src/Projection/ProjectionRepository.php +++ b/src/Projection/ProjectionRepository.php @@ -42,6 +42,13 @@ public function handle(AggregateChanged $event): void } } + public function create(): void + { + foreach ($this->projections as $projection) { + $projection->create(); + } + } + public function drop(): void { foreach ($this->projections as $projection) { diff --git a/src/Store/PipelineStore.php b/src/Store/PipelineStore.php new file mode 100644 index 00000000..2f068beb --- /dev/null +++ b/src/Store/PipelineStore.php @@ -0,0 +1,20 @@ + + */ + public function all(): Generator; + + public function count(): int; + + public function saveEventBucket(EventBucket $bucket): void; +} diff --git a/src/Store/SingleTableStore.php b/src/Store/SingleTableStore.php index e62d912d..bc3fe677 100644 --- a/src/Store/SingleTableStore.php +++ b/src/Store/SingleTableStore.php @@ -10,11 +10,13 @@ use Generator; use Patchlevel\EventSourcing\Aggregate\AggregateChanged; use Patchlevel\EventSourcing\Aggregate\AggregateRoot; +use Patchlevel\EventSourcing\Pipeline\EventBucket; +use function array_flip; use function array_key_exists; use function array_map; -final class SingleTableStore extends DoctrineStore +final class SingleTableStore extends DoctrineStore implements PipelineStore { /** @var array, string> */ private array $aggregates; @@ -68,27 +70,6 @@ static function (array $data) use ($platform) { ); } - /** - * @return Generator - */ - public function loadAll(): Generator - { - $sql = $this->connection->createQueryBuilder() - ->select('*') - ->from($this->tableName) - ->getSQL(); - - $result = $this->connection->executeQuery($sql, []); - $platform = $this->connection->getDatabasePlatform(); - - /** @var array $data */ - foreach ($result->iterateAssociative() as $data) { - yield AggregateChanged::deserialize( - self::normalizeResult($platform, $data) - ); - } - } - /** * @param class-string $aggregate */ @@ -114,16 +95,6 @@ public function has(string $aggregate, string $id): bool return $result > 0; } - public function count(): int - { - $sql = $this->connection->createQueryBuilder() - ->select('COUNT(*)') - ->from($this->tableName) - ->getSQL(); - - return (int)$this->connection->fetchOne($sql); - } - /** * @param class-string $aggregate * @param AggregateChanged[] $events @@ -155,6 +126,64 @@ static function (Connection $connection) use ($shortName, $id, $events, $tableNa ); } + /** + * @return Generator + */ + public function all(): Generator + { + $sql = $this->connection->createQueryBuilder() + ->select('*') + ->from($this->tableName) + ->orderBy('id') + ->getSQL(); + + $result = $this->connection->iterateAssociative($sql); + $platform = $this->connection->getDatabasePlatform(); + + /** @var array> $classMap */ + $classMap = array_flip($this->aggregates); + + /** @var array $data */ + foreach ($result as $data) { + $name = (string)$data['aggregate']; + + if (!array_key_exists($name, $classMap)) { + throw new StoreException(); + } + + yield new EventBucket( + $classMap[$name], + AggregateChanged::deserialize( + self::normalizeResult($platform, $data) + ) + ); + } + } + + public function count(): int + { + $sql = $this->connection->createQueryBuilder() + ->select('COUNT(*)') + ->from($this->tableName) + ->getSQL(); + + return (int)$this->connection->fetchOne($sql); + } + + public function saveEventBucket(EventBucket $bucket): void + { + $data = $bucket->event()->serialize(); + $data['aggregate'] = $this->shortName($bucket->aggregateClass()); + + $this->connection->insert( + $this->tableName, + $data, + [ + 'recordedOn' => Types::DATETIMETZ_IMMUTABLE, + ] + ); + } + public function schema(): Schema { $schema = new Schema([], [], $this->connection->getSchemaManager()->createSchemaConfig()); diff --git a/tests/Integration/Pipeline/Aggregate/Profile.php b/tests/Integration/Pipeline/Aggregate/Profile.php new file mode 100644 index 00000000..59d86c61 --- /dev/null +++ b/tests/Integration/Pipeline/Aggregate/Profile.php @@ -0,0 +1,73 @@ +id; + } + + public static function create(string $id): self + { + $self = new self(); + $self->apply(ProfileCreated::raise($id)); + + return $self; + } + + public function visit(): void + { + $this->apply(OldVisited::raise($this->id)); + } + + public function privacy(): void + { + $this->apply(PrivacyAdded::raise($this->id)); + } + + public function isPrivate(): bool + { + return $this->privacy; + } + + public function count(): int + { + return $this->visited; + } + + protected function applyProfileCreated(ProfileCreated $event): void + { + $this->id = $event->profileId(); + $this->privacy = false; + $this->visited = 0; + } + + protected function applyOldVisited(OldVisited $event): void + { + $this->visited++; + } + + protected function applyNewVisited(NewVisited $event): void + { + $this->visited--; + } + + protected function applyPrivacyAdded(PrivacyAdded $event): void + { + $this->privacy = true; + } +} diff --git a/tests/Integration/Pipeline/Events/NewVisited.php b/tests/Integration/Pipeline/Events/NewVisited.php new file mode 100644 index 00000000..57836d9f --- /dev/null +++ b/tests/Integration/Pipeline/Events/NewVisited.php @@ -0,0 +1,20 @@ + $id]); + } + + public function profileId(): string + { + return $this->aggregateId; + } +} diff --git a/tests/Integration/Pipeline/Events/OldVisited.php b/tests/Integration/Pipeline/Events/OldVisited.php new file mode 100644 index 00000000..ce002fa4 --- /dev/null +++ b/tests/Integration/Pipeline/Events/OldVisited.php @@ -0,0 +1,20 @@ + $id]); + } + + public function profileId(): string + { + return $this->aggregateId; + } +} diff --git a/tests/Integration/Pipeline/Events/PrivacyAdded.php b/tests/Integration/Pipeline/Events/PrivacyAdded.php new file mode 100644 index 00000000..b2cc9cdb --- /dev/null +++ b/tests/Integration/Pipeline/Events/PrivacyAdded.php @@ -0,0 +1,20 @@ + $id]); + } + + public function profileId(): string + { + return $this->aggregateId; + } +} diff --git a/tests/Integration/Pipeline/Events/ProfileCreated.php b/tests/Integration/Pipeline/Events/ProfileCreated.php new file mode 100644 index 00000000..8dc093d0 --- /dev/null +++ b/tests/Integration/Pipeline/Events/ProfileCreated.php @@ -0,0 +1,20 @@ + $id]); + } + + public function profileId(): string + { + return $this->aggregateId; + } +} diff --git a/tests/Integration/Pipeline/PipelineChangeStoreTest.php b/tests/Integration/Pipeline/PipelineChangeStoreTest.php new file mode 100644 index 00000000..220bc516 --- /dev/null +++ b/tests/Integration/Pipeline/PipelineChangeStoreTest.php @@ -0,0 +1,122 @@ +connectionOld = DriverManager::getConnection([ + 'driverClass' => Driver::class, + 'path' => self::DB_PATH_OLD, + ]); + + $this->connectionNew = DriverManager::getConnection([ + 'driverClass' => Driver::class, + 'path' => self::DB_PATH_NEW, + ]); + } + + public function tearDown(): void + { + $this->connectionOld->close(); + $this->connectionNew->close(); + + unlink(self::DB_PATH_OLD); + unlink(self::DB_PATH_NEW); + } + + public function testSuccessful(): void + { + $oldStore = new SingleTableStore( + $this->connectionOld, + [Profile::class => 'profile'], + 'eventstore' + ); + + (new DoctrineSchemaManager())->create($oldStore); + + $newStore = new SingleTableStore( + $this->connectionNew, + [Profile::class => 'profile'], + 'eventstore' + ); + + (new DoctrineSchemaManager())->create($newStore); + + $oldRepository = new Repository($oldStore, new DefaultEventBus(), Profile::class); + $newRepository = new Repository($newStore, new DefaultEventBus(), Profile::class); + + $profile = Profile::create('1'); + $profile->visit(); + $profile->privacy(); + $profile->visit(); + + $oldRepository->save($profile); + + self::assertEquals('1', $profile->aggregateRootId()); + self::assertEquals(3, $profile->playhead()); + self::assertEquals(true, $profile->isPrivate()); + self::assertEquals(2, $profile->count()); + + $pipeline = new Pipeline( + new StoreSource($oldStore), + new StoreTarget($newStore), + [ + new DeleteEventMiddleware([PrivacyAdded::class]), + new ReplaceEventMiddleware(OldVisited::class, static function (OldVisited $oldVisited) { + return NewVisited::raise($oldVisited->profileId()); + }), + new RecalculatePlayheadMiddleware(), + ] + ); + + self::assertEquals(4, $pipeline->count()); + $pipeline->run(); + + $newProfile = $newRepository->load('1'); + + self::assertEquals('1', $newProfile->aggregateRootId()); + self::assertEquals(2, $newProfile->playhead()); + self::assertEquals(false, $newProfile->isPrivate()); + self::assertEquals(-2, $newProfile->count()); + } +} diff --git a/tests/Integration/Pipeline/data/.gitignore b/tests/Integration/Pipeline/data/.gitignore new file mode 100644 index 00000000..f9fda0b8 --- /dev/null +++ b/tests/Integration/Pipeline/data/.gitignore @@ -0,0 +1,2 @@ +old.sqlite3 +new.sqlite3 diff --git a/tests/Unit/Fixture/ProfileVisited.php b/tests/Unit/Fixture/ProfileVisited.php index 5d54fbc3..4e676ef7 100644 --- a/tests/Unit/Fixture/ProfileVisited.php +++ b/tests/Unit/Fixture/ProfileVisited.php @@ -8,7 +8,7 @@ final class ProfileVisited extends AggregateChanged { - public static function raise(ProfileId $visitorId, ProfileId $visitedId): self + public static function raise(ProfileId $visitedId, ProfileId $visitorId): self { return self::occur( $visitedId->toString(), diff --git a/tests/Unit/Pipeline/Middleware/DeleteEventMiddlewareTest.php b/tests/Unit/Pipeline/Middleware/DeleteEventMiddlewareTest.php new file mode 100644 index 00000000..a7399182 --- /dev/null +++ b/tests/Unit/Pipeline/Middleware/DeleteEventMiddlewareTest.php @@ -0,0 +1,51 @@ +recordNow(0) + ); + + $result = $middleware($bucket); + + self::assertEquals([], $result); + } + + public function testSkipEvent(): void + { + $middleware = new DeleteEventMiddleware([ProfileCreated::class]); + + $bucket = new EventBucket( + Profile::class, + ProfileVisited::raise( + ProfileId::fromString('1'), + ProfileId::fromString('2') + )->recordNow(0) + ); + + $result = $middleware($bucket); + + self::assertEquals([$bucket], $result); + } +} diff --git a/tests/Unit/Pipeline/Middleware/FilterEventMiddlewareTest.php b/tests/Unit/Pipeline/Middleware/FilterEventMiddlewareTest.php new file mode 100644 index 00000000..23a1f8c1 --- /dev/null +++ b/tests/Unit/Pipeline/Middleware/FilterEventMiddlewareTest.php @@ -0,0 +1,51 @@ +recordNow(0) + ); + + $result = $middleware($bucket); + + self::assertEquals([$bucket], $result); + } + + public function testSkipEvent(): void + { + $middleware = new FilterEventMiddleware([ProfileCreated::class]); + + $bucket = new EventBucket( + Profile::class, + ProfileVisited::raise( + ProfileId::fromString('1'), + ProfileId::fromString('2') + )->recordNow(0) + ); + + $result = $middleware($bucket); + + self::assertEquals([], $result); + } +} diff --git a/tests/Unit/Pipeline/Middleware/RecalculatePlayheadMiddlewareTest.php b/tests/Unit/Pipeline/Middleware/RecalculatePlayheadMiddlewareTest.php new file mode 100644 index 00000000..166b70a2 --- /dev/null +++ b/tests/Unit/Pipeline/Middleware/RecalculatePlayheadMiddlewareTest.php @@ -0,0 +1,38 @@ +recordNow(5) + ); + + $result = $middleware($bucket); + + self::assertCount(1, $result); + self::assertEquals(Profile::class, $result[0]->aggregateClass()); + + $event = $result[0]->event(); + + self::assertEquals(0, $event->playhead()); + } +} diff --git a/tests/Unit/Pipeline/Middleware/ReplaceEventMiddlewareTest.php b/tests/Unit/Pipeline/Middleware/ReplaceEventMiddlewareTest.php new file mode 100644 index 00000000..71f6eefd --- /dev/null +++ b/tests/Unit/Pipeline/Middleware/ReplaceEventMiddlewareTest.php @@ -0,0 +1,48 @@ +profileId(), + $event->profileId() + ); + } + ); + + $bucket = new EventBucket( + Profile::class, + ProfileCreated::raise( + ProfileId::fromString('1'), + Email::fromString('d.a.badura@gmail.com') + )->recordNow(5) + ); + + $result = $middleware($bucket); + + self::assertCount(1, $result); + self::assertEquals(Profile::class, $result[0]->aggregateClass()); + + $event = $result[0]->event(); + + self::assertInstanceOf(ProfileVisited::class, $event); + self::assertEquals(5, $event->playhead()); + } +} diff --git a/tests/Unit/Pipeline/PipelineTest.php b/tests/Unit/Pipeline/PipelineTest.php new file mode 100644 index 00000000..4f39b1e1 --- /dev/null +++ b/tests/Unit/Pipeline/PipelineTest.php @@ -0,0 +1,144 @@ +recordNow(0) + ), + new EventBucket( + Profile::class, + ProfileVisited::raise( + ProfileId::fromString('1'), + ProfileId::fromString('2') + )->recordNow(1) + ), + new EventBucket( + Profile::class, + ProfileVisited::raise( + ProfileId::fromString('1'), + ProfileId::fromString('3') + )->recordNow(2) + ), + new EventBucket( + Profile::class, + ProfileCreated::raise( + ProfileId::fromString('2'), + Email::fromString('d.a.badura@gmail.com') + )->recordNow(0) + ), + new EventBucket( + Profile::class, + ProfileVisited::raise( + ProfileId::fromString('2'), + ProfileId::fromString('2') + )->recordNow(1) + ), + ]; + + $source = new InMemorySource($buckets); + $target = new InMemoryTarget(); + $pipeline = new Pipeline($source, $target); + + self::assertEquals(5, $pipeline->count()); + + $pipeline->run(); + + self::assertEquals($buckets, $target->buckets()); + } + + public function testPipelineWithMiddleware(): void + { + $buckets = [ + new EventBucket( + Profile::class, + ProfileCreated::raise( + ProfileId::fromString('1'), + Email::fromString('d.a.badura@gmail.com') + )->recordNow(0) + ), + new EventBucket( + Profile::class, + ProfileVisited::raise( + ProfileId::fromString('1'), + ProfileId::fromString('2') + )->recordNow(1) + ), + new EventBucket( + Profile::class, + ProfileVisited::raise( + ProfileId::fromString('1'), + ProfileId::fromString('3') + )->recordNow(2) + ), + new EventBucket( + Profile::class, + ProfileCreated::raise( + ProfileId::fromString('2'), + Email::fromString('d.a.badura@gmail.com') + )->recordNow(0) + ), + new EventBucket( + Profile::class, + ProfileVisited::raise( + ProfileId::fromString('2'), + ProfileId::fromString('2') + )->recordNow(1) + ), + ]; + + $source = new InMemorySource($buckets); + $target = new InMemoryTarget(); + $pipeline = new Pipeline( + $source, + $target, + [ + new DeleteEventMiddleware([ProfileCreated::class]), + new RecalculatePlayheadMiddleware(), + ] + ); + + self::assertEquals(5, $pipeline->count()); + + $pipeline->run(); + + $resultBuckets = $target->buckets(); + + self::assertCount(3, $resultBuckets); + + self::assertInstanceOf(ProfileVisited::class, $resultBuckets[0]->event()); + self::assertEquals('1', $resultBuckets[0]->event()->aggregateId()); + self::assertEquals(0, $resultBuckets[0]->event()->playhead()); + + self::assertInstanceOf(ProfileVisited::class, $resultBuckets[1]->event()); + self::assertEquals('1', $resultBuckets[1]->event()->aggregateId()); + self::assertEquals(1, $resultBuckets[1]->event()->playhead()); + + self::assertInstanceOf(ProfileVisited::class, $resultBuckets[2]->event()); + self::assertEquals('2', $resultBuckets[2]->event()->aggregateId()); + self::assertEquals(0, $resultBuckets[2]->event()->playhead()); + } +}