Skip to content

Commit e03f49b

Browse files
authored
Merge pull request #217 from php-enqueue/amqp-add-basic-consume-support
[BC break] Amqp add basic consume support
2 parents ffa585e + dd9038b commit e03f49b

28 files changed

+783
-24
lines changed

composer.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727
"enqueue/test": "*@dev",
2828
"enqueue/async-event-dispatcher": "*@dev",
2929
"queue-interop/queue-interop": "^0.6@dev",
30-
"queue-interop/amqp-interop": "^0.6@dev",
31-
"queue-interop/queue-spec": "^0.5@dev",
30+
"queue-interop/amqp-interop": "^0.7@dev",
31+
"queue-interop/queue-spec": "^0.5.1@dev",
3232

3333
"phpunit/phpunit": "^5",
3434
"doctrine/doctrine-bundle": "~1.2",

docker/Dockerfile

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ RUN echo "extension=rdkafka.so" > /etc/php/7.1/cli/conf.d/10-rdkafka.ini
1717
RUN echo "extension=rdkafka.so" > /etc/php/7.1/fpm/conf.d/10-rdkafka.ini
1818

1919
COPY ./php/cli.ini /etc/php/7.1/cli/conf.d/1-dev_cli.ini
20+
COPY ./php/amqp.so /usr/lib/php/20160303/amqp.so
2021
COPY ./bin/dev_entrypoiny.sh /usr/local/bin/entrypoint.sh
2122
RUN chmod u+x /usr/local/bin/entrypoint.sh
2223

docker/php/amqp.so

609 KB
Binary file not shown.

phpstan.neon

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ parameters:
77
- pkg/redis/PhpRedis.php
88
- pkg/redis/RedisConnectionFactory.php
99
- pkg/gearman
10-
- pkg/amqp-ext/AmqpConsumer.php
10+
- pkg/amqp-ext/AmqpConsumer.php
11+
- pkg/amqp-ext/AmqpContext.php

pkg/amqp-bunny/AmqpContext.php

+104
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
namespace Enqueue\AmqpBunny;
44

55
use Bunny\Channel;
6+
use Bunny\Client;
7+
use Bunny\Message;
68
use Enqueue\AmqpTools\DelayStrategyAware;
79
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
810
use Interop\Amqp\AmqpBind as InteropAmqpBind;
11+
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
912
use Interop\Amqp\AmqpContext as InteropAmqpContext;
1013
use Interop\Amqp\AmqpMessage as InteropAmqpMessage;
1114
use Interop\Amqp\AmqpQueue as InteropAmqpQueue;
@@ -43,6 +46,13 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware
4346
*/
4447
private $buffer;
4548

49+
/**
50+
* an item contains an array: [AmqpConsumerInterop $consumer, callable $callback];.
51+
*
52+
* @var array
53+
*/
54+
private $subscribers;
55+
4656
/**
4757
* Callable must return instance of \Bunny\Channel once called.
4858
*
@@ -309,6 +319,77 @@ public function setQos($prefetchSize, $prefetchCount, $global)
309319
$this->getBunnyChannel()->qos($prefetchSize, $prefetchCount, $global);
310320
}
311321

322+
/**
323+
* {@inheritdoc}
324+
*/
325+
public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
326+
{
327+
if ($consumer->getConsumerTag() && array_key_exists($consumer->getConsumerTag(), $this->subscribers)) {
328+
return;
329+
}
330+
331+
$bunnyCallback = function (Message $message, Channel $channel, Client $bunny) {
332+
$receivedMessage = $this->convertMessage($message);
333+
$receivedMessage->setConsumerTag($message->consumerTag);
334+
335+
/**
336+
* @var AmqpConsumer
337+
* @var callable $callback
338+
*/
339+
list($consumer, $callback) = $this->subscribers[$message->consumerTag];
340+
341+
if (false === call_user_func($callback, $receivedMessage, $consumer)) {
342+
$bunny->stop();
343+
}
344+
};
345+
346+
$frame = $this->getBunnyChannel()->consume(
347+
$bunnyCallback,
348+
$consumer->getQueue()->getQueueName(),
349+
$consumer->getConsumerTag(),
350+
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOLOCAL),
351+
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOACK),
352+
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_EXCLUSIVE),
353+
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOWAIT)
354+
);
355+
356+
if (empty($frame->consumerTag)) {
357+
throw new Exception('Got empty consumer tag');
358+
}
359+
360+
$consumer->setConsumerTag($frame->consumerTag);
361+
362+
$this->subscribers[$frame->consumerTag] = [$consumer, $callback];
363+
}
364+
365+
/**
366+
* {@inheritdoc}
367+
*/
368+
public function unsubscribe(InteropAmqpConsumer $consumer)
369+
{
370+
if (false == $consumer->getConsumerTag()) {
371+
return;
372+
}
373+
374+
$consumerTag = $consumer->getConsumerTag();
375+
376+
$this->getBunnyChannel()->cancel($consumerTag);
377+
$consumer->setConsumerTag(null);
378+
unset($this->subscribers[$consumerTag]);
379+
}
380+
381+
/**
382+
* {@inheritdoc}
383+
*/
384+
public function consume($timeout = 0)
385+
{
386+
if (empty($this->subscribers)) {
387+
throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming');
388+
}
389+
390+
$this->getBunnyChannel()->getClient()->run($timeout / 1000);
391+
}
392+
312393
/**
313394
* @return Channel
314395
*/
@@ -328,4 +409,27 @@ public function getBunnyChannel()
328409

329410
return $this->bunnyChannel;
330411
}
412+
413+
/**
414+
* @param Message $bunnyMessage
415+
*
416+
* @return InteropAmqpMessage
417+
*/
418+
private function convertMessage(Message $bunnyMessage)
419+
{
420+
$headers = $bunnyMessage->headers;
421+
422+
$properties = [];
423+
if (isset($headers['application_headers'])) {
424+
$properties = $headers['application_headers'];
425+
}
426+
unset($headers['application_headers']);
427+
428+
$message = new AmqpMessage($bunnyMessage->content, $properties, $headers);
429+
$message->setDeliveryTag($bunnyMessage->deliveryTag);
430+
$message->setRedelivered($bunnyMessage->redelivered);
431+
$message->setRoutingKey($bunnyMessage->routingKey);
432+
433+
return $message;
434+
}
331435
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpBunny\Tests\Spec;
4+
5+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeBreakOnFalseSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeBreakOnFalseTest extends BasicConsumeBreakOnFalseSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpBunny\Tests\Spec;
4+
5+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeFromAllSubscribedQueuesSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeFromAllSubscribedQueuesTest extends BasicConsumeFromAllSubscribedQueuesSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpBunny\Tests\Spec;
4+
5+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeShouldAddConsumerTagOnSubscribeSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeShouldAddConsumerTagOnSubscribeTest extends BasicConsumeShouldAddConsumerTagOnSubscribeSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpBunny\Tests\Spec;
4+
5+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeShouldRemoveConsumerTagOnUnsubscribeSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeShouldRemoveConsumerTagOnUnsubscribeTest extends BasicConsumeShouldRemoveConsumerTagOnUnsubscribeSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpBunny\Tests\Spec;
4+
5+
use Enqueue\AmqpBunny\AmqpConnectionFactory;
6+
use Interop\Queue\Spec\Amqp\BasicConsumeUntilUnsubscribedSpec;
7+
8+
/**
9+
* @group functional
10+
*/
11+
class AmqpBasicConsumeUntilUnsubscribedTest extends BasicConsumeUntilUnsubscribedSpec
12+
{
13+
/**
14+
* {@inheritdoc}
15+
*/
16+
protected function createContext()
17+
{
18+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
19+
20+
return $factory->createContext();
21+
}
22+
}

pkg/amqp-bunny/composer.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
"require": {
88
"php": ">=5.6",
99

10-
"queue-interop/amqp-interop": "^0.6@dev",
10+
"queue-interop/amqp-interop": "^0.7@dev",
1111
"bunny/bunny": "^0.2.4",
1212
"enqueue/amqp-tools": "^0.8@dev"
1313
},
@@ -16,7 +16,7 @@
1616
"enqueue/test": "^0.8@dev",
1717
"enqueue/enqueue": "^0.8@dev",
1818
"enqueue/null": "^0.8@dev",
19-
"queue-interop/queue-spec": "^0.5@dev",
19+
"queue-interop/queue-spec": "^0.5.1@dev",
2020
"symfony/dependency-injection": "^2.8|^3",
2121
"symfony/config": "^2.8|^3"
2222
},

0 commit comments

Comments
 (0)