Skip to content

Commit a99805a

Browse files
authored
Merge pull request #1033 from fostermadeco/master
Implement DeliveryDelay, Priority and TimeToLive in PheanstalkProducer
2 parents 749fb74 + 3b205e0 commit a99805a

File tree

2 files changed

+184
-21
lines changed

2 files changed

+184
-21
lines changed

pkg/pheanstalk/PheanstalkProducer.php

+35-21
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
use Interop\Queue\Destination;
88
use Interop\Queue\Exception\InvalidDestinationException;
99
use Interop\Queue\Exception\InvalidMessageException;
10-
use Interop\Queue\Exception\PriorityNotSupportedException;
1110
use Interop\Queue\Message;
1211
use Interop\Queue\Producer;
1312
use Pheanstalk\Pheanstalk;
@@ -19,6 +18,21 @@ class PheanstalkProducer implements Producer
1918
*/
2019
private $pheanstalk;
2120

21+
/**
22+
* @var int
23+
*/
24+
private $deliveryDelay;
25+
26+
/**
27+
* @var int
28+
*/
29+
private $priority;
30+
31+
/**
32+
* @var int
33+
*/
34+
private $timeToLive;
35+
2236
public function __construct(Pheanstalk $pheanstalk)
2337
{
2438
$this->pheanstalk = $pheanstalk;
@@ -35,11 +49,17 @@ public function send(Destination $destination, Message $message): void
3549

3650
$rawMessage = json_encode($message);
3751
if (JSON_ERROR_NONE !== json_last_error()) {
38-
throw new \InvalidArgumentException(sprintf(
39-
'Could not encode value into json. Error %s and message %s',
40-
json_last_error(),
41-
json_last_error_msg()
42-
));
52+
throw new \InvalidArgumentException(sprintf('Could not encode value into json. Error %s and message %s', json_last_error(), json_last_error_msg()));
53+
}
54+
55+
if (null !== $this->priority && null === $message->getHeader('priority')) {
56+
$message->setPriority($this->priority);
57+
}
58+
if (null !== $this->deliveryDelay && null === $message->getHeader('delay')) {
59+
$message->setDelay($this->deliveryDelay / 1000);
60+
}
61+
if (null !== $this->timeToLive && null === $message->getHeader('ttr')) {
62+
$message->setTimeToRun($this->timeToLive / 1000);
4363
}
4464

4565
$this->pheanstalk->useTube($destination->getName())->put(
@@ -55,49 +75,43 @@ public function send(Destination $destination, Message $message): void
5575
*/
5676
public function setDeliveryDelay(int $deliveryDelay = null): Producer
5777
{
58-
if (null === $deliveryDelay) {
59-
return $this;
60-
}
78+
$this->deliveryDelay = $deliveryDelay;
6179

62-
throw new \LogicException('Not implemented');
80+
return $this;
6381
}
6482

6583
public function getDeliveryDelay(): ?int
6684
{
67-
return null;
85+
return $this->deliveryDelay;
6886
}
6987

7088
/**
7189
* @return PheanstalkProducer
7290
*/
7391
public function setPriority(int $priority = null): Producer
7492
{
75-
if (null === $priority) {
76-
return $this;
77-
}
93+
$this->priority = $priority;
7894

79-
throw PriorityNotSupportedException::providerDoestNotSupportIt();
95+
return $this;
8096
}
8197

8298
public function getPriority(): ?int
8399
{
84-
return null;
100+
return $this->priority;
85101
}
86102

87103
/**
88104
* @return PheanstalkProducer
89105
*/
90106
public function setTimeToLive(int $timeToLive = null): Producer
91107
{
92-
if (null === $timeToLive) {
93-
return $this;
94-
}
108+
$this->timeToLive = $timeToLive;
95109

96-
throw new \LogicException('Not implemented');
110+
return $this;
97111
}
98112

99113
public function getTimeToLive(): ?int
100114
{
101-
return null;
115+
return $this->timeToLive;
102116
}
103117
}

pkg/pheanstalk/Tests/PheanstalkProducerTest.php

+149
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,155 @@ public function testShouldJsonEncodeMessageAndPutToExpectedTube()
6666
);
6767
}
6868

69+
public function testMessagePriorityPrecedesPriority()
70+
{
71+
$message = new PheanstalkMessage('theBody');
72+
$message->setPriority(100);
73+
74+
$pheanstalk = $this->createPheanstalkMock();
75+
$pheanstalk
76+
->expects($this->once())
77+
->method('useTube')
78+
->with('theQueueName')
79+
->willReturnSelf()
80+
;
81+
$pheanstalk
82+
->expects($this->once())
83+
->method('put')
84+
->with('{"body":"theBody","properties":[],"headers":{"priority":100}}', 100, Pheanstalk::DEFAULT_DELAY, Pheanstalk::DEFAULT_TTR)
85+
;
86+
87+
$producer = new PheanstalkProducer($pheanstalk);
88+
$producer->setPriority(50);
89+
90+
$producer->send(
91+
new PheanstalkDestination('theQueueName'),
92+
$message
93+
);
94+
}
95+
96+
public function testAccessDeliveryDelayAsMilliseconds()
97+
{
98+
$producer = new PheanstalkProducer($this->createPheanstalkMock());
99+
$producer->setDeliveryDelay(5000);
100+
101+
$this->assertEquals(5000, $producer->getDeliveryDelay());
102+
}
103+
104+
public function testDeliveryDelayResolvesToSeconds()
105+
{
106+
$message = new PheanstalkMessage('theBody');
107+
108+
$pheanstalk = $this->createPheanstalkMock();
109+
$pheanstalk
110+
->expects($this->once())
111+
->method('useTube')
112+
->with('theQueueName')
113+
->willReturnSelf()
114+
;
115+
$pheanstalk
116+
->expects($this->once())
117+
->method('put')
118+
->with('{"body":"theBody","properties":[],"headers":[]}', Pheanstalk::DEFAULT_PRIORITY, 5, Pheanstalk::DEFAULT_TTR)
119+
;
120+
121+
$producer = new PheanstalkProducer($pheanstalk);
122+
$producer->setDeliveryDelay(5000);
123+
124+
$producer->send(
125+
new PheanstalkDestination('theQueueName'),
126+
$message
127+
);
128+
}
129+
130+
public function testMessageDelayPrecedesDeliveryDelay()
131+
{
132+
$message = new PheanstalkMessage('theBody');
133+
$message->setDelay(25);
134+
135+
$pheanstalk = $this->createPheanstalkMock();
136+
$pheanstalk
137+
->expects($this->once())
138+
->method('useTube')
139+
->with('theQueueName')
140+
->willReturnSelf()
141+
;
142+
$pheanstalk
143+
->expects($this->once())
144+
->method('put')
145+
->with('{"body":"theBody","properties":[],"headers":{"delay":25}}', Pheanstalk::DEFAULT_PRIORITY, 25, Pheanstalk::DEFAULT_TTR)
146+
;
147+
148+
$producer = new PheanstalkProducer($pheanstalk);
149+
$producer->setDeliveryDelay(1000);
150+
151+
$producer->send(
152+
new PheanstalkDestination('theQueueName'),
153+
$message
154+
);
155+
}
156+
157+
public function testAccessTimeToLiveAsMilliseconds()
158+
{
159+
$producer = new PheanstalkProducer($this->createPheanstalkMock());
160+
$producer->setTimeToLive(5000);
161+
162+
$this->assertEquals(5000, $producer->getTimeToLive());
163+
}
164+
165+
public function testTimeToLiveResolvesToSeconds()
166+
{
167+
$message = new PheanstalkMessage('theBody');
168+
169+
$pheanstalk = $this->createPheanstalkMock();
170+
$pheanstalk
171+
->expects($this->once())
172+
->method('useTube')
173+
->with('theQueueName')
174+
->willReturnSelf()
175+
;
176+
$pheanstalk
177+
->expects($this->once())
178+
->method('put')
179+
->with('{"body":"theBody","properties":[],"headers":[]}', Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, 5)
180+
;
181+
182+
$producer = new PheanstalkProducer($pheanstalk);
183+
$producer->setTimeToLive(5000);
184+
185+
$producer->send(
186+
new PheanstalkDestination('theQueueName'),
187+
$message
188+
);
189+
}
190+
191+
public function testMessageTimeToRunPrecedesTimeToLive()
192+
{
193+
$message = new PheanstalkMessage('theBody');
194+
$message->setTimeToRun(25);
195+
196+
$pheanstalk = $this->createPheanstalkMock();
197+
$pheanstalk
198+
->expects($this->once())
199+
->method('useTube')
200+
->with('theQueueName')
201+
->willReturnSelf()
202+
;
203+
$pheanstalk
204+
->expects($this->once())
205+
->method('put')
206+
->with('{"body":"theBody","properties":[],"headers":{"ttr":25}}', Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, 25)
207+
;
208+
209+
$producer = new PheanstalkProducer($pheanstalk);
210+
$producer->setTimeToLive(1000);
211+
212+
$producer->send(
213+
new PheanstalkDestination('theQueueName'),
214+
$message
215+
);
216+
}
217+
69218
/**
70219
* @return MockObject|Pheanstalk
71220
*/

0 commit comments

Comments
 (0)