-
Notifications
You must be signed in to change notification settings - Fork 439
/
Copy pathProducer.php
130 lines (101 loc) · 3.86 KB
/
Producer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
<?php
namespace Enqueue\Client;
use Enqueue\Client\Extension\PrepareBodyExtension;
use Enqueue\Rpc\Promise;
use Enqueue\Rpc\RpcFactory;
use Enqueue\Util\UUID;
final class Producer implements ProducerInterface
{
/**
* @var DriverInterface
*/
private $driver;
/**
* @var ExtensionInterface
*/
private $extension;
/**
* @var RpcFactory
*/
private $rpcFactory;
public function __construct(
DriverInterface $driver,
RpcFactory $rpcFactory,
ExtensionInterface $extension = null
) {
$this->driver = $driver;
$this->rpcFactory = $rpcFactory;
$this->extension = $extension ?
new ChainExtension([$extension, new PrepareBodyExtension()]) :
new ChainExtension([new PrepareBodyExtension()])
;
}
public function sendEvent(string $topic, $message): void
{
if (false == $message instanceof Message) {
$message = new Message($message);
}
$preSend = new PreSend($topic, $message, $this, $this->driver);
$this->extension->onPreSendEvent($preSend);
$message = $preSend->getMessage();
$message->setProperty(Config::TOPIC, $preSend->getTopic());
$this->doSend($message);
}
public function sendCommand(string $command, $message, bool $needReply = false): ?Promise
{
if (false == $message instanceof Message) {
$message = new Message($message);
}
$preSend = new PreSend($command, $message, $this, $this->driver);
$this->extension->onPreSendCommand($preSend);
$command = $preSend->getCommand();
$message = $preSend->getMessage();
$deleteReplyQueue = false;
$replyTo = $message->getReplyTo();
if ($needReply) {
if (false == $replyTo) {
$message->setReplyTo($replyTo = $this->rpcFactory->createReplyTo());
$deleteReplyQueue = true;
}
if (false == $message->getCorrelationId()) {
$message->setCorrelationId(UUID::generate());
}
}
$message->setProperty(Config::COMMAND, $command);
$message->setScope(Message::SCOPE_APP);
$this->doSend($message);
if ($needReply) {
$promise = $this->rpcFactory->createPromise($replyTo, $message->getCorrelationId(), 60000);
$promise->setDeleteReplyQueue($deleteReplyQueue);
return $promise;
}
return null;
}
private function doSend(Message $message): void
{
if (false === is_string($message->getBody())) {
throw new \LogicException(sprintf(
'The message body must be string at this stage, got "%s". Make sure you passed string as message or there is an extension that converts custom input to string.',
is_object($message->getBody()) ? get_class($message->getBody()) : gettype($message->getBody())
));
}
if ($message->getProperty(Config::PROCESSOR)) {
throw new \LogicException(sprintf('The %s property must not be set.', Config::PROCESSOR));
}
if (!$message->getMessageId()) {
$message->setMessageId(UUID::generate());
}
if (!$message->getTimestamp()) {
$message->setTimestamp(time());
}
$this->extension->onDriverPreSend(new DriverPreSend($message, $this, $this->driver));
if (Message::SCOPE_MESSAGE_BUS == $message->getScope()) {
$this->driver->sendToRouter($message);
} elseif (Message::SCOPE_APP == $message->getScope()) {
$this->driver->sendToProcessor($message);
} else {
throw new \LogicException(sprintf('The message scope "%s" is not supported.', $message->getScope()));
}
$this->extension->onPostSend(new PostSend($message, $this, $this->driver));
}
}