From 58faa991f5d28dc2ae4a742f37bde4c310cb28cb Mon Sep 17 00:00:00 2001 From: David Badura Date: Fri, 29 Jan 2021 14:38:43 +0100 Subject: [PATCH 01/12] projection pipeline --- src/Console/ProjectionRebuildCommand.php | 57 +++++++++++++++++++++ src/Projection/ProjectionPipeline.php | 39 ++++++++++++++ src/Store/SingleTableStore.php | 65 ++++++++++++------------ src/Store/StreamableStore.php | 18 +++++++ 4 files changed, 147 insertions(+), 32 deletions(-) create mode 100644 src/Console/ProjectionRebuildCommand.php create mode 100644 src/Projection/ProjectionPipeline.php create mode 100644 src/Store/StreamableStore.php diff --git a/src/Console/ProjectionRebuildCommand.php b/src/Console/ProjectionRebuildCommand.php new file mode 100644 index 00000000..55cb8704 --- /dev/null +++ b/src/Console/ProjectionRebuildCommand.php @@ -0,0 +1,57 @@ +store = $store; + $this->projectionRepository = $projectionRepository; + } + + protected function configure(): void + { + $this->setName('event-sourcing:projection:rebuild'); + } + + protected function execute(InputInterface $input, OutputInterface $output): int + { + $console = new SymfonyStyle($input, $output); + + $store = $this->store; + + if (!$store instanceof StreamableStore) { + $console->error('store is not supported'); + + return 1; + } + + $console->progressStart($store->count()); + + $projectionPipeline = new ProjectionPipeline($store, $this->projectionRepository); + $projectionPipeline->rebuild(static function () use ($console): void { + $console->progressAdvance(); + }); + + $console->progressFinish(); + + return 0; + } +} diff --git a/src/Projection/ProjectionPipeline.php b/src/Projection/ProjectionPipeline.php new file mode 100644 index 00000000..bc034dca --- /dev/null +++ b/src/Projection/ProjectionPipeline.php @@ -0,0 +1,39 @@ +store = $store; + $this->projectionRepository = $projectionRepository; + } + + /** + * @param callable(AggregateChanged $event):void|null $observer + */ + public function rebuild(?callable $observer = null): void + { + if ($observer === null) { + $observer = static function (AggregateChanged $event): void { + }; + } + + $this->projectionRepository->drop(); + + foreach ($this->store->all() as $event) { + $observer($event); + + $this->projectionRepository->handle($event); + } + } +} diff --git a/src/Store/SingleTableStore.php b/src/Store/SingleTableStore.php index e62d912d..6ee07199 100644 --- a/src/Store/SingleTableStore.php +++ b/src/Store/SingleTableStore.php @@ -14,7 +14,7 @@ use function array_key_exists; use function array_map; -final class SingleTableStore extends DoctrineStore +final class SingleTableStore extends DoctrineStore implements StreamableStore { /** @var array, string> */ private array $aggregates; @@ -68,27 +68,6 @@ static function (array $data) use ($platform) { ); } - /** - * @return Generator - */ - public function loadAll(): Generator - { - $sql = $this->connection->createQueryBuilder() - ->select('*') - ->from($this->tableName) - ->getSQL(); - - $result = $this->connection->executeQuery($sql, []); - $platform = $this->connection->getDatabasePlatform(); - - /** @var array $data */ - foreach ($result->iterateAssociative() as $data) { - yield AggregateChanged::deserialize( - self::normalizeResult($platform, $data) - ); - } - } - /** * @param class-string $aggregate */ @@ -114,16 +93,6 @@ public function has(string $aggregate, string $id): bool return $result > 0; } - public function count(): int - { - $sql = $this->connection->createQueryBuilder() - ->select('COUNT(*)') - ->from($this->tableName) - ->getSQL(); - - return (int)$this->connection->fetchOne($sql); - } - /** * @param class-string $aggregate * @param AggregateChanged[] $events @@ -155,6 +124,38 @@ static function (Connection $connection) use ($shortName, $id, $events, $tableNa ); } + /** + * @return Generator + */ + public function all(): Generator + { + $sql = $this->connection->createQueryBuilder() + ->select('*') + ->from($this->tableName) + ->orderBy('id') + ->getSQL(); + + $result = $this->connection->executeQuery($sql, []); + $platform = $this->connection->getDatabasePlatform(); + + /** @var array $data */ + foreach ($result->iterateAssociative() as $data) { + yield AggregateChanged::deserialize( + self::normalizeResult($platform, $data) + ); + } + } + + public function count(): int + { + $sql = $this->connection->createQueryBuilder() + ->select('COUNT(*)') + ->from($this->tableName) + ->getSQL(); + + return (int)$this->connection->fetchOne($sql); + } + public function schema(): Schema { $schema = new Schema([], [], $this->connection->getSchemaManager()->createSchemaConfig()); diff --git a/src/Store/StreamableStore.php b/src/Store/StreamableStore.php new file mode 100644 index 00000000..5e4f6a53 --- /dev/null +++ b/src/Store/StreamableStore.php @@ -0,0 +1,18 @@ + + */ + public function all(): Generator; + + public function count(): int; +} From 8fa00dce8bce3ec1cd6cddf0f6691cdb24f90e3d Mon Sep 17 00:00:00 2001 From: David Badura Date: Fri, 29 Jan 2021 15:08:32 +0100 Subject: [PATCH 02/12] add more commands --- src/Console/ProjectionCreateCommand.php | 34 +++++++++++++++++++++++++ src/Console/ProjectionDropCommand.php | 34 +++++++++++++++++++++++++ src/Projection/Projection.php | 2 ++ src/Projection/ProjectionRepository.php | 7 +++++ 4 files changed, 77 insertions(+) create mode 100644 src/Console/ProjectionCreateCommand.php create mode 100644 src/Console/ProjectionDropCommand.php diff --git a/src/Console/ProjectionCreateCommand.php b/src/Console/ProjectionCreateCommand.php new file mode 100644 index 00000000..641c8993 --- /dev/null +++ b/src/Console/ProjectionCreateCommand.php @@ -0,0 +1,34 @@ +projectionRepository = $projectionRepository; + } + + protected function configure(): void + { + $this->setName('event-sourcing:projection:create'); + } + + protected function execute(InputInterface $input, OutputInterface $output): int + { + $this->projectionRepository->create(); + + return 0; + } +} diff --git a/src/Console/ProjectionDropCommand.php b/src/Console/ProjectionDropCommand.php new file mode 100644 index 00000000..eac51d82 --- /dev/null +++ b/src/Console/ProjectionDropCommand.php @@ -0,0 +1,34 @@ +projectionRepository = $projectionRepository; + } + + protected function configure(): void + { + $this->setName('event-sourcing:projection:drop'); + } + + protected function execute(InputInterface $input, OutputInterface $output): int + { + $this->projectionRepository->create(); + + return 0; + } +} diff --git a/src/Projection/Projection.php b/src/Projection/Projection.php index 134fa0d9..2e623ce0 100644 --- a/src/Projection/Projection.php +++ b/src/Projection/Projection.php @@ -11,5 +11,7 @@ interface Projection /** @return iterable, string> */ public function handledEvents(): iterable; + public function create(): void; + public function drop(): void; } diff --git a/src/Projection/ProjectionRepository.php b/src/Projection/ProjectionRepository.php index 10cbebef..5330b19b 100644 --- a/src/Projection/ProjectionRepository.php +++ b/src/Projection/ProjectionRepository.php @@ -42,6 +42,13 @@ public function handle(AggregateChanged $event): void } } + public function create(): void + { + foreach ($this->projections as $projection) { + $projection->create(); + } + } + public function drop(): void { foreach ($this->projections as $projection) { From f486b662c21ab1803eefa540b19c58d8b28922d4 Mon Sep 17 00:00:00 2001 From: David Badura Date: Sat, 30 Jan 2021 10:29:06 +0100 Subject: [PATCH 03/12] refactor --- src/Console/ProjectionRebuildCommand.php | 14 +++- src/Pipeline/Middleware/DeleteMiddleware.php | 35 ++++++++ src/Pipeline/Middleware/Middleware.php | 15 ++++ src/Pipeline/Pipeline.php | 82 +++++++++++++++++++ src/Pipeline/Source/Source.php | 18 ++++ src/Pipeline/Source/StreamableStoreSource.php | 28 +++++++ .../Target/ProjectionRepositoryTarget.php | 23 ++++++ src/Pipeline/Target/Target.php | 12 +++ src/Projection/ProjectionPipeline.php | 39 --------- 9 files changed, 223 insertions(+), 43 deletions(-) create mode 100644 src/Pipeline/Middleware/DeleteMiddleware.php create mode 100644 src/Pipeline/Middleware/Middleware.php create mode 100644 src/Pipeline/Pipeline.php create mode 100644 src/Pipeline/Source/Source.php create mode 100644 src/Pipeline/Source/StreamableStoreSource.php create mode 100644 src/Pipeline/Target/ProjectionRepositoryTarget.php create mode 100644 src/Pipeline/Target/Target.php delete mode 100644 src/Projection/ProjectionPipeline.php diff --git a/src/Console/ProjectionRebuildCommand.php b/src/Console/ProjectionRebuildCommand.php index 55cb8704..2cd44457 100644 --- a/src/Console/ProjectionRebuildCommand.php +++ b/src/Console/ProjectionRebuildCommand.php @@ -4,7 +4,9 @@ namespace Patchlevel\EventSourcing\Console; -use Patchlevel\EventSourcing\Projection\ProjectionPipeline; +use Patchlevel\EventSourcing\Pipeline\Pipeline; +use Patchlevel\EventSourcing\Pipeline\Source\StreamableStoreSource; +use Patchlevel\EventSourcing\Pipeline\Target\ProjectionRepositoryTarget; use Patchlevel\EventSourcing\Projection\ProjectionRepository; use Patchlevel\EventSourcing\Store\Store; use Patchlevel\EventSourcing\Store\StreamableStore; @@ -43,10 +45,14 @@ protected function execute(InputInterface $input, OutputInterface $output): int return 1; } - $console->progressStart($store->count()); + $pipeline = new Pipeline( + new StreamableStoreSource($store), + new ProjectionRepositoryTarget($this->projectionRepository) + ); - $projectionPipeline = new ProjectionPipeline($store, $this->projectionRepository); - $projectionPipeline->rebuild(static function () use ($console): void { + $console->progressStart($pipeline->count()); + + $pipeline->run(static function () use ($console): void { $console->progressAdvance(); }); diff --git a/src/Pipeline/Middleware/DeleteMiddleware.php b/src/Pipeline/Middleware/DeleteMiddleware.php new file mode 100644 index 00000000..7a1776d8 --- /dev/null +++ b/src/Pipeline/Middleware/DeleteMiddleware.php @@ -0,0 +1,35 @@ +> */ + private array $classes; + + /** + * @param list> $classes + */ + public function __construct(array $classes) + { + $this->classes = $classes; + } + + /** + * @return list + */ + public function __invoke(AggregateChanged $aggregateChanged): array + { + foreach ($this->classes as $class) { + if ($aggregateChanged instanceof $class) { + return []; + } + } + + return [$aggregateChanged]; + } +} diff --git a/src/Pipeline/Middleware/Middleware.php b/src/Pipeline/Middleware/Middleware.php new file mode 100644 index 00000000..fcf8d861 --- /dev/null +++ b/src/Pipeline/Middleware/Middleware.php @@ -0,0 +1,15 @@ + + */ + public function __invoke(AggregateChanged $aggregateChanged): array; +} diff --git a/src/Pipeline/Pipeline.php b/src/Pipeline/Pipeline.php new file mode 100644 index 00000000..e9537568 --- /dev/null +++ b/src/Pipeline/Pipeline.php @@ -0,0 +1,82 @@ + */ + private array $middlewares; + + /** + * @param list $middlewares + */ + public function __construct(Source $source, Target $target, array $middlewares = []) + { + $this->source = $source; + $this->target = $target; + $this->middlewares = $middlewares; + } + + /** + * @param callable(AggregateChanged $event):void|null $observer + */ + public function run(?callable $observer = null): void + { + if ($observer === null) { + $observer = static function (AggregateChanged $event): void { + }; + } + + foreach ($this->source->load() as $event) { + foreach ($this->processMiddlewares($event) as $resultEvent) { + $this->target->save($resultEvent); + } + + $observer($event); + } + } + + public function count(): int + { + return $this->source->count(); + } + + /** + * @return list + */ + private function processMiddlewares(AggregateChanged $event): array + { + $events = [$event]; + + foreach ($this->middlewares as $middleware) { + $events = $this->processMiddleware($middleware, $events); + } + + return $events; + } + + /** + * @param list $events + * + * @return list + */ + private function processMiddleware(Middleware $middleware, array $events): array + { + $result = []; + + foreach ($events as $event) { + $result += $middleware($event); + } + + return $result; + } +} diff --git a/src/Pipeline/Source/Source.php b/src/Pipeline/Source/Source.php new file mode 100644 index 00000000..5ce0d7f0 --- /dev/null +++ b/src/Pipeline/Source/Source.php @@ -0,0 +1,18 @@ + + */ + public function load(): Generator; + + public function count(): int; +} diff --git a/src/Pipeline/Source/StreamableStoreSource.php b/src/Pipeline/Source/StreamableStoreSource.php new file mode 100644 index 00000000..38620d4e --- /dev/null +++ b/src/Pipeline/Source/StreamableStoreSource.php @@ -0,0 +1,28 @@ +store = $store; + } + + public function load(): Generator + { + return $this->store->all(); + } + + public function count(): int + { + return $this->store->count(); + } +} diff --git a/src/Pipeline/Target/ProjectionRepositoryTarget.php b/src/Pipeline/Target/ProjectionRepositoryTarget.php new file mode 100644 index 00000000..cc40d8bb --- /dev/null +++ b/src/Pipeline/Target/ProjectionRepositoryTarget.php @@ -0,0 +1,23 @@ +projectionRepository = $projectionRepository; + } + + public function save(AggregateChanged $event): void + { + $this->projectionRepository->handle($event); + } +} diff --git a/src/Pipeline/Target/Target.php b/src/Pipeline/Target/Target.php new file mode 100644 index 00000000..9be476e9 --- /dev/null +++ b/src/Pipeline/Target/Target.php @@ -0,0 +1,12 @@ +store = $store; - $this->projectionRepository = $projectionRepository; - } - - /** - * @param callable(AggregateChanged $event):void|null $observer - */ - public function rebuild(?callable $observer = null): void - { - if ($observer === null) { - $observer = static function (AggregateChanged $event): void { - }; - } - - $this->projectionRepository->drop(); - - foreach ($this->store->all() as $event) { - $observer($event); - - $this->projectionRepository->handle($event); - } - } -} From 09ec7df2ddbda8f44c0b84014e9d18a2a256b272 Mon Sep 17 00:00:00 2001 From: David Badura Date: Sat, 30 Jan 2021 10:29:32 +0100 Subject: [PATCH 04/12] extends store --- src/Store/StreamableStore.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Store/StreamableStore.php b/src/Store/StreamableStore.php index 5e4f6a53..068e7f9c 100644 --- a/src/Store/StreamableStore.php +++ b/src/Store/StreamableStore.php @@ -7,7 +7,7 @@ use Generator; use Patchlevel\EventSourcing\Aggregate\AggregateChanged; -interface StreamableStore +interface StreamableStore extends Store { /** * @return Generator From 302667a20a1372dcff42a2053d6078c4df1e69ae Mon Sep 17 00:00:00 2001 From: David Badura Date: Sat, 30 Jan 2021 12:27:09 +0100 Subject: [PATCH 05/12] add tests --- src/Console/ProjectionRebuildCommand.php | 10 +- ...emaCommand.php => SchemaCreateCommand.php} | 2 +- ...chemaCommand.php => SchemaDropCommand.php} | 2 +- ...emaCommand.php => SchemaUpdateCommand.php} | 2 +- ...ddleware.php => DeleteEventMiddleware.php} | 2 +- .../RecalculatePlayheadMiddleware.php | 54 +++++++++ src/Pipeline/Source/InMemorySource.php | 39 +++++++ src/Pipeline/Target/InMemoryTarget.php | 26 +++++ tests/Unit/Fixture/ProfileVisited.php | 2 +- tests/Unit/Pipeline/PipelineTest.php | 108 ++++++++++++++++++ 10 files changed, 241 insertions(+), 6 deletions(-) rename src/Console/{CreateSchemaCommand.php => SchemaCreateCommand.php} (95%) rename src/Console/{DropSchemaCommand.php => SchemaDropCommand.php} (95%) rename src/Console/{UpdateSchemaCommand.php => SchemaUpdateCommand.php} (95%) rename src/Pipeline/Middleware/{DeleteMiddleware.php => DeleteEventMiddleware.php} (93%) create mode 100644 src/Pipeline/Middleware/RecalculatePlayheadMiddleware.php create mode 100644 src/Pipeline/Source/InMemorySource.php create mode 100644 src/Pipeline/Target/InMemoryTarget.php create mode 100644 tests/Unit/Pipeline/PipelineTest.php diff --git a/src/Console/ProjectionRebuildCommand.php b/src/Console/ProjectionRebuildCommand.php index 2cd44457..45860452 100644 --- a/src/Console/ProjectionRebuildCommand.php +++ b/src/Console/ProjectionRebuildCommand.php @@ -12,6 +12,7 @@ use Patchlevel\EventSourcing\Store\StreamableStore; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Style\SymfonyStyle; @@ -30,7 +31,9 @@ public function __construct(Store $store, ProjectionRepository $projectionReposi protected function configure(): void { - $this->setName('event-sourcing:projection:rebuild'); + $this + ->setName('event-sourcing:projection:rebuild') + ->addOption('recreate', 'r', InputOption::VALUE_OPTIONAL, 'drop and create projections', false); } protected function execute(InputInterface $input, OutputInterface $output): int @@ -45,6 +48,11 @@ protected function execute(InputInterface $input, OutputInterface $output): int return 1; } + if ($input->getOption('recreate')) { + $this->projectionRepository->drop(); + $this->projectionRepository->create(); + } + $pipeline = new Pipeline( new StreamableStoreSource($store), new ProjectionRepositoryTarget($this->projectionRepository) diff --git a/src/Console/CreateSchemaCommand.php b/src/Console/SchemaCreateCommand.php similarity index 95% rename from src/Console/CreateSchemaCommand.php rename to src/Console/SchemaCreateCommand.php index 90e82c16..22e26e84 100644 --- a/src/Console/CreateSchemaCommand.php +++ b/src/Console/SchemaCreateCommand.php @@ -10,7 +10,7 @@ use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; -class CreateSchemaCommand extends Command +class SchemaCreateCommand extends Command { private Store $store; private SchemaManager $schemaManager; diff --git a/src/Console/DropSchemaCommand.php b/src/Console/SchemaDropCommand.php similarity index 95% rename from src/Console/DropSchemaCommand.php rename to src/Console/SchemaDropCommand.php index 2f4a180b..ec7e6886 100644 --- a/src/Console/DropSchemaCommand.php +++ b/src/Console/SchemaDropCommand.php @@ -10,7 +10,7 @@ use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; -class DropSchemaCommand extends Command +class SchemaDropCommand extends Command { private Store $store; private SchemaManager $schemaManager; diff --git a/src/Console/UpdateSchemaCommand.php b/src/Console/SchemaUpdateCommand.php similarity index 95% rename from src/Console/UpdateSchemaCommand.php rename to src/Console/SchemaUpdateCommand.php index c5336e95..cc4a1014 100644 --- a/src/Console/UpdateSchemaCommand.php +++ b/src/Console/SchemaUpdateCommand.php @@ -10,7 +10,7 @@ use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; -class UpdateSchemaCommand extends Command +class SchemaUpdateCommand extends Command { private Store $store; private SchemaManager $schemaManager; diff --git a/src/Pipeline/Middleware/DeleteMiddleware.php b/src/Pipeline/Middleware/DeleteEventMiddleware.php similarity index 93% rename from src/Pipeline/Middleware/DeleteMiddleware.php rename to src/Pipeline/Middleware/DeleteEventMiddleware.php index 7a1776d8..139b6021 100644 --- a/src/Pipeline/Middleware/DeleteMiddleware.php +++ b/src/Pipeline/Middleware/DeleteEventMiddleware.php @@ -6,7 +6,7 @@ use Patchlevel\EventSourcing\Aggregate\AggregateChanged; -class DeleteMiddleware implements Middleware +class DeleteEventMiddleware implements Middleware { /** @var list> */ private array $classes; diff --git a/src/Pipeline/Middleware/RecalculatePlayheadMiddleware.php b/src/Pipeline/Middleware/RecalculatePlayheadMiddleware.php new file mode 100644 index 00000000..c3e2b570 --- /dev/null +++ b/src/Pipeline/Middleware/RecalculatePlayheadMiddleware.php @@ -0,0 +1,54 @@ + */ + private array $index = []; + + private ReflectionProperty $reflectionProperty; + + public function __construct() + { + $reflectionClass = new ReflectionClass(AggregateChanged::class); + + $this->reflectionProperty = $reflectionClass->getProperty('playhead'); + $this->reflectionProperty->setAccessible(true); + } + + /** + * @return list + */ + public function __invoke(AggregateChanged $aggregateChanged): array + { + $playhead = $this->nextPlayhead($aggregateChanged->aggregateId()); + + if ($aggregateChanged->playhead() === $playhead) { + return [$aggregateChanged]; + } + + $this->reflectionProperty->setValue($aggregateChanged, $playhead); + + return [$aggregateChanged]; + } + + private function nextPlayhead(string $aggregateId): int + { + if (!array_key_exists($aggregateId, $this->index)) { + $this->index[$aggregateId] = -1; + } + + $this->index[$aggregateId]++; + + return $this->index[$aggregateId]; + } +} diff --git a/src/Pipeline/Source/InMemorySource.php b/src/Pipeline/Source/InMemorySource.php new file mode 100644 index 00000000..1c988b44 --- /dev/null +++ b/src/Pipeline/Source/InMemorySource.php @@ -0,0 +1,39 @@ + */ + private array $events; + + /** + * @param list $events + */ + public function __construct(array $events) + { + $this->events = $events; + } + + /** + * @return Generator + */ + public function load(): Generator + { + foreach ($this->events as $event) { + yield $event; + } + } + + public function count(): int + { + return count($this->events); + } +} diff --git a/src/Pipeline/Target/InMemoryTarget.php b/src/Pipeline/Target/InMemoryTarget.php new file mode 100644 index 00000000..cd11ec5a --- /dev/null +++ b/src/Pipeline/Target/InMemoryTarget.php @@ -0,0 +1,26 @@ + */ + private array $events = []; + + public function save(AggregateChanged $event): void + { + $this->events[] = $event; + } + + /** + * @return list + */ + public function events(): array + { + return $this->events; + } +} diff --git a/tests/Unit/Fixture/ProfileVisited.php b/tests/Unit/Fixture/ProfileVisited.php index 5d54fbc3..4e676ef7 100644 --- a/tests/Unit/Fixture/ProfileVisited.php +++ b/tests/Unit/Fixture/ProfileVisited.php @@ -8,7 +8,7 @@ final class ProfileVisited extends AggregateChanged { - public static function raise(ProfileId $visitorId, ProfileId $visitedId): self + public static function raise(ProfileId $visitedId, ProfileId $visitorId): self { return self::occur( $visitedId->toString(), diff --git a/tests/Unit/Pipeline/PipelineTest.php b/tests/Unit/Pipeline/PipelineTest.php new file mode 100644 index 00000000..a9dbd9c1 --- /dev/null +++ b/tests/Unit/Pipeline/PipelineTest.php @@ -0,0 +1,108 @@ +recordNow(0), + ProfileVisited::raise( + ProfileId::fromString('1'), + ProfileId::fromString('2') + )->recordNow(1), + ProfileVisited::raise( + ProfileId::fromString('1'), + ProfileId::fromString('3') + )->recordNow(2), + ProfileCreated::raise( + ProfileId::fromString('2'), + Email::fromString('d.a.badura@gmail.com') + )->recordNow(0), + ProfileVisited::raise( + ProfileId::fromString('2'), + ProfileId::fromString('2') + )->recordNow(1), + ]; + + $source = new InMemorySource($events); + $target = new InMemoryTarget(); + $pipeline = new Pipeline($source, $target); + + self::assertEquals(5, $pipeline->count()); + + $pipeline->run(); + + self::assertEquals($events, $target->events()); + } + + public function testPipelineWithMiddleware(): void + { + $events = [ + ProfileCreated::raise( + ProfileId::fromString('1'), + Email::fromString('d.a.badura@gmail.com') + )->recordNow(0), + ProfileVisited::raise( + ProfileId::fromString('1'), + ProfileId::fromString('2') + )->recordNow(1), + ProfileVisited::raise( + ProfileId::fromString('1'), + ProfileId::fromString('3') + )->recordNow(2), + ProfileCreated::raise( + ProfileId::fromString('2'), + Email::fromString('d.a.badura@gmail.com') + )->recordNow(0), + ProfileVisited::raise( + ProfileId::fromString('2'), + ProfileId::fromString('2') + )->recordNow(1), + ]; + + $source = new InMemorySource($events); + $target = new InMemoryTarget(); + $pipeline = new Pipeline( + $source, + $target, + [ + new DeleteEventMiddleware([ProfileCreated::class]), + new RecalculatePlayheadMiddleware(), + ] + ); + + self::assertEquals(5, $pipeline->count()); + + $pipeline->run(); + + $resultEvents = $target->events(); + + self::assertCount(3, $resultEvents); + + self::assertInstanceOf(ProfileVisited::class, $resultEvents[0]); + self::assertEquals('1', $resultEvents[0]->aggregateId()); + self::assertEquals(0, $resultEvents[0]->playhead()); + + self::assertInstanceOf(ProfileVisited::class, $resultEvents[1]); + self::assertEquals('1', $resultEvents[1]->aggregateId()); + self::assertEquals(1, $resultEvents[1]->playhead()); + + self::assertInstanceOf(ProfileVisited::class, $resultEvents[2]); + self::assertEquals('2', $resultEvents[2]->aggregateId()); + self::assertEquals(0, $resultEvents[2]->playhead()); + } +} From 13767b8c54d6ccaa1555969018bfb28f1ccf20a7 Mon Sep 17 00:00:00 2001 From: David Badura Date: Sat, 30 Jan 2021 13:14:38 +0100 Subject: [PATCH 06/12] improve pipeline --- src/Console/ProjectionRebuildCommand.php | 8 +- src/Pipeline/EventBucket.php | 37 +++++ .../Middleware/DeleteEventMiddleware.php | 9 +- src/Pipeline/Middleware/Middleware.php | 6 +- .../RecalculatePlayheadMiddleware.php | 16 +- src/Pipeline/Pipeline.php | 33 ++-- src/Pipeline/Source/InMemorySource.php | 8 +- src/Pipeline/Source/Source.php | 4 +- ...eamableStoreSource.php => StoreSource.php} | 12 +- src/Pipeline/Target/InMemoryTarget.php | 16 +- .../Target/ProjectionRepositoryTarget.php | 6 +- src/Pipeline/Target/StoreTarget.php | 23 +++ src/Pipeline/Target/Target.php | 4 +- ...{StreamableStore.php => PipelineStore.php} | 8 +- src/Store/SingleTableStore.php | 36 ++++- tests/Unit/Pipeline/PipelineTest.php | 150 +++++++++++------- 16 files changed, 254 insertions(+), 122 deletions(-) create mode 100644 src/Pipeline/EventBucket.php rename src/Pipeline/Source/{StreamableStoreSource.php => StoreSource.php} (53%) create mode 100644 src/Pipeline/Target/StoreTarget.php rename src/Store/{StreamableStore.php => PipelineStore.php} (50%) diff --git a/src/Console/ProjectionRebuildCommand.php b/src/Console/ProjectionRebuildCommand.php index 45860452..d94116c1 100644 --- a/src/Console/ProjectionRebuildCommand.php +++ b/src/Console/ProjectionRebuildCommand.php @@ -5,11 +5,11 @@ namespace Patchlevel\EventSourcing\Console; use Patchlevel\EventSourcing\Pipeline\Pipeline; -use Patchlevel\EventSourcing\Pipeline\Source\StreamableStoreSource; +use Patchlevel\EventSourcing\Pipeline\Source\StoreSource; use Patchlevel\EventSourcing\Pipeline\Target\ProjectionRepositoryTarget; use Patchlevel\EventSourcing\Projection\ProjectionRepository; +use Patchlevel\EventSourcing\Store\PipelineStore; use Patchlevel\EventSourcing\Store\Store; -use Patchlevel\EventSourcing\Store\StreamableStore; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; @@ -42,7 +42,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int $store = $this->store; - if (!$store instanceof StreamableStore) { + if (!$store instanceof PipelineStore) { $console->error('store is not supported'); return 1; @@ -54,7 +54,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int } $pipeline = new Pipeline( - new StreamableStoreSource($store), + new StoreSource($store), new ProjectionRepositoryTarget($this->projectionRepository) ); diff --git a/src/Pipeline/EventBucket.php b/src/Pipeline/EventBucket.php new file mode 100644 index 00000000..f4892801 --- /dev/null +++ b/src/Pipeline/EventBucket.php @@ -0,0 +1,37 @@ + */ + private string $aggregateClass; + private AggregateChanged $event; + + /** + * @param class-string $aggregateClass + */ + public function __construct(string $aggregateClass, AggregateChanged $event) + { + $this->aggregateClass = $aggregateClass; + $this->event = $event; + } + + /** + * @return class-string + */ + public function aggregateClass(): string + { + return $this->aggregateClass; + } + + public function event(): AggregateChanged + { + return $this->event; + } +} diff --git a/src/Pipeline/Middleware/DeleteEventMiddleware.php b/src/Pipeline/Middleware/DeleteEventMiddleware.php index 139b6021..19f97d80 100644 --- a/src/Pipeline/Middleware/DeleteEventMiddleware.php +++ b/src/Pipeline/Middleware/DeleteEventMiddleware.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\Pipeline\Middleware; use Patchlevel\EventSourcing\Aggregate\AggregateChanged; +use Patchlevel\EventSourcing\Pipeline\EventBucket; class DeleteEventMiddleware implements Middleware { @@ -20,16 +21,16 @@ public function __construct(array $classes) } /** - * @return list + * @return list */ - public function __invoke(AggregateChanged $aggregateChanged): array + public function __invoke(EventBucket $bucket): array { foreach ($this->classes as $class) { - if ($aggregateChanged instanceof $class) { + if ($bucket->event() instanceof $class) { return []; } } - return [$aggregateChanged]; + return [$bucket]; } } diff --git a/src/Pipeline/Middleware/Middleware.php b/src/Pipeline/Middleware/Middleware.php index fcf8d861..20254925 100644 --- a/src/Pipeline/Middleware/Middleware.php +++ b/src/Pipeline/Middleware/Middleware.php @@ -4,12 +4,12 @@ namespace Patchlevel\EventSourcing\Pipeline\Middleware; -use Patchlevel\EventSourcing\Aggregate\AggregateChanged; +use Patchlevel\EventSourcing\Pipeline\EventBucket; interface Middleware { /** - * @return list + * @return list */ - public function __invoke(AggregateChanged $aggregateChanged): array; + public function __invoke(EventBucket $bucket): array; } diff --git a/src/Pipeline/Middleware/RecalculatePlayheadMiddleware.php b/src/Pipeline/Middleware/RecalculatePlayheadMiddleware.php index c3e2b570..8cb8ae69 100644 --- a/src/Pipeline/Middleware/RecalculatePlayheadMiddleware.php +++ b/src/Pipeline/Middleware/RecalculatePlayheadMiddleware.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\Pipeline\Middleware; use Patchlevel\EventSourcing\Aggregate\AggregateChanged; +use Patchlevel\EventSourcing\Pipeline\EventBucket; use ReflectionClass; use ReflectionProperty; @@ -26,19 +27,20 @@ public function __construct() } /** - * @return list + * @return list */ - public function __invoke(AggregateChanged $aggregateChanged): array + public function __invoke(EventBucket $bucket): array { - $playhead = $this->nextPlayhead($aggregateChanged->aggregateId()); + $event = $bucket->event(); + $playhead = $this->nextPlayhead($event->aggregateId()); - if ($aggregateChanged->playhead() === $playhead) { - return [$aggregateChanged]; + if ($event->playhead() === $playhead) { + return [$bucket]; } - $this->reflectionProperty->setValue($aggregateChanged, $playhead); + $this->reflectionProperty->setValue($event, $playhead); - return [$aggregateChanged]; + return [$bucket]; } private function nextPlayhead(string $aggregateId): int diff --git a/src/Pipeline/Pipeline.php b/src/Pipeline/Pipeline.php index e9537568..5a9775e8 100644 --- a/src/Pipeline/Pipeline.php +++ b/src/Pipeline/Pipeline.php @@ -4,7 +4,6 @@ namespace Patchlevel\EventSourcing\Pipeline; -use Patchlevel\EventSourcing\Aggregate\AggregateChanged; use Patchlevel\EventSourcing\Pipeline\Middleware\Middleware; use Patchlevel\EventSourcing\Pipeline\Source\Source; use Patchlevel\EventSourcing\Pipeline\Target\Target; @@ -27,21 +26,21 @@ public function __construct(Source $source, Target $target, array $middlewares = } /** - * @param callable(AggregateChanged $event):void|null $observer + * @param callable(EventBucket $event):void|null $observer */ public function run(?callable $observer = null): void { if ($observer === null) { - $observer = static function (AggregateChanged $event): void { + $observer = static function (EventBucket $event): void { }; } - foreach ($this->source->load() as $event) { - foreach ($this->processMiddlewares($event) as $resultEvent) { - $this->target->save($resultEvent); + foreach ($this->source->load() as $bucket) { + foreach ($this->processMiddlewares($bucket) as $resultBucket) { + $this->target->save($resultBucket); } - $observer($event); + $observer($bucket); } } @@ -51,30 +50,30 @@ public function count(): int } /** - * @return list + * @return list */ - private function processMiddlewares(AggregateChanged $event): array + private function processMiddlewares(EventBucket $bucket): array { - $events = [$event]; + $buckets = [$bucket]; foreach ($this->middlewares as $middleware) { - $events = $this->processMiddleware($middleware, $events); + $buckets = $this->processMiddleware($middleware, $buckets); } - return $events; + return $buckets; } /** - * @param list $events + * @param list $buckets * - * @return list + * @return list */ - private function processMiddleware(Middleware $middleware, array $events): array + private function processMiddleware(Middleware $middleware, array $buckets): array { $result = []; - foreach ($events as $event) { - $result += $middleware($event); + foreach ($buckets as $bucket) { + $result += $middleware($bucket); } return $result; diff --git a/src/Pipeline/Source/InMemorySource.php b/src/Pipeline/Source/InMemorySource.php index 1c988b44..eb4d8af6 100644 --- a/src/Pipeline/Source/InMemorySource.php +++ b/src/Pipeline/Source/InMemorySource.php @@ -5,17 +5,17 @@ namespace Patchlevel\EventSourcing\Pipeline\Source; use Generator; -use Patchlevel\EventSourcing\Aggregate\AggregateChanged; +use Patchlevel\EventSourcing\Pipeline\EventBucket; use function count; class InMemorySource implements Source { - /** @var list */ + /** @var list */ private array $events; /** - * @param list $events + * @param list $events */ public function __construct(array $events) { @@ -23,7 +23,7 @@ public function __construct(array $events) } /** - * @return Generator + * @return Generator */ public function load(): Generator { diff --git a/src/Pipeline/Source/Source.php b/src/Pipeline/Source/Source.php index 5ce0d7f0..8dffa1dd 100644 --- a/src/Pipeline/Source/Source.php +++ b/src/Pipeline/Source/Source.php @@ -5,12 +5,12 @@ namespace Patchlevel\EventSourcing\Pipeline\Source; use Generator; -use Patchlevel\EventSourcing\Aggregate\AggregateChanged; +use Patchlevel\EventSourcing\Pipeline\EventBucket; interface Source { /** - * @return Generator + * @return Generator */ public function load(): Generator; diff --git a/src/Pipeline/Source/StreamableStoreSource.php b/src/Pipeline/Source/StoreSource.php similarity index 53% rename from src/Pipeline/Source/StreamableStoreSource.php rename to src/Pipeline/Source/StoreSource.php index 38620d4e..1ab45ccb 100644 --- a/src/Pipeline/Source/StreamableStoreSource.php +++ b/src/Pipeline/Source/StoreSource.php @@ -5,17 +5,21 @@ namespace Patchlevel\EventSourcing\Pipeline\Source; use Generator; -use Patchlevel\EventSourcing\Store\StreamableStore; +use Patchlevel\EventSourcing\Pipeline\EventBucket; +use Patchlevel\EventSourcing\Store\PipelineStore; -class StreamableStoreSource implements Source +class StoreSource implements Source { - private StreamableStore $store; + private PipelineStore $store; - public function __construct(StreamableStore $store) + public function __construct(PipelineStore $store) { $this->store = $store; } + /** + * @return Generator + */ public function load(): Generator { return $this->store->all(); diff --git a/src/Pipeline/Target/InMemoryTarget.php b/src/Pipeline/Target/InMemoryTarget.php index cd11ec5a..f5345668 100644 --- a/src/Pipeline/Target/InMemoryTarget.php +++ b/src/Pipeline/Target/InMemoryTarget.php @@ -4,23 +4,23 @@ namespace Patchlevel\EventSourcing\Pipeline\Target; -use Patchlevel\EventSourcing\Aggregate\AggregateChanged; +use Patchlevel\EventSourcing\Pipeline\EventBucket; class InMemoryTarget implements Target { - /** @var list */ - private array $events = []; + /** @var list */ + private array $buckets = []; - public function save(AggregateChanged $event): void + public function save(EventBucket $bucket): void { - $this->events[] = $event; + $this->buckets[] = $bucket; } /** - * @return list + * @return list */ - public function events(): array + public function buckets(): array { - return $this->events; + return $this->buckets; } } diff --git a/src/Pipeline/Target/ProjectionRepositoryTarget.php b/src/Pipeline/Target/ProjectionRepositoryTarget.php index cc40d8bb..dc060e6d 100644 --- a/src/Pipeline/Target/ProjectionRepositoryTarget.php +++ b/src/Pipeline/Target/ProjectionRepositoryTarget.php @@ -4,7 +4,7 @@ namespace Patchlevel\EventSourcing\Pipeline\Target; -use Patchlevel\EventSourcing\Aggregate\AggregateChanged; +use Patchlevel\EventSourcing\Pipeline\EventBucket; use Patchlevel\EventSourcing\Projection\ProjectionRepository; class ProjectionRepositoryTarget implements Target @@ -16,8 +16,8 @@ public function __construct(ProjectionRepository $projectionRepository) $this->projectionRepository = $projectionRepository; } - public function save(AggregateChanged $event): void + public function save(EventBucket $bucket): void { - $this->projectionRepository->handle($event); + $this->projectionRepository->handle($bucket->event()); } } diff --git a/src/Pipeline/Target/StoreTarget.php b/src/Pipeline/Target/StoreTarget.php new file mode 100644 index 00000000..831f93a9 --- /dev/null +++ b/src/Pipeline/Target/StoreTarget.php @@ -0,0 +1,23 @@ +store = $store; + } + + public function save(EventBucket $bucket): void + { + $this->store->save($bucket); + } +} diff --git a/src/Pipeline/Target/Target.php b/src/Pipeline/Target/Target.php index 9be476e9..6861126d 100644 --- a/src/Pipeline/Target/Target.php +++ b/src/Pipeline/Target/Target.php @@ -4,9 +4,9 @@ namespace Patchlevel\EventSourcing\Pipeline\Target; -use Patchlevel\EventSourcing\Aggregate\AggregateChanged; +use Patchlevel\EventSourcing\Pipeline\EventBucket; interface Target { - public function save(AggregateChanged $event): void; + public function save(EventBucket $bucket): void; } diff --git a/src/Store/StreamableStore.php b/src/Store/PipelineStore.php similarity index 50% rename from src/Store/StreamableStore.php rename to src/Store/PipelineStore.php index 068e7f9c..00c10731 100644 --- a/src/Store/StreamableStore.php +++ b/src/Store/PipelineStore.php @@ -5,14 +5,16 @@ namespace Patchlevel\EventSourcing\Store; use Generator; -use Patchlevel\EventSourcing\Aggregate\AggregateChanged; +use Patchlevel\EventSourcing\Pipeline\EventBucket; -interface StreamableStore extends Store +interface PipelineStore extends Store { /** - * @return Generator + * @return Generator */ public function all(): Generator; public function count(): int; + + public function save(EventBucket $bucket): void; } diff --git a/src/Store/SingleTableStore.php b/src/Store/SingleTableStore.php index 6ee07199..b4d39a44 100644 --- a/src/Store/SingleTableStore.php +++ b/src/Store/SingleTableStore.php @@ -10,11 +10,13 @@ use Generator; use Patchlevel\EventSourcing\Aggregate\AggregateChanged; use Patchlevel\EventSourcing\Aggregate\AggregateRoot; +use Patchlevel\EventSourcing\Pipeline\EventBucket; +use function array_flip; use function array_key_exists; use function array_map; -final class SingleTableStore extends DoctrineStore implements StreamableStore +final class SingleTableStore extends DoctrineStore implements PipelineStore { /** @var array, string> */ private array $aggregates; @@ -125,7 +127,7 @@ static function (Connection $connection) use ($shortName, $id, $events, $tableNa } /** - * @return Generator + * @return Generator */ public function all(): Generator { @@ -138,10 +140,22 @@ public function all(): Generator $result = $this->connection->executeQuery($sql, []); $platform = $this->connection->getDatabasePlatform(); + /** @var array> $classMap */ + $classMap = array_flip($this->aggregates); + /** @var array $data */ foreach ($result->iterateAssociative() as $data) { - yield AggregateChanged::deserialize( - self::normalizeResult($platform, $data) + $name = (string)$data['aggregate']; + + if (!array_key_exists($name, $classMap)) { + throw new StoreException(); + } + + yield new EventBucket( + $classMap[$name], + AggregateChanged::deserialize( + self::normalizeResult($platform, $data) + ) ); } } @@ -156,6 +170,20 @@ public function count(): int return (int)$this->connection->fetchOne($sql); } + public function save(EventBucket $bucket): void + { + $data = $bucket->event()->serialize(); + $data['aggregate'] = $this->shortName($bucket->aggregateClass()); + + $this->connection->insert( + $this->tableName, + $data, + [ + 'recordedOn' => Types::DATETIMETZ_IMMUTABLE, + ] + ); + } + public function schema(): Schema { $schema = new Schema([], [], $this->connection->getSchemaManager()->createSchemaConfig()); diff --git a/tests/Unit/Pipeline/PipelineTest.php b/tests/Unit/Pipeline/PipelineTest.php index a9dbd9c1..4f39b1e1 100644 --- a/tests/Unit/Pipeline/PipelineTest.php +++ b/tests/Unit/Pipeline/PipelineTest.php @@ -2,43 +2,64 @@ declare(strict_types=1); -namespace Patchlevel\EventSourcing\Tests\Unit\Fixture; +namespace Patchlevel\EventSourcing\Tests\Unit\Pipeline; +use Patchlevel\EventSourcing\Pipeline\EventBucket; use Patchlevel\EventSourcing\Pipeline\Middleware\DeleteEventMiddleware; use Patchlevel\EventSourcing\Pipeline\Middleware\RecalculatePlayheadMiddleware; use Patchlevel\EventSourcing\Pipeline\Pipeline; use Patchlevel\EventSourcing\Pipeline\Source\InMemorySource; use Patchlevel\EventSourcing\Pipeline\Target\InMemoryTarget; +use Patchlevel\EventSourcing\Tests\Unit\Fixture\Email; +use Patchlevel\EventSourcing\Tests\Unit\Fixture\Profile; +use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileCreated; +use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileId; +use Patchlevel\EventSourcing\Tests\Unit\Fixture\ProfileVisited; use PHPUnit\Framework\TestCase; class PipelineTest extends TestCase { public function testPipeline(): void { - $events = [ - ProfileCreated::raise( - ProfileId::fromString('1'), - Email::fromString('d.a.badura@gmail.com') - )->recordNow(0), - ProfileVisited::raise( - ProfileId::fromString('1'), - ProfileId::fromString('2') - )->recordNow(1), - ProfileVisited::raise( - ProfileId::fromString('1'), - ProfileId::fromString('3') - )->recordNow(2), - ProfileCreated::raise( - ProfileId::fromString('2'), - Email::fromString('d.a.badura@gmail.com') - )->recordNow(0), - ProfileVisited::raise( - ProfileId::fromString('2'), - ProfileId::fromString('2') - )->recordNow(1), + $buckets = [ + new EventBucket( + Profile::class, + ProfileCreated::raise( + ProfileId::fromString('1'), + Email::fromString('d.a.badura@gmail.com') + )->recordNow(0) + ), + new EventBucket( + Profile::class, + ProfileVisited::raise( + ProfileId::fromString('1'), + ProfileId::fromString('2') + )->recordNow(1) + ), + new EventBucket( + Profile::class, + ProfileVisited::raise( + ProfileId::fromString('1'), + ProfileId::fromString('3') + )->recordNow(2) + ), + new EventBucket( + Profile::class, + ProfileCreated::raise( + ProfileId::fromString('2'), + Email::fromString('d.a.badura@gmail.com') + )->recordNow(0) + ), + new EventBucket( + Profile::class, + ProfileVisited::raise( + ProfileId::fromString('2'), + ProfileId::fromString('2') + )->recordNow(1) + ), ]; - $source = new InMemorySource($events); + $source = new InMemorySource($buckets); $target = new InMemoryTarget(); $pipeline = new Pipeline($source, $target); @@ -46,35 +67,50 @@ public function testPipeline(): void $pipeline->run(); - self::assertEquals($events, $target->events()); + self::assertEquals($buckets, $target->buckets()); } public function testPipelineWithMiddleware(): void { - $events = [ - ProfileCreated::raise( - ProfileId::fromString('1'), - Email::fromString('d.a.badura@gmail.com') - )->recordNow(0), - ProfileVisited::raise( - ProfileId::fromString('1'), - ProfileId::fromString('2') - )->recordNow(1), - ProfileVisited::raise( - ProfileId::fromString('1'), - ProfileId::fromString('3') - )->recordNow(2), - ProfileCreated::raise( - ProfileId::fromString('2'), - Email::fromString('d.a.badura@gmail.com') - )->recordNow(0), - ProfileVisited::raise( - ProfileId::fromString('2'), - ProfileId::fromString('2') - )->recordNow(1), + $buckets = [ + new EventBucket( + Profile::class, + ProfileCreated::raise( + ProfileId::fromString('1'), + Email::fromString('d.a.badura@gmail.com') + )->recordNow(0) + ), + new EventBucket( + Profile::class, + ProfileVisited::raise( + ProfileId::fromString('1'), + ProfileId::fromString('2') + )->recordNow(1) + ), + new EventBucket( + Profile::class, + ProfileVisited::raise( + ProfileId::fromString('1'), + ProfileId::fromString('3') + )->recordNow(2) + ), + new EventBucket( + Profile::class, + ProfileCreated::raise( + ProfileId::fromString('2'), + Email::fromString('d.a.badura@gmail.com') + )->recordNow(0) + ), + new EventBucket( + Profile::class, + ProfileVisited::raise( + ProfileId::fromString('2'), + ProfileId::fromString('2') + )->recordNow(1) + ), ]; - $source = new InMemorySource($events); + $source = new InMemorySource($buckets); $target = new InMemoryTarget(); $pipeline = new Pipeline( $source, @@ -89,20 +125,20 @@ public function testPipelineWithMiddleware(): void $pipeline->run(); - $resultEvents = $target->events(); + $resultBuckets = $target->buckets(); - self::assertCount(3, $resultEvents); + self::assertCount(3, $resultBuckets); - self::assertInstanceOf(ProfileVisited::class, $resultEvents[0]); - self::assertEquals('1', $resultEvents[0]->aggregateId()); - self::assertEquals(0, $resultEvents[0]->playhead()); + self::assertInstanceOf(ProfileVisited::class, $resultBuckets[0]->event()); + self::assertEquals('1', $resultBuckets[0]->event()->aggregateId()); + self::assertEquals(0, $resultBuckets[0]->event()->playhead()); - self::assertInstanceOf(ProfileVisited::class, $resultEvents[1]); - self::assertEquals('1', $resultEvents[1]->aggregateId()); - self::assertEquals(1, $resultEvents[1]->playhead()); + self::assertInstanceOf(ProfileVisited::class, $resultBuckets[1]->event()); + self::assertEquals('1', $resultBuckets[1]->event()->aggregateId()); + self::assertEquals(1, $resultBuckets[1]->event()->playhead()); - self::assertInstanceOf(ProfileVisited::class, $resultEvents[2]); - self::assertEquals('2', $resultEvents[2]->aggregateId()); - self::assertEquals(0, $resultEvents[2]->playhead()); + self::assertInstanceOf(ProfileVisited::class, $resultBuckets[2]->event()); + self::assertEquals('2', $resultBuckets[2]->event()->aggregateId()); + self::assertEquals(0, $resultBuckets[2]->event()->playhead()); } } From 2f7b3e63cffc1152d1c315d9962b72d9269ae6da Mon Sep 17 00:00:00 2001 From: David Badura Date: Sun, 31 Jan 2021 12:47:23 +0100 Subject: [PATCH 07/12] improvements & replace middleware --- .../Middleware/ReplaceEventMiddleware.php | 63 +++++++++++++++++++ src/Pipeline/Target/StoreTarget.php | 2 +- src/Store/PipelineStore.php | 2 +- src/Store/SingleTableStore.php | 2 +- 4 files changed, 66 insertions(+), 3 deletions(-) create mode 100644 src/Pipeline/Middleware/ReplaceEventMiddleware.php diff --git a/src/Pipeline/Middleware/ReplaceEventMiddleware.php b/src/Pipeline/Middleware/ReplaceEventMiddleware.php new file mode 100644 index 00000000..47cdcf66 --- /dev/null +++ b/src/Pipeline/Middleware/ReplaceEventMiddleware.php @@ -0,0 +1,63 @@ +callable = $callable; + + $reflectionClass = new ReflectionClass(AggregateChanged::class); + + $this->recoredOnProperty = $reflectionClass->getProperty('recordedOn'); + $this->recoredOnProperty->setAccessible(true); + + $this->playheadProperty = $reflectionClass->getProperty('playhead'); + $this->playheadProperty->setAccessible(true); + } + + /** + * @return list + */ + public function __invoke(EventBucket $bucket): array + { + $event = $bucket->event(); + $callable = $this->callable; + + $newEvent = $callable($event); + + $this->recoredOnProperty->setValue( + $newEvent, + $this->recoredOnProperty->getValue($event) + ); + + $this->playheadProperty->setValue( + $newEvent, + $this->playheadProperty->getValue($event) + ); + + return [ + new EventBucket( + $bucket->aggregateClass(), + $newEvent + ), + ]; + } +} diff --git a/src/Pipeline/Target/StoreTarget.php b/src/Pipeline/Target/StoreTarget.php index 831f93a9..c5fd6439 100644 --- a/src/Pipeline/Target/StoreTarget.php +++ b/src/Pipeline/Target/StoreTarget.php @@ -18,6 +18,6 @@ public function __construct(PipelineStore $store) public function save(EventBucket $bucket): void { - $this->store->save($bucket); + $this->store->saveEventBucket($bucket); } } diff --git a/src/Store/PipelineStore.php b/src/Store/PipelineStore.php index 00c10731..2f068beb 100644 --- a/src/Store/PipelineStore.php +++ b/src/Store/PipelineStore.php @@ -16,5 +16,5 @@ public function all(): Generator; public function count(): int; - public function save(EventBucket $bucket): void; + public function saveEventBucket(EventBucket $bucket): void; } diff --git a/src/Store/SingleTableStore.php b/src/Store/SingleTableStore.php index b4d39a44..0679f348 100644 --- a/src/Store/SingleTableStore.php +++ b/src/Store/SingleTableStore.php @@ -170,7 +170,7 @@ public function count(): int return (int)$this->connection->fetchOne($sql); } - public function save(EventBucket $bucket): void + public function saveEventBucket(EventBucket $bucket): void { $data = $bucket->event()->serialize(); $data['aggregate'] = $this->shortName($bucket->aggregateClass()); From a180e9100ad2e4189a10a6792e3e47e6b0bbca93 Mon Sep 17 00:00:00 2001 From: David Badura Date: Sun, 31 Jan 2021 12:57:37 +0100 Subject: [PATCH 08/12] add more stuff --- .../Middleware/FilterEventMiddleware.php | 36 +++++++++++++++++++ .../RecalculatePlayheadMiddleware.php | 22 ++++++++---- src/Pipeline/Target/ProjectionTarget.php | 24 +++++++++++++ 3 files changed, 75 insertions(+), 7 deletions(-) create mode 100644 src/Pipeline/Middleware/FilterEventMiddleware.php create mode 100644 src/Pipeline/Target/ProjectionTarget.php diff --git a/src/Pipeline/Middleware/FilterEventMiddleware.php b/src/Pipeline/Middleware/FilterEventMiddleware.php new file mode 100644 index 00000000..426bd851 --- /dev/null +++ b/src/Pipeline/Middleware/FilterEventMiddleware.php @@ -0,0 +1,36 @@ +> */ + private array $classes; + + /** + * @param list> $classes + */ + public function __construct(array $classes) + { + $this->classes = $classes; + } + + /** + * @return list + */ + public function __invoke(EventBucket $bucket): array + { + foreach ($this->classes as $class) { + if ($bucket->event() instanceof $class) { + return [$bucket]; + } + } + + return []; + } +} diff --git a/src/Pipeline/Middleware/RecalculatePlayheadMiddleware.php b/src/Pipeline/Middleware/RecalculatePlayheadMiddleware.php index 8cb8ae69..b913264e 100644 --- a/src/Pipeline/Middleware/RecalculatePlayheadMiddleware.php +++ b/src/Pipeline/Middleware/RecalculatePlayheadMiddleware.php @@ -5,6 +5,7 @@ namespace Patchlevel\EventSourcing\Pipeline\Middleware; use Patchlevel\EventSourcing\Aggregate\AggregateChanged; +use Patchlevel\EventSourcing\Aggregate\AggregateRoot; use Patchlevel\EventSourcing\Pipeline\EventBucket; use ReflectionClass; use ReflectionProperty; @@ -13,7 +14,7 @@ class RecalculatePlayheadMiddleware implements Middleware { - /** @var array */ + /** @var array, array> */ private array $index = []; private ReflectionProperty $reflectionProperty; @@ -32,7 +33,7 @@ public function __construct() public function __invoke(EventBucket $bucket): array { $event = $bucket->event(); - $playhead = $this->nextPlayhead($event->aggregateId()); + $playhead = $this->nextPlayhead($bucket->aggregateClass(), $event->aggregateId()); if ($event->playhead() === $playhead) { return [$bucket]; @@ -43,14 +44,21 @@ public function __invoke(EventBucket $bucket): array return [$bucket]; } - private function nextPlayhead(string $aggregateId): int + /** + * @param class-string $aggregateClass + */ + private function nextPlayhead(string $aggregateClass, string $aggregateId): int { - if (!array_key_exists($aggregateId, $this->index)) { - $this->index[$aggregateId] = -1; + if (!array_key_exists($aggregateClass, $this->index)) { + $this->index[$aggregateClass] = []; + } + + if (!array_key_exists($aggregateId, $this->index[$aggregateClass])) { + $this->index[$aggregateClass][$aggregateId] = -1; } - $this->index[$aggregateId]++; + $this->index[$aggregateClass][$aggregateId]++; - return $this->index[$aggregateId]; + return $this->index[$aggregateClass][$aggregateId]; } } diff --git a/src/Pipeline/Target/ProjectionTarget.php b/src/Pipeline/Target/ProjectionTarget.php new file mode 100644 index 00000000..b0df524c --- /dev/null +++ b/src/Pipeline/Target/ProjectionTarget.php @@ -0,0 +1,24 @@ +projectionRepository = new ProjectionRepository([$projection]); + } + + public function save(EventBucket $bucket): void + { + $this->projectionRepository->handle($bucket->event()); + } +} From dfb094cd0558c4fd3b40e340567810337dfb40a2 Mon Sep 17 00:00:00 2001 From: David Badura Date: Sun, 31 Jan 2021 13:31:25 +0100 Subject: [PATCH 09/12] add more tests --- .../Middleware/ReplaceEventMiddleware.php | 14 ++++- .../Middleware/DeleteEventMiddlewareTest.php | 51 +++++++++++++++++++ .../Middleware/FilterEventMiddlewareTest.php | 51 +++++++++++++++++++ .../RecalculatePlayheadMiddlewareTest.php | 38 ++++++++++++++ .../Middleware/ReplaceEventMiddlewareTest.php | 48 +++++++++++++++++ 5 files changed, 200 insertions(+), 2 deletions(-) create mode 100644 tests/Unit/Pipeline/Middleware/DeleteEventMiddlewareTest.php create mode 100644 tests/Unit/Pipeline/Middleware/FilterEventMiddlewareTest.php create mode 100644 tests/Unit/Pipeline/Middleware/RecalculatePlayheadMiddlewareTest.php create mode 100644 tests/Unit/Pipeline/Middleware/ReplaceEventMiddlewareTest.php diff --git a/src/Pipeline/Middleware/ReplaceEventMiddleware.php b/src/Pipeline/Middleware/ReplaceEventMiddleware.php index 47cdcf66..772d0b01 100644 --- a/src/Pipeline/Middleware/ReplaceEventMiddleware.php +++ b/src/Pipeline/Middleware/ReplaceEventMiddleware.php @@ -11,6 +11,9 @@ class ReplaceEventMiddleware implements Middleware { + /** @var class-string */ + private string $class; + /** @var callable(AggregateChanged $event):AggregateChanged */ private $callable; @@ -18,10 +21,12 @@ class ReplaceEventMiddleware implements Middleware private ReflectionProperty $playheadProperty; /** - * @param callable(AggregateChanged $event):AggregateChanged $callable + * @param class-string $class + * @param callable(AggregateChanged $event):AggregateChanged $callable */ - public function __construct(callable $callable) + public function __construct(string $class, callable $callable) { + $this->class = $class; $this->callable = $callable; $reflectionClass = new ReflectionClass(AggregateChanged::class); @@ -39,6 +44,11 @@ public function __construct(callable $callable) public function __invoke(EventBucket $bucket): array { $event = $bucket->event(); + + if (!$event instanceof $this->class) { + return [$bucket]; + } + $callable = $this->callable; $newEvent = $callable($event); diff --git a/tests/Unit/Pipeline/Middleware/DeleteEventMiddlewareTest.php b/tests/Unit/Pipeline/Middleware/DeleteEventMiddlewareTest.php new file mode 100644 index 00000000..a7399182 --- /dev/null +++ b/tests/Unit/Pipeline/Middleware/DeleteEventMiddlewareTest.php @@ -0,0 +1,51 @@ +recordNow(0) + ); + + $result = $middleware($bucket); + + self::assertEquals([], $result); + } + + public function testSkipEvent(): void + { + $middleware = new DeleteEventMiddleware([ProfileCreated::class]); + + $bucket = new EventBucket( + Profile::class, + ProfileVisited::raise( + ProfileId::fromString('1'), + ProfileId::fromString('2') + )->recordNow(0) + ); + + $result = $middleware($bucket); + + self::assertEquals([$bucket], $result); + } +} diff --git a/tests/Unit/Pipeline/Middleware/FilterEventMiddlewareTest.php b/tests/Unit/Pipeline/Middleware/FilterEventMiddlewareTest.php new file mode 100644 index 00000000..23a1f8c1 --- /dev/null +++ b/tests/Unit/Pipeline/Middleware/FilterEventMiddlewareTest.php @@ -0,0 +1,51 @@ +recordNow(0) + ); + + $result = $middleware($bucket); + + self::assertEquals([$bucket], $result); + } + + public function testSkipEvent(): void + { + $middleware = new FilterEventMiddleware([ProfileCreated::class]); + + $bucket = new EventBucket( + Profile::class, + ProfileVisited::raise( + ProfileId::fromString('1'), + ProfileId::fromString('2') + )->recordNow(0) + ); + + $result = $middleware($bucket); + + self::assertEquals([], $result); + } +} diff --git a/tests/Unit/Pipeline/Middleware/RecalculatePlayheadMiddlewareTest.php b/tests/Unit/Pipeline/Middleware/RecalculatePlayheadMiddlewareTest.php new file mode 100644 index 00000000..166b70a2 --- /dev/null +++ b/tests/Unit/Pipeline/Middleware/RecalculatePlayheadMiddlewareTest.php @@ -0,0 +1,38 @@ +recordNow(5) + ); + + $result = $middleware($bucket); + + self::assertCount(1, $result); + self::assertEquals(Profile::class, $result[0]->aggregateClass()); + + $event = $result[0]->event(); + + self::assertEquals(0, $event->playhead()); + } +} diff --git a/tests/Unit/Pipeline/Middleware/ReplaceEventMiddlewareTest.php b/tests/Unit/Pipeline/Middleware/ReplaceEventMiddlewareTest.php new file mode 100644 index 00000000..71f6eefd --- /dev/null +++ b/tests/Unit/Pipeline/Middleware/ReplaceEventMiddlewareTest.php @@ -0,0 +1,48 @@ +profileId(), + $event->profileId() + ); + } + ); + + $bucket = new EventBucket( + Profile::class, + ProfileCreated::raise( + ProfileId::fromString('1'), + Email::fromString('d.a.badura@gmail.com') + )->recordNow(5) + ); + + $result = $middleware($bucket); + + self::assertCount(1, $result); + self::assertEquals(Profile::class, $result[0]->aggregateClass()); + + $event = $result[0]->event(); + + self::assertInstanceOf(ProfileVisited::class, $event); + self::assertEquals(5, $event->playhead()); + } +} From 00e0c660a4b1428f8990f64d0d08663b1d927451 Mon Sep 17 00:00:00 2001 From: David Badura Date: Sun, 31 Jan 2021 13:59:48 +0100 Subject: [PATCH 10/12] add itegration test --- .../Pipeline/Aggregate/Profile.php | 73 +++++++++++ .../Pipeline/Events/NewVisited.php | 20 +++ .../Pipeline/Events/OldVisited.php | 20 +++ .../Pipeline/Events/PrivacyAdded.php | 20 +++ .../Pipeline/Events/ProfileCreated.php | 20 +++ .../Pipeline/PipelineChangeStoreTest.php | 122 ++++++++++++++++++ tests/Integration/Pipeline/data/.gitignore | 1 + 7 files changed, 276 insertions(+) create mode 100644 tests/Integration/Pipeline/Aggregate/Profile.php create mode 100644 tests/Integration/Pipeline/Events/NewVisited.php create mode 100644 tests/Integration/Pipeline/Events/OldVisited.php create mode 100644 tests/Integration/Pipeline/Events/PrivacyAdded.php create mode 100644 tests/Integration/Pipeline/Events/ProfileCreated.php create mode 100644 tests/Integration/Pipeline/PipelineChangeStoreTest.php create mode 100644 tests/Integration/Pipeline/data/.gitignore diff --git a/tests/Integration/Pipeline/Aggregate/Profile.php b/tests/Integration/Pipeline/Aggregate/Profile.php new file mode 100644 index 00000000..59d86c61 --- /dev/null +++ b/tests/Integration/Pipeline/Aggregate/Profile.php @@ -0,0 +1,73 @@ +id; + } + + public static function create(string $id): self + { + $self = new self(); + $self->apply(ProfileCreated::raise($id)); + + return $self; + } + + public function visit(): void + { + $this->apply(OldVisited::raise($this->id)); + } + + public function privacy(): void + { + $this->apply(PrivacyAdded::raise($this->id)); + } + + public function isPrivate(): bool + { + return $this->privacy; + } + + public function count(): int + { + return $this->visited; + } + + protected function applyProfileCreated(ProfileCreated $event): void + { + $this->id = $event->profileId(); + $this->privacy = false; + $this->visited = 0; + } + + protected function applyOldVisited(OldVisited $event): void + { + $this->visited++; + } + + protected function applyNewVisited(NewVisited $event): void + { + $this->visited--; + } + + protected function applyPrivacyAdded(PrivacyAdded $event): void + { + $this->privacy = true; + } +} diff --git a/tests/Integration/Pipeline/Events/NewVisited.php b/tests/Integration/Pipeline/Events/NewVisited.php new file mode 100644 index 00000000..57836d9f --- /dev/null +++ b/tests/Integration/Pipeline/Events/NewVisited.php @@ -0,0 +1,20 @@ + $id]); + } + + public function profileId(): string + { + return $this->aggregateId; + } +} diff --git a/tests/Integration/Pipeline/Events/OldVisited.php b/tests/Integration/Pipeline/Events/OldVisited.php new file mode 100644 index 00000000..ce002fa4 --- /dev/null +++ b/tests/Integration/Pipeline/Events/OldVisited.php @@ -0,0 +1,20 @@ + $id]); + } + + public function profileId(): string + { + return $this->aggregateId; + } +} diff --git a/tests/Integration/Pipeline/Events/PrivacyAdded.php b/tests/Integration/Pipeline/Events/PrivacyAdded.php new file mode 100644 index 00000000..b2cc9cdb --- /dev/null +++ b/tests/Integration/Pipeline/Events/PrivacyAdded.php @@ -0,0 +1,20 @@ + $id]); + } + + public function profileId(): string + { + return $this->aggregateId; + } +} diff --git a/tests/Integration/Pipeline/Events/ProfileCreated.php b/tests/Integration/Pipeline/Events/ProfileCreated.php new file mode 100644 index 00000000..8dc093d0 --- /dev/null +++ b/tests/Integration/Pipeline/Events/ProfileCreated.php @@ -0,0 +1,20 @@ + $id]); + } + + public function profileId(): string + { + return $this->aggregateId; + } +} diff --git a/tests/Integration/Pipeline/PipelineChangeStoreTest.php b/tests/Integration/Pipeline/PipelineChangeStoreTest.php new file mode 100644 index 00000000..220bc516 --- /dev/null +++ b/tests/Integration/Pipeline/PipelineChangeStoreTest.php @@ -0,0 +1,122 @@ +connectionOld = DriverManager::getConnection([ + 'driverClass' => Driver::class, + 'path' => self::DB_PATH_OLD, + ]); + + $this->connectionNew = DriverManager::getConnection([ + 'driverClass' => Driver::class, + 'path' => self::DB_PATH_NEW, + ]); + } + + public function tearDown(): void + { + $this->connectionOld->close(); + $this->connectionNew->close(); + + unlink(self::DB_PATH_OLD); + unlink(self::DB_PATH_NEW); + } + + public function testSuccessful(): void + { + $oldStore = new SingleTableStore( + $this->connectionOld, + [Profile::class => 'profile'], + 'eventstore' + ); + + (new DoctrineSchemaManager())->create($oldStore); + + $newStore = new SingleTableStore( + $this->connectionNew, + [Profile::class => 'profile'], + 'eventstore' + ); + + (new DoctrineSchemaManager())->create($newStore); + + $oldRepository = new Repository($oldStore, new DefaultEventBus(), Profile::class); + $newRepository = new Repository($newStore, new DefaultEventBus(), Profile::class); + + $profile = Profile::create('1'); + $profile->visit(); + $profile->privacy(); + $profile->visit(); + + $oldRepository->save($profile); + + self::assertEquals('1', $profile->aggregateRootId()); + self::assertEquals(3, $profile->playhead()); + self::assertEquals(true, $profile->isPrivate()); + self::assertEquals(2, $profile->count()); + + $pipeline = new Pipeline( + new StoreSource($oldStore), + new StoreTarget($newStore), + [ + new DeleteEventMiddleware([PrivacyAdded::class]), + new ReplaceEventMiddleware(OldVisited::class, static function (OldVisited $oldVisited) { + return NewVisited::raise($oldVisited->profileId()); + }), + new RecalculatePlayheadMiddleware(), + ] + ); + + self::assertEquals(4, $pipeline->count()); + $pipeline->run(); + + $newProfile = $newRepository->load('1'); + + self::assertEquals('1', $newProfile->aggregateRootId()); + self::assertEquals(2, $newProfile->playhead()); + self::assertEquals(false, $newProfile->isPrivate()); + self::assertEquals(-2, $newProfile->count()); + } +} diff --git a/tests/Integration/Pipeline/data/.gitignore b/tests/Integration/Pipeline/data/.gitignore new file mode 100644 index 00000000..49ef2557 --- /dev/null +++ b/tests/Integration/Pipeline/data/.gitignore @@ -0,0 +1 @@ +db.sqlite3 From 57546d02957cf4fa3115740f9df1e765fd3e7d06 Mon Sep 17 00:00:00 2001 From: David Badura Date: Sun, 31 Jan 2021 15:50:23 +0100 Subject: [PATCH 11/12] fix tests --- composer.lock | 148 ++++++++++++++++----------------- src/Store/SingleTableStore.php | 4 +- 2 files changed, 76 insertions(+), 76 deletions(-) diff --git a/composer.lock b/composer.lock index 4b5da2c3..a64b4c8f 100644 --- a/composer.lock +++ b/composer.lock @@ -479,16 +479,16 @@ }, { "name": "symfony/console", - "version": "v5.2.1", + "version": "v5.2.2", "source": { "type": "git", "url": "https://github.com/symfony/console.git", - "reference": "47c02526c532fb381374dab26df05e7313978976" + "reference": "d62ec79478b55036f65e2602e282822b8eaaff0a" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/console/zipball/47c02526c532fb381374dab26df05e7313978976", - "reference": "47c02526c532fb381374dab26df05e7313978976", + "url": "https://api.github.com/repos/symfony/console/zipball/d62ec79478b55036f65e2602e282822b8eaaff0a", + "reference": "d62ec79478b55036f65e2602e282822b8eaaff0a", "shasum": "" }, "require": { @@ -547,7 +547,7 @@ "homepage": "https://symfony.com/contributors" } ], - "description": "Symfony Console Component", + "description": "Eases the creation of beautiful and testable command line interfaces", "homepage": "https://symfony.com", "keywords": [ "cli", @@ -556,7 +556,7 @@ "terminal" ], "support": { - "source": "https://github.com/symfony/console/tree/v5.2.1" + "source": "https://github.com/symfony/console/tree/v5.2.2" }, "funding": [ { @@ -572,7 +572,7 @@ "type": "tidelift" } ], - "time": "2020-12-18T08:03:05+00:00" + "time": "2021-01-27T10:15:41+00:00" }, { "name": "symfony/polyfill-ctype", @@ -1141,16 +1141,16 @@ }, { "name": "symfony/string", - "version": "v5.2.1", + "version": "v5.2.2", "source": { "type": "git", "url": "https://github.com/symfony/string.git", - "reference": "5bd67751d2e3f7d6f770c9154b8fbcb2aa05f7ed" + "reference": "c95468897f408dd0aca2ff582074423dd0455122" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/string/zipball/5bd67751d2e3f7d6f770c9154b8fbcb2aa05f7ed", - "reference": "5bd67751d2e3f7d6f770c9154b8fbcb2aa05f7ed", + "url": "https://api.github.com/repos/symfony/string/zipball/c95468897f408dd0aca2ff582074423dd0455122", + "reference": "c95468897f408dd0aca2ff582074423dd0455122", "shasum": "" }, "require": { @@ -1193,7 +1193,7 @@ "homepage": "https://symfony.com/contributors" } ], - "description": "Symfony String component", + "description": "Provides an object-oriented API to strings and deals with bytes, UTF-8 code points and grapheme clusters in a unified way", "homepage": "https://symfony.com", "keywords": [ "grapheme", @@ -1204,7 +1204,7 @@ "utf8" ], "support": { - "source": "https://github.com/symfony/string/tree/v5.2.1" + "source": "https://github.com/symfony/string/tree/v5.2.2" }, "funding": [ { @@ -1220,7 +1220,7 @@ "type": "tidelift" } ], - "time": "2020-12-05T07:33:16+00:00" + "time": "2021-01-25T15:14:59+00:00" } ], "packages-dev": [ @@ -2925,16 +2925,16 @@ }, { "name": "phpstan/phpstan", - "version": "0.12.69", + "version": "0.12.70", "source": { "type": "git", "url": "https://github.com/phpstan/phpstan.git", - "reference": "8f436ea35241da33487fd0d38b4bc3e6dfe30ea8" + "reference": "07f0ef37f5f876e8cee44cc8ea0ec3fe80d499ee" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpstan/phpstan/zipball/8f436ea35241da33487fd0d38b4bc3e6dfe30ea8", - "reference": "8f436ea35241da33487fd0d38b4bc3e6dfe30ea8", + "url": "https://api.github.com/repos/phpstan/phpstan/zipball/07f0ef37f5f876e8cee44cc8ea0ec3fe80d499ee", + "reference": "07f0ef37f5f876e8cee44cc8ea0ec3fe80d499ee", "shasum": "" }, "require": { @@ -2965,7 +2965,7 @@ "description": "PHPStan - PHP Static Analysis Tool", "support": { "issues": "https://github.com/phpstan/phpstan/issues", - "source": "https://github.com/phpstan/phpstan/tree/0.12.69" + "source": "https://github.com/phpstan/phpstan/tree/0.12.70" }, "funding": [ { @@ -2981,7 +2981,7 @@ "type": "tidelift" } ], - "time": "2021-01-24T14:55:37+00:00" + "time": "2021-01-27T17:06:47+00:00" }, { "name": "phpunit/php-code-coverage", @@ -4665,16 +4665,16 @@ }, { "name": "symfony/amqp-messenger", - "version": "v5.2.1", + "version": "v5.2.2", "source": { "type": "git", "url": "https://github.com/symfony/amqp-messenger.git", - "reference": "a7f681b022f9cfb88febc12face05b12a47232c1" + "reference": "cf309a35ed08caa77886ee6a352b8491c7681424" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/amqp-messenger/zipball/a7f681b022f9cfb88febc12face05b12a47232c1", - "reference": "a7f681b022f9cfb88febc12face05b12a47232c1", + "url": "https://api.github.com/repos/symfony/amqp-messenger/zipball/cf309a35ed08caa77886ee6a352b8491c7681424", + "reference": "cf309a35ed08caa77886ee6a352b8491c7681424", "shasum": "" }, "require": { @@ -4714,7 +4714,7 @@ "description": "Symfony AMQP extension Messenger Bridge", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/amqp-messenger/tree/v5.2.1" + "source": "https://github.com/symfony/amqp-messenger/tree/v5.2.2" }, "funding": [ { @@ -4730,7 +4730,7 @@ "type": "tidelift" } ], - "time": "2020-12-15T11:52:46+00:00" + "time": "2021-01-27T11:19:04+00:00" }, { "name": "symfony/deprecation-contracts", @@ -4801,16 +4801,16 @@ }, { "name": "symfony/doctrine-messenger", - "version": "v5.2.1", + "version": "v5.2.2", "source": { "type": "git", "url": "https://github.com/symfony/doctrine-messenger.git", - "reference": "8ad222fc5abce88f2b7c9cb91aa53b9f445a5889" + "reference": "e00cd690e49f08b41472e18b0d420c9874d2c707" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/doctrine-messenger/zipball/8ad222fc5abce88f2b7c9cb91aa53b9f445a5889", - "reference": "8ad222fc5abce88f2b7c9cb91aa53b9f445a5889", + "url": "https://api.github.com/repos/symfony/doctrine-messenger/zipball/e00cd690e49f08b41472e18b0d420c9874d2c707", + "reference": "e00cd690e49f08b41472e18b0d420c9874d2c707", "shasum": "" }, "require": { @@ -4854,7 +4854,7 @@ "description": "Symfony Doctrine Messenger Bridge", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/doctrine-messenger/tree/v5.2.1" + "source": "https://github.com/symfony/doctrine-messenger/tree/v5.2.2" }, "funding": [ { @@ -4870,20 +4870,20 @@ "type": "tidelift" } ], - "time": "2020-12-10T19:16:15+00:00" + "time": "2021-01-27T11:19:04+00:00" }, { "name": "symfony/filesystem", - "version": "v5.2.1", + "version": "v5.2.2", "source": { "type": "git", "url": "https://github.com/symfony/filesystem.git", - "reference": "fa8f8cab6b65e2d99a118e082935344c5ba8c60d" + "reference": "262d033b57c73e8b59cd6e68a45c528318b15038" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/filesystem/zipball/fa8f8cab6b65e2d99a118e082935344c5ba8c60d", - "reference": "fa8f8cab6b65e2d99a118e082935344c5ba8c60d", + "url": "https://api.github.com/repos/symfony/filesystem/zipball/262d033b57c73e8b59cd6e68a45c528318b15038", + "reference": "262d033b57c73e8b59cd6e68a45c528318b15038", "shasum": "" }, "require": { @@ -4913,10 +4913,10 @@ "homepage": "https://symfony.com/contributors" } ], - "description": "Symfony Filesystem Component", + "description": "Provides basic utilities for the filesystem", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/filesystem/tree/v5.2.1" + "source": "https://github.com/symfony/filesystem/tree/v5.2.2" }, "funding": [ { @@ -4932,20 +4932,20 @@ "type": "tidelift" } ], - "time": "2020-11-30T17:05:38+00:00" + "time": "2021-01-27T10:01:46+00:00" }, { "name": "symfony/finder", - "version": "v5.2.1", + "version": "v5.2.2", "source": { "type": "git", "url": "https://github.com/symfony/finder.git", - "reference": "0b9231a5922fd7287ba5b411893c0ecd2733e5ba" + "reference": "196f45723b5e618bf0e23b97e96d11652696ea9e" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/finder/zipball/0b9231a5922fd7287ba5b411893c0ecd2733e5ba", - "reference": "0b9231a5922fd7287ba5b411893c0ecd2733e5ba", + "url": "https://api.github.com/repos/symfony/finder/zipball/196f45723b5e618bf0e23b97e96d11652696ea9e", + "reference": "196f45723b5e618bf0e23b97e96d11652696ea9e", "shasum": "" }, "require": { @@ -4974,10 +4974,10 @@ "homepage": "https://symfony.com/contributors" } ], - "description": "Symfony Finder Component", + "description": "Finds files and directories via an intuitive fluent interface", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/finder/tree/v5.2.1" + "source": "https://github.com/symfony/finder/tree/v5.2.2" }, "funding": [ { @@ -4993,20 +4993,20 @@ "type": "tidelift" } ], - "time": "2020-12-08T17:02:38+00:00" + "time": "2021-01-27T10:01:46+00:00" }, { "name": "symfony/messenger", - "version": "v5.2.1", + "version": "v5.2.2", "source": { "type": "git", "url": "https://github.com/symfony/messenger.git", - "reference": "3fc7766f5fa6c21096fa962a01684868b68f1f3c" + "reference": "ce658034cd7884428a8c6f50c2d4f8cf66348c40" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/messenger/zipball/3fc7766f5fa6c21096fa962a01684868b68f1f3c", - "reference": "3fc7766f5fa6c21096fa962a01684868b68f1f3c", + "url": "https://api.github.com/repos/symfony/messenger/zipball/ce658034cd7884428a8c6f50c2d4f8cf66348c40", + "reference": "ce658034cd7884428a8c6f50c2d4f8cf66348c40", "shasum": "" }, "require": { @@ -5062,10 +5062,10 @@ "homepage": "https://symfony.com/contributors" } ], - "description": "Symfony Messenger Component", + "description": "Helps applications send and receive messages to/from other applications or via message queues", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/messenger/tree/v5.2.1" + "source": "https://github.com/symfony/messenger/tree/v5.2.2" }, "funding": [ { @@ -5081,20 +5081,20 @@ "type": "tidelift" } ], - "time": "2020-12-18T07:27:35+00:00" + "time": "2021-01-27T11:24:50+00:00" }, { "name": "symfony/process", - "version": "v5.2.1", + "version": "v5.2.2", "source": { "type": "git", "url": "https://github.com/symfony/process.git", - "reference": "bd8815b8b6705298beaa384f04fabd459c10bedd" + "reference": "313a38f09c77fbcdc1d223e57d368cea76a2fd2f" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/process/zipball/bd8815b8b6705298beaa384f04fabd459c10bedd", - "reference": "bd8815b8b6705298beaa384f04fabd459c10bedd", + "url": "https://api.github.com/repos/symfony/process/zipball/313a38f09c77fbcdc1d223e57d368cea76a2fd2f", + "reference": "313a38f09c77fbcdc1d223e57d368cea76a2fd2f", "shasum": "" }, "require": { @@ -5124,10 +5124,10 @@ "homepage": "https://symfony.com/contributors" } ], - "description": "Symfony Process Component", + "description": "Executes commands in sub-processes", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/process/tree/v5.2.1" + "source": "https://github.com/symfony/process/tree/v5.2.2" }, "funding": [ { @@ -5143,20 +5143,20 @@ "type": "tidelift" } ], - "time": "2020-12-08T17:03:37+00:00" + "time": "2021-01-27T10:15:41+00:00" }, { "name": "symfony/redis-messenger", - "version": "v5.2.1", + "version": "v5.2.2", "source": { "type": "git", "url": "https://github.com/symfony/redis-messenger.git", - "reference": "c9ab1c7467657661554e7010a59f47f723f69c93" + "reference": "7e68914bf35cda948ee4d9081b8eaed9fd783fe5" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/redis-messenger/zipball/c9ab1c7467657661554e7010a59f47f723f69c93", - "reference": "c9ab1c7467657661554e7010a59f47f723f69c93", + "url": "https://api.github.com/repos/symfony/redis-messenger/zipball/7e68914bf35cda948ee4d9081b8eaed9fd783fe5", + "reference": "7e68914bf35cda948ee4d9081b8eaed9fd783fe5", "shasum": "" }, "require": { @@ -5194,7 +5194,7 @@ "description": "Symfony Redis extension Messenger Bridge", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/redis-messenger/tree/v5.2.1" + "source": "https://github.com/symfony/redis-messenger/tree/v5.2.2" }, "funding": [ { @@ -5210,20 +5210,20 @@ "type": "tidelift" } ], - "time": "2020-12-15T11:52:46+00:00" + "time": "2021-01-27T11:24:50+00:00" }, { "name": "symfony/var-dumper", - "version": "v5.2.1", + "version": "v5.2.2", "source": { "type": "git", "url": "https://github.com/symfony/var-dumper.git", - "reference": "13e7e882eaa55863faa7c4ad7c60f12f1a8b5089" + "reference": "72ca213014a92223a5d18651ce79ef441c12b694" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/var-dumper/zipball/13e7e882eaa55863faa7c4ad7c60f12f1a8b5089", - "reference": "13e7e882eaa55863faa7c4ad7c60f12f1a8b5089", + "url": "https://api.github.com/repos/symfony/var-dumper/zipball/72ca213014a92223a5d18651ce79ef441c12b694", + "reference": "72ca213014a92223a5d18651ce79ef441c12b694", "shasum": "" }, "require": { @@ -5239,7 +5239,7 @@ "ext-iconv": "*", "symfony/console": "^4.4|^5.0", "symfony/process": "^4.4|^5.0", - "twig/twig": "^2.4|^3.0" + "twig/twig": "^2.13|^3.0.4" }, "suggest": { "ext-iconv": "To convert non-UTF-8 strings to UTF-8 (or symfony/polyfill-iconv in case ext-iconv cannot be used).", @@ -5275,14 +5275,14 @@ "homepage": "https://symfony.com/contributors" } ], - "description": "Symfony mechanism for exploring and dumping PHP variables", + "description": "Provides mechanisms for walking through any arbitrary PHP variable", "homepage": "https://symfony.com", "keywords": [ "debug", "dump" ], "support": { - "source": "https://github.com/symfony/var-dumper/tree/v5.2.1" + "source": "https://github.com/symfony/var-dumper/tree/v5.2.2" }, "funding": [ { @@ -5298,7 +5298,7 @@ "type": "tidelift" } ], - "time": "2020-12-16T17:02:19+00:00" + "time": "2021-01-27T10:15:41+00:00" }, { "name": "thecodingmachine/safe", diff --git a/src/Store/SingleTableStore.php b/src/Store/SingleTableStore.php index 0679f348..bc3fe677 100644 --- a/src/Store/SingleTableStore.php +++ b/src/Store/SingleTableStore.php @@ -137,14 +137,14 @@ public function all(): Generator ->orderBy('id') ->getSQL(); - $result = $this->connection->executeQuery($sql, []); + $result = $this->connection->iterateAssociative($sql); $platform = $this->connection->getDatabasePlatform(); /** @var array> $classMap */ $classMap = array_flip($this->aggregates); /** @var array $data */ - foreach ($result->iterateAssociative() as $data) { + foreach ($result as $data) { $name = (string)$data['aggregate']; if (!array_key_exists($name, $classMap)) { From d4309929c47c689695794ba12fecc281a2642a69 Mon Sep 17 00:00:00 2001 From: David Badura Date: Sun, 31 Jan 2021 15:54:55 +0100 Subject: [PATCH 12/12] fix gitignore --- tests/Integration/Pipeline/data/.gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/Integration/Pipeline/data/.gitignore b/tests/Integration/Pipeline/data/.gitignore index 49ef2557..f9fda0b8 100644 --- a/tests/Integration/Pipeline/data/.gitignore +++ b/tests/Integration/Pipeline/data/.gitignore @@ -1 +1,2 @@ -db.sqlite3 +old.sqlite3 +new.sqlite3