Skip to content

Commit

Permalink
add trigger subscription engine event bus
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Apr 22, 2024
1 parent 7642156 commit ff33b07
Show file tree
Hide file tree
Showing 10 changed files with 421 additions and 240 deletions.
9 changes: 2 additions & 7 deletions docs/pages/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
use Patchlevel\EventSourcing\Store\DoctrineDbalStore;
use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\RunSubscriptionEngineEventBus;
use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository;

Expand Down Expand Up @@ -311,6 +312,7 @@ $engine = new DefaultSubscriptionEngine(
$repositoryManager = new DefaultRepositoryManager(
$aggregateRegistry,
$eventStore,
new RunSubscriptionEngineEventBus($engine),
);

$hotelRepository = $repositoryManager->get(Hotel::class);
Expand Down Expand Up @@ -375,15 +377,8 @@ $hotel2 = $hotelRepository->load(Uuid::fromString('d0d0d0d0-d0d0-d0d0-d0d0-d0d0d
$hotel2->checkIn('David');
$hotelRepository->save($hotel2);

/** @var SubscriptionEngine $engine */
$engine->run();

$hotels = $hotelProjection->getHotels();
```
!!! warning

You need to run the subscription engine to update the projections and execute the processors.

!!! note

You can also use other forms of IDs such as uuid version 6 or a custom format.
Expand Down
28 changes: 28 additions & 0 deletions src/EventBus/ChainEventBus.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\EventBus;

use Patchlevel\EventSourcing\Message\Message;

use function array_values;

final class ChainEventBus implements EventBus
{
/** @var list<EventBus> */
private readonly array $eventBus;

public function __construct(
EventBus ...$eventBus,
) {
$this->eventBus = array_values($eventBus);
}

public function dispatch(Message ...$messages): void
{
foreach ($this->eventBus as $eventBus) {
$eventBus->dispatch(...$messages);
}
}
}
15 changes: 15 additions & 0 deletions src/Subscription/Engine/AlreadyProcessing.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\Engine;

use RuntimeException;

final class AlreadyProcessing extends RuntimeException
{
public function __construct()
{
parent::__construct('Subscription engine is already processing');
}
}
Loading

0 comments on commit ff33b07

Please sign in to comment.