Skip to content

Commit ed59b01

Browse files
committed
[amqp] Move rabbitmq specific logic to its own driver (from amqp driver).
fixes #20
1 parent facff51 commit ed59b01

7 files changed

+59
-139
lines changed

pkg/amqp-ext/Client/AmqpDriver.php

+1-35
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,8 @@
99
use Enqueue\Client\Config;
1010
use Enqueue\Client\DriverInterface;
1111
use Enqueue\Client\Message;
12-
use Enqueue\Client\MessagePriority;
1312
use Enqueue\Client\Meta\QueueMetaRegistry;
14-
use Enqueue\Psr\DeliveryMode;
13+
use Enqueue\AmqpExt\DeliveryMode;
1514
use Enqueue\Psr\Message as TransportMessage;
1615
use Psr\Log\LoggerInterface;
1716
use Psr\Log\NullLogger;
@@ -33,11 +32,6 @@ class AmqpDriver implements DriverInterface
3332
*/
3433
private $queueMetaRegistry;
3534

36-
/**
37-
* @var array
38-
*/
39-
private $priorityMap;
40-
4135
/**
4236
* @param AmqpContext $context
4337
* @param Config $config
@@ -48,14 +42,6 @@ public function __construct(AmqpContext $context, Config $config, QueueMetaRegis
4842
$this->context = $context;
4943
$this->config = $config;
5044
$this->queueMetaRegistry = $queueMetaRegistry;
51-
52-
$this->priorityMap = [
53-
MessagePriority::VERY_LOW => 0,
54-
MessagePriority::LOW => 1,
55-
MessagePriority::NORMAL => 2,
56-
MessagePriority::HIGH => 3,
57-
MessagePriority::VERY_HIGH => 4,
58-
];
5945
}
6046

6147
/**
@@ -131,7 +117,6 @@ public function createQueue($queueName)
131117
{
132118
$queue = $this->context->createQueue($this->config->createTransportQueueName($queueName));
133119
$queue->addFlag(AMQP_DURABLE);
134-
$queue->setArguments(['x-max-priority' => 4]);
135120

136121
return $queue;
137122
}
@@ -152,17 +137,6 @@ public function createTransportMessage(Message $message)
152137
$headers['expiration'] = (string) ($message->getExpire() * 1000);
153138
}
154139

155-
if ($priority = $message->getPriority()) {
156-
if (false == array_key_exists($priority, $this->priorityMap)) {
157-
throw new \InvalidArgumentException(sprintf(
158-
'Given priority could not be converted to client\'s one. Got: %s',
159-
$priority
160-
));
161-
}
162-
163-
$headers['priority'] = $this->priorityMap[$priority];
164-
}
165-
166140
$headers['delivery_mode'] = DeliveryMode::PERSISTENT;
167141

168142
$transportMessage = $this->context->createMessage();
@@ -198,14 +172,6 @@ public function createClientMessage(TransportMessage $message)
198172
$clientMessage->setExpire((int) ((int) $expiration) / 1000);
199173
}
200174

201-
if ($priority = $message->getHeader('priority')) {
202-
if (false === $clientPriority = array_search($priority, $this->priorityMap, true)) {
203-
throw new \LogicException(sprintf('Cant convert transport priority to client: "%s"', $priority));
204-
}
205-
206-
$clientMessage->setPriority($clientPriority);
207-
}
208-
209175
$clientMessage->setMessageId($message->getMessageId());
210176
$clientMessage->setTimestamp($message->getTimestamp());
211177

pkg/amqp-ext/Client/RabbitMqDriver.php

+47
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use Enqueue\AmqpExt\AmqpTopic;
99
use Enqueue\Client\Config;
1010
use Enqueue\Client\Message;
11+
use Enqueue\Client\MessagePriority;
1112
use Enqueue\Client\Meta\QueueMetaRegistry;
1213
use Enqueue\Consumption\Exception\LogicException;
1314
use Enqueue\Psr\Message as TransportMessage;
@@ -31,6 +32,11 @@ class RabbitMqDriver extends AmqpDriver
3132
*/
3233
private $queueMetaRegistry;
3334

35+
/**
36+
* @var array
37+
*/
38+
private $priorityMap;
39+
3440
/**
3541
* @param AmqpContext $context
3642
* @param Config $config
@@ -43,6 +49,14 @@ public function __construct(AmqpContext $context, Config $config, QueueMetaRegis
4349
$this->config = $config;
4450
$this->context = $context;
4551
$this->queueMetaRegistry = $queueMetaRegistry;
52+
53+
$this->priorityMap = [
54+
MessagePriority::VERY_LOW => 0,
55+
MessagePriority::LOW => 1,
56+
MessagePriority::NORMAL => 2,
57+
MessagePriority::HIGH => 3,
58+
MessagePriority::VERY_HIGH => 4,
59+
];
4660
}
4761

4862
/**
@@ -68,6 +82,19 @@ public function sendToProcessor(Message $message)
6882
$this->context->createProducer()->send($destination, $transportMessage);
6983
}
7084

85+
/**
86+
* {@inheritdoc}
87+
*
88+
* @return AmqpQueue
89+
*/
90+
public function createQueue($queueName)
91+
{
92+
$queue = parent::createQueue($queueName);
93+
$queue->setArguments(['x-max-priority' => 4]);
94+
95+
return $queue;
96+
}
97+
7198
/**
7299
* {@inheritdoc}
73100
*
@@ -77,6 +104,17 @@ public function createTransportMessage(Message $message)
77104
{
78105
$transportMessage = parent::createTransportMessage($message);
79106

107+
if ($priority = $message->getPriority()) {
108+
if (false == array_key_exists($priority, $this->priorityMap)) {
109+
throw new \InvalidArgumentException(sprintf(
110+
'Given priority could not be converted to client\'s one. Got: %s',
111+
$priority
112+
));
113+
}
114+
115+
$transportMessage->setHeader('priority', $this->priorityMap[$priority]);
116+
}
117+
80118
if ($message->getDelay()) {
81119
if (false == $this->config->getTransportOption('delay_plugin_installed', false)) {
82120
throw new LogicException('The message delaying is not supported. In order to use delay feature install RabbitMQ delay plugin.');
@@ -85,6 +123,7 @@ public function createTransportMessage(Message $message)
85123
$transportMessage->setProperty('x-delay', (string) ($message->getDelay() * 1000));
86124
}
87125

126+
88127
return $transportMessage;
89128
}
90129

@@ -97,6 +136,14 @@ public function createClientMessage(TransportMessage $message)
97136
{
98137
$clientMessage = parent::createClientMessage($message);
99138

139+
if ($priority = $message->getHeader('priority')) {
140+
if (false === $clientPriority = array_search($priority, $this->priorityMap, true)) {
141+
throw new \LogicException(sprintf('Cant convert transport priority to client: "%s"', $priority));
142+
}
143+
144+
$clientMessage->setPriority($clientPriority);
145+
}
146+
100147
if ($delay = $message->getProperty('x-delay')) {
101148
if (false == is_numeric($delay)) {
102149
throw new \LogicException(sprintf('x-delay header is not numeric. "%s"', $delay));

pkg/psr-queue/DeliveryMode.php renamed to pkg/amqp-ext/DeliveryMode.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?php
22

3-
namespace Enqueue\Psr;
3+
namespace Enqueue\AmqpExt;
44

55
final class DeliveryMode
66
{

pkg/amqp-ext/Tests/Client/AmqpDriverTest.php

+1-41
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
use Enqueue\Client\Config;
1111
use Enqueue\Client\DriverInterface;
1212
use Enqueue\Client\Message;
13-
use Enqueue\Client\MessagePriority;
1413
use Enqueue\Client\Meta\QueueMetaRegistry;
1514
use Enqueue\Psr\Producer;
1615
use Enqueue\Test\ClassExtensionTrait;
@@ -60,7 +59,7 @@ public function testShouldCreateAndReturnQueueInstance()
6059

6160
$this->assertSame($expectedQueue, $queue);
6261
$this->assertSame('queue-name', $queue->getQueueName());
63-
$this->assertSame(['x-max-priority' => 4], $queue->getArguments());
62+
$this->assertSame([], $queue->getArguments());
6463
$this->assertSame(2, $queue->getFlags());
6564
$this->assertNull($queue->getConsumerTag());
6665
$this->assertSame([], $queue->getBindArguments());
@@ -74,7 +73,6 @@ public function testShouldConvertTransportMessageToClientMessage()
7473
$transportMessage->setProperties(['key' => 'val']);
7574
$transportMessage->setHeader('content_type', 'ContentType');
7675
$transportMessage->setHeader('expiration', '12345000');
77-
$transportMessage->setHeader('priority', 3);
7876
$transportMessage->setMessageId('MessageId');
7977
$transportMessage->setTimestamp(1000);
8078

@@ -92,7 +90,6 @@ public function testShouldConvertTransportMessageToClientMessage()
9290
'hkey' => 'hval',
9391
'content_type' => 'ContentType',
9492
'expiration' => '12345000',
95-
'priority' => 3,
9693
'message_id' => 'MessageId',
9794
'timestamp' => 1000,
9895
], $clientMessage->getHeaders());
@@ -103,7 +100,6 @@ public function testShouldConvertTransportMessageToClientMessage()
103100
$this->assertSame(12345, $clientMessage->getExpire());
104101
$this->assertSame('ContentType', $clientMessage->getContentType());
105102
$this->assertSame(1000, $clientMessage->getTimestamp());
106-
$this->assertSame(MessagePriority::HIGH, $clientMessage->getPriority());
107103
}
108104

109105
public function testShouldThrowExceptionIfExpirationIsNotNumeric()
@@ -123,40 +119,6 @@ public function testShouldThrowExceptionIfExpirationIsNotNumeric()
123119
$driver->createClientMessage($transportMessage);
124120
}
125121

126-
public function testShouldThrowExceptionIfCantConvertTransportPriorityToClientPriority()
127-
{
128-
$transportMessage = new AmqpMessage();
129-
$transportMessage->setHeader('priority', 'unknown');
130-
131-
$driver = new AmqpDriver(
132-
$this->createPsrContextMock(),
133-
new Config('', '', '', '', '', ''),
134-
$this->createQueueMetaRegistryMock()
135-
);
136-
137-
$this->expectException(\LogicException::class);
138-
$this->expectExceptionMessage('Cant convert transport priority to client: "unknown"');
139-
140-
$driver->createClientMessage($transportMessage);
141-
}
142-
143-
public function testShouldThrowExceptionIfCantConvertClientPriorityToTransportPriority()
144-
{
145-
$clientMessage = new Message();
146-
$clientMessage->setPriority('unknown');
147-
148-
$driver = new AmqpDriver(
149-
$this->createPsrContextMock(),
150-
new Config('', '', '', '', '', ''),
151-
$this->createQueueMetaRegistryMock()
152-
);
153-
154-
$this->expectException(\LogicException::class);
155-
$this->expectExceptionMessage('Given priority could not be converted to client\'s one. Got: unknown');
156-
157-
$driver->createTransportMessage($clientMessage);
158-
}
159-
160122
public function testShouldConvertClientMessageToTransportMessage()
161123
{
162124
$clientMessage = new Message();
@@ -165,7 +127,6 @@ public function testShouldConvertClientMessageToTransportMessage()
165127
$clientMessage->setProperties(['key' => 'val']);
166128
$clientMessage->setContentType('ContentType');
167129
$clientMessage->setExpire(123);
168-
$clientMessage->setPriority(MessagePriority::VERY_HIGH);
169130
$clientMessage->setMessageId('MessageId');
170131
$clientMessage->setTimestamp(1000);
171132

@@ -190,7 +151,6 @@ public function testShouldConvertClientMessageToTransportMessage()
190151
'hkey' => 'hval',
191152
'content_type' => 'ContentType',
192153
'expiration' => '123000',
193-
'priority' => 4,
194154
'delivery_mode' => 2,
195155
'message_id' => 'MessageId',
196156
'timestamp' => 1000,

pkg/amqp-ext/Tests/Client/RabbitMqDriverTest.php

+9-2
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,15 @@ public function testShouldThrowExceptionIfCantConvertClientPriorityToTransportPr
171171
$clientMessage = new Message();
172172
$clientMessage->setPriority('unknown');
173173

174+
$context = $this->createPsrContextMock();
175+
$context
176+
->expects($this->once())
177+
->method('createMessage')
178+
->willReturn(new AmqpMessage())
179+
;
180+
174181
$driver = new RabbitMqDriver(
175-
$this->createPsrContextMock(),
182+
$context,
176183
new Config('', '', '', '', '', ''),
177184
$this->createQueueMetaRegistryMock()
178185
);
@@ -217,10 +224,10 @@ public function testShouldConvertClientMessageToTransportMessage()
217224
'hkey' => 'hval',
218225
'content_type' => 'ContentType',
219226
'expiration' => '123000',
220-
'priority' => 4,
221227
'delivery_mode' => 2,
222228
'message_id' => 'MessageId',
223229
'timestamp' => 1000,
230+
'priority' => 4,
224231
], $transportMessage->getHeaders());
225232
$this->assertSame([
226233
'key' => 'val',

pkg/psr-queue/InvalidDeliveryModeException.php

-23
This file was deleted.

pkg/psr-queue/Tests/InvalidDeliveryModeExceptionTest.php

-37
This file was deleted.

0 commit comments

Comments
 (0)