Skip to content

Commit 83fd8cc

Browse files
committed
[amqp] pass context to tick callback.
1 parent 2c192e8 commit 83fd8cc

File tree

2 files changed

+84
-1
lines changed

2 files changed

+84
-1
lines changed

pkg/amqp-lib/AmqpSubscriptionConsumer.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public function consume(int $timeout = 0): void
5151
$context->getLibChannel()->getConnection()->getIO()->check_heartbeat();
5252
};
5353

54-
$this->heartbeatOnTick && register_tick_function($heartbeatOnTick);
54+
$this->heartbeatOnTick && register_tick_function($heartbeatOnTick, $this->context);
5555

5656
try {
5757
while (true) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpLib\Tests\Functional;
4+
5+
use Enqueue\AmqpLib\AmqpConnectionFactory;
6+
use Enqueue\AmqpLib\AmqpContext;
7+
use Interop\Amqp\AmqpQueue;
8+
use Interop\Queue\Consumer;
9+
use Interop\Queue\Context;
10+
use Interop\Queue\Message;
11+
use PHPUnit\Framework\TestCase;
12+
13+
/**
14+
* @group functional
15+
*/
16+
class AmqpSubscriptionConsumerWithHeartbeatOnTickTest extends TestCase
17+
{
18+
/**
19+
* @var Context
20+
*/
21+
private $context;
22+
23+
protected function tearDown()
24+
{
25+
if ($this->context) {
26+
$this->context->close();
27+
}
28+
29+
parent::tearDown();
30+
}
31+
32+
public function test()
33+
{
34+
$this->context = $context = $this->createContext();
35+
36+
$fooQueue = $this->createQueue($context, 'foo_subscription_consumer_consume_from_all_subscribed_queues_spec');
37+
38+
$expectedFooBody = 'fooBody';
39+
40+
$context->createProducer()->send($fooQueue, $context->createMessage($expectedFooBody));
41+
42+
$fooConsumer = $context->createConsumer($fooQueue);
43+
44+
$actualBodies = [];
45+
$actualQueues = [];
46+
$callback = function (Message $message, Consumer $consumer) use (&$actualBodies, &$actualQueues) {
47+
declare(ticks=1) {
48+
$actualBodies[] = $message->getBody();
49+
$actualQueues[] = $consumer->getQueue()->getQueueName();
50+
51+
$consumer->acknowledge($message);
52+
53+
return true;
54+
}
55+
};
56+
57+
$subscriptionConsumer = $context->createSubscriptionConsumer();
58+
$subscriptionConsumer->subscribe($fooConsumer, $callback);
59+
60+
$subscriptionConsumer->consume(1000);
61+
62+
$this->assertCount(1, $actualBodies);
63+
}
64+
65+
protected function createContext(): AmqpContext
66+
{
67+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
68+
69+
$context = $factory->createContext();
70+
$context->setQos(0, 5, false);
71+
72+
return $context;
73+
}
74+
75+
protected function createQueue(AmqpContext $context, string $queueName): AmqpQueue
76+
{
77+
$queue = $context->createQueue($queueName);
78+
$context->declareQueue($queue);
79+
$context->purgeQueue($queue);
80+
81+
return $queue;
82+
}
83+
}

0 commit comments

Comments
 (0)