Skip to content

Commit

Permalink
Merge pull request #402 from patchlevel/projectionist-dev-runner
Browse files Browse the repository at this point in the history
add dev runner for projectionist
  • Loading branch information
DavidBadura authored Aug 4, 2023
2 parents a4b3ba5 + 822af79 commit 101ce3d
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 20 deletions.
106 changes: 106 additions & 0 deletions src/Console/Command/ProjectionistDevRunCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Console\Command;

use Patchlevel\EventSourcing\Console\InputHelper;
use Patchlevel\EventSourcing\Console\InvalidArgumentGiven;
use Patchlevel\EventSourcing\Projection\Projectionist\Listener\ThrowErrorListener;
use Patchlevel\EventSourcing\Projection\Projectionist\Projectionist;
use Patchlevel\Worker\DefaultWorker;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Logger\ConsoleLogger;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

#[AsCommand(
'event-sourcing:projectionist:dev-run',
'Run the active projectors',
)]
final class ProjectionistDevRunCommand extends ProjectionistCommand
{
public function __construct(
Projectionist $projectionist,
private readonly EventDispatcherInterface $eventDispatcher,
) {
parent::__construct($projectionist);
}

protected function configure(): void
{
parent::configure();

$this
->addOption(
'run-limit',
null,
InputOption::VALUE_REQUIRED,
'The maximum number of runs this command should execute',
)
->addOption(
'message-limit',
null,
InputOption::VALUE_REQUIRED,
'How many messages should be consumed in one run',
100,
)
->addOption(
'memory-limit',
null,
InputOption::VALUE_REQUIRED,
'How much memory consumption should the worker be terminated',
)
->addOption(
'time-limit',
null,
InputOption::VALUE_REQUIRED,
'What is the maximum time the worker can run in seconds',
)
->addOption(
'sleep',
null,
InputOption::VALUE_REQUIRED,
'How much time should elapse before the next job is executed in microseconds',
1000,
);
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$runLimit = InputHelper::nullablePositivInt($input->getOption('run-limit'));
$messageLimit = InputHelper::nullablePositivInt($input->getOption('message-limit'));
$memoryLimit = InputHelper::nullableString($input->getOption('memory-limit'));
$timeLimit = InputHelper::nullablePositivInt($input->getOption('time-limit'));
$sleep = InputHelper::int($input->getOption('sleep'));
$criteria = $this->projectionCriteria($input);

$logger = new ConsoleLogger($output);

$this->eventDispatcher->addSubscriber(new ThrowErrorListener());

$worker = DefaultWorker::create(
function () use ($criteria, $messageLimit): void {
$this->projectionist->run($criteria, $messageLimit);
},
[
'runLimit' => $runLimit,
'memoryLimit' => $memoryLimit,
'timeLimit' => $timeLimit,
],
$logger,
);

if ($sleep < 0) {
throw new InvalidArgumentGiven($sleep, '0|positive-int');
}

$this->projectionist->remove($criteria);
$this->projectionist->boot($criteria);
$worker->run($sleep);

return 0;
}
}
15 changes: 15 additions & 0 deletions src/Projection/Projectionist/DefaultProjectionist.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
use Patchlevel\EventSourcing\Projection\Projection\ProjectionId;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionStatus;
use Patchlevel\EventSourcing\Projection\Projection\Store\ProjectionStore;
use Patchlevel\EventSourcing\Projection\Projectionist\Event\ProjectorErrorEvent;
use Patchlevel\EventSourcing\Projection\Projector\MetadataProjectorResolver;
use Patchlevel\EventSourcing\Projection\Projector\Projector;
use Patchlevel\EventSourcing\Projection\Projector\ProjectorRepository;
use Patchlevel\EventSourcing\Projection\Projector\ProjectorResolver;
use Patchlevel\EventSourcing\Store\CriteriaBuilder;
use Patchlevel\EventSourcing\Store\Store;
use Psr\EventDispatcher\EventDispatcherInterface;
use Psr\Log\LoggerInterface;
use Throwable;

Expand All @@ -32,6 +34,7 @@ public function __construct(
private readonly ProjectionStore $projectionStore,
private readonly ProjectorRepository $projectorRepository,
private readonly ProjectorResolver $projectorResolver = new MetadataProjectorResolver(),
private readonly EventDispatcherInterface|null $eventDispatcher = null,
private readonly LoggerInterface|null $logger = null,
) {
}
Expand Down Expand Up @@ -87,6 +90,12 @@ public function boot(ProjectionCriteria $criteria = new ProjectionCriteria(), in

$projection->error($e->getMessage());
$this->projectionStore->save($projection);

$this->eventDispatcher?->dispatch(new ProjectorErrorEvent(
$projector::class,
$projection->id(),
$e,
));
}
}

Expand Down Expand Up @@ -377,6 +386,12 @@ private function handleMessage(Message $message, Projection $projection): void
$projection->error($e->getMessage());
$this->projectionStore->save($projection);

$this->eventDispatcher?->dispatch(new ProjectorErrorEvent(
$projector::class,
$projection->id(),
$e,
));

return;
}
}
Expand Down
20 changes: 20 additions & 0 deletions src/Projection/Projectionist/Event/ProjectorErrorEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Projection\Projectionist\Event;

use Patchlevel\EventSourcing\Projection\Projection\ProjectionId;
use Patchlevel\EventSourcing\Projection\Projector\Projector;
use Throwable;

final class ProjectorErrorEvent
{
public function __construct(
/** @var class-string<Projector> */
public readonly string $projector,
public readonly ProjectionId $projection,
public readonly Throwable $error,
) {
}
}
27 changes: 27 additions & 0 deletions src/Projection/Projectionist/Listener/ThrowErrorListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Projection\Projectionist\Listener;

use Patchlevel\EventSourcing\Projection\Projectionist\Event\ProjectorErrorEvent;
use Patchlevel\EventSourcing\Projection\Projectionist\ProjectionistError;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;

final class ThrowErrorListener implements EventSubscriberInterface
{
public function onProjectorError(ProjectorErrorEvent $event): void
{
throw new ProjectionistError(
$event->projector,
$event->projection,
$event->error,
);
}

/** @return array<class-string, string> */
public static function getSubscribedEvents(): array
{
return [ProjectorErrorEvent::class => 'onProjectorError'];
}
}
31 changes: 31 additions & 0 deletions src/Projection/Projectionist/ProjectionistError.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Projection\Projectionist;

use Patchlevel\EventSourcing\Projection\Projection\ProjectionId;
use RuntimeException;
use Throwable;

use function sprintf;

final class ProjectionistError extends RuntimeException
{
public function __construct(
public readonly string $projector,
public readonly ProjectionId $projectionId,
Throwable $error,
) {
parent::__construct(
sprintf(
'error in projector "%s" for "%s": %s',
$projector,
$projectionId->toString(),
$error->getMessage(),
),
0,
$error,
);
}
}
20 changes: 0 additions & 20 deletions tests/Unit/Projection/Projectionist/DefaultProjectionistTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
use Patchlevel\EventSourcing\Tests\Unit\Projection\DummyStore;
use PHPUnit\Framework\TestCase;
use Prophecy\PhpUnit\ProphecyTrait;
use Psr\Log\NullLogger;
use RuntimeException;

/** @covers \Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist */
Expand Down Expand Up @@ -52,7 +51,6 @@ public function testNothingToBoot(): void
$projectionStore->reveal(),
$projectorRepository->reveal(),
$projectorResolver->reveal(),
new NullLogger(),
);

$projectionist->boot();
Expand Down Expand Up @@ -86,7 +84,6 @@ public function targetProjection(): ProjectionId
$projectionStore,
$projectorRepository->reveal(),
$projectorResolver->reveal(),
new NullLogger(),
);

$projectionist->boot();
Expand Down Expand Up @@ -139,7 +136,6 @@ public function handle(Message $message): void
$projectionStore,
$projectorRepository->reveal(),
$projectorResolver->reveal(),
new NullLogger(),
);

$projectionist->boot();
Expand Down Expand Up @@ -195,7 +191,6 @@ public function handle(Message $message): void
$projectionStore,
$projectorRepository->reveal(),
$projectorResolver->reveal(),
new NullLogger(),
);

$projectionist->boot(new ProjectionCriteria(), 1);
Expand Down Expand Up @@ -244,7 +239,6 @@ public function create(): void
$projectionStore,
$projectorRepository->reveal(),
$projectorResolver->reveal(),
new NullLogger(),
);

$projectionist->boot();
Expand Down Expand Up @@ -289,7 +283,6 @@ public function handle(Message $message): void
$projectionStore,
$projectorRepository->reveal(),
$projectorResolver->reveal(),
new NullLogger(),
);

$projectionist->run();
Expand Down Expand Up @@ -339,7 +332,6 @@ public function handle(Message $message): void
$projectionStore,
$projectorRepository->reveal(),
$projectorResolver->reveal(),
new NullLogger(),
);

$projectionist->run(new ProjectionCriteria(), 1);
Expand Down Expand Up @@ -402,7 +394,6 @@ public function handle(Message $message): void
$projectionStore,
$projectorRepository->reveal(),
$projectorResolver->reveal(),
new NullLogger(),
);

$projectionist->run();
Expand Down Expand Up @@ -447,7 +438,6 @@ public function handle(Message $message): void
$projectionStore,
$projectorRepository->reveal(),
$projectorResolver->reveal(),
new NullLogger(),
);

$projectionist->run();
Expand Down Expand Up @@ -476,7 +466,6 @@ public function testRunningMarkOutdated(): void
$projectionStore,
$projectorRepository->reveal(),
$projectorResolver->reveal(),
new NullLogger(),
);

$projectionist->run();
Expand Down Expand Up @@ -517,7 +506,6 @@ public function handle(Message $message): void
$projectionStore,
$projectorRepository->reveal(),
$projectorResolver->reveal(),
new NullLogger(),
);

$projectionist->run();
Expand Down Expand Up @@ -557,7 +545,6 @@ public function drop(): void
$projectionStore,
$projectorRepository->reveal(),
$projectorResolver->reveal(),
new NullLogger(),
);

$projectionist->teardown();
Expand Down Expand Up @@ -599,7 +586,6 @@ public function drop(): void
$projectionStore,
$projectorRepository->reveal(),
$projectorResolver->reveal(),
new NullLogger(),
);

$projectionist->teardown();
Expand All @@ -626,7 +612,6 @@ public function testTeardownWithoutProjector(): void
$projectionStore,
$projectorRepository->reveal(),
$projectorResolver->reveal(),
new NullLogger(),
);

$projectionist->teardown();
Expand Down Expand Up @@ -666,7 +651,6 @@ public function drop(): void
$projectionStore,
$projectorRepository->reveal(),
$projectorResolver->reveal(),
new NullLogger(),
);

$projectionist->remove();
Expand Down Expand Up @@ -700,7 +684,6 @@ public function targetProjection(): ProjectionId
$projectionStore,
$projectorRepository->reveal(),
$projectorResolver->reveal(),
new NullLogger(),
);

$projectionist->remove();
Expand Down Expand Up @@ -740,7 +723,6 @@ public function drop(): void
$projectionStore,
$projectorRepository->reveal(),
$projectorResolver->reveal(),
new NullLogger(),
);

$projectionist->remove();
Expand All @@ -767,7 +749,6 @@ public function testRemoveWithoutProjector(): void
$projectionStore,
$projectorRepository->reveal(),
$projectorResolver->reveal(),
new NullLogger(),
);

$projectionist->remove();
Expand Down Expand Up @@ -799,7 +780,6 @@ public function targetProjection(): ProjectionId
$projectionStore,
$projectorRepository->reveal(),
$projectorResolver->reveal(),
new NullLogger(),
);

$projectionist->reactivate();
Expand Down

0 comments on commit 101ce3d

Please sign in to comment.