Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipeline & commands #35

Merged
merged 12 commits into from
Jan 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 74 additions & 74 deletions composer.lock

Large diffs are not rendered by default.

34 changes: 34 additions & 0 deletions src/Console/ProjectionCreateCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Console;

use Patchlevel\EventSourcing\Projection\ProjectionRepository;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class ProjectionCreateCommand extends Command
{
private ProjectionRepository $projectionRepository;

public function __construct(ProjectionRepository $projectionRepository)
{
parent::__construct();

$this->projectionRepository = $projectionRepository;
}

protected function configure(): void
{
$this->setName('event-sourcing:projection:create');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->projectionRepository->create();

return 0;
}
}
34 changes: 34 additions & 0 deletions src/Console/ProjectionDropCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Console;

use Patchlevel\EventSourcing\Projection\ProjectionRepository;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class ProjectionDropCommand extends Command
{
private ProjectionRepository $projectionRepository;

public function __construct(ProjectionRepository $projectionRepository)
{
parent::__construct();

$this->projectionRepository = $projectionRepository;
}

protected function configure(): void
{
$this->setName('event-sourcing:projection:drop');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->projectionRepository->create();

return 0;
}
}
71 changes: 71 additions & 0 deletions src/Console/ProjectionRebuildCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Console;

use Patchlevel\EventSourcing\Pipeline\Pipeline;
use Patchlevel\EventSourcing\Pipeline\Source\StoreSource;
use Patchlevel\EventSourcing\Pipeline\Target\ProjectionRepositoryTarget;
use Patchlevel\EventSourcing\Projection\ProjectionRepository;
use Patchlevel\EventSourcing\Store\PipelineStore;
use Patchlevel\EventSourcing\Store\Store;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;

class ProjectionRebuildCommand extends Command
{
private Store $store;
private ProjectionRepository $projectionRepository;

public function __construct(Store $store, ProjectionRepository $projectionRepository)
DavidBadura marked this conversation as resolved.
Show resolved Hide resolved
{
parent::__construct();

$this->store = $store;
$this->projectionRepository = $projectionRepository;
}

protected function configure(): void
{
$this
->setName('event-sourcing:projection:rebuild')
->addOption('recreate', 'r', InputOption::VALUE_OPTIONAL, 'drop and create projections', false);
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$console = new SymfonyStyle($input, $output);

$store = $this->store;

if (!$store instanceof PipelineStore) {
$console->error('store is not supported');

return 1;
}

if ($input->getOption('recreate')) {
$this->projectionRepository->drop();
$this->projectionRepository->create();
}

$pipeline = new Pipeline(
new StoreSource($store),
new ProjectionRepositoryTarget($this->projectionRepository)
);

$console->progressStart($pipeline->count());

$pipeline->run(static function () use ($console): void {
$console->progressAdvance();
});

$console->progressFinish();

return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class CreateSchemaCommand extends Command
class SchemaCreateCommand extends Command
{
private Store $store;
private SchemaManager $schemaManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class DropSchemaCommand extends Command
class SchemaDropCommand extends Command
{
private Store $store;
private SchemaManager $schemaManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class UpdateSchemaCommand extends Command
class SchemaUpdateCommand extends Command
{
private Store $store;
private SchemaManager $schemaManager;
Expand Down
37 changes: 37 additions & 0 deletions src/Pipeline/EventBucket.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline;

use Patchlevel\EventSourcing\Aggregate\AggregateChanged;
use Patchlevel\EventSourcing\Aggregate\AggregateRoot;

class EventBucket
{
/** @var class-string<AggregateRoot> */
private string $aggregateClass;
private AggregateChanged $event;

/**
* @param class-string<AggregateRoot> $aggregateClass
*/
public function __construct(string $aggregateClass, AggregateChanged $event)
{
$this->aggregateClass = $aggregateClass;
$this->event = $event;
}

/**
* @return class-string<AggregateRoot>
*/
public function aggregateClass(): string
{
return $this->aggregateClass;
}

public function event(): AggregateChanged
{
return $this->event;
}
}
36 changes: 36 additions & 0 deletions src/Pipeline/Middleware/DeleteEventMiddleware.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;

use Patchlevel\EventSourcing\Aggregate\AggregateChanged;
use Patchlevel\EventSourcing\Pipeline\EventBucket;

class DeleteEventMiddleware implements Middleware
{
/** @var list<class-string<AggregateChanged>> */
private array $classes;

/**
* @param list<class-string<AggregateChanged>> $classes
*/
public function __construct(array $classes)
{
$this->classes = $classes;
}

/**
* @return list<EventBucket>
*/
public function __invoke(EventBucket $bucket): array
{
foreach ($this->classes as $class) {
if ($bucket->event() instanceof $class) {
return [];
}
}

return [$bucket];
}
}
36 changes: 36 additions & 0 deletions src/Pipeline/Middleware/FilterEventMiddleware.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;

use Patchlevel\EventSourcing\Aggregate\AggregateChanged;
use Patchlevel\EventSourcing\Pipeline\EventBucket;

class FilterEventMiddleware implements Middleware
{
/** @var list<class-string<AggregateChanged>> */
private array $classes;

/**
* @param list<class-string<AggregateChanged>> $classes
*/
public function __construct(array $classes)
{
$this->classes = $classes;
}

/**
* @return list<EventBucket>
*/
public function __invoke(EventBucket $bucket): array
{
foreach ($this->classes as $class) {
if ($bucket->event() instanceof $class) {
return [$bucket];
}
}

return [];
}
}
15 changes: 15 additions & 0 deletions src/Pipeline/Middleware/Middleware.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;

use Patchlevel\EventSourcing\Pipeline\EventBucket;

interface Middleware
{
/**
* @return list<EventBucket>
*/
public function __invoke(EventBucket $bucket): array;
}
64 changes: 64 additions & 0 deletions src/Pipeline/Middleware/RecalculatePlayheadMiddleware.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;

use Patchlevel\EventSourcing\Aggregate\AggregateChanged;
use Patchlevel\EventSourcing\Aggregate\AggregateRoot;
use Patchlevel\EventSourcing\Pipeline\EventBucket;
use ReflectionClass;
use ReflectionProperty;

use function array_key_exists;

class RecalculatePlayheadMiddleware implements Middleware
{
/** @var array<class-string<AggregateRoot>, array<string, int>> */
private array $index = [];

private ReflectionProperty $reflectionProperty;

public function __construct()
{
$reflectionClass = new ReflectionClass(AggregateChanged::class);

$this->reflectionProperty = $reflectionClass->getProperty('playhead');
$this->reflectionProperty->setAccessible(true);
}

/**
* @return list<EventBucket>
*/
public function __invoke(EventBucket $bucket): array
{
$event = $bucket->event();
$playhead = $this->nextPlayhead($bucket->aggregateClass(), $event->aggregateId());

if ($event->playhead() === $playhead) {
return [$bucket];
}

$this->reflectionProperty->setValue($event, $playhead);

return [$bucket];
}

/**
* @param class-string<AggregateRoot> $aggregateClass
*/
private function nextPlayhead(string $aggregateClass, string $aggregateId): int
{
if (!array_key_exists($aggregateClass, $this->index)) {
$this->index[$aggregateClass] = [];
}

if (!array_key_exists($aggregateId, $this->index[$aggregateClass])) {
$this->index[$aggregateClass][$aggregateId] = -1;
}

$this->index[$aggregateClass][$aggregateId]++;

return $this->index[$aggregateClass][$aggregateId];
}
}
Loading