-
Notifications
You must be signed in to change notification settings - Fork 439
/
Copy pathExclusiveCommandExtension.php
81 lines (65 loc) · 2.32 KB
/
ExclusiveCommandExtension.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
<?php
namespace Enqueue\Client\ConsumptionExtension;
use Enqueue\Client\Config;
use Enqueue\Client\DriverInterface;
use Enqueue\Client\Route;
use Enqueue\Consumption\Context;
use Enqueue\Consumption\EmptyExtensionTrait as ConsumptionEmptyExtensionTrait;
use Enqueue\Consumption\ExtensionInterface as ConsumptionExtensionInterface;
final class ExclusiveCommandExtension implements ConsumptionExtensionInterface
{
use ConsumptionEmptyExtensionTrait;
/**
* @var DriverInterface
*/
private $driver;
/**
* @var Route[]
*/
private $queueToRouteMap;
public function __construct(DriverInterface $driver)
{
$this->driver = $driver;
}
public function onPreReceived(Context $context)
{
$message = $context->getInteropMessage();
$queue = $context->getInteropQueue();
if ($message->getProperty(Config::TOPIC)) {
return;
}
if ($message->getProperty(Config::COMMAND)) {
return;
}
if ($message->getProperty(Config::PROCESSOR)) {
return;
}
if (null === $this->queueToRouteMap) {
$this->queueToRouteMap = $this->buildMap();
}
if (array_key_exists($queue->getQueueName(), $this->queueToRouteMap)) {
$context->getLogger()->debug('[ExclusiveCommandExtension] This is a exclusive command queue and client\'s properties are not set. Setting them');
$route = $this->queueToRouteMap[$queue->getQueueName()];
$message->setProperty(Config::PROCESSOR, $route->getProcessor());
$message->setProperty(Config::COMMAND, $route->getSource());
}
}
private function buildMap(): array
{
$map = [];
foreach ($this->driver->getRouteCollection()->all() as $route) {
if (false == $route->isCommand()) {
continue;
}
if (false == $route->isProcessorExclusive()) {
continue;
}
$queueName = $this->driver->createQueue($route->getQueue())->getQueueName();
if (array_key_exists($queueName, $map)) {
throw new \LogicException('The queue name has been already bound by another exclusive command processor');
}
$map[$queueName] = $route;
}
return $map;
}
}