-
Notifications
You must be signed in to change notification settings - Fork 56
/
MessageBus.php
134 lines (113 loc) · 4.13 KB
/
MessageBus.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
<?php
/**
* This file is part of the prooph/service-bus.
* (c) 2014-2017 prooph software GmbH <contact@prooph.de>
* (c) 2015-2017 Sascha-Oliver Prolic <saschaprolic@googlemail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
declare(strict_types=1);
namespace Prooph\ServiceBus;
use Prooph\Common\Event\ActionEvent;
use Prooph\Common\Event\ActionEventEmitter;
use Prooph\Common\Event\ListenerHandler;
use Prooph\Common\Event\ProophActionEventEmitter;
use Prooph\Common\Messaging\HasMessageName;
use Prooph\ServiceBus\Exception\MessageDispatchException;
/**
* Base class for a message bus implementation
*/
abstract class MessageBus
{
public const EVENT_DISPATCH = 'dispatch';
public const EVENT_FINALIZE = 'finalize';
public const EVENT_PARAM_MESSAGE = 'message';
public const EVENT_PARAM_MESSAGE_NAME = 'message-name';
public const EVENT_PARAM_MESSAGE_HANDLER = 'message-handler';
public const EVENT_PARAM_EXCEPTION = 'exception';
public const EVENT_PARAM_MESSAGE_HANDLED = 'message-handled';
public const PRIORITY_INITIALIZE = 400000;
public const PRIORITY_DETECT_MESSAGE_NAME = 300000;
public const PRIORITY_ROUTE = 200000;
public const PRIORITY_LOCATE_HANDLER = 100000;
public const PRIORITY_PROMISE_REJECT = 1000;
public const PRIORITY_INVOKE_HANDLER = 0;
/**
* @var ActionEventEmitter
*/
protected $events;
public function __construct(ActionEventEmitter $actionEventEmitter = null)
{
if (null === $actionEventEmitter) {
$actionEventEmitter = new ProophActionEventEmitter([
self::EVENT_DISPATCH,
self::EVENT_FINALIZE,
]);
}
$actionEventEmitter->attachListener(
self::EVENT_DISPATCH,
function (ActionEvent $actionEvent): void {
$actionEvent->setParam(self::EVENT_PARAM_MESSAGE_HANDLED, false);
$message = $actionEvent->getParam(self::EVENT_PARAM_MESSAGE);
if ($message instanceof HasMessageName) {
$actionEvent->setParam(self::EVENT_PARAM_MESSAGE_NAME, $message->messageName());
}
},
self::PRIORITY_INITIALIZE
);
$actionEventEmitter->attachListener(
self::EVENT_DISPATCH,
function (ActionEvent $actionEvent): void {
if ($actionEvent->getParam(self::EVENT_PARAM_MESSAGE_NAME) === null) {
$actionEvent->setParam(
self::EVENT_PARAM_MESSAGE_NAME,
$this->getMessageName($actionEvent->getParam(self::EVENT_PARAM_MESSAGE))
);
}
},
self::PRIORITY_DETECT_MESSAGE_NAME
);
$actionEventEmitter->attachListener(
self::EVENT_FINALIZE,
function (ActionEvent $actionEvent): void {
if ($exception = $actionEvent->getParam(self::EVENT_PARAM_EXCEPTION)) {
throw MessageDispatchException::failed($exception);
}
}
);
$this->events = $actionEventEmitter;
}
/**
* @param mixed $message
*
* @return \React\Promise\Promise|void depends on the bus type
*/
abstract public function dispatch($message);
protected function triggerFinalize(ActionEvent $actionEvent): void
{
$actionEvent->setName(self::EVENT_FINALIZE);
$this->events->dispatch($actionEvent);
}
/**
* @param mixed $message
*/
protected function getMessageName($message): string
{
if (is_object($message)) {
return get_class($message);
}
if (is_string($message)) {
return $message;
}
return gettype($message);
}
public function attach(string $eventName, callable $listener, int $priority = 0): ListenerHandler
{
return $this->events->attachListener($eventName, $listener, $priority);
}
public function detach(ListenerHandler $handler): void
{
$this->events->detachListener($handler);
}
}