Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace pipeline with subscription engine #540

Merged
merged 2 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions deptrac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ deptrac:
value: MetadataMessage
- type: layer
value: MetadataSubscriber
- name: Pipeline
collectors:
- type: directory
value: src/Pipeline/.*
- name: Repository
collectors:
- type: directory
Expand Down Expand Up @@ -117,8 +113,10 @@ deptrac:
- Attribute
- Message
Message:
- Aggregate
- MetadataMessage
- Serializer
- Store
Metadata:
MetadataAggregate:
- Aggregate
Expand All @@ -137,12 +135,6 @@ deptrac:
- Attribute
- Metadata
- Subscription
Pipeline:
- Aggregate
- EventBus
- Message
- Store
- Subscription
Subscription:
- Attribute
- Clock
Expand Down
56 changes: 28 additions & 28 deletions docs/pages/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ whether the migration worked.
In this example the event `PrivacyAdded` is removed and the event `OldVisited` is replaced by `NewVisited`:

```php
use Patchlevel\EventSourcing\Pipeline\Middleware\ExcludeEventMiddleware;
use Patchlevel\EventSourcing\Pipeline\Middleware\RecalculatePlayheadMiddleware;
use Patchlevel\EventSourcing\Pipeline\Middleware\ReplaceEventMiddleware;
use Patchlevel\EventSourcing\Message\Translator\ExcludeEventTranslator;
use Patchlevel\EventSourcing\Message\Translator\RecalculatePlayheadTranslator;
use Patchlevel\EventSourcing\Message\Translator\ReplaceEventTranslator;
use Patchlevel\EventSourcing\Pipeline\Pipeline;
use Patchlevel\EventSourcing\Pipeline\Source\StoreSource;
use Patchlevel\EventSourcing\Pipeline\Target\StoreTarget;
Expand All @@ -21,11 +21,11 @@ $pipeline = new Pipeline(
new StoreSource($oldStore),
new StoreTarget($newStore),
[
new ExcludeEventMiddleware([PrivacyAdded::class]),
new ReplaceEventMiddleware(OldVisited::class, static function (OldVisited $oldVisited) {
new ExcludeEventTranslator([PrivacyAdded::class]),
new ReplaceEventTranslator(OldVisited::class, static function (OldVisited $oldVisited) {
return new NewVisited($oldVisited->profileId());
}),
new RecalculatePlayheadMiddleware(),
new RecalculatePlayheadTranslator(),
],
);
```
Expand Down Expand Up @@ -197,9 +197,9 @@ Middelwares can be used to manipulate, delete or expand messages or events durin
With this middleware you can exclude certain events.

```php
use Patchlevel\EventSourcing\Pipeline\Middleware\ExcludeEventMiddleware;
use Patchlevel\EventSourcing\Message\Translator\ExcludeEventTranslator;

$middleware = new ExcludeEventMiddleware([EmailChanged::class]);
$middleware = new ExcludeEventTranslator([EmailChanged::class]);
```
!!! warning

Expand All @@ -210,9 +210,9 @@ $middleware = new ExcludeEventMiddleware([EmailChanged::class]);
With this middleware you can only allow certain events.

```php
use Patchlevel\EventSourcing\Pipeline\Middleware\IncludeEventMiddleware;
use Patchlevel\EventSourcing\Message\Translator\IncludeEventTranslator;

$middleware = new IncludeEventMiddleware([ProfileCreated::class]);
$middleware = new IncludeEventTranslator([ProfileCreated::class]);
```
!!! warning

Expand All @@ -226,9 +226,9 @@ This middleware expects a callback that returns either true to allow events or f

```php
use Patchlevel\EventSourcing\Aggregate\AggregateChanged;
use Patchlevel\EventSourcing\Pipeline\Middleware\FilterEventMiddleware;
use Patchlevel\EventSourcing\Message\Translator\FilterEventTranslator;

$middleware = new FilterEventMiddleware(static function (AggregateChanged $event) {
$middleware = new FilterEventTranslator(static function (AggregateChanged $event) {
if (!$event instanceof ProfileCreated) {
return true;
}
Expand All @@ -245,9 +245,9 @@ $middleware = new FilterEventMiddleware(static function (AggregateChanged $event
With this middleware you can exclude archived events.

```php
use Patchlevel\EventSourcing\Pipeline\Middleware\ExcludeArchivedEventMiddleware;
use Patchlevel\EventSourcing\Message\Translator\ExcludeArchivedEventTranslator;

$middleware = new ExcludeArchivedEventMiddleware();
$middleware = new ExcludeArchivedEventTranslator();
```
!!! warning

Expand All @@ -258,9 +258,9 @@ $middleware = new ExcludeArchivedEventMiddleware();
With this middleware you can only allow archived events.

```php
use Patchlevel\EventSourcing\Pipeline\Middleware\OnlyArchivedEventMiddleware;
use Patchlevel\EventSourcing\Message\Translator\OnlyArchivedEventTranslator;

$middleware = new OnlyArchivedEventMiddleware();
$middleware = new OnlyArchivedEventTranslator();
```
!!! warning

Expand All @@ -273,9 +273,9 @@ The first parameter you have to define is the event class that you want to repla
And as a second parameter a callback, that the old event awaits and a new event returns.

```php
use Patchlevel\EventSourcing\Pipeline\Middleware\ReplaceEventMiddleware;
use Patchlevel\EventSourcing\Message\Translator\ReplaceEventTranslator;

$middleware = new ReplaceEventMiddleware(OldVisited::class, static function (OldVisited $oldVisited) {
$middleware = new ReplaceEventTranslator(OldVisited::class, static function (OldVisited $oldVisited) {
return new NewVisited($oldVisited->profileId());
});
```
Expand All @@ -302,9 +302,9 @@ The playhead must always be in ascending order so that the data is valid.
Some middleware can break this order and the middleware `RecalculatePlayheadMiddleware` can fix this problem.

```php
use Patchlevel\EventSourcing\Pipeline\Middleware\RecalculatePlayheadMiddleware;
use Patchlevel\EventSourcing\Message\Translator\RecalculatePlayheadTranslator;

$middleware = new RecalculatePlayheadMiddleware();
$middleware = new RecalculatePlayheadTranslator();
```
!!! note

Expand All @@ -315,13 +315,13 @@ $middleware = new RecalculatePlayheadMiddleware();
If you want to group your middleware, you can use one or more `ChainMiddleware`.

```php
use Patchlevel\EventSourcing\Pipeline\Middleware\ChainMiddleware;
use Patchlevel\EventSourcing\Pipeline\Middleware\ExcludeEventMiddleware;
use Patchlevel\EventSourcing\Pipeline\Middleware\RecalculatePlayheadMiddleware;
use Patchlevel\EventSourcing\Message\Translator\ChainTranslator;
use Patchlevel\EventSourcing\Message\Translator\ExcludeEventTranslator;
use Patchlevel\EventSourcing\Message\Translator\RecalculatePlayheadTranslator;

$middleware = new ChainMiddleware([
new ExcludeEventMiddleware([EmailChanged::class]),
new RecalculatePlayheadMiddleware(),
$middleware = new ChainTranslator([
new ExcludeEventTranslator([EmailChanged::class]),
new RecalculatePlayheadTranslator(),
]);
```
### Custom middleware
Expand All @@ -341,9 +341,9 @@ which should replace the `ProfileCreated` event.

```php
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Pipeline\Middleware\Middleware;
use Patchlevel\EventSourcing\Message\Translator\Translator;

final class SplitProfileCreatedMiddleware implements Middleware
final class SplitProfileCreatedMiddleware implements Translator
{
public function __invoke(Message $message): array
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;
namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Message\Message;

use function array_values;

final class ChainMiddleware implements Middleware
final class ChainTranslator implements Translator
{
/** @param iterable<Middleware> $middlewares */
/** @param iterable<Translator> $translators */
public function __construct(
private readonly iterable $middlewares,
private readonly iterable $translators,
) {
}

Expand All @@ -21,8 +21,8 @@ public function __invoke(Message $message): array
{
$messages = [$message];

foreach ($this->middlewares as $middleware) {
$messages = $this->processMiddleware($middleware, $messages);
foreach ($this->translators as $middleware) {
$messages = $this->process($middleware, $messages);
}

return $messages;
Expand All @@ -33,12 +33,12 @@ public function __invoke(Message $message): array
*
* @return list<Message>
*/
private function processMiddleware(Middleware $middleware, array $messages): array
private function process(Translator $translator, array $messages): array
{
$result = [];

foreach ($messages as $message) {
$result += $middleware($message);
$result += $translator($message);
}

return array_values($result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;
namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Message\HeaderNotFound;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\ArchivedHeader;

final class ExcludeArchivedEventMiddleware implements Middleware
final class ExcludeArchivedEventTranslator implements Translator
{
/** @return list<Message> */
public function __invoke(Message $message): array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;
namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Message\Message;

final class ExcludeEventMiddleware implements Middleware
final class ExcludeEventTranslator implements Translator
{
/** @param list<class-string> $classes */
public function __construct(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;
namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Message\Message;

final class FilterEventMiddleware implements Middleware
final class FilterEventTranslator implements Translator
{
/** @var callable(object $event):bool */
private $callable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;
namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Message\Message;

final class IncludeEventMiddleware implements Middleware
final class IncludeEventTranslator implements Translator
{
/** @param list<class-string> $classes */
public function __construct(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;
namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Message\HeaderNotFound;
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\ArchivedHeader;

final class OnlyArchivedEventMiddleware implements Middleware
final class OnlyArchivedEventTranslator implements Translator
{
/** @return list<Message> */
public function __invoke(Message $message): array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;
namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Message\Message;

use function array_key_exists;

final class RecalculatePlayheadMiddleware implements Middleware
final class RecalculatePlayheadTranslator implements Translator
{
/** @var array<string, array<string, positive-int>> */
private array $index = [];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;
namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Message\Message;

/** @template T of object */
final class ReplaceEventMiddleware implements Middleware
final class ReplaceEventTranslator implements Translator
{
/** @var callable(T $event):object */
private $callable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;
namespace Patchlevel\EventSourcing\Message\Translator;

use Patchlevel\EventSourcing\Message\Message;

interface Middleware
interface Translator
{
/** @return list<Message> */
public function __invoke(Message $message): array;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Pipeline\Middleware;
namespace Patchlevel\EventSourcing\Message\Translator;

use DateTimeImmutable;
use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Message\Message;

final class UntilEventMiddleware implements Middleware
final class UntilEventTranslator implements Translator
{
public function __construct(
private readonly DateTimeImmutable $until,
Expand Down
Loading
Loading