Skip to content

Commit

Permalink
add trigger subscription engine event bus
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Apr 22, 2024
1 parent 7642156 commit d7eccdb
Show file tree
Hide file tree
Showing 13 changed files with 488 additions and 243 deletions.
1 change: 1 addition & 0 deletions deptrac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ deptrac:
- Aggregate
- Attribute
- Clock
- EventBus
- Message
- MetadataSubscriber
- Schema
Expand Down
39 changes: 39 additions & 0 deletions docs/pages/event_bus.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,31 @@ final class WelcomeSubscriber
}
}
```
## Run Subscription Engine Event Bus

If you want that the subscriptions run after the events are stored in the store,
you can use the `RunSubscriptionEngineEventBus`.
This means that a worker to run the subscriptions are not needed.

```php
use Patchlevel\EventSourcing\Subscription\Engine\RunSubscriptionEngineEventBus;

/** @var SubscriptionEngine $subscriptionEngine */
$eventBus = new RunSubscriptionEngineEventBus($subscriptionEngine);
```
!!! warning

You need to be careful with this event bus if you use the DoctrineStore.
The subsription engine do not support rollbacks.

!!! note

More about the subscription engine can be found [here](subscription.md).

!!! tip

You can perfectly use it in development or testing.

## Psr-14 Event Bus

You can also use a [psr-14](https://www.php-fig.org/psr/psr-14/) compatible event bus.
Expand All @@ -136,6 +161,20 @@ $eventBus = new Psr14EventBus($psr14EventDispatcher);

You can't use the `Subscribe` attribute with the psr-14 event bus.

## Chain Event Bus

You can also use a chain event bus to dispatch events to multiple event buses.

```php
use Patchlevel\EventSourcing\EventBus\ChainEventBus;
use Patchlevel\EventSourcing\EventBus\EventBus;

/**
* @var EventBus $eventBus1
* @var EventBus $eventBus2
*/
$eventBus = new ChainEventBus($eventBus1, $eventBus2);
```
## Learn more

* [How to use messages](message.md)
Expand Down
10 changes: 2 additions & 8 deletions docs/pages/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\RunSubscriptionEngineEventBus;
use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository;

Expand Down Expand Up @@ -311,6 +312,7 @@ $engine = new DefaultSubscriptionEngine(
$repositoryManager = new DefaultRepositoryManager(
$aggregateRegistry,
$eventStore,
new RunSubscriptionEngineEventBus($engine),
);

$hotelRepository = $repositoryManager->get(Hotel::class);
Expand Down Expand Up @@ -361,7 +363,6 @@ We are now ready to use the Event Sourcing System. We can load, change and save
```php
use Patchlevel\EventSourcing\Aggregate\Uuid;
use Patchlevel\EventSourcing\Repository\Repository;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;

$hotel1 = Hotel::create(Uuid::generate(), 'HOTEL');
$hotel1->checkIn('David');
Expand All @@ -375,15 +376,8 @@ $hotel2 = $hotelRepository->load(Uuid::fromString('d0d0d0d0-d0d0-d0d0-d0d0-d0d0d
$hotel2->checkIn('David');
$hotelRepository->save($hotel2);

/** @var SubscriptionEngine $engine */
$engine->run();

$hotels = $hotelProjection->getHotels();
```
!!! warning

You need to run the subscription engine to update the projections and execute the processors.

!!! note

You can also use other forms of IDs such as uuid version 6 or a custom format.
Expand Down
29 changes: 27 additions & 2 deletions docs/pages/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ $catchupSubscriptionEngine = new CatchUpSubscriptionEngine($subscriptionEngine);

You can use the `CatchUpSubscriptionEngine` in your tests to process the events immediately.

### Throw by error Subscription Engine
### Throw on error Subscription Engine

This is another decorator for the subscription engine. It throws an exception if a subscription is in error state.
This is useful for testing or development to get directly feedback if something is wrong.
Expand All @@ -695,13 +695,38 @@ use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\ThrowOnErrorSubscriptionEngine;

/** @var SubscriptionEngine $subscriptionStore */
$throwByErrorSubscriptionEngine = new ThrowOnErrorSubscriptionEngine($subscriptionEngine);
$throwOnErrorSubscriptionEngine = new ThrowOnErrorSubscriptionEngine($subscriptionEngine);
```
!!! warning

This is only for testing or development. Don't use it in production.
The subscription engine has an build in retry strategy to retry subscriptions that have failed.

### Run Subscription Engine after save

You can trigger the subscription engine after the events are stored in the store.
This means that a worker to run the subscriptions are not needed.

```php
use Patchlevel\EventSourcing\Subscription\Engine\RunSubscriptionEngineEventBus;

/** @var SubscriptionEngine $subscriptionEngine */
$eventBus = new RunSubscriptionEngineEventBus($subscriptionEngine);
```
!!! warning

You need to be careful with this event bus if you use the DoctrineStore.
The subsription engine do not support rollbacks.

!!! note

More about repository can be found [here](./repository.md).
And more about event bus can be found [here](./event_bus.md).

!!! tip

You can perfectly use it in development or testing.

## Usage

The Subscription Engine has a few methods needed to use it effectively.
Expand Down
28 changes: 28 additions & 0 deletions src/EventBus/ChainEventBus.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\EventBus;

use Patchlevel\EventSourcing\Message\Message;

use function array_values;

final class ChainEventBus implements EventBus
{
/** @var list<EventBus> */
private readonly array $eventBus;

public function __construct(
EventBus ...$eventBus,
) {
$this->eventBus = array_values($eventBus);
}

public function dispatch(Message ...$messages): void
{
foreach ($this->eventBus as $eventBus) {
$eventBus->dispatch(...$messages);
}
}
}
15 changes: 15 additions & 0 deletions src/Subscription/Engine/AlreadyProcessing.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\Engine;

use RuntimeException;

final class AlreadyProcessing extends RuntimeException
{
public function __construct()
{
parent::__construct('Subscription engine is already processing');
}
}
Loading

0 comments on commit d7eccdb

Please sign in to comment.