Skip to content

Commit

Permalink
Merge pull request #207 from patchlevel/change-event-bus-api
Browse files Browse the repository at this point in the history
change event bus api
  • Loading branch information
DavidBadura authored Mar 28, 2022
2 parents f37ab56 + a7a4805 commit 78eb500
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 33 deletions.
6 changes: 3 additions & 3 deletions src/EventBus/DefaultEventBus.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

final class DefaultEventBus implements EventBus
{
/** @var list<Message> */
/** @var array<Message> */
private array $queue;
/** @var list<Listener> */
private array $listeners;
Expand All @@ -25,9 +25,9 @@ public function __construct(array $listeners = [])
$this->processing = false;
}

public function dispatch(Message $message): void
public function dispatch(Message ...$messages): void
{
$this->queue[] = $message;
$this->queue += $messages;

if ($this->processing) {
return;
Expand Down
2 changes: 1 addition & 1 deletion src/EventBus/EventBus.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@

interface EventBus
{
public function dispatch(Message $message): void;
public function dispatch(Message ...$messages): void;
}
10 changes: 6 additions & 4 deletions src/EventBus/SymfonyEventBus.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ public function __construct(MessageBusInterface $bus)
$this->bus = $bus;
}

public function dispatch(Message $message): void
public function dispatch(Message ...$messages): void
{
$envelope = (new Envelope($message))
->with(new DispatchAfterCurrentBusStamp());
foreach ($messages as $message) {
$envelope = (new Envelope($message))
->with(new DispatchAfterCurrentBusStamp());

$this->bus->dispatch($envelope);
$this->bus->dispatch($envelope);
}
}

/**
Expand Down
21 changes: 8 additions & 13 deletions src/Repository/DefaultRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
final class DefaultRepository implements Repository
{
private Store $store;
private EventBus $eventStream;
private EventBus $eventBus;

/** @var class-string<AggregateRoot> */
private string $aggregateClass;
Expand All @@ -28,15 +28,15 @@ final class DefaultRepository implements Repository
*/
public function __construct(
Store $store,
EventBus $eventStream,
EventBus $eventBus,
string $aggregateClass
) {
if (!is_subclass_of($aggregateClass, AggregateRoot::class)) {
throw InvalidAggregateClass::notAggregateRoot($aggregateClass);
}

$this->store = $store;
$this->eventStream = $eventStream;
$this->eventBus = $eventBus;
$this->aggregateClass = $aggregateClass;
}

Expand Down Expand Up @@ -66,22 +66,17 @@ public function has(string $id): bool

public function save(AggregateRoot $aggregate): void
{
$class = $aggregate::class;

if (!$aggregate instanceof $this->aggregateClass) {
throw new WrongAggregate($class, $this->aggregateClass);
throw new WrongAggregate($aggregate::class, $this->aggregateClass);
}

$messageStream = $aggregate->releaseMessages();
$messages = $aggregate->releaseMessages();

if (count($messageStream) === 0) {
if (count($messages) === 0) {
return;
}

$this->store->save(...$messageStream);

foreach ($messageStream as $message) {
$this->eventStream->dispatch($message);
}
$this->store->save(...$messages);
$this->eventBus->dispatch(...$messages);
}
}
11 changes: 4 additions & 7 deletions src/Repository/SnapshotRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
final class SnapshotRepository implements Repository
{
private Store $store;
private EventBus $eventStream;
private EventBus $eventBus;

/** @var class-string<SnapshotableAggregateRoot> */
private string $aggregateClass;
Expand All @@ -33,7 +33,7 @@ final class SnapshotRepository implements Repository
*/
public function __construct(
Store $store,
EventBus $eventStream,
EventBus $eventBus,
string $aggregateClass,
SnapshotStore $snapshotStore
) {
Expand All @@ -42,7 +42,7 @@ public function __construct(
}

$this->store = $store;
$this->eventStream = $eventStream;
$this->eventBus = $eventBus;
$this->aggregateClass = $aggregateClass;
$this->snapshotStore = $snapshotStore;
}
Expand Down Expand Up @@ -98,10 +98,7 @@ public function save(AggregateRoot $aggregate): void
}

$this->store->save(...$messages);

foreach ($messages as $message) {
$this->eventStream->dispatch($message);
}
$this->eventBus->dispatch(...$messages);

$snapshot = $aggregate->toSnapshot();
$this->snapshotStore->save($snapshot);
Expand Down
46 changes: 43 additions & 3 deletions tests/Unit/EventBus/DefaultEventBusTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public function __invoke(Message $message): void
1,
new ProfileCreated(
ProfileId::fromString('1'),
Email::fromString('d.badura@gmx.de')
Email::fromString('info@patchlevel.de')
)
);

Expand All @@ -46,6 +46,46 @@ public function __invoke(Message $message): void
self::assertSame($message, $listener->message);
}

public function testDispatchMultipleMessages(): void
{
$listener = new class implements Listener {
/** @var list<Message> */
public array $message = [];

public function __invoke(Message $message): void
{
$this->message[] = $message;
}
};

$message1 = new Message(
Profile::class,
'1',
1,
new ProfileCreated(
ProfileId::fromString('1'),
Email::fromString('info@patchlevel.de')
)
);

$message2 = new Message(
Profile::class,
'1',
2,
new ProfileCreated(
ProfileId::fromString('1'),
Email::fromString('info@patchlevel.de')
)
);

$eventBus = new DefaultEventBus([$listener]);
$eventBus->dispatch($message1, $message2);

self::assertCount(2, $listener->message);
self::assertSame($message1, $listener->message[0]);
self::assertSame($message2, $listener->message[1]);
}

public function testDynamicListener(): void
{
$listener = new class implements Listener {
Expand All @@ -63,7 +103,7 @@ public function __invoke(Message $message): void
1,
new ProfileCreated(
ProfileId::fromString('1'),
Email::fromString('d.badura@gmx.de')
Email::fromString('info@patchlevel.de')
)
);

Expand All @@ -82,7 +122,7 @@ public function testSynchroneEvents(): void
1,
new ProfileCreated(
ProfileId::fromString('1'),
Email::fromString('d.badura@gmx.de')
Email::fromString('info@patchlevel.de')
)
);

Expand Down
51 changes: 49 additions & 2 deletions tests/Unit/EventBus/SymfonyEventBusTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public function testDispatchEvent(): void
1,
new ProfileCreated(
ProfileId::fromString('1'),
Email::fromString('d.badura@gmx.de')
Email::fromString('info@patchlevel.de')
)
);

Expand All @@ -49,6 +49,53 @@ public function testDispatchEvent(): void
$eventBus->dispatch($message);
}

public function testDispatchMultipleMessages(): void
{
$message1 = new Message(
Profile::class,
'1',
1,
new ProfileCreated(
ProfileId::fromString('1'),
Email::fromString('info@patchlevel.de')
)
);

$message2 = new Message(
Profile::class,
'1',
1,
new ProfileCreated(
ProfileId::fromString('1'),
Email::fromString('info@patchlevel.de')
)
);

$envelope1 = new Envelope($message1);

$symfony = $this->prophesize(MessageBusInterface::class);
$symfony->dispatch(Argument::that(static function ($envelope1) use ($message1) {
if (!$envelope1 instanceof Envelope) {
return false;
}

return $envelope1->getMessage() === $message1;
}))->willReturn($envelope1)->shouldBeCalled();

$envelope2 = new Envelope($message2);

$symfony->dispatch(Argument::that(static function ($envelope2) use ($message2) {
if (!$envelope2 instanceof Envelope) {
return false;
}

return $envelope2->getMessage() === $message2;
}))->willReturn($envelope2)->shouldBeCalled();

$eventBus = new SymfonyEventBus($symfony->reveal());
$eventBus->dispatch($message1, $message2);
}

public function testDefaultEventBus(): void
{
$listener = new class implements Listener {
Expand All @@ -66,7 +113,7 @@ public function __invoke(Message $message): void
1,
new ProfileCreated(
ProfileId::fromString('1'),
Email::fromString('d.badura@gmx.de')
Email::fromString('info@patchlevel.de')
)
);

Expand Down

0 comments on commit 78eb500

Please sign in to comment.