Skip to content

Commit 82b1d77

Browse files
authored
Merge pull request #658 from php-enqueue/amqp-lib-improve-hearbeat
[amqp][lib] Improve heartbeat handling. Introduce heartbeat on tick. Fixes "Invalid frame type 65" and "Broken pipe or closed connection"
2 parents a76a9b8 + 83fd8cc commit 82b1d77

7 files changed

+104
-6
lines changed

pkg/amqp-lib/AmqpConnectionFactory.php

+1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public function __construct($config = 'amqp:')
4747
->addDefaultOption('login_response', null)
4848
->addDefaultOption('locale', 'en_US')
4949
->addDefaultOption('keepalive', false)
50+
->addDefaultOption('heartbeat_on_tick', true)
5051
->parse()
5152
;
5253

pkg/amqp-lib/AmqpContext.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public function createConsumer(Destination $destination): Consumer
110110
*/
111111
public function createSubscriptionConsumer(): SubscriptionConsumer
112112
{
113-
return new AmqpSubscriptionConsumer($this);
113+
return new AmqpSubscriptionConsumer($this, (bool) $this->config['heartbeat_on_tick']);
114114
}
115115

116116
/**

pkg/amqp-lib/AmqpSubscriptionConsumer.php

+15-1
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,15 @@ class AmqpSubscriptionConsumer implements InteropAmqpSubscriptionConsumer
2727
*/
2828
private $subscribers;
2929

30-
public function __construct(AmqpContext $context)
30+
/**
31+
* @var bool
32+
*/
33+
private $heartbeatOnTick;
34+
35+
public function __construct(AmqpContext $context, bool $heartbeatOnTick)
3136
{
3237
$this->context = $context;
38+
$this->heartbeatOnTick = $heartbeatOnTick;
3339
}
3440

3541
public function consume(int $timeout = 0): void
@@ -41,6 +47,12 @@ public function consume(int $timeout = 0): void
4147
$signalHandler = new SignalSocketHelper();
4248
$signalHandler->beforeSocket();
4349

50+
$heartbeatOnTick = function (AmqpContext $context) {
51+
$context->getLibChannel()->getConnection()->getIO()->check_heartbeat();
52+
};
53+
54+
$this->heartbeatOnTick && register_tick_function($heartbeatOnTick, $this->context);
55+
4456
try {
4557
while (true) {
4658
$start = microtime(true);
@@ -69,6 +81,8 @@ public function consume(int $timeout = 0): void
6981
throw $e;
7082
} finally {
7183
$signalHandler->afterSocket();
84+
85+
$this->heartbeatOnTick && unregister_tick_function($heartbeatOnTick);
7286
}
7387
}
7488

pkg/amqp-lib/Tests/AmqpContextTest.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ public function testShouldSetQos()
344344

345345
public function testShouldReturnExpectedSubscriptionConsumerInstance()
346346
{
347-
$context = new AmqpContext($this->createConnectionMock(), []);
347+
$context = new AmqpContext($this->createConnectionMock(), ['heartbeat_on_tick' => true]);
348348

349349
$this->assertInstanceOf(AmqpSubscriptionConsumer::class, $context->createSubscriptionConsumer());
350350
}

pkg/amqp-lib/Tests/AmqpSubscriptionConsumerTest.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ public function testShouldImplementSubscriptionConsumerInterface()
1616
$this->assertTrue($rc->implementsInterface(SubscriptionConsumer::class));
1717
}
1818

19-
public function testCouldBeConstructedWithAmqpContextAsFirstArgument()
19+
public function testCouldBeConstructedWithAmqpContextAndHeartbeatOnTickAsArguments()
2020
{
21-
new AmqpSubscriptionConsumer($this->createAmqpContextMock());
21+
new AmqpSubscriptionConsumer($this->createAmqpContextMock(), $heartbeatOnTick = true);
2222
}
2323

2424
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpLib\Tests\Functional;
4+
5+
use Enqueue\AmqpLib\AmqpConnectionFactory;
6+
use Enqueue\AmqpLib\AmqpContext;
7+
use Interop\Amqp\AmqpQueue;
8+
use Interop\Queue\Consumer;
9+
use Interop\Queue\Context;
10+
use Interop\Queue\Message;
11+
use PHPUnit\Framework\TestCase;
12+
13+
/**
14+
* @group functional
15+
*/
16+
class AmqpSubscriptionConsumerWithHeartbeatOnTickTest extends TestCase
17+
{
18+
/**
19+
* @var Context
20+
*/
21+
private $context;
22+
23+
protected function tearDown()
24+
{
25+
if ($this->context) {
26+
$this->context->close();
27+
}
28+
29+
parent::tearDown();
30+
}
31+
32+
public function test()
33+
{
34+
$this->context = $context = $this->createContext();
35+
36+
$fooQueue = $this->createQueue($context, 'foo_subscription_consumer_consume_from_all_subscribed_queues_spec');
37+
38+
$expectedFooBody = 'fooBody';
39+
40+
$context->createProducer()->send($fooQueue, $context->createMessage($expectedFooBody));
41+
42+
$fooConsumer = $context->createConsumer($fooQueue);
43+
44+
$actualBodies = [];
45+
$actualQueues = [];
46+
$callback = function (Message $message, Consumer $consumer) use (&$actualBodies, &$actualQueues) {
47+
declare(ticks=1) {
48+
$actualBodies[] = $message->getBody();
49+
$actualQueues[] = $consumer->getQueue()->getQueueName();
50+
51+
$consumer->acknowledge($message);
52+
53+
return true;
54+
}
55+
};
56+
57+
$subscriptionConsumer = $context->createSubscriptionConsumer();
58+
$subscriptionConsumer->subscribe($fooConsumer, $callback);
59+
60+
$subscriptionConsumer->consume(1000);
61+
62+
$this->assertCount(1, $actualBodies);
63+
}
64+
65+
protected function createContext(): AmqpContext
66+
{
67+
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));
68+
69+
$context = $factory->createContext();
70+
$context->setQos(0, 5, false);
71+
72+
return $context;
73+
}
74+
75+
protected function createQueue(AmqpContext $context, string $queueName): AmqpQueue
76+
{
77+
$queue = $context->createQueue($queueName);
78+
$context->declareQueue($queue);
79+
$context->purgeQueue($queue);
80+
81+
return $queue;
82+
}
83+
}

pkg/amqp-lib/composer.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
"license": "MIT",
88
"require": {
99
"php": "^7.1.3",
10-
"php-amqplib/php-amqplib": "^2.7",
10+
"php-amqplib/php-amqplib": "^2.8",
1111
"queue-interop/amqp-interop": "^0.8",
1212
"enqueue/amqp-tools": "0.9.x-dev"
1313
},

0 commit comments

Comments
 (0)