From 2c192e8d1265c244bb867bdc0906f6100f97bb8a Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 26 Nov 2018 11:52:30 +0200 Subject: [PATCH 1/2] [amqp][lib] Improve heartbeat handling. Introduce heartbeat on tick. --- pkg/amqp-lib/AmqpConnectionFactory.php | 1 + pkg/amqp-lib/AmqpContext.php | 2 +- pkg/amqp-lib/AmqpSubscriptionConsumer.php | 16 +++++++++++++++- pkg/amqp-lib/Tests/AmqpContextTest.php | 2 +- .../Tests/AmqpSubscriptionConsumerTest.php | 4 ++-- pkg/amqp-lib/composer.json | 2 +- 6 files changed, 21 insertions(+), 6 deletions(-) diff --git a/pkg/amqp-lib/AmqpConnectionFactory.php b/pkg/amqp-lib/AmqpConnectionFactory.php index 6f3323a9f..169671d12 100644 --- a/pkg/amqp-lib/AmqpConnectionFactory.php +++ b/pkg/amqp-lib/AmqpConnectionFactory.php @@ -47,6 +47,7 @@ public function __construct($config = 'amqp:') ->addDefaultOption('login_response', null) ->addDefaultOption('locale', 'en_US') ->addDefaultOption('keepalive', false) + ->addDefaultOption('heartbeat_on_tick', true) ->parse() ; diff --git a/pkg/amqp-lib/AmqpContext.php b/pkg/amqp-lib/AmqpContext.php index b88b29b2a..83167c3db 100644 --- a/pkg/amqp-lib/AmqpContext.php +++ b/pkg/amqp-lib/AmqpContext.php @@ -110,7 +110,7 @@ public function createConsumer(Destination $destination): Consumer */ public function createSubscriptionConsumer(): SubscriptionConsumer { - return new AmqpSubscriptionConsumer($this); + return new AmqpSubscriptionConsumer($this, (bool) $this->config['heartbeat_on_tick']); } /** diff --git a/pkg/amqp-lib/AmqpSubscriptionConsumer.php b/pkg/amqp-lib/AmqpSubscriptionConsumer.php index 541d37fd7..2937a23d9 100644 --- a/pkg/amqp-lib/AmqpSubscriptionConsumer.php +++ b/pkg/amqp-lib/AmqpSubscriptionConsumer.php @@ -27,9 +27,15 @@ class AmqpSubscriptionConsumer implements InteropAmqpSubscriptionConsumer */ private $subscribers; - public function __construct(AmqpContext $context) + /** + * @var bool + */ + private $heartbeatOnTick; + + public function __construct(AmqpContext $context, bool $heartbeatOnTick) { $this->context = $context; + $this->heartbeatOnTick = $heartbeatOnTick; } public function consume(int $timeout = 0): void @@ -41,6 +47,12 @@ public function consume(int $timeout = 0): void $signalHandler = new SignalSocketHelper(); $signalHandler->beforeSocket(); + $heartbeatOnTick = function (AmqpContext $context) { + $context->getLibChannel()->getConnection()->getIO()->check_heartbeat(); + }; + + $this->heartbeatOnTick && register_tick_function($heartbeatOnTick); + try { while (true) { $start = microtime(true); @@ -69,6 +81,8 @@ public function consume(int $timeout = 0): void throw $e; } finally { $signalHandler->afterSocket(); + + $this->heartbeatOnTick && unregister_tick_function($heartbeatOnTick); } } diff --git a/pkg/amqp-lib/Tests/AmqpContextTest.php b/pkg/amqp-lib/Tests/AmqpContextTest.php index b4e75241d..1fd8d8853 100644 --- a/pkg/amqp-lib/Tests/AmqpContextTest.php +++ b/pkg/amqp-lib/Tests/AmqpContextTest.php @@ -344,7 +344,7 @@ public function testShouldSetQos() public function testShouldReturnExpectedSubscriptionConsumerInstance() { - $context = new AmqpContext($this->createConnectionMock(), []); + $context = new AmqpContext($this->createConnectionMock(), ['heartbeat_on_tick' => true]); $this->assertInstanceOf(AmqpSubscriptionConsumer::class, $context->createSubscriptionConsumer()); } diff --git a/pkg/amqp-lib/Tests/AmqpSubscriptionConsumerTest.php b/pkg/amqp-lib/Tests/AmqpSubscriptionConsumerTest.php index 4e24cbdd5..028d3067e 100644 --- a/pkg/amqp-lib/Tests/AmqpSubscriptionConsumerTest.php +++ b/pkg/amqp-lib/Tests/AmqpSubscriptionConsumerTest.php @@ -16,9 +16,9 @@ public function testShouldImplementSubscriptionConsumerInterface() $this->assertTrue($rc->implementsInterface(SubscriptionConsumer::class)); } - public function testCouldBeConstructedWithAmqpContextAsFirstArgument() + public function testCouldBeConstructedWithAmqpContextAndHeartbeatOnTickAsArguments() { - new AmqpSubscriptionConsumer($this->createAmqpContextMock()); + new AmqpSubscriptionConsumer($this->createAmqpContextMock(), $heartbeatOnTick = true); } /** diff --git a/pkg/amqp-lib/composer.json b/pkg/amqp-lib/composer.json index 17b145a03..eb4848248 100644 --- a/pkg/amqp-lib/composer.json +++ b/pkg/amqp-lib/composer.json @@ -7,7 +7,7 @@ "license": "MIT", "require": { "php": "^7.1.3", - "php-amqplib/php-amqplib": "^2.7", + "php-amqplib/php-amqplib": "^2.8", "queue-interop/amqp-interop": "^0.8", "enqueue/amqp-tools": "0.9.x-dev" }, From 83fd8cc44890834652c2f30ce4f6a05c0f888057 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 26 Nov 2018 12:08:36 +0200 Subject: [PATCH 2/2] [amqp] pass context to tick callback. --- pkg/amqp-lib/AmqpSubscriptionConsumer.php | 2 +- ...riptionConsumerWithHeartbeatOnTickTest.php | 83 +++++++++++++++++++ 2 files changed, 84 insertions(+), 1 deletion(-) create mode 100644 pkg/amqp-lib/Tests/Functional/AmqpSubscriptionConsumerWithHeartbeatOnTickTest.php diff --git a/pkg/amqp-lib/AmqpSubscriptionConsumer.php b/pkg/amqp-lib/AmqpSubscriptionConsumer.php index 2937a23d9..694c134dc 100644 --- a/pkg/amqp-lib/AmqpSubscriptionConsumer.php +++ b/pkg/amqp-lib/AmqpSubscriptionConsumer.php @@ -51,7 +51,7 @@ public function consume(int $timeout = 0): void $context->getLibChannel()->getConnection()->getIO()->check_heartbeat(); }; - $this->heartbeatOnTick && register_tick_function($heartbeatOnTick); + $this->heartbeatOnTick && register_tick_function($heartbeatOnTick, $this->context); try { while (true) { diff --git a/pkg/amqp-lib/Tests/Functional/AmqpSubscriptionConsumerWithHeartbeatOnTickTest.php b/pkg/amqp-lib/Tests/Functional/AmqpSubscriptionConsumerWithHeartbeatOnTickTest.php new file mode 100644 index 000000000..aea380d17 --- /dev/null +++ b/pkg/amqp-lib/Tests/Functional/AmqpSubscriptionConsumerWithHeartbeatOnTickTest.php @@ -0,0 +1,83 @@ +context) { + $this->context->close(); + } + + parent::tearDown(); + } + + public function test() + { + $this->context = $context = $this->createContext(); + + $fooQueue = $this->createQueue($context, 'foo_subscription_consumer_consume_from_all_subscribed_queues_spec'); + + $expectedFooBody = 'fooBody'; + + $context->createProducer()->send($fooQueue, $context->createMessage($expectedFooBody)); + + $fooConsumer = $context->createConsumer($fooQueue); + + $actualBodies = []; + $actualQueues = []; + $callback = function (Message $message, Consumer $consumer) use (&$actualBodies, &$actualQueues) { + declare(ticks=1) { + $actualBodies[] = $message->getBody(); + $actualQueues[] = $consumer->getQueue()->getQueueName(); + + $consumer->acknowledge($message); + + return true; + } + }; + + $subscriptionConsumer = $context->createSubscriptionConsumer(); + $subscriptionConsumer->subscribe($fooConsumer, $callback); + + $subscriptionConsumer->consume(1000); + + $this->assertCount(1, $actualBodies); + } + + protected function createContext(): AmqpContext + { + $factory = new AmqpConnectionFactory(getenv('AMQP_DSN')); + + $context = $factory->createContext(); + $context->setQos(0, 5, false); + + return $context; + } + + protected function createQueue(AmqpContext $context, string $queueName): AmqpQueue + { + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purgeQueue($queue); + + return $queue; + } +}