Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Bugs in Event Bus #171

Merged
merged 3 commits into from
Oct 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions src/EventBus.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ class EventBus extends MessageBus
*/
protected $collectExceptions = false;

/**
* @var array
*/
protected $collectedExceptions = [];

public function __construct(ActionEventEmitter $actionEventEmitter = null)
{
parent::__construct($actionEventEmitter);
Expand Down Expand Up @@ -58,12 +63,29 @@ function (ActionEvent $actionEvent): void {
$actionEvent->setParam(self::EVENT_PARAM_MESSAGE_HANDLED, true);
}

if (count($caughtExceptions)) {
throw EventListenerException::collected(...$caughtExceptions);
foreach ($caughtExceptions as $ex) {
$this->collectedExceptions[] = $ex;
}
},
self::PRIORITY_INVOKE_HANDLER
);

$this->events->attachListener(
self::EVENT_FINALIZE,
function (ActionEvent $actionEvent): void {
$target = $actionEvent->getTarget();

if (empty($target->collectedExceptions)) {
return;
}

$exceptions = $target->collectedExceptions;
$target->collectedExceptions = [];

$actionEvent->setParam(MessageBus::EVENT_PARAM_EXCEPTION, EventListenerException::collected(...$exceptions));
},
1000
);
}

/**
Expand Down Expand Up @@ -99,4 +121,14 @@ public function disableCollectExceptions(): void
{
$this->collectExceptions = false;
}

public function isCollectingExceptions(): bool
{
return $this->collectExceptions;
}

public function addCollectedException(\Throwable $e): void
{
$this->collectedExceptions[] = $e;
}
}
19 changes: 14 additions & 5 deletions src/Plugin/InvokeStrategy/OnEventStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,24 @@ public function attachToMessageBus(MessageBus $messageBus): void
$this->listenerHandlers[] = $messageBus->attach(
MessageBus::EVENT_DISPATCH,
function (ActionEvent $actionEvent): void {
if ($actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLED, false)) {
return;
}

$target = $actionEvent->getTarget();
$message = $actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE);
$handlers = $actionEvent->getParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, []);

foreach ($handlers as $handler) {
$handler->onEvent($message);
if (is_callable($handler) || ! is_object($handler) || ! is_callable([$handler, 'onEvent'])) {
continue;
}

try {
$handler->onEvent($message);
} catch (\Throwable $e) {
if ($target->isCollectingExceptions()) {
$target->addCollectedException($e);
} else {
throw $e;
}
}
}

$actionEvent->setParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLED, true);
Expand Down
2 changes: 1 addition & 1 deletion tests/EventBusTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public function it_collects_exceptions_if_mode_is_enabled(): void
MessageBus::EVENT_DISPATCH,
function (ActionEvent $e) use ($handler, $errorProducer): void {
if ($e->getParam(MessageBus::EVENT_PARAM_MESSAGE_NAME) === CustomMessage::class) {
$e->setParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, [$handler, $errorProducer, $handler]);
$e->setParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, [$handler, $errorProducer, $handler]);
}
},
MessageBus::PRIORITY_ROUTE
Expand Down
36 changes: 36 additions & 0 deletions tests/Mock/CustomMessageEventHandler2.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php
/**
* This file is part of the prooph/service-bus.
* (c) 2014-2017 prooph software GmbH <contact@prooph.de>
* (c) 2015-2017 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace ProophTest\ServiceBus\Mock;

final class CustomMessageEventHandler2
{
private $lastMessage;

private $invokeCounter = 0;

public function on($message): void
{
$this->lastMessage = $message;
$this->invokeCounter++;
}

public function getLastMessage()
{
return $this->lastMessage;
}

public function getInvokeCounter(): int
{
return $this->invokeCounter;
}
}
21 changes: 21 additions & 0 deletions tests/Mock/CustomMessageEventHandlerThrowingExceptions.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php
/**
* This file is part of the prooph/service-bus.
* (c) 2014-2017 prooph software GmbH <contact@prooph.de>
* (c) 2015-2017 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace ProophTest\ServiceBus\Mock;

final class CustomMessageEventHandlerThrowingExceptions
{
public function onEvent($message): void
{
throw new \Exception('bar');
}
}
52 changes: 52 additions & 0 deletions tests/Mock/CustomOnEventStrategy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?php
/**
* This file is part of the prooph/service-bus.
* (c) 2014-2017 prooph software GmbH <contact@prooph.de>
* (c) 2015-2017 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace ProophTest\ServiceBus\Mock;

use Prooph\Common\Event\ActionEvent;
use Prooph\ServiceBus\EventBus;
use Prooph\ServiceBus\MessageBus;
use Prooph\ServiceBus\Plugin\AbstractPlugin;

final class CustomOnEventStrategy extends AbstractPlugin
{
public function attachToMessageBus(MessageBus $messageBus): void
{
$this->listenerHandlers[] = $messageBus->attach(
MessageBus::EVENT_DISPATCH,
function (ActionEvent $actionEvent): void {
$target = $actionEvent->getTarget();
$message = $actionEvent->getParam(MessageBus::EVENT_PARAM_MESSAGE);
$handlers = $actionEvent->getParam(EventBus::EVENT_PARAM_EVENT_LISTENERS, []);

foreach ($handlers as $handler) {
if (is_callable($handler) || ! is_object($handler) || ! is_callable([$handler, 'on'])) {
continue;
}

try {
$handler->on($message);
} catch (\Throwable $e) {
if ($target->isCollectingExceptions()) {
$target->addCollectedException($e);
} else {
throw $e;
}
}
}

$actionEvent->setParam(MessageBus::EVENT_PARAM_MESSAGE_HANDLED, true);
},
MessageBus::PRIORITY_INVOKE_HANDLER
);
}
}
147 changes: 146 additions & 1 deletion tests/Plugin/InvokeStrategy/OnEventStrategyTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@
use PHPUnit\Framework\TestCase;
use Prooph\Common\Event\DefaultListenerHandler;
use Prooph\ServiceBus\EventBus;
use Prooph\ServiceBus\Exception\EventListenerException;
use Prooph\ServiceBus\Exception\MessageDispatchException;
use Prooph\ServiceBus\Plugin\InvokeStrategy\OnEventStrategy;
use Prooph\ServiceBus\Plugin\ListenerExceptionCollectionMode;
use Prooph\ServiceBus\Plugin\Router\EventRouter;
use ProophTest\ServiceBus\Mock\CustomInvokableMessageHandler;
use ProophTest\ServiceBus\Mock\CustomMessage;
use ProophTest\ServiceBus\Mock\CustomMessageEventHandler;
use ProophTest\ServiceBus\Mock\CustomMessageEventHandler2;
use ProophTest\ServiceBus\Mock\CustomMessageEventHandlerThrowingExceptions;
use ProophTest\ServiceBus\Mock\CustomOnEventStrategy;
use Prophecy\Argument;

class OnEventStrategyTest extends TestCase
Expand Down Expand Up @@ -60,7 +66,7 @@ public function it_can_be_attached_to_event_bus(): void
->shouldBeCalled()
->willReturn(
new DefaultListenerHandler(
function () {
function (): void {
}
)
);
Expand Down Expand Up @@ -91,4 +97,143 @@ public function it_should_not_handle_already_processed_messages(): void
$this->assertSame($customEvent, $callableHandler->getLastMessage());
$this->assertSame(1, $callableHandler->getInvokeCounter());
}

/**
* @test
*/
public function it_should_still_work_with_callables(): void
{
$eventBus = new EventBus();

$onEventStrategy = new OnEventStrategy();
$onEventStrategy->attachToMessageBus($eventBus);

$handler = new CustomMessageEventHandler();

$result = false;

$router = new EventRouter();
$router->route(CustomMessage::class)
->to(function (CustomMessage $message) use (&$result): void {
$result = true;
})
->andTo($handler);

$router->attachToMessageBus($eventBus);

$eventBus->dispatch(new CustomMessage('some text'));

$this->assertTrue($result);
$this->assertSame(1, $handler->getInvokeCounter());
}

/**
* @test
*/
public function it_should_still_work_with_callables_and_collect_all_exceptions(): void
{
$eventBus = new EventBus();

$exceptionModePlugin = new ListenerExceptionCollectionMode();
$exceptionModePlugin->attachToMessageBus($eventBus);

$onEventStrategy = new OnEventStrategy();
$onEventStrategy->attachToMessageBus($eventBus);

$handler = new CustomMessageEventHandlerThrowingExceptions();

$router = new EventRouter();
$router->route(CustomMessage::class)
->to(function (CustomMessage $message): void {
throw new \Exception('foo');
})
->andTo($handler);

$router->attachToMessageBus($eventBus);

$ex = null;

try {
$eventBus->dispatch(new CustomMessage('some text'));
} catch (MessageDispatchException $ex) {
$ex = $ex->getPrevious();
}

$this->assertNotNull($ex);
$this->assertInstanceOf(EventListenerException::class, $ex);
$this->assertCount(2, $ex->listenerExceptions());
}

/**
* @test
*/
public function it_should_still_work_with_callables_and_collect_all_exceptions_part2(): void
{
$eventBus = new EventBus();

$exceptionModePlugin = new ListenerExceptionCollectionMode();
$exceptionModePlugin->attachToMessageBus($eventBus);

$onEventStrategy = new OnEventStrategy();
$onEventStrategy->attachToMessageBus($eventBus);

$handler = new CustomMessageEventHandlerThrowingExceptions();

$router = new EventRouter();
$router->route(CustomMessage::class)
->to(function (CustomMessage $message): void {
throw new \Exception('foo');
})
->andTo($handler)
->andTo($handler);

$router->attachToMessageBus($eventBus);

$ex = null;

try {
$eventBus->dispatch(new CustomMessage('some text'));
} catch (MessageDispatchException $ex) {
$ex = $ex->getPrevious();
}

$this->assertNotNull($ex);
$this->assertInstanceOf(EventListenerException::class, $ex);
$this->assertCount(3, $ex->listenerExceptions());
}

/**
* @test
*/
public function it_should_still_work_with_callables_and_other_strategies(): void
{
$eventBus = new EventBus();

$onEventStrategy = new OnEventStrategy();
$onEventStrategy->attachToMessageBus($eventBus);

$secondOnEventStrategy = new CustomOnEventStrategy();
$secondOnEventStrategy->attachToMessageBus($eventBus);

$handler = new CustomMessageEventHandler();
$handler2 = new CustomMessageEventHandler2();

$result = false;

$router = new EventRouter();
$router->route(CustomMessage::class)
->to(function (CustomMessage $message) use (&$result): void {
$result = true;
})
->andTo($handler)
->andTo($handler2);

$router->attachToMessageBus($eventBus);

$eventBus->dispatch(new CustomMessage('some text'));

$this->assertTrue($result);
$this->assertSame(1, $handler->getInvokeCounter());
$this->assertSame(1, $handler2->getInvokeCounter());
}
}