@@ -47,10 +47,10 @@ public function __construct(
47
47
48
48
public function sendToRouter (Message $ message ): void
49
49
{
50
- if ($ message ->getProperty (Config::PARAMETER_COMMAND_NAME )) {
50
+ if ($ message ->getProperty (Config::COMMAND )) {
51
51
throw new \LogicException ('Command must not be send to router but go directly to its processor. ' );
52
52
}
53
- if (false == $ message ->getProperty (Config::PARAMETER_TOPIC_NAME )) {
53
+ if (false == $ message ->getProperty (Config::TOPIC )) {
54
54
throw new \LogicException ('Topic name parameter is required but is not set ' );
55
55
}
56
56
@@ -63,21 +63,21 @@ public function sendToRouter(Message $message): void
63
63
64
64
public function sendToProcessor (Message $ message ): void
65
65
{
66
- $ topic = $ message ->getProperty (Config::PARAMETER_TOPIC_NAME );
67
- $ command = $ message ->getProperty (Config::PARAMETER_COMMAND_NAME );
66
+ $ topic = $ message ->getProperty (Config::TOPIC );
67
+ $ command = $ message ->getProperty (Config::COMMAND );
68
68
69
69
/** @var InteropQueue $queue */
70
70
$ queue = null ;
71
- if ($ topic && $ processor = $ message ->getProperty (Config::PARAMETER_PROCESSOR_NAME )) {
71
+ if ($ topic && $ processor = $ message ->getProperty (Config::PROCESSOR )) {
72
72
$ route = $ this ->routeCollection ->topicAndProcessor ($ topic , $ processor );
73
73
if (false == $ route ) {
74
74
throw new \LogicException (sprintf ('There is no route for topic "%s" and processor "%s" ' , $ topic , $ processor ));
75
75
}
76
76
77
- $ message ->setProperty (Config::PARAMETER_PROCESSOR_NAME , $ route ->getProcessor ());
77
+ $ message ->setProperty (Config::PROCESSOR , $ route ->getProcessor ());
78
78
$ queue = $ this ->createRouteQueue ($ route );
79
- } elseif ($ topic && false == $ message ->getProperty (Config::PARAMETER_PROCESSOR_NAME )) {
80
- $ message ->setProperty (Config::PARAMETER_PROCESSOR_NAME , $ this ->config ->getRouterProcessorName ());
79
+ } elseif ($ topic && false == $ message ->getProperty (Config::PROCESSOR )) {
80
+ $ message ->setProperty (Config::PROCESSOR , $ this ->config ->getRouterProcessorName ());
81
81
82
82
$ queue = $ this ->createQueue ($ this ->config ->getRouterQueueName ());
83
83
} elseif ($ command ) {
@@ -86,7 +86,7 @@ public function sendToProcessor(Message $message): void
86
86
throw new \LogicException (sprintf ('There is no route for command "%s". ' , $ command ));
87
87
}
88
88
89
- $ message ->setProperty (Config::PARAMETER_PROCESSOR_NAME , $ route ->getProcessor ());
89
+ $ message ->setProperty (Config::PROCESSOR , $ route ->getProcessor ());
90
90
$ queue = $ this ->createRouteQueue ($ route );
91
91
} else {
92
92
throw new \LogicException ('Either topic or command parameter must be set. ' );
@@ -96,15 +96,15 @@ public function sendToProcessor(Message $message): void
96
96
97
97
$ producer = $ this ->context ->createProducer ();
98
98
99
- if (null !== $ delay = $ transportMessage ->getProperty (' X-Enqueue-Delay ' )) {
99
+ if (null !== $ delay = $ transportMessage ->getProperty (Config:: DELAY )) {
100
100
$ producer ->setDeliveryDelay ($ delay * 1000 );
101
101
}
102
102
103
- if (null !== $ expire = $ transportMessage ->getProperty (' X-Enqueue-Expire ' )) {
103
+ if (null !== $ expire = $ transportMessage ->getProperty (Config:: EXPIRE )) {
104
104
$ producer ->setTimeToLive ($ expire * 1000 );
105
105
}
106
106
107
- if (null !== $ priority = $ transportMessage ->getProperty (' X-Enqueue-Priority ' )) {
107
+ if (null !== $ priority = $ transportMessage ->getProperty (Config:: PRIORITY )) {
108
108
$ priorityMap = $ this ->getPriorityMap ();
109
109
110
110
$ producer ->setPriority ($ priorityMap [$ priority ]);
@@ -149,19 +149,19 @@ public function createTransportMessage(Message $clientMessage): InteropMessage
149
149
$ transportMessage ->setCorrelationId ($ clientMessage ->getCorrelationId ());
150
150
151
151
if ($ contentType = $ clientMessage ->getContentType ()) {
152
- $ transportMessage ->setProperty (' X-Enqueue-Content-Type ' , $ contentType );
152
+ $ transportMessage ->setProperty (Config:: CONTENT_TYPE , $ contentType );
153
153
}
154
154
155
155
if ($ priority = $ clientMessage ->getPriority ()) {
156
- $ transportMessage ->setProperty (' X-Enqueue-Priority ' , $ priority );
156
+ $ transportMessage ->setProperty (Config:: PRIORITY , $ priority );
157
157
}
158
158
159
159
if ($ expire = $ clientMessage ->getExpire ()) {
160
- $ transportMessage ->setProperty (' X-Enqueue-Expire ' , $ expire );
160
+ $ transportMessage ->setProperty (Config:: EXPIRE , $ expire );
161
161
}
162
162
163
163
if ($ delay = $ clientMessage ->getDelay ()) {
164
- $ transportMessage ->setProperty (' X-Enqueue-Delay ' , $ delay );
164
+ $ transportMessage ->setProperty (Config:: DELAY , $ delay );
165
165
}
166
166
167
167
return $ transportMessage ;
@@ -179,19 +179,19 @@ public function createClientMessage(InteropMessage $transportMessage): Message
179
179
$ clientMessage ->setReplyTo ($ transportMessage ->getReplyTo ());
180
180
$ clientMessage ->setCorrelationId ($ transportMessage ->getCorrelationId ());
181
181
182
- if ($ contentType = $ transportMessage ->getProperty (' X-Enqueue-Content-Type ' )) {
182
+ if ($ contentType = $ transportMessage ->getProperty (Config:: CONTENT_TYPE )) {
183
183
$ clientMessage ->setContentType ($ contentType );
184
184
}
185
185
186
- if ($ priority = $ transportMessage ->getProperty (' X-Enqueue-Priority ' )) {
186
+ if ($ priority = $ transportMessage ->getProperty (Config:: PRIORITY )) {
187
187
$ clientMessage ->setPriority ($ priority );
188
188
}
189
189
190
- if ($ delay = $ transportMessage ->getProperty (' X-Enqueue-Delay ' )) {
190
+ if ($ delay = $ transportMessage ->getProperty (Config:: DELAY )) {
191
191
$ clientMessage ->setDelay ((int ) $ delay );
192
192
}
193
193
194
- if ($ expire = $ transportMessage ->getProperty (' X-Enqueue-Expire ' )) {
194
+ if ($ expire = $ transportMessage ->getProperty (Config:: EXPIRE )) {
195
195
$ clientMessage ->setExpire ((int ) $ expire );
196
196
}
197
197
0 commit comments