Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipeline & commands #35

Merged
merged 12 commits into from
Jan 31, 2021
Merged
34 changes: 34 additions & 0 deletions src/Console/ProjectionCreateCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Console;

use Patchlevel\EventSourcing\Projection\ProjectionRepository;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class ProjectionCreateCommand extends Command
{
private ProjectionRepository $projectionRepository;

public function __construct(ProjectionRepository $projectionRepository)
{
parent::__construct();

$this->projectionRepository = $projectionRepository;
}

protected function configure(): void
{
$this->setName('event-sourcing:projection:create');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->projectionRepository->create();

return 0;
}
}
34 changes: 34 additions & 0 deletions src/Console/ProjectionDropCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Console;

use Patchlevel\EventSourcing\Projection\ProjectionRepository;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class ProjectionDropCommand extends Command
{
private ProjectionRepository $projectionRepository;

public function __construct(ProjectionRepository $projectionRepository)
{
parent::__construct();

$this->projectionRepository = $projectionRepository;
}

protected function configure(): void
{
$this->setName('event-sourcing:projection:drop');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->projectionRepository->create();

return 0;
}
}
71 changes: 71 additions & 0 deletions src/Console/ProjectionRebuildCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Console;

use Patchlevel\EventSourcing\Pipeline\Pipeline;
use Patchlevel\EventSourcing\Pipeline\Source\StoreSource;
use Patchlevel\EventSourcing\Pipeline\Target\ProjectionRepositoryTarget;
use Patchlevel\EventSourcing\Projection\ProjectionRepository;
use Patchlevel\EventSourcing\Store\PipelineStore;
use Patchlevel\EventSourcing\Store\Store;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;

class ProjectionRebuildCommand extends Command
{
private Store $store;
private ProjectionRepository $projectionRepository;

public function __construct(Store $store, ProjectionRepository $projectionRepository)
DavidBadura marked this conversation as resolved.
Show resolved Hide resolved
{
parent::__construct();

$this->store = $store;
$this->projectionRepository = $projectionRepository;
}

protected function configure(): void
{
$this
->setName('event-sourcing:projection:rebuild')
->addOption('recreate', 'r', InputOption::VALUE_OPTIONAL, 'drop and create projections', false);
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$console = new SymfonyStyle($input, $output);

$store = $this->store;

if (!$store instanceof PipelineStore) {
$console->error('store is not supported');

return 1;
}

if ($input->getOption('recreate')) {
$this->projectionRepository->drop();
$this->projectionRepository->create();
}

$pipeline = new Pipeline(
new StoreSource($store),
new ProjectionRepositoryTarget($this->projectionRepository)
);

$console->progressStart($pipeline->count());

$pipeline->run(static function () use ($console): void {
$console->progressAdvance();
});

$console->progressFinish();

return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class CreateSchemaCommand extends Command
class SchemaCreateCommand extends Command
{
private Store $store;
private SchemaManager $schemaManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class DropSchemaCommand extends Command
class SchemaDropCommand extends Command
{
private Store $store;
private SchemaManager $schemaManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class UpdateSchemaCommand extends Command
class SchemaUpdateCommand extends Command
{
private Store $store;
private SchemaManager $schemaManager;
Expand Down
37 changes: 37 additions & 0 deletions src/Pipeline/EventBucket.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline;

use Patchlevel\EventSourcing\Aggregate\AggregateChanged;
use Patchlevel\EventSourcing\Aggregate\AggregateRoot;

class EventBucket
{
/** @var class-string<AggregateRoot> */
private string $aggregateClass;
private AggregateChanged $event;

/**
* @param class-string<AggregateRoot> $aggregateClass
*/
public function __construct(string $aggregateClass, AggregateChanged $event)
{
$this->aggregateClass = $aggregateClass;
$this->event = $event;
}

/**
* @return class-string<AggregateRoot>
*/
public function aggregateClass(): string
{
return $this->aggregateClass;
}

public function event(): AggregateChanged
{
return $this->event;
}
}
36 changes: 36 additions & 0 deletions src/Pipeline/Middleware/DeleteEventMiddleware.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;

use Patchlevel\EventSourcing\Aggregate\AggregateChanged;
use Patchlevel\EventSourcing\Pipeline\EventBucket;

class DeleteEventMiddleware implements Middleware
{
/** @var list<class-string<AggregateChanged>> */
private array $classes;

/**
* @param list<class-string<AggregateChanged>> $classes
*/
public function __construct(array $classes)
{
$this->classes = $classes;
}

/**
* @return list<EventBucket>
*/
public function __invoke(EventBucket $bucket): array
{
foreach ($this->classes as $class) {
if ($bucket->event() instanceof $class) {
return [];
}
}

return [$bucket];
}
}
15 changes: 15 additions & 0 deletions src/Pipeline/Middleware/Middleware.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;

use Patchlevel\EventSourcing\Pipeline\EventBucket;

interface Middleware
{
/**
* @return list<EventBucket>
*/
public function __invoke(EventBucket $bucket): array;
}
56 changes: 56 additions & 0 deletions src/Pipeline/Middleware/RecalculatePlayheadMiddleware.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;

use Patchlevel\EventSourcing\Aggregate\AggregateChanged;
use Patchlevel\EventSourcing\Pipeline\EventBucket;
use ReflectionClass;
use ReflectionProperty;

use function array_key_exists;

class RecalculatePlayheadMiddleware implements Middleware
{
/** @var array<string, int> */
private array $index = [];

private ReflectionProperty $reflectionProperty;

public function __construct()
{
$reflectionClass = new ReflectionClass(AggregateChanged::class);

$this->reflectionProperty = $reflectionClass->getProperty('playhead');
$this->reflectionProperty->setAccessible(true);
}

/**
* @return list<EventBucket>
*/
public function __invoke(EventBucket $bucket): array
{
$event = $bucket->event();
$playhead = $this->nextPlayhead($event->aggregateId());

if ($event->playhead() === $playhead) {
return [$bucket];
}

$this->reflectionProperty->setValue($event, $playhead);

return [$bucket];
}

private function nextPlayhead(string $aggregateId): int
{
if (!array_key_exists($aggregateId, $this->index)) {
$this->index[$aggregateId] = -1;
}

$this->index[$aggregateId]++;

return $this->index[$aggregateId];
}
}
81 changes: 81 additions & 0 deletions src/Pipeline/Pipeline.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline;

use Patchlevel\EventSourcing\Pipeline\Middleware\Middleware;
use Patchlevel\EventSourcing\Pipeline\Source\Source;
use Patchlevel\EventSourcing\Pipeline\Target\Target;

class Pipeline
{
private Source $source;
private Target $target;
/** @var list<Middleware> */
private array $middlewares;

/**
* @param list<Middleware> $middlewares
*/
public function __construct(Source $source, Target $target, array $middlewares = [])
{
$this->source = $source;
$this->target = $target;
$this->middlewares = $middlewares;
}

/**
* @param callable(EventBucket $event):void|null $observer
*/
public function run(?callable $observer = null): void
{
if ($observer === null) {
$observer = static function (EventBucket $event): void {
};
}

foreach ($this->source->load() as $bucket) {
foreach ($this->processMiddlewares($bucket) as $resultBucket) {
$this->target->save($resultBucket);
}

$observer($bucket);
}
}

public function count(): int
{
return $this->source->count();
}

/**
* @return list<EventBucket>
*/
private function processMiddlewares(EventBucket $bucket): array
{
$buckets = [$bucket];

foreach ($this->middlewares as $middleware) {
$buckets = $this->processMiddleware($middleware, $buckets);
}

return $buckets;
}

/**
* @param list<EventBucket> $buckets
*
* @return list<EventBucket>
*/
private function processMiddleware(Middleware $middleware, array $buckets): array
{
$result = [];

foreach ($buckets as $bucket) {
$result += $middleware($bucket);
}

return $result;
}
}
Loading