A lightweight but also all-inclusive event sourcing library with a focus on developer experience.
- Everything is included in the package for event sourcing
- Based on doctrine dbal and their ecosystem
- Developer experience oriented and fully typed
- Snapshots system to quickly rebuild the aggregates
- Pipeline to build new projections or to migrate events
- Scheme management and doctrine migration support
- Dev tools such as a realtime event watcher
- Built in cli commands with symfony
composer require patchlevel/event-sourcing
In our little getting started example, we manage hotels. We keep the example small, so we can only create hotels and let guests check in and check out.
First we define the events that happen in our system.
A hotel can be created with a name
:
use Patchlevel\EventSourcing\Aggregate\AggregateChanged;
final class HotelCreated extends AggregateChanged
{
public static function raise(string $id, string $hotelName): static
{
return new static($id, ['hotelId' => $id, 'hotelName' => $hotelName]);
}
public function hotelId(): string
{
return $this->aggregateId;
}
public function hotelName(): string
{
return $this->payload['hotelName'];
}
}
A guest can check in by name:
use Patchlevel\EventSourcing\Aggregate\AggregateChanged;
final class GuestIsCheckedIn extends AggregateChanged
{
public static function raise(string $id, string $guestName): static
{
return new static($id, ['guestName' => $guestName]);
}
public function guestName(): string
{
return $this->payload['guestName'];
}
}
And also check out again:
use Patchlevel\EventSourcing\Aggregate\AggregateChanged;
final class GuestIsCheckedOut extends AggregateChanged
{
public static function raise(string $id, string $guestName): static
{
return new static($id, ['guestName' => $guestName]);
}
public function guestName(): string
{
return $this->payload['guestName'];
}
}
Next we need to define the aggregate. So the hotel and how the hotel should behave.
We have also defined the create
, checkIn
and checkOut
methods accordingly.
These events are thrown here and the state of the hotel is also changed.
use Patchlevel\EventSourcing\Aggregate\AggregateChanged;
use Patchlevel\EventSourcing\Aggregate\AggregateRoot;
final class Hotel extends AggregateRoot
{
private string $id;
private string $name;
/**
* @var list<string>
*/
private array $guests;
public function name(): string
{
return $this->name;
}
public function guests(): int
{
return $this->guests;
}
public static function create(string $id, string $hotelName): static
{
$self = new static();
$self->record(HotelCreated::raise($id, $hotelName));
return $self;
}
public function checkIn(string $guestName): void
{
if (in_array($guestName, $this->guests, true)) {
throw new GuestHasAlreadyCheckedIn($guestName);
}
$this->record(GuestIsCheckedIn::raise($this->id, $guestName));
}
public function checkOut(string $guestName): void
{
if (!in_array($guestName, $this->guests, true)) {
throw new IsNotAGuest($guestName);
}
$this->record(GuestIsCheckedOut::raise($this->id, $guestName));
}
protected function apply(AggregateChanged $event): void
{
if ($event instanceof HotelCreated) {
$this->id = $event->hotelId();
$this->name = $event->hotelName();
$this->guests = [];
return;
}
if ($event instanceof GuestIsCheckedIn) {
$this->guests[] = $event->guestName();
return;
}
if ($event instanceof GuestIsCheckedOut) {
$this->guests = array_values(
array_filter(
$this->guests,
fn ($name) => $name !== $event->guestName();
)
);
return;
}
}
public function aggregateRootId(): string
{
return $this->id;
}
}
π You can find out more about aggregates and events here.
So that we can see all the hotels on our website and also see how many guests are currently visiting the hotels, we need a projection for it.
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Projection\Projection;
final class HotelProjection implements Projection
{
private Connection $db;
public function __construct(Connection $db)
{
$this->db = $db;
}
public static function getHandledMessages(): iterable
{
yield HotelCreated::class => 'applyHotelCreated';
yield GuestIsCheckedIn::class => 'applyGuestIsCheckedIn';
yield GuestIsCheckedOut::class => 'applyGuestIsCheckedOut';
}
public function applyHotelCreated(HotelCreated $event): void
{
$this->db->insert(
'hotel',
[
'id' => $event->hotelId(),
'name' => $event->hotelName(),
'guests' => 0
]
);
}
public function applyGuestIsCheckedIn(GuestIsCheckedIn $event): void
{
$this->db->executeStatement(
'UPDATE hotel SET guests = guests + 1 WHERE id = ?;',
[$event->aggregateId()]
);
}
public function applyGuestIsCheckedOut(GuestIsCheckedOut $event): void
{
$this->db->executeStatement(
'UPDATE hotel SET guests = guests - 1 WHERE id = ?;',
[$event->aggregateId()]
);
}
public function create(): void
{
$this->db->executeStatement('CREATE TABLE IF NOT EXISTS hotel (id VARCHAR PRIMARY KEY, name VARCHAR, guests INTEGER);');
}
public function drop(): void
{
$this->db->executeStatement('DROP TABLE IF EXISTS hotel;');
}
}
π You can find out more about projections here.
In our example we also want to send an email to the head office as soon as a guest is checked in.
use Patchlevel\EventSourcing\Aggregate\AggregateChanged;
use Patchlevel\EventSourcing\EventBus\Listener;
final class SendCheckInEmailListener implements Listener
{
private Mailer $mailer;
private function __construct(Mailer $mailer)
{
$this->mailer = $mailer;
}
public function __invoke(AggregateChanged $event): void
{
if (!$event instanceof GuestIsCheckedIn) {
return;
}
$this->mailer->send(
'hq@patchlevel.de',
'Guest is checked in',
sprintf('A new guest named "%s" is checked in', $event->guestName())
);
}
}
π You can find out more about processor here.
After we have defined everything, we still have to plug the whole thing together:
use Doctrine\DBAL\DriverManager;
use Patchlevel\EventSourcing\EventBus\DefaultEventBus;
use Patchlevel\EventSourcing\Projection\DefaultProjectionRepository;
use Patchlevel\EventSourcing\Projection\ProjectionListener;
use Patchlevel\EventSourcing\Repository\DefaultRepository;
use Patchlevel\EventSourcing\Store\SingleTableStore;
$connection = DriverManager::getConnection([
'url' => 'mysql://user:secret@localhost/app'
]);
$mailer = /* your own mailer */;
$hotelProjection = new HotelProjection($connection);
$projectionRepository = new DefaultProjectionRepository(
[$hotelProjection]
);
$eventBus = new DefaultEventBus();
$eventBus->addListener(new ProjectionListener($projectionRepository));
$eventBus->addListener(new SendCheckInEmailListener($mailer));
$store = new SingleTableStore(
$connection,
[Hotel::class => 'hotel'],
'eventstore'
);
$hotelRepository = new DefaultRepository($store, $eventBus, Hotel::class);
π You can find out more about stores here.
So that we can actually write the data to a database, we need the associated schema and databases.
use Patchlevel\EventSourcing\Schema\DoctrineSchemaManager;
(new DoctrineSchemaManager())->create($store);
$hotelProjection->create();
π you can use the predefined cli commands for this.
We are now ready to use the Event Sourcing System. We can load, change and save aggregates.
$hotel = Hotel::create('1', 'HOTEL');
$hotel->checkIn('David');
$hotel->checkIn('Daniel');
$hotel->checkOut('David');
$hotelRepository->save($hotel);
$hotel2 = $hotelRepository->load('2');
$hotel2->checkIn('David');
$hotelRepository->save($hotel2);
π An aggregateId can be an uuid, you can find more about this here.
Consult the documentation or FAQ for more information. If you still have questions, feel free to create an issue for it :)