diff --git a/baseline.xml b/baseline.xml
index 4955142b..423f2f78 100644
--- a/baseline.xml
+++ b/baseline.xml
@@ -10,6 +10,30 @@
$sleep
+
+
+ MetadataAwareProjectionHandler
+ MetadataAwareProjectionHandler::class
+ Projection
+ ProjectionHandler
+ ProjectionHandler
+ ProjectionHandler
+ ProjectionHandler
+ non-empty-array<class-string<Projection>>
+ non-empty-array<class-string<Projection>>|null
+
+
+
+
+ ProjectionHandler
+
+
+
+
+ $messageLimit
+ $sleep
+
+
SchemaManager|SchemaDirector
@@ -43,6 +67,81 @@
array<class-string, string>
+
+
+ array
+ class-string<Projection>
+ private array $projectionMetadata = [];
+
+
+
+
+ class-string<Projection>
+
+
+
+
+ class-string<Projection>
+
+
+
+
+ class-string<Projection>
+
+
+
+
+ class-string<Projection>
+
+
+
+
+ PipelineStore
+ PipelineStore
+
+
+
+
+ ProjectionHandler
+ ProjectionHandler
+
+
+
+
+ MetadataAwareProjectionHandler
+ Projection
+
+
+
+
+ iterable
+ iterable<Projection>
+ iterable<Projection>
+
+
+ MetadataAwareProjectionHandler
+
+
+
+
+ Projection
+ Projection
+ Projection
+
+
+
+
+ ProjectionHandler
+ ProjectionHandler
+
+
+
+
+ Projection
+ Projection
+ Projection
+
+
$messages[0]->playhead() - 1
@@ -92,10 +191,23 @@
+
+ MultiTableStore
+
array{id: string, aggregate_id: string, playhead: string, event: string, payload: string, recorded_on: string, custom_headers: string}|null
+
+
+ SingleTableStore
+
+
+
+
+ ProfileProjector
+
+
$bus
@@ -112,6 +224,9 @@
+
+ new ProjectionListener($projectionRepository)
+
$bus
$profile
@@ -119,6 +234,74 @@
$store
+
+
+ new MetadataAwareProjectionHandler([$bankAccountProjection])
+ new MetadataAwareProjectionHandler([$bankAccountProjection])
+ new ProjectionListener($projectionRepository)
+ new ProjectionListener($projectionRepository)
+
+
+
+
+ BankAccountProjection
+
+
+
+
+ ProfileProjection
+
+
+
+
+ new ProjectionListener($projectionRepository)
+
+
+
+
+ ProfileProjection
+
+
+
+
+ ProfileProjection
+
+
+
+
+ $this->prophesize(ProjectionHandler::class)
+ new MetadataAwareProjectionHandler([$projectionA, $projectionB])
+
+
+
+
+ $this->prophesize(ProjectionHandler::class)
+ new MetadataAwareProjectionHandler([$projectionA, $projectionB])
+
+
+
+
+ $this->prophesize(PipelineStore::class)
+ $this->prophesize(PipelineStore::class)
+ $this->prophesize(PipelineStore::class)
+ $this->prophesize(PipelineStore::class)
+ $this->prophesize(ProjectionHandler::class)
+ $this->prophesize(ProjectionHandler::class)
+ $this->prophesize(ProjectionHandler::class)
+ new MetadataAwareProjectionHandler([$projectionA, $projectionB])
+ new MetadataAwareProjectionHandler([$projectionA, $projectionB])
+
+
+
+
+ Dummy2Projection
+
+
+
+
+ DummyProjection
+
+
$value
@@ -129,6 +312,94 @@
$event
+
+
+ class implements Projection {
+ class implements Projection {
+ class implements Projection {
+ class implements Projection {
+ class implements Projection {
+
+
+
+
+ $this->prophesize(PipelineStore::class)
+ $this->prophesize(PipelineStore::class)
+ $this->prophesize(PipelineStore::class)
+ $this->prophesize(PipelineStore::class)
+
+
+
+
+ $this->prophesize(ProjectionHandler::class)
+
+
+
+
+ class implements Projection {
+
+
+
+
+ $this->prophesize(PipelineStore::class)
+
+
+
+
+ class implements Projector {
+ class implements Projector {
+ class implements Projector {
+ class implements Projector {
+ class implements Projector {
+ class implements Projector {
+ class implements Projector {
+ class implements Projector {
+ class implements Projector {
+
+
+
+
+ class implements Projector {
+ class implements Projector {
+
+
+
+
+ new MetadataAwareProjectionHandler([$projection])
+ new MetadataAwareProjectionHandler([$projection])
+ new MetadataAwareProjectionHandler([$projection])
+ new MetadataAwareProjectionHandler([$projection])
+ new MetadataAwareProjectionHandler([])
+
+
+ class implements Projection {
+ class implements Projection {
+ class implements Projection {
+ class implements Projection {
+
+
+
+
+ class implements Projection {
+ class implements Projection {
+ class implements Projection {
+ class implements Projection {
+ class implements Projection {
+ class implements Projection {
+
+
+
+
+ $this->prophesize(ProjectionHandler::class)
+ new ProjectionListener($projectionRepository->reveal())
+
+
+
+
+ class implements Projector {
+ class implements Projector {
+
+
new Comparator()
diff --git a/docs/pages/projection.md b/docs/pages/projection.md
index 192b18c2..3234d8b8 100644
--- a/docs/pages/projection.md
+++ b/docs/pages/projection.md
@@ -8,8 +8,9 @@ and everything can always be reproduced from the events.
The target of a projection can be anything.
Either a file, a relational database, a no-sql database like mongodb or an elasticsearch.
-## Define Projection
+## Define Projector
+To create a projection you need a projector.
In this example we always create a new data set in a relational database when a profile is created:
```php
@@ -18,9 +19,10 @@ use Patchlevel\EventSourcing\Attribute\Create;
use Patchlevel\EventSourcing\Attribute\Drop;
use Patchlevel\EventSourcing\Attribute\Handle;
use Patchlevel\EventSourcing\EventBus\Message;
-use Patchlevel\EventSourcing\Projection\Projection;
+use Patchlevel\EventSourcing\Projection\Projector;
+use Patchlevel\EventSourcing\Projection\ProjectorId;
-final class ProfileProjection implements Projection
+final class ProfileProjection implements Projector
{
private Connection $connection;
@@ -28,6 +30,14 @@ final class ProfileProjection implements Projection
{
$this->connection = $connection;
}
+
+ public function projectorId(): ProjectorId
+ {
+ return new ProjectorId(
+ name: 'profile',
+ version: 1
+ );
+ }
#[Create]
public function create(): void
@@ -59,7 +69,7 @@ final class ProfileProjection implements Projection
!!! danger
- You should not execute any actions with projections,
+ You should not execute any actions with projectors,
otherwise these will be executed again if you rebuild the projection!
!!! tip
@@ -67,18 +77,18 @@ final class ProfileProjection implements Projection
If you are using psalm then you can install the event sourcing [plugin](https://github.com/patchlevel/event-sourcing-psalm-plugin)
to make the event method return the correct type.
-Projections have a `create` and a `drop` method that is executed when the projection is created or deleted.
+Projectors have a `create` and a `drop` method that is executed when the projection is created or deleted.
In some cases it may be that no schema has to be created for the projection, as the target does it automatically.
-In order for the projection to know which method is responsible for which event,
+In order for the projector to know which method is responsible for which event,
the methods must be given the `Handle` attribute with the respective event class name.
As soon as the event has been dispatched, the appropriate methods are then executed.
-Several projections can also listen to the same event.
+Several projectors can also listen to the same event.
-## Register projections
+## Register projector
-So that the projections are known and also executed, you have to add them to the `ProjectionHandler`.
+So that the projectors are known and also executed, you have to add them to the `ProjectionHandler`.
Then add this to the event bus using the `ProjectionListener`.
```php
diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon
index c60b17ec..56222259 100644
--- a/phpstan-baseline.neon
+++ b/phpstan-baseline.neon
@@ -10,6 +10,11 @@ parameters:
count: 1
path: src/Console/Worker/DefaultWorker.php
+ -
+ message: "#^Method Patchlevel\\\\EventSourcing\\\\Projection\\\\DefaultProjectorRepository\\:\\:projectors\\(\\) should return array\\ but returns array\\\\.$#"
+ count: 1
+ path: src/Projection/DefaultProjectorRepository.php
+
-
message: "#^Parameter \\#2 \\$data of method Patchlevel\\\\EventSourcing\\\\Serializer\\\\Hydrator\\\\AggregateRootHydrator\\:\\:hydrate\\(\\) expects array\\, mixed given\\.$#"
count: 1
diff --git a/src/Console/Command/ProjectionistBootCommand.php b/src/Console/Command/ProjectionistBootCommand.php
new file mode 100644
index 00000000..e6ba3314
--- /dev/null
+++ b/src/Console/Command/ProjectionistBootCommand.php
@@ -0,0 +1,24 @@
+projectorCriteria();
+ $this->projectionist->boot($criteria);
+
+ return 0;
+ }
+}
diff --git a/src/Console/Command/ProjectionistCommand.php b/src/Console/Command/ProjectionistCommand.php
new file mode 100644
index 00000000..5a095a97
--- /dev/null
+++ b/src/Console/Command/ProjectionistCommand.php
@@ -0,0 +1,23 @@
+projectorCriteria();
+ $this->projectionist->remove($criteria);
+
+ return 0;
+ }
+}
diff --git a/src/Console/Command/ProjectionistRunCommand.php b/src/Console/Command/ProjectionistRunCommand.php
new file mode 100644
index 00000000..023e134d
--- /dev/null
+++ b/src/Console/Command/ProjectionistRunCommand.php
@@ -0,0 +1,121 @@
+addOption(
+ 'run-limit',
+ null,
+ InputOption::VALUE_REQUIRED,
+ 'The maximum number of runs this command should execute'
+ )
+ ->addOption(
+ 'message-limit',
+ null,
+ InputOption::VALUE_REQUIRED,
+ 'How many messages should be consumed in one run',
+ 100
+ )
+ ->addOption(
+ 'memory-limit',
+ null,
+ InputOption::VALUE_REQUIRED,
+ 'How much memory consumption should the worker be terminated'
+ )
+ ->addOption(
+ 'time-limit',
+ null,
+ InputOption::VALUE_REQUIRED,
+ 'What is the maximum time the worker can run in seconds'
+ )
+ ->addOption(
+ 'sleep',
+ null,
+ InputOption::VALUE_REQUIRED,
+ 'How much time should elapse before the next job is executed in microseconds',
+ 1000
+ );
+ }
+
+ protected function execute(InputInterface $input, OutputInterface $output): int
+ {
+ $runLimit = InputHelper::nullableInt($input->getOption('run-limit'));
+ $messageLimit = InputHelper::nullableInt($input->getOption('message-limit'));
+ $memoryLimit = InputHelper::nullableString($input->getOption('memory-limit'));
+ $timeLimit = InputHelper::nullableInt($input->getOption('time-limit'));
+ $sleep = InputHelper::int($input->getOption('sleep'));
+ $criteria = $this->projectorCriteria();
+
+ $logger = new ConsoleLogger($output);
+
+ $eventDispatcher = new EventDispatcher();
+ $eventDispatcher->addSubscriber(new StopWorkerOnSigtermSignalListener($logger));
+
+ if ($runLimit) {
+ if ($runLimit <= 0) {
+ throw new InvalidArgumentGiven($runLimit, 'null|positive-int');
+ }
+
+ $eventDispatcher->addSubscriber(new StopWorkerOnIterationLimitListener($runLimit, $logger));
+ }
+
+ if ($memoryLimit) {
+ $eventDispatcher->addSubscriber(
+ new StopWorkerOnMemoryLimitListener(Bytes::parseFromString($memoryLimit), $logger)
+ );
+ }
+
+ if ($timeLimit) {
+ if ($timeLimit <= 0) {
+ throw new InvalidArgumentGiven($timeLimit, 'null|positive-int');
+ }
+
+ $eventDispatcher->addSubscriber(new StopWorkerOnTimeLimitListener($timeLimit, $logger));
+ }
+
+ if ($messageLimit !== null && $messageLimit <= 0) {
+ throw new InvalidArgumentGiven($messageLimit, 'null|positive-int');
+ }
+
+ $worker = new DefaultWorker(
+ function () use ($criteria, $messageLimit): void {
+ $this->projectionist->run($criteria, $messageLimit);
+ },
+ $eventDispatcher,
+ $logger
+ );
+
+ if ($sleep < 0) {
+ throw new InvalidArgumentGiven($sleep, '0|positive-int');
+ }
+
+ $worker->run($sleep);
+
+ return 0;
+ }
+}
diff --git a/src/Console/Command/ProjectionistStatusCommand.php b/src/Console/Command/ProjectionistStatusCommand.php
new file mode 100644
index 00000000..0935f698
--- /dev/null
+++ b/src/Console/Command/ProjectionistStatusCommand.php
@@ -0,0 +1,46 @@
+projectionist->projectorStates();
+
+ $io->table(
+ [
+ 'name',
+ 'version',
+ 'position',
+ 'status',
+ ],
+ array_map(
+ static fn (ProjectorState $state) => [
+ $state->id()->name(),
+ $state->id()->version(),
+ $state->position(),
+ $state->status()->value,
+ ],
+ [...$states]
+ )
+ );
+
+ return 0;
+ }
+}
diff --git a/src/Console/Command/ProjectionistTeardownCommand.php b/src/Console/Command/ProjectionistTeardownCommand.php
new file mode 100644
index 00000000..7d29f3e6
--- /dev/null
+++ b/src/Console/Command/ProjectionistTeardownCommand.php
@@ -0,0 +1,24 @@
+projectorCriteria();
+ $this->projectionist->teardown($criteria);
+
+ return 0;
+ }
+}
diff --git a/src/Console/Worker/DefaultWorker.php b/src/Console/Worker/DefaultWorker.php
index 7b82418f..cbcaaed6 100644
--- a/src/Console/Worker/DefaultWorker.php
+++ b/src/Console/Worker/DefaultWorker.php
@@ -56,7 +56,7 @@ public function run(int $sleepTimer = 1000): void
}
$this->logger?->debug('Worker sleep for {sleepTimer}ms', ['sleepTimer' => $sleepFor]);
- usleep($sleepFor);
+ usleep($sleepFor * 1000);
}
$this->logger?->debug('Worker stopped');
diff --git a/src/Projection/DefaultProjectionist.php b/src/Projection/DefaultProjectionist.php
new file mode 100644
index 00000000..6adadd37
--- /dev/null
+++ b/src/Projection/DefaultProjectionist.php
@@ -0,0 +1,214 @@
+projectorStates()
+ ->filterByProjectorStatus(ProjectorStatus::New)
+ ->filterByCriteria($criteria);
+
+ foreach ($projectorStates as $projectorState) {
+ $projector = $this->projectorRepository->findByProjectorId($projectorState->id());
+
+ if (!$projector) {
+ throw new ProjectorNotFound($projectorState->id());
+ }
+
+ $projectorState->booting();
+ $this->projectorStore->saveProjectorState($projectorState);
+
+ $createMethod = $this->resolver->resolveCreateMethod($projector);
+
+ if (!$createMethod) {
+ continue;
+ }
+
+ try {
+ $createMethod();
+ $this->logger?->info(sprintf('%s created', $projectorState->id()->toString()));
+ } catch (Throwable $e) {
+ $this->logger?->error(sprintf('%s create error', $projectorState->id()->toString()));
+ $this->logger?->error($e->getMessage());
+ $projectorState->error();
+ $this->projectorStore->saveProjectorState($projectorState);
+ }
+ }
+
+ $stream = $this->streamableMessageStore->stream();
+
+ foreach ($stream as $message) {
+ foreach ($projectorStates->filterByProjectorStatus(ProjectorStatus::Booting) as $projectorState) {
+ $this->handleMessage($message, $projectorState);
+ }
+ }
+
+ foreach ($projectorStates->filterByProjectorStatus(ProjectorStatus::Booting) as $projectorState) {
+ $projectorState->active();
+ $this->projectorStore->saveProjectorState($projectorState);
+ }
+ }
+
+ public function run(ProjectorCriteria $criteria = new ProjectorCriteria(), ?int $limit = null): void
+ {
+ $projectorStates = $this->projectorStates()
+ ->filterByProjectorStatus(ProjectorStatus::Active)
+ ->filterByCriteria($criteria);
+
+ $currentPosition = $projectorStates->minProjectorPosition();
+ $stream = $this->streamableMessageStore->stream($currentPosition);
+
+ foreach ($projectorStates as $projectorState) {
+ $projector = $this->projectorRepository->findByProjectorId($projectorState->id());
+
+ if ($projector) {
+ continue;
+ }
+
+ $projectorState->outdated();
+ $this->projectorStore->saveProjectorState($projectorState);
+ }
+
+ $messageCounter = 0;
+
+ foreach ($stream as $message) {
+ foreach ($projectorStates->filterByProjectorStatus(ProjectorStatus::Active) as $projectorState) {
+ if ($projectorState->position() > $currentPosition) {
+ continue;
+ }
+
+ $this->handleMessage($message, $projectorState);
+ }
+
+ $currentPosition++;
+
+ $this->logger?->info(sprintf('position: %s', $currentPosition));
+
+ $messageCounter++;
+ if ($messageCounter >= $limit) {
+ return;
+ }
+ }
+ }
+
+ public function teardown(ProjectorCriteria $criteria = new ProjectorCriteria()): void
+ {
+ $projectorStates = $this->projectorStates()->filterByProjectorStatus(ProjectorStatus::Outdated);
+
+ foreach ($projectorStates as $projectorState) {
+ $projector = $this->projectorRepository->findByProjectorId($projectorState->id());
+
+ if (!$projector) {
+ $this->logger?->warning(
+ sprintf('projector with the id "%s" not found', $projectorState->id()->toString())
+ );
+ continue;
+ }
+
+ $dropMethod = $this->resolver->resolveDropMethod($projector);
+
+ if ($dropMethod) {
+ try {
+ $dropMethod();
+ $this->logger?->info(sprintf('%s dropped', $projectorState->id()->toString()));
+ } catch (Throwable $e) {
+ $this->logger?->error(sprintf('%s drop error', $projectorState->id()->toString()));
+ $this->logger?->error($e->getMessage());
+ continue;
+ }
+ }
+
+ $this->projectorStore->removeProjectorState($projectorState->id());
+ }
+ }
+
+ public function remove(ProjectorCriteria $criteria = new ProjectorCriteria()): void
+ {
+ $projectorStates = $this->projectorStates();
+
+ foreach ($projectorStates as $projectorState) {
+ $projector = $this->projectorRepository->findByProjectorId($projectorState->id());
+
+ if ($projector) {
+ $dropMethod = $this->resolver->resolveDropMethod($projector);
+
+ if ($dropMethod) {
+ try {
+ $dropMethod();
+ $this->logger?->info(sprintf('%s dropped', $projectorState->id()->toString()));
+ } catch (Throwable $e) {
+ $this->logger?->warning(sprintf('%s drop error, skipped', $projectorState->id()->toString()));
+ $this->logger?->error($e->getMessage());
+ }
+ }
+ }
+
+ $this->projectorStore->removeProjectorState($projectorState->id());
+ }
+ }
+
+ private function handleMessage(Message $message, ProjectorState $projectorState): void
+ {
+ $projector = $this->projectorRepository->findByProjectorId($projectorState->id());
+
+ if (!$projector) {
+ throw new ProjectorNotFound($projectorState->id());
+ }
+
+ $handleMethod = $this->resolver->resolveHandleMethod($projector, $message);
+
+ if ($handleMethod) {
+ try {
+ $handleMethod($message);
+ } catch (Throwable $e) {
+ $this->logger?->error(sprintf('%s message error', $projectorState->id()->toString()));
+ $this->logger?->error($e->getMessage());
+ $projectorState->error();
+ $this->projectorStore->saveProjectorState($projectorState);
+
+ return;
+ }
+ }
+
+ $projectorState->incrementPosition();
+ $this->projectorStore->saveProjectorState($projectorState);
+ }
+
+ public function projectorStates(): ProjectorStateCollection
+ {
+ $projectorsStates = $this->projectorStore->getStateFromAllProjectors();
+
+ foreach ($this->projectorRepository->projectors() as $projector) {
+ if ($projectorsStates->has($projector->projectorId())) {
+ continue;
+ }
+
+ $projectorsStates = $projectorsStates->add(new ProjectorState($projector->projectorId()));
+ }
+
+ return $projectorsStates;
+ }
+}
diff --git a/src/Projection/DefaultProjectorRepository.php b/src/Projection/DefaultProjectorRepository.php
new file mode 100644
index 00000000..7858e516
--- /dev/null
+++ b/src/Projection/DefaultProjectorRepository.php
@@ -0,0 +1,40 @@
+|null */
+ private ?array $projectorIdHashmap = null;
+
+ /**
+ * @param iterable $projectors
+ */
+ public function __construct(
+ private readonly iterable $projectors = [],
+ ) {
+ }
+
+ public function findByProjectorId(ProjectorId $projectorId): ?Projector
+ {
+ if ($this->projectorIdHashmap === null) {
+ $this->projectorIdHashmap = [];
+
+ foreach ($this->projectors as $projector) {
+ $this->projectorIdHashmap[$projector->projectorId()->toString()] = $projector;
+ }
+ }
+
+ return $this->projectorIdHashmap[$projectorId->toString()] ?? null;
+ }
+
+ /**
+ * @return list
+ */
+ public function projectors(): array
+ {
+ return [...$this->projectors];
+ }
+}
diff --git a/src/Projection/MetadataAwareProjectionHandler.php b/src/Projection/MetadataAwareProjectionHandler.php
index 8d01fb02..132c7e34 100644
--- a/src/Projection/MetadataAwareProjectionHandler.php
+++ b/src/Projection/MetadataAwareProjectionHandler.php
@@ -8,14 +8,17 @@
use Patchlevel\EventSourcing\Metadata\Projection\AttributeProjectionMetadataFactory;
use Patchlevel\EventSourcing\Metadata\Projection\ProjectionMetadataFactory;
-use function array_key_exists;
-
+/**
+ * @deprecated use MetadataProjectorResolver
+ */
final class MetadataAwareProjectionHandler implements ProjectionHandler
{
/** @var iterable */
private iterable $projections;
- private ProjectionMetadataFactory $metadataFactor;
+ private ProjectionMetadataFactory $metadataFactory;
+
+ private ProjectorResolver $resolver;
/**
* @param iterable $projections
@@ -23,51 +26,46 @@ final class MetadataAwareProjectionHandler implements ProjectionHandler
public function __construct(iterable $projections, ?ProjectionMetadataFactory $metadataFactory = null)
{
$this->projections = $projections;
- $this->metadataFactor = $metadataFactory ?? new AttributeProjectionMetadataFactory();
+ $this->metadataFactory = $metadataFactory ?? new AttributeProjectionMetadataFactory();
+ $this->resolver = new MetadataProjectorResolver($this->metadataFactory);
}
public function handle(Message $message): void
{
- $event = $message->event();
-
foreach ($this->projections as $projection) {
- $metadata = $this->metadataFactor->metadata($projection::class);
+ $handleMethod = $this->resolver->resolveHandleMethod($projection, $message);
- if (!array_key_exists($event::class, $metadata->handleMethods)) {
+ if (!$handleMethod) {
continue;
}
- $handleMethod = $metadata->handleMethods[$event::class];
-
- $projection->$handleMethod($message);
+ $handleMethod($message);
}
}
public function create(): void
{
foreach ($this->projections as $projection) {
- $metadata = $this->metadataFactor->metadata($projection::class);
- $method = $metadata->createMethod;
+ $createMethod = $this->resolver->resolveCreateMethod($projection);
- if (!$method) {
+ if (!$createMethod) {
continue;
}
- $projection->$method();
+ $createMethod();
}
}
public function drop(): void
{
foreach ($this->projections as $projection) {
- $metadata = $this->metadataFactor->metadata($projection::class);
- $method = $metadata->dropMethod;
+ $dropMethod = $this->resolver->resolveDropMethod($projection);
- if (!$method) {
+ if (!$dropMethod) {
continue;
}
- $projection->$method();
+ $dropMethod();
}
}
@@ -81,6 +79,6 @@ public function projections(): iterable
public function metadataFactory(): ProjectionMetadataFactory
{
- return $this->metadataFactor;
+ return $this->metadataFactory;
}
}
diff --git a/src/Projection/MetadataProjectorResolver.php b/src/Projection/MetadataProjectorResolver.php
new file mode 100644
index 00000000..ba971e16
--- /dev/null
+++ b/src/Projection/MetadataProjectorResolver.php
@@ -0,0 +1,58 @@
+metadataFactory->metadata($projector::class);
+ $method = $metadata->createMethod;
+
+ if (!$method) {
+ return null;
+ }
+
+ return $projector->$method(...);
+ }
+
+ public function resolveDropMethod(Projection $projector): ?Closure
+ {
+ $metadata = $this->metadataFactory->metadata($projector::class);
+ $method = $metadata->dropMethod;
+
+ if (!$method) {
+ return null;
+ }
+
+ return $projector->$method(...);
+ }
+
+ public function resolveHandleMethod(Projection $projector, Message $message): ?Closure
+ {
+ $event = $message->event();
+ $metadata = $this->metadataFactory->metadata($projector::class);
+
+ if (!array_key_exists($event::class, $metadata->handleMethods)) {
+ return null;
+ }
+
+ $handleMethod = $metadata->handleMethods[$event::class];
+
+ return $projector->$handleMethod(...);
+ }
+}
diff --git a/src/Projection/Projection.php b/src/Projection/Projection.php
index 6f8a9890..bdbfbde5 100644
--- a/src/Projection/Projection.php
+++ b/src/Projection/Projection.php
@@ -4,6 +4,9 @@
namespace Patchlevel\EventSourcing\Projection;
+/**
+ * @deprecated use Projector interface
+ */
interface Projection
{
}
diff --git a/src/Projection/ProjectionHandler.php b/src/Projection/ProjectionHandler.php
index 5f58f94d..18a9100d 100644
--- a/src/Projection/ProjectionHandler.php
+++ b/src/Projection/ProjectionHandler.php
@@ -6,6 +6,9 @@
use Patchlevel\EventSourcing\EventBus\Message;
+/**
+ * @deprecated use ProjectorResolver
+ */
interface ProjectionHandler
{
public function handle(Message $message): void;
diff --git a/src/Projection/ProjectionListener.php b/src/Projection/ProjectionListener.php
index c3c49be0..c928b12f 100644
--- a/src/Projection/ProjectionListener.php
+++ b/src/Projection/ProjectionListener.php
@@ -7,6 +7,9 @@
use Patchlevel\EventSourcing\EventBus\Listener;
use Patchlevel\EventSourcing\EventBus\Message;
+/**
+ * @deprecated use SyncProjectorListener
+ */
final class ProjectionListener implements Listener
{
private ProjectionHandler $projectionHandler;
diff --git a/src/Projection/Projectionist.php b/src/Projection/Projectionist.php
new file mode 100644
index 00000000..4aa72242
--- /dev/null
+++ b/src/Projection/Projectionist.php
@@ -0,0 +1,23 @@
+|null $ids
+ */
+ public function __construct(
+ public readonly ?array $ids = null,
+ ) {
+ }
+}
diff --git a/src/Projection/ProjectorId.php b/src/Projection/ProjectorId.php
new file mode 100644
index 00000000..b687a365
--- /dev/null
+++ b/src/Projection/ProjectorId.php
@@ -0,0 +1,39 @@
+name, $this->version);
+ }
+
+ public function name(): string
+ {
+ return $this->name;
+ }
+
+ public function version(): int
+ {
+ return $this->version;
+ }
+
+ public function equals(self $other): bool
+ {
+ return $this->name === $other->name && $this->version === $other->version;
+ }
+}
diff --git a/src/Projection/ProjectorNotFound.php b/src/Projection/ProjectorNotFound.php
new file mode 100644
index 00000000..9f53f3a3
--- /dev/null
+++ b/src/Projection/ProjectorNotFound.php
@@ -0,0 +1,17 @@
+toString()));
+ }
+}
diff --git a/src/Projection/ProjectorRepository.php b/src/Projection/ProjectorRepository.php
new file mode 100644
index 00000000..b1329ba4
--- /dev/null
+++ b/src/Projection/ProjectorRepository.php
@@ -0,0 +1,15 @@
+
+ */
+ public function projectors(): array;
+}
diff --git a/src/Projection/ProjectorResolver.php b/src/Projection/ProjectorResolver.php
new file mode 100644
index 00000000..1811f234
--- /dev/null
+++ b/src/Projection/ProjectorResolver.php
@@ -0,0 +1,17 @@
+connection->createQueryBuilder()
+ ->select('*')
+ ->from($this->projectorTable)
+ ->where('projector = :projector AND version = :version')
+ ->getSQL();
+
+ /** @var array{projector: string, version: int, position: int, status: string}|false $result */
+ $result = $this->connection->fetchAssociative($sql, [
+ 'projector' => $projectorId->name(),
+ 'version' => $projectorId->version(),
+ ]);
+
+ if ($result === false) {
+ throw new ProjectorStateNotFound($projectorId);
+ }
+
+ return new ProjectorState(
+ new ProjectorId($result['projector'], $result['version']),
+ ProjectorStatus::from($result['status']),
+ $result['position']
+ );
+ }
+
+ public function getStateFromAllProjectors(): ProjectorStateCollection
+ {
+ $sql = $this->connection->createQueryBuilder()
+ ->select('*')
+ ->from($this->projectorTable)
+ ->getSQL();
+
+ /** @var list $result */
+ $result = $this->connection->fetchAllAssociative($sql);
+
+ return new ProjectorStateCollection(
+ array_map(
+ static function (array $data) {
+ return new ProjectorState(
+ new ProjectorId($data['projector'], $data['version']),
+ ProjectorStatus::from($data['status']),
+ $data['position']
+ );
+ },
+ $result
+ )
+ );
+ }
+
+ public function saveProjectorState(ProjectorState ...$projectorStates): void
+ {
+ $this->connection->transactional(
+ function (Connection $connection) use ($projectorStates): void {
+ foreach ($projectorStates as $projectorState) {
+ try {
+ $this->getProjectorState($projectorState->id());
+ $connection->update(
+ $this->projectorTable,
+ [
+ 'position' => $projectorState->position(),
+ 'status' => $projectorState->status()->value,
+ ],
+ [
+ 'projector' => $projectorState->id()->name(),
+ 'version' => $projectorState->id()->version(),
+ ]
+ );
+ } catch (ProjectorStateNotFound) {
+ $connection->insert(
+ $this->projectorTable,
+ [
+ 'projector' => $projectorState->id()->name(),
+ 'version' => $projectorState->id()->version(),
+ 'position' => $projectorState->position(),
+ 'status' => $projectorState->status()->value,
+ ]
+ );
+ }
+ }
+ }
+ );
+ }
+
+ public function removeProjectorState(ProjectorId $projectorId): void
+ {
+ $this->connection->delete($this->projectorTable, [
+ 'projector' => $projectorId->name(),
+ 'version' => $projectorId->version(),
+ ]);
+ }
+
+ public function configureSchema(Schema $schema, Connection $connection): void
+ {
+ $table = $schema->createTable($this->projectorTable);
+
+ $table->addColumn('projector', Types::STRING)
+ ->setNotnull(true);
+ $table->addColumn('version', Types::INTEGER)
+ ->setNotnull(true);
+ $table->addColumn('position', Types::INTEGER)
+ ->setNotnull(true);
+ $table->addColumn('status', Types::STRING)
+ ->setNotnull(true);
+
+ $table->setPrimaryKey(['projector', 'version']);
+ }
+}
diff --git a/src/Projection/ProjectorStore/DuplicateProjectorId.php b/src/Projection/ProjectorStore/DuplicateProjectorId.php
new file mode 100644
index 00000000..5f29f05f
--- /dev/null
+++ b/src/Projection/ProjectorStore/DuplicateProjectorId.php
@@ -0,0 +1,18 @@
+toString()));
+ }
+}
diff --git a/src/Projection/ProjectorStore/InMemoryStore.php b/src/Projection/ProjectorStore/InMemoryStore.php
new file mode 100644
index 00000000..3095a7d0
--- /dev/null
+++ b/src/Projection/ProjectorStore/InMemoryStore.php
@@ -0,0 +1,42 @@
+ */
+ private array $store = [];
+
+ public function getProjectorState(ProjectorId $projectorId): ProjectorState
+ {
+ if (array_key_exists($projectorId->toString(), $this->store)) {
+ return $this->store[$projectorId->toString()];
+ }
+
+ throw new ProjectorStateNotFound($projectorId);
+ }
+
+ public function getStateFromAllProjectors(): ProjectorStateCollection
+ {
+ return new ProjectorStateCollection(array_values($this->store));
+ }
+
+ public function saveProjectorState(ProjectorState ...$projectorStates): void
+ {
+ foreach ($projectorStates as $state) {
+ $this->store[$state->id()->toString()] = $state;
+ }
+ }
+
+ public function removeProjectorState(ProjectorId $projectorId): void
+ {
+ unset($this->store[$projectorId->toString()]);
+ }
+}
diff --git a/src/Projection/ProjectorStore/ProjectorState.php b/src/Projection/ProjectorStore/ProjectorState.php
new file mode 100644
index 00000000..d33a6f26
--- /dev/null
+++ b/src/Projection/ProjectorStore/ProjectorState.php
@@ -0,0 +1,83 @@
+id;
+ }
+
+ public function status(): ProjectorStatus
+ {
+ return $this->status;
+ }
+
+ public function position(): int
+ {
+ return $this->position;
+ }
+
+ public function incrementPosition(): void
+ {
+ $this->position++;
+ }
+
+ public function isNew(): bool
+ {
+ return $this->status === ProjectorStatus::New;
+ }
+
+ public function booting(): void
+ {
+ $this->status = ProjectorStatus::Booting;
+ }
+
+ public function isBooting(): bool
+ {
+ return $this->status === ProjectorStatus::Booting;
+ }
+
+ public function active(): void
+ {
+ $this->status = ProjectorStatus::Active;
+ }
+
+ public function isActive(): bool
+ {
+ return $this->status === ProjectorStatus::Active;
+ }
+
+ public function outdated(): void
+ {
+ $this->status = ProjectorStatus::Outdated;
+ }
+
+ public function isOutdated(): bool
+ {
+ return $this->status === ProjectorStatus::Outdated;
+ }
+
+ public function error(): void
+ {
+ $this->status = ProjectorStatus::Error;
+ }
+
+ public function isError(): bool
+ {
+ return $this->status === ProjectorStatus::Error;
+ }
+}
diff --git a/src/Projection/ProjectorStore/ProjectorStateCollection.php b/src/Projection/ProjectorStore/ProjectorStateCollection.php
new file mode 100644
index 00000000..b66d243f
--- /dev/null
+++ b/src/Projection/ProjectorStore/ProjectorStateCollection.php
@@ -0,0 +1,133 @@
+
+ * @psalm-immutable
+ */
+final class ProjectorStateCollection implements Countable, IteratorAggregate
+{
+ /** @var array */
+ private readonly array $projectorStates;
+
+ /**
+ * @param list $projectorStates
+ */
+ public function __construct(array $projectorStates = [])
+ {
+ $result = [];
+
+ foreach ($projectorStates as $projectorState) {
+ if (array_key_exists($projectorState->id()->toString(), $result)) {
+ throw new DuplicateProjectorId($projectorState->id());
+ }
+
+ $result[$projectorState->id()->toString()] = $projectorState;
+ }
+
+ $this->projectorStates = $result;
+ }
+
+ public function get(ProjectorId $projectorId): ProjectorState
+ {
+ if (!$this->has($projectorId)) {
+ throw new ProjectorStateNotFound($projectorId);
+ }
+
+ return $this->projectorStates[$projectorId->toString()];
+ }
+
+ public function has(ProjectorId $projectorId): bool
+ {
+ return array_key_exists($projectorId->toString(), $this->projectorStates);
+ }
+
+ public function add(ProjectorState $state): self
+ {
+ if ($this->has($state->id())) {
+ throw new DuplicateProjectorId($state->id());
+ }
+
+ return new self(
+ [
+ ...array_values($this->projectorStates),
+ $state,
+ ]
+ );
+ }
+
+ public function minProjectorPosition(): int
+ {
+ $min = null;
+
+ foreach ($this->projectorStates as $projectorState) {
+ if ($min !== null && $projectorState->position() >= $min) {
+ continue;
+ }
+
+ $min = $projectorState->position();
+ }
+
+ return $min ?: 0;
+ }
+
+ public function filterByProjectorStatus(ProjectorStatus $status): self
+ {
+ $projectors = array_filter(
+ $this->projectorStates,
+ static fn (ProjectorState $projectorState) => $projectorState->status() === $status
+ );
+
+ return new self(array_values($projectors));
+ }
+
+ public function filterByCriteria(ProjectorCriteria $criteria): self
+ {
+ $projectors = array_filter(
+ $this->projectorStates,
+ static function (ProjectorState $projectorState) use ($criteria): bool {
+ if ($criteria->ids !== null) {
+ foreach ($criteria->ids as $id) {
+ if ($projectorState->id()->equals($id)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ return true;
+ }
+ );
+
+ return new self(array_values($projectors));
+ }
+
+ public function count(): int
+ {
+ return count($this->projectorStates);
+ }
+
+ /**
+ * @return ArrayIterator
+ */
+ public function getIterator(): ArrayIterator
+ {
+ return new ArrayIterator(array_values($this->projectorStates));
+ }
+}
diff --git a/src/Projection/ProjectorStore/ProjectorStateNotFound.php b/src/Projection/ProjectorStore/ProjectorStateNotFound.php
new file mode 100644
index 00000000..4c3c31ae
--- /dev/null
+++ b/src/Projection/ProjectorStore/ProjectorStateNotFound.php
@@ -0,0 +1,18 @@
+toString()));
+ }
+}
diff --git a/src/Projection/ProjectorStore/ProjectorStore.php b/src/Projection/ProjectorStore/ProjectorStore.php
new file mode 100644
index 00000000..953ef099
--- /dev/null
+++ b/src/Projection/ProjectorStore/ProjectorStore.php
@@ -0,0 +1,18 @@
+projectorRepository->projectors() as $projector) {
+ $handleMethod = $this->projectorResolver->resolveHandleMethod($projector, $message);
+
+ if (!$handleMethod) {
+ continue;
+ }
+
+ $handleMethod($message);
+ }
+ }
+}
diff --git a/src/Store/DoctrineStore.php b/src/Store/DoctrineStore.php
index 443c25b0..e6726e68 100644
--- a/src/Store/DoctrineStore.php
+++ b/src/Store/DoctrineStore.php
@@ -212,7 +212,7 @@ protected static function normalizeCustomHeaders(string $customHeaders, Abstract
protected function addOutboxSchema(Schema $schema): void
{
- $table = $schema->createTable('outbox');
+ $table = $schema->createTable(self::OUTBOX_TABLE);
$table->addColumn('aggregate', Types::STRING)
->setNotnull(true);
diff --git a/src/Store/MultiTableStore.php b/src/Store/MultiTableStore.php
index 71a9fca0..f44a9641 100644
--- a/src/Store/MultiTableStore.php
+++ b/src/Store/MultiTableStore.php
@@ -21,7 +21,7 @@
use function is_string;
use function sprintf;
-final class MultiTableStore extends DoctrineStore implements PipelineStore, SchemaConfigurator
+final class MultiTableStore extends DoctrineStore implements StreamableStore, SchemaConfigurator
{
private string $metadataTableName;
diff --git a/src/Store/PipelineStore.php b/src/Store/PipelineStore.php
index 345bbe1a..c2f9c95e 100644
--- a/src/Store/PipelineStore.php
+++ b/src/Store/PipelineStore.php
@@ -7,6 +7,7 @@
use Generator;
use Patchlevel\EventSourcing\EventBus\Message;
+/** @deprecated use StreamableStore */
interface PipelineStore extends Store
{
/**
diff --git a/src/Store/SingleTableStore.php b/src/Store/SingleTableStore.php
index af30da89..ffb2d0a3 100644
--- a/src/Store/SingleTableStore.php
+++ b/src/Store/SingleTableStore.php
@@ -20,7 +20,7 @@
use function is_string;
use function sprintf;
-final class SingleTableStore extends DoctrineStore implements PipelineStore, SchemaConfigurator
+final class SingleTableStore extends DoctrineStore implements StreamableStore, SchemaConfigurator
{
private string $storeTableName;
diff --git a/src/Store/StreamableStore.php b/src/Store/StreamableStore.php
new file mode 100644
index 00000000..fa096cb5
--- /dev/null
+++ b/src/Store/StreamableStore.php
@@ -0,0 +1,18 @@
+
+ */
+ public function stream(int $fromIndex = 0): Generator;
+
+ public function count(int $fromIndex = 0): int;
+}
diff --git a/tests/Benchmark/BasicImplementation/Projection/ProfileProjection.php b/tests/Benchmark/BasicImplementation/Projection/ProfileProjector.php
similarity index 74%
rename from tests/Benchmark/BasicImplementation/Projection/ProfileProjection.php
rename to tests/Benchmark/BasicImplementation/Projection/ProfileProjector.php
index 615bcd50..34ab6b75 100644
--- a/tests/Benchmark/BasicImplementation/Projection/ProfileProjection.php
+++ b/tests/Benchmark/BasicImplementation/Projection/ProfileProjector.php
@@ -9,10 +9,13 @@
use Patchlevel\EventSourcing\Attribute\Drop;
use Patchlevel\EventSourcing\Attribute\Handle;
use Patchlevel\EventSourcing\EventBus\Message;
-use Patchlevel\EventSourcing\Projection\Projection;
-use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Events\ProfileCreated;
+use Patchlevel\EventSourcing\Projection\Projector;
+use Patchlevel\EventSourcing\Projection\ProjectorId;
+use Patchlevel\EventSourcing\Tests\Benchmark\BasicImplementation\Events\ProfileCreated;
-final class ProfileProjection implements Projection
+use function assert;
+
+final class ProfileProjector implements Projector
{
private Connection $connection;
@@ -21,6 +24,11 @@ public function __construct(Connection $connection)
$this->connection = $connection;
}
+ public function projectorId(): ProjectorId
+ {
+ return new ProjectorId('profile', 1);
+ }
+
#[Create]
public function create(): void
{
@@ -38,6 +46,8 @@ public function handleProfileCreated(Message $message): void
{
$profileCreated = $message->event();
+ assert($profileCreated instanceof ProfileCreated);
+
$this->connection->executeStatement(
'INSERT INTO projection_profile (`id`, `name`) VALUES(:id, :name);',
[
diff --git a/tests/Benchmark/WriteEventsBench.php b/tests/Benchmark/WriteEventsBench.php
index c1aa1388..536e648b 100644
--- a/tests/Benchmark/WriteEventsBench.php
+++ b/tests/Benchmark/WriteEventsBench.php
@@ -20,7 +20,7 @@
use Patchlevel\EventSourcing\Tests\Benchmark\BasicImplementation\Aggregate\Profile;
use Patchlevel\EventSourcing\Tests\Benchmark\BasicImplementation\Processor\SendEmailProcessor;
use Patchlevel\EventSourcing\Tests\Benchmark\BasicImplementation\ProfileId;
-use Patchlevel\EventSourcing\Tests\Benchmark\BasicImplementation\Projection\ProfileProjection;
+use Patchlevel\EventSourcing\Tests\Benchmark\BasicImplementation\Projection\ProfileProjector;
use PhpBench\Attributes as Bench;
use function file_exists;
@@ -47,7 +47,7 @@ public function setUp(): void
'path' => self::DB_PATH,
]);
- $profileProjection = new ProfileProjection($connection);
+ $profileProjection = new ProfileProjector($connection);
$projectionRepository = new MetadataAwareProjectionHandler(
[$profileProjection]
);
diff --git a/tests/Integration/BasicImplementation/BasicIntegrationTest.php b/tests/Integration/BasicImplementation/BasicIntegrationTest.php
index 7f656e9f..751224ff 100644
--- a/tests/Integration/BasicImplementation/BasicIntegrationTest.php
+++ b/tests/Integration/BasicImplementation/BasicIntegrationTest.php
@@ -12,8 +12,8 @@
use Patchlevel\EventSourcing\EventBus\SymfonyEventBus;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AttributeAggregateRootRegistryFactory;
-use Patchlevel\EventSourcing\Projection\MetadataAwareProjectionHandler;
-use Patchlevel\EventSourcing\Projection\ProjectionListener;
+use Patchlevel\EventSourcing\Projection\DefaultProjectorRepository;
+use Patchlevel\EventSourcing\Projection\SyncProjectorListener;
use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
@@ -49,12 +49,12 @@ public function tearDown(): void
public function testSuccessful(): void
{
$profileProjection = new ProfileProjection($this->connection);
- $projectionRepository = new MetadataAwareProjectionHandler(
+ $projectorRepository = new DefaultProjectorRepository(
[$profileProjection]
);
$eventStream = new DefaultEventBus();
- $eventStream->addListener(new ProjectionListener($projectionRepository));
+ $eventStream->addListener(new SyncProjectorListener($projectorRepository));
$eventStream->addListener(new SendEmailProcessor());
$store = new SingleTableStore(
@@ -111,12 +111,12 @@ public function testSuccessful(): void
public function testWithSymfonySuccessful(): void
{
$profileProjection = new ProfileProjection($this->connection);
- $projectionRepository = new MetadataAwareProjectionHandler(
+ $projectorRepository = new DefaultProjectorRepository(
[$profileProjection]
);
$eventStream = SymfonyEventBus::create([
- new ProjectionListener($projectionRepository),
+ new SyncProjectorListener($projectorRepository),
new SendEmailProcessor(),
]);
@@ -175,12 +175,12 @@ public function testWithSymfonySuccessful(): void
public function testMultiTableSuccessful(): void
{
$profileProjection = new ProfileProjection($this->connection);
- $projectionRepository = new MetadataAwareProjectionHandler(
+ $projectorRepository = new DefaultProjectorRepository(
[$profileProjection]
);
$eventStream = new DefaultEventBus();
- $eventStream->addListener(new ProjectionListener($projectionRepository));
+ $eventStream->addListener(new SyncProjectorListener($projectorRepository));
$eventStream->addListener(new SendEmailProcessor());
$store = new MultiTableStore(
@@ -236,12 +236,12 @@ public function testMultiTableSuccessful(): void
public function testSnapshot(): void
{
$profileProjection = new ProfileProjection($this->connection);
- $projectionRepository = new MetadataAwareProjectionHandler(
+ $projectorRepository = new DefaultProjectorRepository(
[$profileProjection]
);
$eventStream = new DefaultEventBus();
- $eventStream->addListener(new ProjectionListener($projectionRepository));
+ $eventStream->addListener(new SyncProjectorListener($projectorRepository));
$eventStream->addListener(new SendEmailProcessor());
$store = new SingleTableStore(
diff --git a/tests/Integration/BasicImplementation/Projection/ProfileProjection.php b/tests/Integration/BasicImplementation/Projection/ProfileProjection.php
index b7025317..73c39b80 100644
--- a/tests/Integration/BasicImplementation/Projection/ProfileProjection.php
+++ b/tests/Integration/BasicImplementation/Projection/ProfileProjection.php
@@ -10,10 +10,13 @@
use Patchlevel\EventSourcing\Attribute\Drop;
use Patchlevel\EventSourcing\Attribute\Handle;
use Patchlevel\EventSourcing\EventBus\Message;
-use Patchlevel\EventSourcing\Projection\Projection;
+use Patchlevel\EventSourcing\Projection\Projector;
+use Patchlevel\EventSourcing\Projection\ProjectorId;
use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Events\ProfileCreated;
-final class ProfileProjection implements Projection
+use function assert;
+
+final class ProfileProjection implements Projector
{
private Connection $connection;
@@ -22,6 +25,11 @@ public function __construct(Connection $connection)
$this->connection = $connection;
}
+ public function projectorId(): ProjectorId
+ {
+ return new ProjectorId('profile', 1);
+ }
+
#[Create]
public function create(): void
{
@@ -44,6 +52,8 @@ public function handleProfileCreated(Message $message): void
{
$profileCreated = $message->event();
+ assert($profileCreated instanceof ProfileCreated);
+
$this->connection->executeStatement(
'INSERT INTO projection_profile (id, name) VALUES(:id, :name);',
[
diff --git a/tests/Integration/Projectionist/Aggregate/Profile.php b/tests/Integration/Projectionist/Aggregate/Profile.php
new file mode 100644
index 00000000..07a0dd93
--- /dev/null
+++ b/tests/Integration/Projectionist/Aggregate/Profile.php
@@ -0,0 +1,46 @@
+id->toString();
+ }
+
+ public static function create(ProfileId $id, string $name): self
+ {
+ $self = new self();
+ $self->recordThat(new ProfileCreated($id, $name));
+
+ return $self;
+ }
+
+ #[Apply(ProfileCreated::class)]
+ protected function applyProfileCreated(ProfileCreated $event): void
+ {
+ $this->id = $event->profileId;
+ $this->name = $event->name;
+ }
+
+ public function name(): string
+ {
+ return $this->name;
+ }
+}
diff --git a/tests/Integration/Projectionist/Events/ProfileCreated.php b/tests/Integration/Projectionist/Events/ProfileCreated.php
new file mode 100644
index 00000000..beb9df8a
--- /dev/null
+++ b/tests/Integration/Projectionist/Events/ProfileCreated.php
@@ -0,0 +1,21 @@
+toString();
+ }
+
+ public function denormalize(mixed $value): ?ProfileId
+ {
+ if ($value === null) {
+ return null;
+ }
+
+ if (!is_string($value)) {
+ throw new InvalidArgumentException();
+ }
+
+ return ProfileId::fromString($value);
+ }
+}
diff --git a/tests/Integration/Projectionist/ProfileId.php b/tests/Integration/Projectionist/ProfileId.php
new file mode 100644
index 00000000..65c33fd2
--- /dev/null
+++ b/tests/Integration/Projectionist/ProfileId.php
@@ -0,0 +1,25 @@
+id = $id;
+ }
+
+ public static function fromString(string $id): self
+ {
+ return new self($id);
+ }
+
+ public function toString(): string
+ {
+ return $this->id;
+ }
+}
diff --git a/tests/Integration/Projectionist/Projection/ProfileProjection.php b/tests/Integration/Projectionist/Projection/ProfileProjection.php
new file mode 100644
index 00000000..1558bb66
--- /dev/null
+++ b/tests/Integration/Projectionist/Projection/ProfileProjection.php
@@ -0,0 +1,75 @@
+connection = $connection;
+ }
+
+ #[Create]
+ public function create(): void
+ {
+ $table = new Table($this->tableName());
+ $table->addColumn('id', 'string');
+ $table->addColumn('name', 'string');
+ $table->setPrimaryKey(['id']);
+
+ $this->connection->createSchemaManager()->createTable($table);
+ }
+
+ #[Drop]
+ public function drop(): void
+ {
+ $this->connection->createSchemaManager()->dropTable($this->tableName());
+ }
+
+ #[Handle(ProfileCreated::class)]
+ public function handleProfileCreated(Message $message): void
+ {
+ $profileCreated = $message->event();
+
+ assert($profileCreated instanceof ProfileCreated);
+
+ $this->connection->executeStatement(
+ 'INSERT INTO ' . $this->tableName() . ' (id, name) VALUES(:id, :name);',
+ [
+ 'id' => $profileCreated->profileId->toString(),
+ 'name' => $profileCreated->name,
+ ]
+ );
+ }
+
+ private function tableName(): string
+ {
+ return sprintf(
+ 'projection_%s_%s',
+ $this->projectorId()->name(),
+ $this->projectorId()->version()
+ );
+ }
+
+ public function projectorId(): ProjectorId
+ {
+ return new ProjectorId('profile', 1);
+ }
+}
diff --git a/tests/Integration/Projectionist/ProjectionistTest.php b/tests/Integration/Projectionist/ProjectionistTest.php
new file mode 100644
index 00000000..9fc8333e
--- /dev/null
+++ b/tests/Integration/Projectionist/ProjectionistTest.php
@@ -0,0 +1,93 @@
+connection = DbalManager::createConnection();
+ }
+
+ public function tearDown(): void
+ {
+ $this->connection->close();
+ }
+
+ public function testSuccessful(): void
+ {
+ $store = new SingleTableStore(
+ $this->connection,
+ DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']),
+ (new AttributeAggregateRootRegistryFactory())->create([__DIR__ . '/Aggregate']),
+ 'eventstore'
+ );
+
+ $projectorStore = new DoctrineStore($this->connection);
+
+ $manager = new DefaultRepositoryManager(
+ new AggregateRootRegistry(['profile' => Profile::class]),
+ $store,
+ new DefaultEventBus(),
+ );
+
+ $repository = $manager->get(Profile::class);
+
+ $schemaDirector = new DoctrineSchemaDirector(
+ $this->connection,
+ new ChainSchemaConfigurator([
+ $store,
+ $projectorStore,
+ ])
+ );
+
+ $schemaDirector->create();
+
+ $profile = Profile::create(ProfileId::fromString('1'), 'John');
+ $repository->save($profile);
+
+ $projectionist = new DefaultProjectionist(
+ $store,
+ $projectorStore,
+ new DefaultProjectorRepository(
+ [new ProfileProjection($this->connection)]
+ ),
+ );
+
+ $projectionist->boot();
+ $projectionist->run();
+
+ $result = $this->connection->fetchAssociative('SELECT * FROM projection_profile_1 WHERE id = ?', ['1']);
+
+ self::assertIsArray($result);
+ self::assertArrayHasKey('id', $result);
+ self::assertSame('1', $result['id']);
+ self::assertSame('John', $result['name']);
+
+ $projectionist->remove();
+ }
+}
diff --git a/tests/Unit/Projection/DefaultProjectionistTest.php b/tests/Unit/Projection/DefaultProjectionistTest.php
new file mode 100644
index 00000000..aa696c89
--- /dev/null
+++ b/tests/Unit/Projection/DefaultProjectionistTest.php
@@ -0,0 +1,574 @@
+prophesize(StreamableStore::class);
+ $streamableStore->stream()->willReturn($generatorFactory())->shouldBeCalledOnce();
+
+ $projectorStore = $this->prophesize(ProjectorStore::class);
+ $projectorStore->getStateFromAllProjectors()->willReturn($projectorStateCollection)->shouldBeCalledOnce();
+
+ $projectorRepository = $this->prophesize(ProjectorRepository::class);
+ $projectorRepository->projectors()->willReturn([])->shouldBeCalledOnce();
+
+ $projectorResolver = $this->prophesize(ProjectorResolver::class);
+
+ $projectionist = new DefaultProjectionist(
+ $streamableStore->reveal(),
+ $projectorStore->reveal(),
+ $projectorRepository->reveal(),
+ $projectorResolver->reveal(),
+ );
+
+ $projectionist->boot();
+ }
+
+ public function testBootWithoutCreateMethod(): void
+ {
+ $projector = new class implements Projector {
+ public function projectorId(): ProjectorId
+ {
+ return new ProjectorId('test', 1);
+ }
+ };
+
+ $projectorStore = new DummyStore([
+ new ProjectorState($projector->projectorId()),
+ ]);
+
+ $message = new Message(new ProfileVisited(ProfileId::fromString('test')));
+
+ $generatorFactory = static function () use ($message): Generator {
+ yield $message;
+ };
+
+ $streamableStore = $this->prophesize(StreamableStore::class);
+ $streamableStore->stream()->willReturn($generatorFactory())->shouldBeCalledOnce();
+
+ $projectorRepository = $this->prophesize(ProjectorRepository::class);
+ $projectorRepository->projectors()->willReturn([$projector])->shouldBeCalledOnce();
+ $projectorRepository->findByProjectorId($projector->projectorId())->willReturn($projector)->shouldBeCalledTimes(
+ 2
+ );
+
+ $projectorResolver = $this->prophesize(ProjectorResolver::class);
+
+ $projectionist = new DefaultProjectionist(
+ $streamableStore->reveal(),
+ $projectorStore,
+ $projectorRepository->reveal(),
+ $projectorResolver->reveal(),
+ );
+
+ $projectionist->boot();
+
+ self::assertEquals([
+ new ProjectorState($projector->projectorId(), ProjectorStatus::Booting),
+ new ProjectorState($projector->projectorId(), ProjectorStatus::Booting, 1),
+ new ProjectorState($projector->projectorId(), ProjectorStatus::Active, 1),
+ ], $projectorStore->savedStates);
+ }
+
+ public function testBootWithMethods(): void
+ {
+ $projector = new class implements Projector {
+ public ?Message $message = null;
+ public bool $created = false;
+
+ public function projectorId(): ProjectorId
+ {
+ return new ProjectorId('test', 1);
+ }
+
+ public function create(): void
+ {
+ $this->created = true;
+ }
+
+ public function handle(Message $message): void
+ {
+ $this->message = $message;
+ }
+ };
+
+ $projectorStore = new DummyStore();
+
+ $message = new Message(new ProfileVisited(ProfileId::fromString('test')));
+
+ $generatorFactory = static function () use ($message): Generator {
+ yield $message;
+ };
+
+ $streamableStore = $this->prophesize(StreamableStore::class);
+ $streamableStore->stream()->willReturn($generatorFactory())->shouldBeCalledOnce();
+
+ $projectorRepository = $this->prophesize(ProjectorRepository::class);
+ $projectorRepository->projectors()->willReturn([$projector])->shouldBeCalledOnce();
+ $projectorRepository->findByProjectorId($projector->projectorId())->willReturn($projector)->shouldBeCalledTimes(
+ 2
+ );
+
+ $projectorResolver = $this->prophesize(ProjectorResolver::class);
+ $projectorResolver->resolveCreateMethod($projector)->willReturn($projector->create(...));
+ $projectorResolver->resolveHandleMethod($projector, $message)->willReturn($projector->handle(...));
+
+ $projectionist = new DefaultProjectionist(
+ $streamableStore->reveal(),
+ $projectorStore,
+ $projectorRepository->reveal(),
+ $projectorResolver->reveal(),
+ );
+
+ $projectionist->boot();
+
+ self::assertEquals([
+ new ProjectorState($projector->projectorId(), ProjectorStatus::Booting),
+ new ProjectorState($projector->projectorId(), ProjectorStatus::Booting, 1),
+ new ProjectorState($projector->projectorId(), ProjectorStatus::Active, 1),
+ ], $projectorStore->savedStates);
+
+ self::assertTrue($projector->created);
+ self::assertSame($message, $projector->message);
+ }
+
+ public function testBootWithCreateError(): void
+ {
+ $projector = new class implements Projector {
+ public ?Message $message = null;
+ public bool $created = false;
+
+ public function projectorId(): ProjectorId
+ {
+ return new ProjectorId('test', 1);
+ }
+
+ public function create(): void
+ {
+ throw new RuntimeException('ERROR');
+ }
+ };
+
+ $projectorStore = new DummyStore([
+ new ProjectorState($projector->projectorId()),
+ ]);
+
+ $message = new Message(new ProfileVisited(ProfileId::fromString('test')));
+
+ $generatorFactory = static function () use ($message): Generator {
+ yield $message;
+ };
+
+ $streamableStore = $this->prophesize(StreamableStore::class);
+ $streamableStore->stream()->willReturn($generatorFactory())->shouldBeCalledOnce();
+
+ $projectorRepository = $this->prophesize(ProjectorRepository::class);
+ $projectorRepository->projectors()->willReturn([$projector])->shouldBeCalledOnce();
+ $projectorRepository->findByProjectorId($projector->projectorId())->willReturn($projector)->shouldBeCalledTimes(
+ 1
+ );
+
+ $projectorResolver = $this->prophesize(ProjectorResolver::class);
+ $projectorResolver->resolveCreateMethod($projector)->willReturn($projector->create(...));
+
+ $projectionist = new DefaultProjectionist(
+ $streamableStore->reveal(),
+ $projectorStore,
+ $projectorRepository->reveal(),
+ $projectorResolver->reveal(),
+ );
+
+ $projectionist->boot();
+
+ self::assertEquals([
+ new ProjectorState($projector->projectorId(), ProjectorStatus::Booting),
+ new ProjectorState($projector->projectorId(), ProjectorStatus::Error),
+ ], $projectorStore->savedStates);
+ }
+
+ public function testRunning(): void
+ {
+ $projector = new class implements Projector {
+ public ?Message $message = null;
+
+ public function projectorId(): ProjectorId
+ {
+ return new ProjectorId('test', 1);
+ }
+
+ public function handle(Message $message): void
+ {
+ $this->message = $message;
+ }
+ };
+
+ $projectorStore = new DummyStore([new ProjectorState($projector->projectorId(), ProjectorStatus::Active)]);
+
+ $message = new Message(new ProfileVisited(ProfileId::fromString('test')));
+
+ $generatorFactory = static function () use ($message): Generator {
+ yield $message;
+ };
+
+ $streamableStore = $this->prophesize(StreamableStore::class);
+ $streamableStore->stream(0)->willReturn($generatorFactory())->shouldBeCalledOnce();
+
+ $projectorRepository = $this->prophesize(ProjectorRepository::class);
+ $projectorRepository->projectors()->willReturn([$projector])->shouldBeCalledOnce();
+ $projectorRepository->findByProjectorId($projector->projectorId())->willReturn($projector)->shouldBeCalledTimes(
+ 2
+ );
+
+ $projectorResolver = $this->prophesize(ProjectorResolver::class);
+ $projectorResolver->resolveHandleMethod($projector, $message)->willReturn($projector->handle(...));
+
+ $projectionist = new DefaultProjectionist(
+ $streamableStore->reveal(),
+ $projectorStore,
+ $projectorRepository->reveal(),
+ $projectorResolver->reveal(),
+ );
+
+ $projectionist->run();
+
+ self::assertEquals([
+ new ProjectorState($projector->projectorId(), ProjectorStatus::Active, 1),
+ ], $projectorStore->savedStates);
+
+ self::assertSame($message, $projector->message);
+ }
+
+ public function testRunningWithError(): void
+ {
+ $projector = new class implements Projector {
+ public function projectorId(): ProjectorId
+ {
+ return new ProjectorId('test', 1);
+ }
+
+ public function handle(Message $message): void
+ {
+ throw new RuntimeException('ERROR');
+ }
+ };
+
+ $projectorStore = new DummyStore([new ProjectorState($projector->projectorId(), ProjectorStatus::Active)]);
+
+ $message = new Message(new ProfileVisited(ProfileId::fromString('test')));
+
+ $generatorFactory = static function () use ($message): Generator {
+ yield $message;
+ };
+
+ $streamableStore = $this->prophesize(StreamableStore::class);
+ $streamableStore->stream(0)->willReturn($generatorFactory())->shouldBeCalledOnce();
+
+ $projectorRepository = $this->prophesize(ProjectorRepository::class);
+ $projectorRepository->projectors()->willReturn([$projector])->shouldBeCalledOnce();
+ $projectorRepository->findByProjectorId($projector->projectorId())->willReturn($projector)->shouldBeCalledTimes(
+ 2
+ );
+
+ $projectorResolver = $this->prophesize(ProjectorResolver::class);
+ $projectorResolver->resolveHandleMethod($projector, $message)->willReturn($projector->handle(...));
+
+ $projectionist = new DefaultProjectionist(
+ $streamableStore->reveal(),
+ $projectorStore,
+ $projectorRepository->reveal(),
+ $projectorResolver->reveal(),
+ );
+
+ $projectionist->run();
+
+ self::assertEquals([
+ new ProjectorState($projector->projectorId(), ProjectorStatus::Error, 0),
+ ], $projectorStore->savedStates);
+ }
+
+ public function testRunningMarkOutdated(): void
+ {
+ $projectorId = new ProjectorId('test', 1);
+
+ $projectorStore = new DummyStore([new ProjectorState($projectorId, ProjectorStatus::Active)]);
+
+ $generatorFactory = static function (): Generator {
+ yield from [];
+ };
+
+ $streamableStore = $this->prophesize(StreamableStore::class);
+ $streamableStore->stream(0)->willReturn($generatorFactory())->shouldBeCalledOnce();
+
+ $projectorRepository = $this->prophesize(ProjectorRepository::class);
+ $projectorRepository->projectors()->willReturn([])->shouldBeCalledOnce();
+ $projectorRepository->findByProjectorId($projectorId)->willReturn(null)->shouldBeCalledTimes(1);
+
+ $projectorResolver = $this->prophesize(ProjectorResolver::class);
+
+ $projectionist = new DefaultProjectionist(
+ $streamableStore->reveal(),
+ $projectorStore,
+ $projectorRepository->reveal(),
+ $projectorResolver->reveal(),
+ );
+
+ $projectionist->run();
+
+ self::assertEquals([
+ new ProjectorState($projectorId, ProjectorStatus::Outdated, 0),
+ ], $projectorStore->savedStates);
+ }
+
+ public function testTeardownWithProjector(): void
+ {
+ $projector = new class implements Projector {
+ public ?Message $message = null;
+ public bool $dropped = false;
+
+ public function projectorId(): ProjectorId
+ {
+ return new ProjectorId('test', 1);
+ }
+
+ public function drop(): void
+ {
+ $this->dropped = true;
+ }
+ };
+
+ $projectorStore = new DummyStore([new ProjectorState($projector->projectorId(), ProjectorStatus::Outdated)]);
+
+ $streamableStore = $this->prophesize(StreamableStore::class);
+
+ $projectorRepository = $this->prophesize(ProjectorRepository::class);
+ $projectorRepository->projectors()->willReturn([$projector])->shouldBeCalledOnce();
+ $projectorRepository->findByProjectorId($projector->projectorId())->willReturn($projector)->shouldBeCalledTimes(1);
+
+ $projectorResolver = $this->prophesize(ProjectorResolver::class);
+ $projectorResolver->resolveDropMethod($projector)->willReturn($projector->drop(...));
+
+ $projectionist = new DefaultProjectionist(
+ $streamableStore->reveal(),
+ $projectorStore,
+ $projectorRepository->reveal(),
+ $projectorResolver->reveal(),
+ );
+
+ $projectionist->teardown();
+
+ self::assertEquals([], $projectorStore->savedStates);
+ self::assertEquals([$projector->projectorId()], $projectorStore->removedIds);
+ self::assertTrue($projector->dropped);
+ }
+
+ public function testTeardownWithProjectorAndError(): void
+ {
+ $projector = new class implements Projector {
+ public ?Message $message = null;
+ public bool $dropped = false;
+
+ public function projectorId(): ProjectorId
+ {
+ return new ProjectorId('test', 1);
+ }
+
+ public function drop(): void
+ {
+ throw new RuntimeException('ERROR');
+ }
+ };
+
+ $projectorStore = new DummyStore([new ProjectorState($projector->projectorId(), ProjectorStatus::Outdated)]);
+
+ $streamableStore = $this->prophesize(StreamableStore::class);
+
+ $projectorRepository = $this->prophesize(ProjectorRepository::class);
+ $projectorRepository->projectors()->willReturn([$projector])->shouldBeCalledOnce();
+ $projectorRepository->findByProjectorId($projector->projectorId())->willReturn($projector)->shouldBeCalledTimes(1);
+
+ $projectorResolver = $this->prophesize(ProjectorResolver::class);
+ $projectorResolver->resolveDropMethod($projector)->willReturn($projector->drop(...));
+
+ $projectionist = new DefaultProjectionist(
+ $streamableStore->reveal(),
+ $projectorStore,
+ $projectorRepository->reveal(),
+ $projectorResolver->reveal(),
+ );
+
+ $projectionist->teardown();
+
+ self::assertEquals([], $projectorStore->savedStates);
+ self::assertEquals([], $projectorStore->removedIds);
+ }
+
+ public function testTeardownWithoutProjector(): void
+ {
+ $projectorId = new ProjectorId('test', 1);
+
+ $projectorStore = new DummyStore([new ProjectorState($projectorId, ProjectorStatus::Outdated)]);
+
+ $streamableStore = $this->prophesize(StreamableStore::class);
+
+ $projectorRepository = $this->prophesize(ProjectorRepository::class);
+ $projectorRepository->projectors()->willReturn([])->shouldBeCalledOnce();
+ $projectorRepository->findByProjectorId($projectorId)->willReturn(null)->shouldBeCalledTimes(1);
+
+ $projectorResolver = $this->prophesize(ProjectorResolver::class);
+
+ $projectionist = new DefaultProjectionist(
+ $streamableStore->reveal(),
+ $projectorStore,
+ $projectorRepository->reveal(),
+ $projectorResolver->reveal(),
+ );
+
+ $projectionist->teardown();
+
+ self::assertEquals([], $projectorStore->savedStates);
+ self::assertEquals([], $projectorStore->removedIds);
+ }
+
+ public function testRemoveWithProjector(): void
+ {
+ $projector = new class implements Projector {
+ public ?Message $message = null;
+ public bool $dropped = false;
+
+ public function projectorId(): ProjectorId
+ {
+ return new ProjectorId('test', 1);
+ }
+
+ public function drop(): void
+ {
+ $this->dropped = true;
+ }
+ };
+
+ $projectorStore = new DummyStore([new ProjectorState($projector->projectorId(), ProjectorStatus::Outdated)]);
+
+ $streamableStore = $this->prophesize(StreamableStore::class);
+
+ $projectorRepository = $this->prophesize(ProjectorRepository::class);
+ $projectorRepository->projectors()->willReturn([$projector])->shouldBeCalledOnce();
+ $projectorRepository->findByProjectorId($projector->projectorId())->willReturn($projector)->shouldBeCalledTimes(1);
+
+ $projectorResolver = $this->prophesize(ProjectorResolver::class);
+ $projectorResolver->resolveDropMethod($projector)->willReturn($projector->drop(...));
+
+ $projectionist = new DefaultProjectionist(
+ $streamableStore->reveal(),
+ $projectorStore,
+ $projectorRepository->reveal(),
+ $projectorResolver->reveal(),
+ );
+
+ $projectionist->remove();
+
+ self::assertEquals([], $projectorStore->savedStates);
+ self::assertEquals([$projector->projectorId()], $projectorStore->removedIds);
+ self::assertTrue($projector->dropped);
+ }
+
+ public function testRemoveWithProjectorAndError(): void
+ {
+ $projector = new class implements Projector {
+ public ?Message $message = null;
+ public bool $dropped = false;
+
+ public function projectorId(): ProjectorId
+ {
+ return new ProjectorId('test', 1);
+ }
+
+ public function drop(): void
+ {
+ throw new RuntimeException('ERROR');
+ }
+ };
+
+ $projectorStore = new DummyStore([new ProjectorState($projector->projectorId(), ProjectorStatus::Outdated)]);
+
+ $streamableStore = $this->prophesize(StreamableStore::class);
+
+ $projectorRepository = $this->prophesize(ProjectorRepository::class);
+ $projectorRepository->projectors()->willReturn([$projector])->shouldBeCalledOnce();
+ $projectorRepository->findByProjectorId($projector->projectorId())->willReturn($projector)->shouldBeCalledTimes(1);
+
+ $projectorResolver = $this->prophesize(ProjectorResolver::class);
+ $projectorResolver->resolveDropMethod($projector)->willReturn($projector->drop(...));
+
+ $projectionist = new DefaultProjectionist(
+ $streamableStore->reveal(),
+ $projectorStore,
+ $projectorRepository->reveal(),
+ $projectorResolver->reveal(),
+ );
+
+ $projectionist->remove();
+
+ self::assertEquals([], $projectorStore->savedStates);
+ self::assertEquals([$projector->projectorId()], $projectorStore->removedIds);
+ }
+
+ public function testRemoveWithoutProjector(): void
+ {
+ $projectorId = new ProjectorId('test', 1);
+
+ $projectorStore = new DummyStore([new ProjectorState($projectorId, ProjectorStatus::Outdated)]);
+
+ $streamableStore = $this->prophesize(StreamableStore::class);
+
+ $projectorRepository = $this->prophesize(ProjectorRepository::class);
+ $projectorRepository->projectors()->willReturn([])->shouldBeCalledOnce();
+ $projectorRepository->findByProjectorId($projectorId)->willReturn(null)->shouldBeCalledTimes(1);
+
+ $projectorResolver = $this->prophesize(ProjectorResolver::class);
+
+ $projectionist = new DefaultProjectionist(
+ $streamableStore->reveal(),
+ $projectorStore,
+ $projectorRepository->reveal(),
+ $projectorResolver->reveal(),
+ );
+
+ $projectionist->remove();
+
+ self::assertEquals([], $projectorStore->savedStates);
+ self::assertEquals([$projectorId], $projectorStore->removedIds);
+ }
+}
diff --git a/tests/Unit/Projection/DefaultProjectorRepositoryTest.php b/tests/Unit/Projection/DefaultProjectorRepositoryTest.php
new file mode 100644
index 00000000..a5c4b95c
--- /dev/null
+++ b/tests/Unit/Projection/DefaultProjectorRepositoryTest.php
@@ -0,0 +1,55 @@
+projectors());
+ }
+
+ public function testGetAllProjectors(): void
+ {
+ $projector = new class implements Projector {
+ public function projectorId(): ProjectorId
+ {
+ return new ProjectorId('test', 1);
+ }
+ };
+
+ $repository = new DefaultProjectorRepository([$projector]);
+
+ self::assertEquals([$projector], $repository->projectors());
+ }
+
+ public function testFindProjector(): void
+ {
+ $projector = new class implements Projector {
+ public function projectorId(): ProjectorId
+ {
+ return new ProjectorId('test', 1);
+ }
+ };
+
+ $repository = new DefaultProjectorRepository([$projector]);
+
+ self::assertSame($projector, $repository->findByProjectorId(new ProjectorId('test', 1)));
+ }
+
+ public function testProjectorNotFound(): void
+ {
+ $repository = new DefaultProjectorRepository();
+
+ self::assertNull($repository->findByProjectorId(new ProjectorId('test', 1)));
+ }
+}
diff --git a/tests/Unit/Projection/DummyStore.php b/tests/Unit/Projection/DummyStore.php
new file mode 100644
index 00000000..31b64d46
--- /dev/null
+++ b/tests/Unit/Projection/DummyStore.php
@@ -0,0 +1,62 @@
+ */
+ private array $store = [];
+ /** @var list */
+ public array $savedStates = [];
+ /** @var list */
+ public array $removedIds = [];
+
+ /**
+ * @param list $store
+ */
+ public function __construct(array $store = [])
+ {
+ foreach ($store as $state) {
+ $this->store[$state->id()->toString()] = $state;
+ }
+ }
+
+ public function getProjectorState(ProjectorId $projectorId): ProjectorState
+ {
+ if (array_key_exists($projectorId->toString(), $this->store)) {
+ return $this->store[$projectorId->toString()];
+ }
+
+ throw new ProjectorStateNotFound($projectorId);
+ }
+
+ public function getStateFromAllProjectors(): ProjectorStateCollection
+ {
+ return new ProjectorStateCollection(array_values($this->store));
+ }
+
+ public function saveProjectorState(ProjectorState ...$projectorStates): void
+ {
+ foreach ($projectorStates as $state) {
+ $this->store[$state->id()->toString()] = $state;
+ $this->savedStates[] = clone $state;
+ }
+ }
+
+ public function removeProjectorState(ProjectorId $projectorId): void
+ {
+ $this->removedIds[] = $projectorId;
+ unset($this->store[$projectorId->toString()]);
+ }
+}
diff --git a/tests/Unit/Projection/MetadataProjectorResolverTest.php b/tests/Unit/Projection/MetadataProjectorResolverTest.php
new file mode 100644
index 00000000..15fd4133
--- /dev/null
+++ b/tests/Unit/Projection/MetadataProjectorResolverTest.php
@@ -0,0 +1,133 @@
+resolveHandleMethod($projection, $message);
+
+ self::assertIsCallable($result);
+
+ $result($message);
+
+ self::assertSame($message, $projection::$handledMessage);
+ }
+
+ public function testNotResolveHandleMethod(): void
+ {
+ $projection = new class implements Projection {
+ };
+
+ $message = new Message(
+ new ProfileVisited(
+ ProfileId::fromString('1')
+ )
+ );
+
+ $resolver = new MetadataProjectorResolver();
+ $result = $resolver->resolveHandleMethod($projection, $message);
+
+ self::assertNull($result);
+ }
+
+ public function testResolveCreateMethod(): void
+ {
+ $projection = new class implements Projection {
+ public static bool $called = false;
+
+ #[Create]
+ public function method(): void
+ {
+ self::$called = true;
+ }
+ };
+
+ $resolver = new MetadataProjectorResolver();
+ $result = $resolver->resolveCreateMethod($projection);
+
+ self::assertIsCallable($result);
+
+ $result();
+
+ self::assertTrue($projection::$called);
+ }
+
+ public function testNotResolveCreateMethod(): void
+ {
+ $projection = new class implements Projection {
+ };
+
+ $resolver = new MetadataProjectorResolver();
+ $result = $resolver->resolveCreateMethod($projection);
+
+ self::assertNull($result);
+ }
+
+ public function testResolveDropMethod(): void
+ {
+ $projection = new class implements Projection {
+ public static bool $called = false;
+
+ #[Drop]
+ public function method(): void
+ {
+ self::$called = true;
+ }
+ };
+
+ $resolver = new MetadataProjectorResolver();
+ $result = $resolver->resolveDropMethod($projection);
+
+ self::assertIsCallable($result);
+
+ $result();
+
+ self::assertTrue($projection::$called);
+ }
+
+ public function testNotResolveDropMethod(): void
+ {
+ $projection = new class implements Projection {
+ };
+
+ $resolver = new MetadataProjectorResolver();
+ $result = $resolver->resolveDropMethod($projection);
+
+ self::assertNull($result);
+ }
+}
diff --git a/tests/Unit/Projection/ProjectorCriteriaTest.php b/tests/Unit/Projection/ProjectorCriteriaTest.php
new file mode 100644
index 00000000..48e7e05d
--- /dev/null
+++ b/tests/Unit/Projection/ProjectorCriteriaTest.php
@@ -0,0 +1,24 @@
+ids);
+ }
+}
diff --git a/tests/Unit/Projection/ProjectorIdTest.php b/tests/Unit/Projection/ProjectorIdTest.php
new file mode 100644
index 00000000..3609b933
--- /dev/null
+++ b/tests/Unit/Projection/ProjectorIdTest.php
@@ -0,0 +1,51 @@
+name());
+ self::assertSame(1, $projectorId->version());
+ self::assertSame('test-1', $projectorId->toString());
+ }
+
+ public function testEquals(): void
+ {
+ $a = new ProjectorId(
+ 'test',
+ 1
+ );
+
+ $b = new ProjectorId(
+ 'test',
+ 1
+ );
+
+ $c = new ProjectorId(
+ 'foo',
+ 1
+ );
+
+ $d = new ProjectorId(
+ 'test',
+ 2
+ );
+
+ self::assertTrue($a->equals($b));
+ self::assertFalse($a->equals($c));
+ self::assertFalse($a->equals($d));
+ }
+}
diff --git a/tests/Unit/Projection/ProjectorStore/InMemoryStoreTest.php b/tests/Unit/Projection/ProjectorStore/InMemoryStoreTest.php
new file mode 100644
index 00000000..f6320e59
--- /dev/null
+++ b/tests/Unit/Projection/ProjectorStore/InMemoryStoreTest.php
@@ -0,0 +1,65 @@
+saveProjectorState($state);
+
+ self::assertEquals($state, $store->getProjectorState($id));
+
+ $collection = $store->getStateFromAllProjectors();
+
+ self::assertTrue($collection->has($id));
+ }
+
+ public function testNotFound(): void
+ {
+ $this->expectException(ProjectorStateNotFound::class);
+
+ $store = new InMemoryStore();
+ $store->getProjectorState(new ProjectorId('test', 1));
+ }
+
+ public function testRemove(): void
+ {
+ $store = new InMemoryStore();
+
+ $id = new ProjectorId('test', 1);
+
+ $state = new ProjectorState(
+ $id
+ );
+
+ $store->saveProjectorState($state);
+
+ $collection = $store->getStateFromAllProjectors();
+
+ self::assertTrue($collection->has($id));
+
+ $store->removeProjectorState($id);
+
+ $collection = $store->getStateFromAllProjectors();
+
+ self::assertFalse($collection->has($id));
+ }
+}
diff --git a/tests/Unit/Projection/ProjectorStore/ProjectorStateCollectionTest.php b/tests/Unit/Projection/ProjectorStore/ProjectorStateCollectionTest.php
new file mode 100644
index 00000000..bac736b8
--- /dev/null
+++ b/tests/Unit/Projection/ProjectorStore/ProjectorStateCollectionTest.php
@@ -0,0 +1,192 @@
+has($id));
+ self::assertSame($state, $collection->get($id));
+ self::assertSame(1, $collection->count());
+ }
+
+ public function testCreateWithDuplicatedId(): void
+ {
+ $this->expectException(DuplicateProjectorId::class);
+
+ $id = new ProjectorId('test', 1);
+
+ new ProjectorStateCollection([
+ new ProjectorState(
+ $id
+ ),
+ new ProjectorState(
+ $id
+ ),
+ ]);
+ }
+
+ public function testNotFound(): void
+ {
+ $this->expectException(ProjectorStateNotFound::class);
+
+ $collection = new ProjectorStateCollection();
+ /** @psalm-suppress UnusedMethodCall */
+ $collection->get(new ProjectorId('test', 1));
+ }
+
+ public function testAdd(): void
+ {
+ $id = new ProjectorId('test', 1);
+
+ $state = new ProjectorState(
+ $id
+ );
+
+ $collection = new ProjectorStateCollection();
+ $newCollection = $collection->add($state);
+
+ self::assertNotSame($collection, $newCollection);
+ self::assertTrue($newCollection->has($id));
+ self::assertSame($state, $newCollection->get($id));
+ }
+
+ public function testAddWithDuplicatedId(): void
+ {
+ $this->expectException(DuplicateProjectorId::class);
+
+ $id = new ProjectorId('test', 1);
+
+ /** @psalm-suppress UnusedMethodCall */
+ (new ProjectorStateCollection())
+ ->add(new ProjectorState($id))
+ ->add(new ProjectorState($id));
+ }
+
+ public function testMinProjectorPosition(): void
+ {
+ $collection = new ProjectorStateCollection([
+ new ProjectorState(
+ new ProjectorId('foo', 1),
+ ProjectorStatus::Active,
+ 10
+ ),
+ new ProjectorState(
+ new ProjectorId('bar', 1),
+ ProjectorStatus::Active,
+ 5
+ ),
+ new ProjectorState(
+ new ProjectorId('baz', 1),
+ ProjectorStatus::Active,
+ 15
+ ),
+ ]);
+
+ self::assertSame(5, $collection->minProjectorPosition());
+ }
+
+ public function testMinProjectorPositionWithEmptyCollection(): void
+ {
+ $collection = new ProjectorStateCollection();
+
+ self::assertSame(0, $collection->minProjectorPosition());
+ }
+
+ public function testFilterByProjectStatus(): void
+ {
+ $fooId = new ProjectorId('foo', 1);
+ $barId = new ProjectorId('bar', 1);
+
+ $collection = new ProjectorStateCollection([
+ new ProjectorState(
+ $fooId,
+ ProjectorStatus::Booting,
+ ),
+ new ProjectorState(
+ $barId,
+ ProjectorStatus::Active,
+ ),
+ ]);
+
+ $newCollection = $collection->filterByProjectorStatus(ProjectorStatus::Active);
+
+ self::assertNotSame($collection, $newCollection);
+ self::assertFalse($newCollection->has($fooId));
+ self::assertTrue($newCollection->has($barId));
+ self::assertSame(1, $newCollection->count());
+ }
+
+ public function testFilterByCriteriaEmpty(): void
+ {
+ $fooId = new ProjectorId('foo', 1);
+ $barId = new ProjectorId('bar', 1);
+
+ $collection = new ProjectorStateCollection([
+ new ProjectorState(
+ $fooId,
+ ProjectorStatus::Booting,
+ ),
+ new ProjectorState(
+ $barId,
+ ProjectorStatus::Active,
+ ),
+ ]);
+
+ $criteria = new ProjectorCriteria();
+
+ $newCollection = $collection->filterByCriteria($criteria);
+
+ self::assertNotSame($collection, $newCollection);
+ self::assertTrue($newCollection->has($fooId));
+ self::assertTrue($newCollection->has($barId));
+ self::assertSame(2, $newCollection->count());
+ }
+
+ public function testFilterByCriteriaWithIds(): void
+ {
+ $fooId = new ProjectorId('foo', 1);
+ $barId = new ProjectorId('bar', 1);
+
+ $collection = new ProjectorStateCollection([
+ new ProjectorState(
+ $fooId,
+ ProjectorStatus::Booting,
+ ),
+ new ProjectorState(
+ $barId,
+ ProjectorStatus::Active,
+ ),
+ ]);
+
+ $criteria = new ProjectorCriteria([$fooId]);
+
+ $newCollection = $collection->filterByCriteria($criteria);
+
+ self::assertNotSame($collection, $newCollection);
+ self::assertTrue($newCollection->has($fooId));
+ self::assertFalse($newCollection->has($barId));
+ self::assertSame(1, $newCollection->count());
+ }
+}
diff --git a/tests/Unit/Projection/ProjectorStore/ProjectorStateTest.php b/tests/Unit/Projection/ProjectorStore/ProjectorStateTest.php
new file mode 100644
index 00000000..2ac5b9f1
--- /dev/null
+++ b/tests/Unit/Projection/ProjectorStore/ProjectorStateTest.php
@@ -0,0 +1,107 @@
+id());
+ self::assertEquals(ProjectorStatus::New, $state->status());
+ self::assertEquals(0, $state->position());
+ self::assertTrue($state->isNew());
+ self::assertFalse($state->isBooting());
+ self::assertFalse($state->isActive());
+ self::assertFalse($state->isError());
+ self::assertFalse($state->isOutdated());
+ }
+
+ public function testBooting(): void
+ {
+ $state = new ProjectorState(
+ new ProjectorId('test', 1)
+ );
+
+ $state->booting();
+
+ self::assertEquals(ProjectorStatus::Booting, $state->status());
+ self::assertFalse($state->isNew());
+ self::assertTrue($state->isBooting());
+ self::assertFalse($state->isActive());
+ self::assertFalse($state->isError());
+ self::assertFalse($state->isOutdated());
+ }
+
+ public function testActive(): void
+ {
+ $state = new ProjectorState(
+ new ProjectorId('test', 1)
+ );
+
+ $state->active();
+
+ self::assertEquals(ProjectorStatus::Active, $state->status());
+ self::assertFalse($state->isNew());
+ self::assertFalse($state->isBooting());
+ self::assertTrue($state->isActive());
+ self::assertFalse($state->isError());
+ self::assertFalse($state->isOutdated());
+ }
+
+ public function testError(): void
+ {
+ $state = new ProjectorState(
+ new ProjectorId('test', 1)
+ );
+
+ $state->error();
+
+ self::assertEquals(ProjectorStatus::Error, $state->status());
+ self::assertFalse($state->isNew());
+ self::assertFalse($state->isBooting());
+ self::assertFalse($state->isActive());
+ self::assertTrue($state->isError());
+ self::assertFalse($state->isOutdated());
+ }
+
+ public function testOutdated(): void
+ {
+ $state = new ProjectorState(
+ new ProjectorId('test', 1)
+ );
+
+ $state->outdated();
+
+ self::assertEquals(ProjectorStatus::Outdated, $state->status());
+ self::assertFalse($state->isNew());
+ self::assertFalse($state->isBooting());
+ self::assertFalse($state->isActive());
+ self::assertFalse($state->isError());
+ self::assertTrue($state->isOutdated());
+ }
+
+ public function testIncrementPosition(): void
+ {
+ $state = new ProjectorState(
+ new ProjectorId('test', 1)
+ );
+
+ $state->incrementPosition();
+
+ self::assertEquals(1, $state->position());
+ }
+}
diff --git a/tests/Unit/Projection/SyncProjectorListenerTest.php b/tests/Unit/Projection/SyncProjectorListenerTest.php
new file mode 100644
index 00000000..6931a5b5
--- /dev/null
+++ b/tests/Unit/Projection/SyncProjectorListenerTest.php
@@ -0,0 +1,101 @@
+message = $message;
+ }
+ };
+
+ $message = new Message(
+ new ProfileCreated(
+ ProfileId::fromString('1'),
+ Email::fromString('foo@bar.com')
+ )
+ );
+
+ $projectorRepository = $this->prophesize(ProjectorRepository::class);
+ $projectorRepository->projectors()->willReturn([$projector])->shouldBeCalledOnce();
+
+ $resolver = $this->prophesize(ProjectorResolver::class);
+ $resolver->resolveHandleMethod($projector, $message)->willReturn($projector->handleProfileCreated(...))->shouldBeCalledOnce();
+
+ $projectionListener = new SyncProjectorListener(
+ $projectorRepository->reveal(),
+ $resolver->reveal()
+ );
+
+ $projectionListener($message);
+
+ self::assertSame($message, $projector->message);
+ }
+
+ public function testNoMethod(): void
+ {
+ $projector = new class implements Projector {
+ public ?Message $message = null;
+
+ public function projectorId(): ProjectorId
+ {
+ return new ProjectorId('test', 1);
+ }
+
+ public function handleProfileCreated(Message $message): void
+ {
+ $this->message = $message;
+ }
+ };
+
+ $message = new Message(
+ new ProfileCreated(
+ ProfileId::fromString('1'),
+ Email::fromString('foo@bar.com')
+ )
+ );
+
+ $projectorRepository = $this->prophesize(ProjectorRepository::class);
+ $projectorRepository->projectors()->willReturn([$projector])->shouldBeCalledOnce();
+
+ $resolver = $this->prophesize(ProjectorResolver::class);
+ $resolver->resolveHandleMethod($projector, $message)->willReturn(null)->shouldBeCalledOnce();
+
+ $projectionListener = new SyncProjectorListener(
+ $projectorRepository->reveal(),
+ $resolver->reveal()
+ );
+
+ $projectionListener($message);
+
+ self::assertNull($projector->message);
+ }
+}