Skip to content

Commit 57d4ed4

Browse files
authored
Merge pull request #492 from php-enqueue/amqp-subscription-consumers
Move subscription related logic to SubscriptionConsumer class.
2 parents 3012dbb + 4b654ae commit 57d4ed4

12 files changed

+601
-240
lines changed

pkg/amqp-bunny/AmqpContext.php

+9-73
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,9 @@
33
namespace Enqueue\AmqpBunny;
44

55
use Bunny\Channel;
6-
use Bunny\Client;
7-
use Bunny\Exception\ClientException;
86
use Bunny\Message;
97
use Enqueue\AmqpTools\DelayStrategyAware;
108
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
11-
use Enqueue\AmqpTools\SignalSocketHelper;
12-
use Enqueue\AmqpTools\SubscriptionConsumer;
139
use Interop\Amqp\AmqpBind as InteropAmqpBind;
1410
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
1511
use Interop\Amqp\AmqpContext as InteropAmqpContext;
@@ -51,11 +47,9 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware, PsrSubscrip
5147
private $buffer;
5248

5349
/**
54-
* an item contains an array: [AmqpConsumerInterop $consumer, callable $callback];.
55-
*
56-
* @var array
50+
* @var AmqpSubscriptionConsumer
5751
*/
58-
private $subscribers;
52+
private $bcSubscriptionConsumer;
5953

6054
/**
6155
* Callable must return instance of \Bunny\Channel once called.
@@ -81,7 +75,7 @@ public function __construct($bunnyChannel, $config = [])
8175
}
8276

8377
$this->buffer = new Buffer();
84-
$this->subscribers = [];
78+
$this->bcSubscriptionConsumer = $this->createSubscriptionConsumer();
8579
}
8680

8781
/**
@@ -140,10 +134,12 @@ public function createConsumer(PsrDestination $destination)
140134

141135
/**
142136
* {@inheritdoc}
137+
*
138+
* @return AmqpSubscriptionConsumer
143139
*/
144140
public function createSubscriptionConsumer()
145141
{
146-
return new SubscriptionConsumer($this);
142+
return new AmqpSubscriptionConsumer($this);
147143
}
148144

149145
/**
@@ -339,42 +335,7 @@ public function setQos($prefetchSize, $prefetchCount, $global)
339335
*/
340336
public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
341337
{
342-
if ($consumer->getConsumerTag() && array_key_exists($consumer->getConsumerTag(), $this->subscribers)) {
343-
return;
344-
}
345-
346-
$bunnyCallback = function (Message $message, Channel $channel, Client $bunny) {
347-
$receivedMessage = $this->convertMessage($message);
348-
$receivedMessage->setConsumerTag($message->consumerTag);
349-
350-
/**
351-
* @var AmqpConsumer
352-
* @var callable $callback
353-
*/
354-
list($consumer, $callback) = $this->subscribers[$message->consumerTag];
355-
356-
if (false === call_user_func($callback, $receivedMessage, $consumer)) {
357-
$bunny->stop();
358-
}
359-
};
360-
361-
$frame = $this->getBunnyChannel()->consume(
362-
$bunnyCallback,
363-
$consumer->getQueue()->getQueueName(),
364-
$consumer->getConsumerTag(),
365-
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOLOCAL),
366-
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOACK),
367-
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_EXCLUSIVE),
368-
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOWAIT)
369-
);
370-
371-
if (empty($frame->consumerTag)) {
372-
throw new Exception('Got empty consumer tag');
373-
}
374-
375-
$consumer->setConsumerTag($frame->consumerTag);
376-
377-
$this->subscribers[$frame->consumerTag] = [$consumer, $callback];
338+
$this->bcSubscriptionConsumer->subscribe($consumer, $callback);
378339
}
379340

380341
/**
@@ -384,15 +345,7 @@ public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
384345
*/
385346
public function unsubscribe(InteropAmqpConsumer $consumer)
386347
{
387-
if (false == $consumer->getConsumerTag()) {
388-
return;
389-
}
390-
391-
$consumerTag = $consumer->getConsumerTag();
392-
393-
$this->getBunnyChannel()->cancel($consumerTag);
394-
$consumer->setConsumerTag(null);
395-
unset($this->subscribers[$consumerTag]);
348+
$this->bcSubscriptionConsumer->unsubscribe($consumer);
396349
}
397350

398351
/**
@@ -402,24 +355,7 @@ public function unsubscribe(InteropAmqpConsumer $consumer)
402355
*/
403356
public function consume($timeout = 0)
404357
{
405-
if (empty($this->subscribers)) {
406-
throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming');
407-
}
408-
409-
$signalHandler = new SignalSocketHelper();
410-
$signalHandler->beforeSocket();
411-
412-
try {
413-
$this->getBunnyChannel()->getClient()->run(0 !== $timeout ? $timeout / 1000 : null);
414-
} catch (ClientException $e) {
415-
if ('stream_select() failed.' == $e->getMessage() && $signalHandler->wasThereSignal()) {
416-
return;
417-
}
418-
419-
throw $e;
420-
} finally {
421-
$signalHandler->afterSocket();
422-
}
358+
$this->bcSubscriptionConsumer->consume($timeout);
423359
}
424360

425361
/**
+141
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpBunny;
4+
5+
use Bunny\Channel;
6+
use Bunny\Client;
7+
use Bunny\Exception\ClientException;
8+
use Bunny\Message;
9+
use Enqueue\AmqpTools\SignalSocketHelper;
10+
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
11+
use Interop\Queue\Exception;
12+
use Interop\Queue\PsrConsumer;
13+
use Interop\Queue\PsrSubscriptionConsumer;
14+
15+
class AmqpSubscriptionConsumer implements PsrSubscriptionConsumer
16+
{
17+
/**
18+
* @var AmqpContext
19+
*/
20+
private $context;
21+
22+
/**
23+
* an item contains an array: [AmqpConsumerInterop $consumer, callable $callback];.
24+
*
25+
* @var array
26+
*/
27+
private $subscribers;
28+
29+
public function __construct(AmqpContext $context)
30+
{
31+
$this->context = $context;
32+
33+
$this->subscribers = [];
34+
}
35+
36+
/**
37+
* {@inheritdoc}
38+
*/
39+
public function consume($timeout = 0)
40+
{
41+
if (empty($this->subscribers)) {
42+
throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming');
43+
}
44+
45+
$signalHandler = new SignalSocketHelper();
46+
$signalHandler->beforeSocket();
47+
48+
try {
49+
$this->context->getBunnyChannel()->getClient()->run(0 !== $timeout ? $timeout / 1000 : null);
50+
} catch (ClientException $e) {
51+
if ('stream_select() failed.' == $e->getMessage() && $signalHandler->wasThereSignal()) {
52+
return;
53+
}
54+
55+
throw $e;
56+
} finally {
57+
$signalHandler->afterSocket();
58+
}
59+
}
60+
61+
/**
62+
* @param AmqpConsumer $consumer
63+
*
64+
* {@inheritdoc}
65+
*/
66+
public function subscribe(PsrConsumer $consumer, callable $callback)
67+
{
68+
if (false == $consumer instanceof AmqpConsumer) {
69+
throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', AmqpConsumer::class, get_class($consumer)));
70+
}
71+
72+
if ($consumer->getConsumerTag() && array_key_exists($consumer->getConsumerTag(), $this->subscribers)) {
73+
return;
74+
}
75+
76+
$bunnyCallback = function (Message $message, Channel $channel, Client $bunny) {
77+
$receivedMessage = $this->context->convertMessage($message);
78+
$receivedMessage->setConsumerTag($message->consumerTag);
79+
80+
/**
81+
* @var AmqpConsumer
82+
* @var callable $callback
83+
*/
84+
list($consumer, $callback) = $this->subscribers[$message->consumerTag];
85+
86+
if (false === call_user_func($callback, $receivedMessage, $consumer)) {
87+
$bunny->stop();
88+
}
89+
};
90+
91+
$frame = $this->context->getBunnyChannel()->consume(
92+
$bunnyCallback,
93+
$consumer->getQueue()->getQueueName(),
94+
$consumer->getConsumerTag(),
95+
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOLOCAL),
96+
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOACK),
97+
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_EXCLUSIVE),
98+
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOWAIT)
99+
);
100+
101+
if (empty($frame->consumerTag)) {
102+
throw new Exception('Got empty consumer tag');
103+
}
104+
105+
$consumer->setConsumerTag($frame->consumerTag);
106+
107+
$this->subscribers[$frame->consumerTag] = [$consumer, $callback];
108+
}
109+
110+
/**
111+
* @param AmqpConsumer $consumer
112+
*
113+
* {@inheritdoc}
114+
*/
115+
public function unsubscribe(PsrConsumer $consumer)
116+
{
117+
if (false == $consumer instanceof AmqpConsumer) {
118+
throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', AmqpConsumer::class, get_class($consumer)));
119+
}
120+
121+
if (false == $consumer->getConsumerTag()) {
122+
return;
123+
}
124+
125+
$consumerTag = $consumer->getConsumerTag();
126+
127+
$this->context->getBunnyChannel()->cancel($consumerTag);
128+
$consumer->setConsumerTag(null);
129+
unset($this->subscribers[$consumerTag]);
130+
}
131+
132+
/**
133+
* {@inheritdoc}
134+
*/
135+
public function unsubscribeAll()
136+
{
137+
foreach ($this->subscribers as list($consumer)) {
138+
$this->unsubscribe($consumer);
139+
}
140+
}
141+
}

pkg/amqp-bunny/Tests/AmqpContextTest.php

+16
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55
use Bunny\Channel;
66
use Bunny\Protocol\MethodQueueDeclareOkFrame;
77
use Enqueue\AmqpBunny\AmqpContext;
8+
use Enqueue\AmqpBunny\AmqpSubscriptionConsumer;
89
use Interop\Amqp\Impl\AmqpBind;
910
use Interop\Amqp\Impl\AmqpQueue;
1011
use Interop\Amqp\Impl\AmqpTopic;
12+
use Interop\Queue\PsrSubscriptionConsumerAwareContext;
1113
use PHPUnit\Framework\TestCase;
1214

1315
class AmqpContextTest extends TestCase
@@ -235,6 +237,20 @@ public function testShouldSetQos()
235237
$context->setQos(123, 456, true);
236238
}
237239

240+
public function testShouldImplementPsrSubscriptionConsumerAwareInterface()
241+
{
242+
$rc = new \ReflectionClass(AmqpContext::class);
243+
244+
$this->assertTrue($rc->implementsInterface(PsrSubscriptionConsumerAwareContext::class));
245+
}
246+
247+
public function testShouldReturnExpectedSubscriptionConsumerInstance()
248+
{
249+
$context = new AmqpContext($this->createChannelMock());
250+
251+
$this->assertInstanceOf(AmqpSubscriptionConsumer::class, $context->createSubscriptionConsumer());
252+
}
253+
238254
/**
239255
* @return \PHPUnit_Framework_MockObject_MockObject|Channel
240256
*/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpBunny\Tests;
4+
5+
use Enqueue\AmqpBunny\AmqpContext;
6+
use Enqueue\AmqpBunny\AmqpSubscriptionConsumer;
7+
use Interop\Queue\PsrSubscriptionConsumer;
8+
use PHPUnit\Framework\TestCase;
9+
10+
class AmqpSubscriptionConsumerTest extends TestCase
11+
{
12+
public function testShouldImplementPsrSubscriptionConsumerInterface()
13+
{
14+
$rc = new \ReflectionClass(AmqpSubscriptionConsumer::class);
15+
16+
$this->assertTrue($rc->implementsInterface(PsrSubscriptionConsumer::class));
17+
}
18+
19+
public function testCouldBeConstructedWithAmqpContextAsFirstArgument()
20+
{
21+
new AmqpSubscriptionConsumer($this->createAmqpContextMock());
22+
}
23+
24+
/**
25+
* @return AmqpContext|\PHPUnit_Framework_MockObject_MockObject
26+
*/
27+
private function createAmqpContextMock()
28+
{
29+
return $this->createMock(AmqpContext::class);
30+
}
31+
}

0 commit comments

Comments
 (0)