Skip to content

Commit 7d4f7ba

Browse files
committed
Merge master
1 parent 70b1ea3 commit 7d4f7ba

25 files changed

+152
-148
lines changed

pkg/enqueue/Client/Config.php

+7-6
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@
44

55
class Config
66
{
7-
const TOPIC_PARAMETER = 'enqueue.topic';
8-
const COMMAND_PARAMETER = 'enqueue.command';
9-
const PROCESSOR_PARAMETER = 'enqueue.processor';
10-
const EXPIRE_PARAMETER = 'enqueue.expire';
11-
const PRIORITY_PARAMETER = 'enqueue.priority';
12-
const DELAY_PARAMETER = 'enqueue.delay';
7+
const TOPIC = 'enqueue.topic';
8+
const COMMAND = 'enqueue.command';
9+
const PROCESSOR = 'enqueue.processor';
10+
const EXPIRE = 'enqueue.expire';
11+
const PRIORITY = 'enqueue.priority';
12+
const DELAY = 'enqueue.delay';
13+
const CONTENT_TYPE = 'enqueue.content_type';
1314

1415
/**
1516
* @var string

pkg/enqueue/Client/ConsumptionExtension/ExclusiveCommandExtension.php

+5-5
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,13 @@ public function onPreReceived(Context $context)
3333
$message = $context->getInteropMessage();
3434
$queue = $context->getInteropQueue();
3535

36-
if ($message->getProperty(Config::TOPIC_PARAMETER)) {
36+
if ($message->getProperty(Config::TOPIC)) {
3737
return;
3838
}
39-
if ($message->getProperty(Config::COMMAND_PARAMETER)) {
39+
if ($message->getProperty(Config::COMMAND)) {
4040
return;
4141
}
42-
if ($message->getProperty(Config::PROCESSOR_PARAMETER)) {
42+
if ($message->getProperty(Config::PROCESSOR)) {
4343
return;
4444
}
4545

@@ -51,8 +51,8 @@ public function onPreReceived(Context $context)
5151
$context->getLogger()->debug('[ExclusiveCommandExtension] This is a exclusive command queue and client\'s properties are not set. Setting them');
5252

5353
$route = $this->queueToRouteMap[$queue->getQueueName()];
54-
$message->setProperty(Config::PROCESSOR_PARAMETER, $route->getProcessor());
55-
$message->setProperty(Config::COMMAND_PARAMETER, $route->getSource());
54+
$message->setProperty(Config::PROCESSOR, $route->getProcessor());
55+
$message->setProperty(Config::COMMAND, $route->getSource());
5656
}
5757
}
5858

pkg/enqueue/Client/ConsumptionExtension/SetRouterPropertiesExtension.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public function __construct(DriverInterface $driver)
2828
public function onPreReceived(Context $context)
2929
{
3030
$message = $context->getInteropMessage();
31-
if ($message->getProperty(Config::PROCESSOR_PARAMETER)) {
31+
if ($message->getProperty(Config::PROCESSOR)) {
3232
return;
3333
}
3434

@@ -39,7 +39,7 @@ public function onPreReceived(Context $context)
3939
}
4040

4141
// RouterProcessor is our default message processor when that header is not set
42-
$message->setProperty(Config::PROCESSOR_PARAMETER, $config->getRouterProcessorName());
42+
$message->setProperty(Config::PROCESSOR, $config->getRouterProcessorName());
4343

4444
$context->getLogger()->debug(
4545
'[SetRouterPropertiesExtension] '.

pkg/enqueue/Client/DelegateProcessor.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ public function __construct(ProcessorRegistryInterface $registry)
2626
*/
2727
public function process(InteropMessage $message, Context $context)
2828
{
29-
$processorName = $message->getProperty(Config::PROCESSOR_PARAMETER);
29+
$processorName = $message->getProperty(Config::PROCESSOR);
3030
if (false == $processorName) {
3131
throw new \LogicException(sprintf(
3232
'Got message without required parameter: "%s"',
33-
Config::PROCESSOR_PARAMETER
33+
Config::PROCESSOR
3434
));
3535
}
3636

pkg/enqueue/Client/Driver/GenericDriver.php

+20-20
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,10 @@ public function __construct(
4747

4848
public function sendToRouter(Message $message): void
4949
{
50-
if ($message->getProperty(Config::COMMAND_PARAMETER)) {
50+
if ($message->getProperty(Config::COMMAND)) {
5151
throw new \LogicException('Command must not be send to router but go directly to its processor.');
5252
}
53-
if (false == $message->getProperty(Config::TOPIC_PARAMETER)) {
53+
if (false == $message->getProperty(Config::TOPIC)) {
5454
throw new \LogicException('Topic name parameter is required but is not set');
5555
}
5656

@@ -63,21 +63,21 @@ public function sendToRouter(Message $message): void
6363

6464
public function sendToProcessor(Message $message): void
6565
{
66-
$topic = $message->getProperty(Config::TOPIC_PARAMETER);
67-
$command = $message->getProperty(Config::COMMAND_PARAMETER);
66+
$topic = $message->getProperty(Config::TOPIC);
67+
$command = $message->getProperty(Config::COMMAND);
6868

6969
/** @var InteropQueue $queue */
7070
$queue = null;
71-
if ($topic && $processor = $message->getProperty(Config::PROCESSOR_PARAMETER)) {
71+
if ($topic && $processor = $message->getProperty(Config::PROCESSOR)) {
7272
$route = $this->routeCollection->topicAndProcessor($topic, $processor);
7373
if (false == $route) {
7474
throw new \LogicException(sprintf('There is no route for topic "%s" and processor "%s"', $topic, $processor));
7575
}
7676

77-
$message->setProperty(Config::PROCESSOR_PARAMETER, $route->getProcessor());
77+
$message->setProperty(Config::PROCESSOR, $route->getProcessor());
7878
$queue = $this->createRouteQueue($route);
79-
} elseif ($topic && false == $message->getProperty(Config::PROCESSOR_PARAMETER)) {
80-
$message->setProperty(Config::PROCESSOR_PARAMETER, $this->config->getRouterProcessorName());
79+
} elseif ($topic && false == $message->getProperty(Config::PROCESSOR)) {
80+
$message->setProperty(Config::PROCESSOR, $this->config->getRouterProcessorName());
8181

8282
$queue = $this->createQueue($this->config->getRouterQueueName());
8383
} elseif ($command) {
@@ -86,7 +86,7 @@ public function sendToProcessor(Message $message): void
8686
throw new \LogicException(sprintf('There is no route for command "%s".', $command));
8787
}
8888

89-
$message->setProperty(Config::PROCESSOR_PARAMETER, $route->getProcessor());
89+
$message->setProperty(Config::PROCESSOR, $route->getProcessor());
9090
$queue = $this->createRouteQueue($route);
9191
} else {
9292
throw new \LogicException('Either topic or command parameter must be set.');
@@ -96,15 +96,15 @@ public function sendToProcessor(Message $message): void
9696

9797
$producer = $this->context->createProducer();
9898

99-
if (null !== $delay = $transportMessage->getProperty(Config::DELAY_PARAMETER)) {
99+
if (null !== $delay = $transportMessage->getProperty(Config::DELAY)) {
100100
$producer->setDeliveryDelay($delay * 1000);
101101
}
102102

103-
if (null !== $expire = $transportMessage->getProperty('X-Enqueue-Expire')) {
103+
if (null !== $expire = $transportMessage->getProperty(Config::EXPIRE)) {
104104
$producer->setTimeToLive($expire * 1000);
105105
}
106106

107-
if (null !== $priority = $transportMessage->getProperty('X-Enqueue-Priority')) {
107+
if (null !== $priority = $transportMessage->getProperty(Config::PRIORITY)) {
108108
$priorityMap = $this->getPriorityMap();
109109

110110
$producer->setPriority($priorityMap[$priority]);
@@ -149,19 +149,19 @@ public function createTransportMessage(Message $clientMessage): InteropMessage
149149
$transportMessage->setCorrelationId($clientMessage->getCorrelationId());
150150

151151
if ($contentType = $clientMessage->getContentType()) {
152-
$transportMessage->setProperty('X-Enqueue-Content-Type', $contentType);
152+
$transportMessage->setProperty(Config::CONTENT_TYPE, $contentType);
153153
}
154154

155155
if ($priority = $clientMessage->getPriority()) {
156-
$transportMessage->setProperty('X-Enqueue-Priority', $priority);
156+
$transportMessage->setProperty(Config::PRIORITY, $priority);
157157
}
158158

159159
if ($expire = $clientMessage->getExpire()) {
160-
$transportMessage->setProperty('X-Enqueue-Expire', $expire);
160+
$transportMessage->setProperty(Config::EXPIRE, $expire);
161161
}
162162

163163
if ($delay = $clientMessage->getDelay()) {
164-
$transportMessage->setProperty(Config::DELAY_PARAMETER, $delay);
164+
$transportMessage->setProperty(Config::DELAY, $delay);
165165
}
166166

167167
return $transportMessage;
@@ -179,19 +179,19 @@ public function createClientMessage(InteropMessage $transportMessage): Message
179179
$clientMessage->setReplyTo($transportMessage->getReplyTo());
180180
$clientMessage->setCorrelationId($transportMessage->getCorrelationId());
181181

182-
if ($contentType = $transportMessage->getProperty('X-Enqueue-Content-Type')) {
182+
if ($contentType = $transportMessage->getProperty(Config::CONTENT_TYPE)) {
183183
$clientMessage->setContentType($contentType);
184184
}
185185

186-
if ($priority = $transportMessage->getProperty('X-Enqueue-Priority')) {
186+
if ($priority = $transportMessage->getProperty(Config::PRIORITY)) {
187187
$clientMessage->setPriority($priority);
188188
}
189189

190-
if ($delay = $transportMessage->getProperty(Config::DELAY_PARAMETER)) {
190+
if ($delay = $transportMessage->getProperty(Config::DELAY)) {
191191
$clientMessage->setDelay((int) $delay);
192192
}
193193

194-
if ($expire = $transportMessage->getProperty('X-Enqueue-Expire')) {
194+
if ($expire = $transportMessage->getProperty(Config::EXPIRE)) {
195195
$clientMessage->setExpire((int) $expire);
196196
}
197197

pkg/enqueue/Client/Driver/RabbitMqStompDriver.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ protected function doSendToRouter(InteropProducer $producer, Destination $topic,
169169
*/
170170
protected function doSendToProcessor(InteropProducer $producer, InteropQueue $destination, InteropMessage $transportMessage): void
171171
{
172-
if ($delay = $transportMessage->getProperty(Config::DELAY_PARAMETER)) {
172+
if ($delay = $transportMessage->getProperty(Config::DELAY)) {
173173
$producer->setDeliveryDelay(null);
174174
$destination = $this->createDelayedTopic($destination);
175175
}

pkg/enqueue/Client/DriverPreSend.php

+4-4
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,21 @@ public function getDriver(): DriverInterface
3434

3535
public function isEvent(): bool
3636
{
37-
return (bool) $this->message->getProperty(Config::TOPIC_PARAMETER);
37+
return (bool) $this->message->getProperty(Config::TOPIC);
3838
}
3939

4040
public function isCommand(): bool
4141
{
42-
return (bool) $this->message->getProperty(Config::COMMAND_PARAMETER);
42+
return (bool) $this->message->getProperty(Config::COMMAND);
4343
}
4444

4545
public function getCommand(): string
4646
{
47-
return $this->message->getProperty(Config::COMMAND_PARAMETER);
47+
return $this->message->getProperty(Config::COMMAND);
4848
}
4949

5050
public function getTopic(): string
5151
{
52-
return $this->message->getProperty(Config::TOPIC_PARAMETER);
52+
return $this->message->getProperty(Config::TOPIC);
5353
}
5454
}

pkg/enqueue/Client/PostSend.php

+4-4
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,21 @@ public function getDriver(): DriverInterface
3434

3535
public function isEvent(): bool
3636
{
37-
return (bool) $this->message->getProperty(Config::TOPIC_PARAMETER);
37+
return (bool) $this->message->getProperty(Config::TOPIC);
3838
}
3939

4040
public function isCommand(): bool
4141
{
42-
return (bool) $this->message->getProperty(Config::COMMAND_PARAMETER);
42+
return (bool) $this->message->getProperty(Config::COMMAND);
4343
}
4444

4545
public function getCommand(): string
4646
{
47-
return $this->message->getProperty(Config::COMMAND_PARAMETER);
47+
return $this->message->getProperty(Config::COMMAND);
4848
}
4949

5050
public function getTopic(): string
5151
{
52-
return $this->message->getProperty(Config::TOPIC_PARAMETER);
52+
return $this->message->getProperty(Config::TOPIC);
5353
}
5454
}

pkg/enqueue/Client/Producer.php

+4-4
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public function sendEvent(string $topic, $message): void
4848
$this->extension->onPreSendEvent($preSend);
4949

5050
$message = $preSend->getMessage();
51-
$message->setProperty(Config::TOPIC_PARAMETER, $preSend->getTopic());
51+
$message->setProperty(Config::TOPIC, $preSend->getTopic());
5252

5353
$this->doSend($message);
5454
}
@@ -79,7 +79,7 @@ public function sendCommand(string $command, $message, bool $needReply = false):
7979
}
8080
}
8181

82-
$message->setProperty(Config::COMMAND_PARAMETER, $command);
82+
$message->setProperty(Config::COMMAND, $command);
8383
$message->setScope(Message::SCOPE_APP);
8484

8585
$this->doSend($message);
@@ -103,8 +103,8 @@ private function doSend(Message $message): void
103103
));
104104
}
105105

106-
if ($message->getProperty(Config::PROCESSOR_PARAMETER)) {
107-
throw new \LogicException(sprintf('The %s property must not be set.', Config::PROCESSOR_PARAMETER));
106+
if ($message->getProperty(Config::PROCESSOR)) {
107+
throw new \LogicException(sprintf('The %s property must not be set.', Config::PROCESSOR));
108108
}
109109

110110
if (!$message->getMessageId()) {

pkg/enqueue/Client/RouterProcessor.php

+5-5
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,22 @@ public function __construct(DriverInterface $driver)
2121

2222
public function process(InteropMessage $message, Context $context): Result
2323
{
24-
if ($message->getProperty(Config::COMMAND_PARAMETER)) {
24+
if ($message->getProperty(Config::COMMAND)) {
2525
return Result::reject(sprintf(
2626
'Unexpected command "%s" got. Command must not go to the router.',
27-
$message->getProperty(Config::COMMAND_PARAMETER)
27+
$message->getProperty(Config::COMMAND)
2828
));
2929
}
3030

31-
$topic = $message->getProperty(Config::TOPIC_PARAMETER);
31+
$topic = $message->getProperty(Config::TOPIC);
3232
if (false == $topic) {
33-
return Result::reject(sprintf('Topic property "%s" is required but not set or empty.', Config::TOPIC_PARAMETER));
33+
return Result::reject(sprintf('Topic property "%s" is required but not set or empty.', Config::TOPIC));
3434
}
3535

3636
$count = 0;
3737
foreach ($this->driver->getRouteCollection()->topic($topic) as $route) {
3838
$clientMessage = $this->driver->createClientMessage($message);
39-
$clientMessage->setProperty(Config::PROCESSOR_PARAMETER, $route->getProcessor());
39+
$clientMessage->setProperty(Config::PROCESSOR, $route->getProcessor());
4040

4141
$this->driver->sendToProcessor($clientMessage);
4242

pkg/enqueue/Tests/Client/ConsumptionExtension/ExclusiveCommandExtensionTest.php

+3-3
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public function testCouldBeConstructedWithDriverAsFirstArgument()
3838
public function testShouldDoNothingIfMessageHasTopicPropertySetOnPreReceive()
3939
{
4040
$message = new NullMessage();
41-
$message->setProperty(Config::TOPIC_PARAMETER, 'aTopic');
41+
$message->setProperty(Config::TOPIC, 'aTopic');
4242

4343
$context = new Context(new NullContext());
4444
$context->setInteropMessage($message);
@@ -63,7 +63,7 @@ public function testShouldDoNothingIfMessageHasTopicPropertySetOnPreReceive()
6363
public function testShouldDoNothingIfMessageHasCommandPropertySetOnPreReceive()
6464
{
6565
$message = new NullMessage();
66-
$message->setProperty(Config::COMMAND_PARAMETER, 'aCommand');
66+
$message->setProperty(Config::COMMAND, 'aCommand');
6767

6868
$context = new Context(new NullContext());
6969
$context->setInteropMessage($message);
@@ -88,7 +88,7 @@ public function testShouldDoNothingIfMessageHasCommandPropertySetOnPreReceive()
8888
public function testShouldDoNothingIfMessageHasProcessorPropertySetOnPreReceive()
8989
{
9090
$message = new NullMessage();
91-
$message->setProperty(Config::PROCESSOR_PARAMETER, 'aProcessor');
91+
$message->setProperty(Config::PROCESSOR, 'aProcessor');
9292

9393
$context = new Context(new NullContext());
9494
$context->setInteropMessage($message);

pkg/enqueue/Tests/Client/ConsumptionExtension/SetRouterPropertiesExtensionTest.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public function testShouldNotSetAnyPropertyIfProcessorNamePropertyAlreadySet()
100100
;
101101

102102
$message = new NullMessage();
103-
$message->setProperty(Config::PROCESSOR_PARAMETER, 'non-router-processor');
103+
$message->setProperty(Config::PROCESSOR, 'non-router-processor');
104104

105105
$context = new Context($this->createContextMock());
106106
$context->setInteropMessage($message);

pkg/enqueue/Tests/Client/DelegateProcessorTest.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public function testShouldProcessMessage()
3333
$session = $this->createContextMock();
3434
$message = new NullMessage();
3535
$message->setProperties([
36-
Config::PROCESSOR_PARAMETER => 'processor-name',
36+
Config::PROCESSOR => 'processor-name',
3737
]);
3838

3939
$processor = $this->createProcessorMock();

pkg/enqueue/Tests/Client/Driver/AmqpDriverTest.php

+5-5
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public function testShouldResetPriorityAndExpirationAndNeverCallProducerDelivery
170170
);
171171

172172
$message = new Message();
173-
$message->setProperty(Config::TOPIC_PARAMETER, 'topic');
173+
$message->setProperty(Config::TOPIC, 'topic');
174174
$message->setExpire(123);
175175
$message->setPriority(MessagePriority::HIGH);
176176

@@ -349,10 +349,10 @@ protected function assertTransportMessage(InteropMessage $transportMessage): voi
349349
], $transportMessage->getHeaders());
350350
$this->assertEquals([
351351
'pkey' => 'pval',
352-
'X-Enqueue-Content-Type' => 'ContentType',
353-
'X-Enqueue-Priority' => MessagePriority::HIGH,
354-
'X-Enqueue-Expire' => 123,
355-
'enqueue.delay' => 345,
352+
Config::CONTENT_TYPE => 'ContentType',
353+
Config::PRIORITY => MessagePriority::HIGH,
354+
Config::EXPIRE => 123,
355+
Config::DELAY => 345,
356356
], $transportMessage->getProperties());
357357
$this->assertSame('theMessageId', $transportMessage->getMessageId());
358358
$this->assertSame(1000, $transportMessage->getTimestamp());

0 commit comments

Comments
 (0)