Skip to content

Commit

Permalink
Merge pull request #514 from patchlevel/trace
Browse files Browse the repository at this point in the history
projector accessor & experimental trace feature
  • Loading branch information
DavidBadura authored Mar 8, 2024
2 parents 486ab70 + bb6fcc9 commit 955e18c
Show file tree
Hide file tree
Showing 27 changed files with 1,052 additions and 143 deletions.
11 changes: 9 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,15 @@ psalm-baseline: vendor
vendor/bin/psalm --update-baseline --set-baseline=baseline.xml

.PHONY: phpunit
phpunit: vendor ## run phpunit tests
XDEBUG_MODE=coverage vendor/bin/phpunit
phpunit: vendor phpunit-unit phpunit-integration ## run phpunit tests

.PHONY: phpunit-integration
phpunit-integration: vendor ## run phpunit integration tests
vendor/bin/phpunit --testsuite=integration

.PHONY: phpunit-unit
phpunit-unit: vendor ## run phpunit unit tests
XDEBUG_MODE=coverage vendor/bin/phpunit --testsuite=unit

.PHONY: infection
infection: vendor ## run infection
Expand Down
6 changes: 5 additions & 1 deletion baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,16 @@
<code><![CDATA[$projectionError->errorContext]]></code>
</PossiblyNullPropertyFetch>
</file>
<file src="src/Projection/Projectionist/DefaultProjectionist.php">
<file src="src/Projection/Projector/MetadataProjectorAccessor.php">
<MixedMethodCall>
<code><![CDATA[$method]]></code>
<code><![CDATA[$method]]></code>
<code><![CDATA[$method]]></code>
</MixedMethodCall>
<MixedReturnTypeCoercion>
<code><![CDATA[$this->projector->$method(...)]]></code>
<code><![CDATA[Closure(Message):void]]></code>
</MixedReturnTypeCoercion>
</file>
<file src="src/Repository/DefaultRepository.php">
<PropertyTypeCoercion>
Expand Down
3 changes: 2 additions & 1 deletion docs/pages/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ use Doctrine\DBAL\DriverManager;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\Projection\Projection\Store\DoctrineStore;
use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist;
use Patchlevel\EventSourcing\Projection\Projector\MetadataProjectorAccessorRepository;
use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
Expand All @@ -306,7 +307,7 @@ $eventStore = new DoctrineDbalStore(

$hotelProjector = new HotelProjector($projectionConnection);

$projectorRepository = new ProjectorRepository([
$projectorRepository = new MetadataProjectorAccessorRepository([
$hotelProjector,
]);

Expand Down
13 changes: 12 additions & 1 deletion docs/pages/projection.md
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,17 @@ $retryStrategy = new ClockBasedRetryStrategy(

You can reactivate the projection manually or remove it and rebuild it from scratch.

### Projector Accessor

The projector accessor is responsible for providing the projectors to the projectionist.
We provide a metadata projector accessor repository by default.

```php
use Patchlevel\EventSourcing\Projection\Projector\MetadataProjectorAccessorRepository;

$projectorAccessorRepository = new MetadataProjectorAccessorRepository([$projector1, $projector2, $projector3]);
```

### Projectionist

Now we can create the projectionist and plug together the necessary services.
Expand All @@ -481,7 +492,7 @@ use Patchlevel\EventSourcing\Projection\Projectionist\DefaultProjectionist;
$projectionist = new DefaultProjectionist(
$eventStore,
$projectionStore,
[$projector1, $projector2, $projector3],
$projectorAccessorRepository,
$retryStrategy,
);
```
Expand Down
103 changes: 19 additions & 84 deletions src/Projection/Projectionist/DefaultProjectionist.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,33 @@
namespace Patchlevel\EventSourcing\Projection\Projectionist;

use Closure;
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\EventBus\Message;
use Patchlevel\EventSourcing\Metadata\Projector\AttributeProjectorMetadataFactory;
use Patchlevel\EventSourcing\Metadata\Projector\ProjectorMetadata;
use Patchlevel\EventSourcing\Metadata\Projector\ProjectorMetadataFactory;
use Patchlevel\EventSourcing\Projection\Projection\Projection;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionCriteria;
use Patchlevel\EventSourcing\Projection\Projection\ProjectionStatus;
use Patchlevel\EventSourcing\Projection\Projection\RunMode;
use Patchlevel\EventSourcing\Projection\Projection\Store\LockableProjectionStore;
use Patchlevel\EventSourcing\Projection\Projection\Store\ProjectionStore;
use Patchlevel\EventSourcing\Projection\Projector\ProjectorAccessor;
use Patchlevel\EventSourcing\Projection\Projector\ProjectorAccessorRepository;
use Patchlevel\EventSourcing\Projection\RetryStrategy\ClockBasedRetryStrategy;
use Patchlevel\EventSourcing\Projection\RetryStrategy\RetryStrategy;
use Patchlevel\EventSourcing\Store\Criteria;
use Patchlevel\EventSourcing\Store\Store;
use Psr\Log\LoggerInterface;
use Throwable;

use function array_map;
use function array_merge;
use function count;
use function in_array;
use function sprintf;

final class DefaultProjectionist implements Projectionist
{
/** @var array<string, object>|null */
private array|null $projectorIndex = null;

/** @param iterable<object> $projectors */
public function __construct(
private readonly Store $messageStore,
private readonly ProjectionStore $projectionStore,
private readonly iterable $projectors,
private readonly ProjectorAccessorRepository $projectorRepository,
private readonly RetryStrategy $retryStrategy = new ClockBasedRetryStrategy(),
private readonly ProjectorMetadataFactory $metadataFactory = new AttributeProjectorMetadataFactory(),
private readonly LoggerInterface|null $logger = null,
) {
}
Expand Down Expand Up @@ -323,7 +314,7 @@ function (array $projections): void {
continue;
}

$teardownMethod = $this->resolveTeardownMethod($projector);
$teardownMethod = $projector->teardownMethod();

if (!$teardownMethod) {
$this->projectionStore->remove($projection);
Expand Down Expand Up @@ -402,7 +393,7 @@ function (array $projections): void {
continue;
}

$teardownMethod = $this->resolveTeardownMethod($projector);
$teardownMethod = $projector->teardownMethod();

if (!$teardownMethod) {
$this->projectionStore->remove($projection);
Expand Down Expand Up @@ -561,7 +552,7 @@ private function handleMessage(int $index, Message $message, Projection $project
throw ProjectorNotFound::forProjectionId($projection->id());
}

$subscribeMethods = $this->resolveSubscribeMethods($projector, $message);
$subscribeMethods = $projector->subscribeMethods($message->event()::class);

if ($subscribeMethods === []) {
$projection->changePosition($index);
Expand Down Expand Up @@ -611,19 +602,9 @@ private function handleMessage(int $index, Message $message, Projection $project
);
}

private function projector(string $projectionId): object|null
private function projector(string $projectionId): ProjectorAccessor|null
{
if ($this->projectorIndex === null) {
$this->projectorIndex = [];

foreach ($this->projectors as $projector) {
$projectorId = $this->projectorMetadata($projector)->id;

$this->projectorIndex[$projectorId] = $projector;
}
}

return $this->projectorIndex[$projectionId] ?? null;
return $this->projectorRepository->get($projectionId);
}

private function handleOutdatedProjections(ProjectionistCriteria $criteria): void
Expand Down Expand Up @@ -764,7 +745,7 @@ function (array $projections): void {
throw ProjectorNotFound::forProjectionId($projection->id());
}

$setupMethod = $this->resolveSetupMethod($projector);
$setupMethod = $projector->setupMethod();

if (!$setupMethod) {
$projection->booting();
Expand Down Expand Up @@ -810,78 +791,32 @@ private function discoverNewProjections(): void
$this->findForUpdate(
new ProjectionCriteria(),
function (array $projections): void {
foreach ($this->projectors as $projector) {
$metadata = $this->projectorMetadata($projector);

foreach ($this->projectorRepository->all() as $projector) {
foreach ($projections as $projection) {
if ($projection->id() === $metadata->id) {
if ($projection->id() === $projector->id()) {
continue 2;
}
}

$this->projectionStore->add(new Projection(
$metadata->id,
$metadata->group,
$metadata->runMode,
));
$this->projectionStore->add(
new Projection(
$projector->id(),
$projector->group(),
$projector->runMode(),
),
);

$this->logger?->info(
sprintf(
'Projectionist: New Projector "%s" was found and added to the projection store.',
$metadata->id,
$projector->id(),
),
);
}
},
);
}

private function resolveSetupMethod(object $projector): Closure|null
{
$metadata = $this->metadataFactory->metadata($projector::class);
$method = $metadata->setupMethod;

if ($method === null) {
return null;
}

return $projector->$method(...);
}

private function resolveTeardownMethod(object $projector): Closure|null
{
$metadata = $this->metadataFactory->metadata($projector::class);
$method = $metadata->teardownMethod;

if ($method === null) {
return null;
}

return $projector->$method(...);
}

/** @return iterable<Closure> */
private function resolveSubscribeMethods(object $projector, Message $message): iterable
{
$event = $message->event();
$metadata = $this->metadataFactory->metadata($projector::class);

$methods = array_merge(
$metadata->subscribeMethods[$event::class] ?? [],
$metadata->subscribeMethods[Subscribe::ALL] ?? [],
);

return array_map(
static fn (string $method) => $projector->$method(...),
$methods,
);
}

private function projectorMetadata(object $projector): ProjectorMetadata
{
return $this->metadataFactory->metadata($projector::class);
}

private function latestIndex(): int
{
$stream = $this->messageStore->load(null, 1, null, true);
Expand Down
89 changes: 89 additions & 0 deletions src/Projection/Projector/MetadataProjectorAccessor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Projection\Projector;

use Closure;
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\EventBus\Message;
use Patchlevel\EventSourcing\Metadata\Projector\ProjectorMetadata;
use Patchlevel\EventSourcing\Projection\Projection\RunMode;

use function array_key_exists;
use function array_map;
use function array_merge;

final class MetadataProjectorAccessor implements ProjectorAccessor
{
/** @var array<class-string, list<Closure(Message):void>> */
private array $subscribeCache = [];

public function __construct(
private readonly object $projector,
private readonly ProjectorMetadata $metadata,
) {
}

public function id(): string
{
return $this->metadata->id;
}

public function group(): string
{
return $this->metadata->group;
}

public function runMode(): RunMode
{
return $this->metadata->runMode;
}

public function setupMethod(): Closure|null
{
$method = $this->metadata->setupMethod;

if ($method === null) {
return null;
}

return $this->projector->$method(...);
}

public function teardownMethod(): Closure|null
{
$method = $this->metadata->teardownMethod;

if ($method === null) {
return null;
}

return $this->projector->$method(...);
}

/**
* @param class-string $eventClass
*
* @return list<Closure(Message):void>
*/
public function subscribeMethods(string $eventClass): array
{
if (array_key_exists($eventClass, $this->subscribeCache)) {
return $this->subscribeCache[$eventClass];
}

$methods = array_merge(
$this->metadata->subscribeMethods[$eventClass] ?? [],
$this->metadata->subscribeMethods[Subscribe::ALL] ?? [],
);

$this->subscribeCache[$eventClass] = array_map(
/** @return Closure(Message):void */
fn (string $method) => $this->projector->$method(...),
$methods,
);

return $this->subscribeCache[$eventClass];
}
}
Loading

0 comments on commit 955e18c

Please sign in to comment.