From 107ea8c33a22ab8a36e91c63d39b132a9c461c87 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Tue, 9 Jan 2018 16:28:28 +0200 Subject: [PATCH 1/5] [amqp] Fix socket and signal issue. --- pkg/amqp-bunny/AmqpContext.php | 17 ++++- pkg/amqp-lib/AmqpContext.php | 13 ++++ pkg/amqp-tools/SignalSocketHelper.php | 65 +++++++++++++++++++ .../Consumption/Extension/SignalExtension.php | 2 + pkg/enqueue/Symfony/AmqpTransportFactory.php | 6 +- 5 files changed, 100 insertions(+), 3 deletions(-) create mode 100644 pkg/amqp-tools/SignalSocketHelper.php diff --git a/pkg/amqp-bunny/AmqpContext.php b/pkg/amqp-bunny/AmqpContext.php index a519227ca..70698ca64 100644 --- a/pkg/amqp-bunny/AmqpContext.php +++ b/pkg/amqp-bunny/AmqpContext.php @@ -4,9 +4,11 @@ use Bunny\Channel; use Bunny\Client; +use Bunny\Exception\ClientException; use Bunny\Message; use Enqueue\AmqpTools\DelayStrategyAware; use Enqueue\AmqpTools\DelayStrategyAwareTrait; +use Enqueue\AmqpTools\SignalSocketHelper; use Interop\Amqp\AmqpBind as InteropAmqpBind; use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer; use Interop\Amqp\AmqpContext as InteropAmqpContext; @@ -388,7 +390,20 @@ public function consume($timeout = 0) throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming'); } - $this->getBunnyChannel()->getClient()->run(0 !== $timeout ? $timeout / 1000 : null); + $socketHelper = new SignalSocketHelper(); + $socketHelper->beforeSocket(); + + try { + $this->getBunnyChannel()->getClient()->run(0 !== $timeout ? $timeout / 1000 : null); + } catch (ClientException $e) { + if ('stream_select() failed.' == $e->getMessage() && $socketHelper->wasThereSignal()) { + return; + } + + throw $e; + } finally { + $socketHelper->afterSocket(); + } } /** diff --git a/pkg/amqp-lib/AmqpContext.php b/pkg/amqp-lib/AmqpContext.php index 1c28eb7e2..1fef11a25 100644 --- a/pkg/amqp-lib/AmqpContext.php +++ b/pkg/amqp-lib/AmqpContext.php @@ -4,6 +4,7 @@ use Enqueue\AmqpTools\DelayStrategyAware; use Enqueue\AmqpTools\DelayStrategyAwareTrait; +use Enqueue\AmqpTools\SignalSocketHelper; use Interop\Amqp\AmqpBind as InteropAmqpBind; use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer; use Interop\Amqp\AmqpContext as InteropAmqpContext; @@ -20,6 +21,7 @@ use Interop\Queue\PsrTopic; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AbstractConnection; +use PhpAmqpLib\Exception\AMQPIOWaitException; use PhpAmqpLib\Exception\AMQPTimeoutException; use PhpAmqpLib\Message\AMQPMessage as LibAMQPMessage; use PhpAmqpLib\Wire\AMQPTable; @@ -382,6 +384,9 @@ public function consume($timeout = 0) throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming'); } + $socketHelper = new SignalSocketHelper(); + $socketHelper->beforeSocket(); + try { while (true) { $start = microtime(true); @@ -402,6 +407,14 @@ public function consume($timeout = 0) } } catch (AMQPTimeoutException $e) { } catch (StopBasicConsumptionException $e) { + } catch (AMQPIOWaitException $e) { + if ($socketHelper->wasThereSignal()) { + return; + } + + throw $e; + } finally { + $socketHelper->afterSocket(); } } diff --git a/pkg/amqp-tools/SignalSocketHelper.php b/pkg/amqp-tools/SignalSocketHelper.php new file mode 100644 index 000000000..b85ace158 --- /dev/null +++ b/pkg/amqp-tools/SignalSocketHelper.php @@ -0,0 +1,65 @@ +handlers = []; + } + + public function beforeSocket() + { + if ($this->handlers) { + throw new \LogicException('The handlers property should be empty but it is not. The afterSocket method might not have been called.'); + } + if (null !== $this->wasThereSignal) { + throw new \LogicException('The wasSignal property should be null but it is not. The afterSocket method might not have been called.'); + } + + $this->wasThereSignal = false; + + foreach ([SIGTERM, SIGQUIT, SIGINT] as $signal) { + /** @var callable $handler */ + if ($handler = pcntl_signal_get_handler(SIGTERM)) { + pcntl_signal($signal, function ($signal) use ($handler) { + $this->wasThereSignal = true; + + $handler($signal); + }); + + $this->handlers[$signal] = $handler; + } + } + } + + public function afterSocket() + { + $this->wasThereSignal = null; + + foreach ($this->handlers as $signal => $handler) { + pcntl_signal($signal, $handler); + } + + $this->handlers = []; + } + + /** + * @return bool + */ + public function wasThereSignal() + { + return (bool) $this->wasThereSignal; + } +} diff --git a/pkg/enqueue/Consumption/Extension/SignalExtension.php b/pkg/enqueue/Consumption/Extension/SignalExtension.php index aeeadff28..de5b3fe89 100644 --- a/pkg/enqueue/Consumption/Extension/SignalExtension.php +++ b/pkg/enqueue/Consumption/Extension/SignalExtension.php @@ -31,6 +31,8 @@ public function onStart(Context $context) throw new LogicException('The pcntl extension is required in order to catch signals.'); } + pcntl_async_signals(true); + pcntl_signal(SIGTERM, [$this, 'handleSignal']); pcntl_signal(SIGQUIT, [$this, 'handleSignal']); pcntl_signal(SIGINT, [$this, 'handleSignal']); diff --git a/pkg/enqueue/Symfony/AmqpTransportFactory.php b/pkg/enqueue/Symfony/AmqpTransportFactory.php index 0a749f769..e69bf44de 100644 --- a/pkg/enqueue/Symfony/AmqpTransportFactory.php +++ b/pkg/enqueue/Symfony/AmqpTransportFactory.php @@ -58,9 +58,11 @@ public function addConfiguration(ArrayNodeDefinition $builder) throw new \InvalidArgumentException('There is no amqp driver available. Please consider installing one of the packages: enqueue/amqp-ext, enqueue/amqp-lib, enqueue/amqp-bunny.'); } - if (isset($v['driver']) && false == in_array($v['driver'], $drivers, true)) { - throw new \InvalidArgumentException(sprintf('Unexpected driver given "invalidDriver". Available are "%s"', implode('", "', $drivers))); + if ($v && false == in_array($v, $drivers, true)) { + throw new \InvalidArgumentException(sprintf('Unexpected driver given "%s". Available are "%s"', $v, implode('", "', $drivers))); } + + return $v; }) ->end() ->end() From 25205482f1cb21a1f4e9f048d7b88acc3485e0a6 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Tue, 9 Jan 2018 16:34:03 +0200 Subject: [PATCH 2/5] [amqp] set signal socket helper in context's constructor. Make it easy to write test --- pkg/amqp-bunny/AmqpContext.php | 13 +++++++++---- pkg/amqp-lib/AmqpContext.php | 13 +++++++++---- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/pkg/amqp-bunny/AmqpContext.php b/pkg/amqp-bunny/AmqpContext.php index 70698ca64..911eff8c6 100644 --- a/pkg/amqp-bunny/AmqpContext.php +++ b/pkg/amqp-bunny/AmqpContext.php @@ -55,6 +55,11 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware */ private $subscribers; + /** + * @var SignalSocketHelper + */ + private $signalSocketHandler; + /** * Callable must return instance of \Bunny\Channel once called. * @@ -80,6 +85,7 @@ public function __construct($bunnyChannel, $config = []) $this->buffer = new Buffer(); $this->subscribers = []; + $this->signalSocketHandler = new SignalSocketHelper(); } /** @@ -390,19 +396,18 @@ public function consume($timeout = 0) throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming'); } - $socketHelper = new SignalSocketHelper(); - $socketHelper->beforeSocket(); + $this->signalSocketHandler->beforeSocket(); try { $this->getBunnyChannel()->getClient()->run(0 !== $timeout ? $timeout / 1000 : null); } catch (ClientException $e) { - if ('stream_select() failed.' == $e->getMessage() && $socketHelper->wasThereSignal()) { + if ('stream_select() failed.' == $e->getMessage() && $this->signalSocketHandler->wasThereSignal()) { return; } throw $e; } finally { - $socketHelper->afterSocket(); + $this->signalSocketHandler->afterSocket(); } } diff --git a/pkg/amqp-lib/AmqpContext.php b/pkg/amqp-lib/AmqpContext.php index 1fef11a25..3378ebc53 100644 --- a/pkg/amqp-lib/AmqpContext.php +++ b/pkg/amqp-lib/AmqpContext.php @@ -57,6 +57,11 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware */ private $subscribers; + /** + * @var SignalSocketHelper + */ + private $signalSocketHandler; + /** * @param AbstractConnection $connection * @param array $config @@ -73,6 +78,7 @@ public function __construct(AbstractConnection $connection, $config = []) $this->connection = $connection; $this->buffer = new Buffer(); $this->subscribers = []; + $this->signalSocketHandler = new SignalSocketHelper(); } /** @@ -384,8 +390,7 @@ public function consume($timeout = 0) throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming'); } - $socketHelper = new SignalSocketHelper(); - $socketHelper->beforeSocket(); + $this->signalSocketHandler->beforeSocket(); try { while (true) { @@ -408,13 +413,13 @@ public function consume($timeout = 0) } catch (AMQPTimeoutException $e) { } catch (StopBasicConsumptionException $e) { } catch (AMQPIOWaitException $e) { - if ($socketHelper->wasThereSignal()) { + if ($this->signalSocketHandler->wasThereSignal()) { return; } throw $e; } finally { - $socketHelper->afterSocket(); + $this->signalSocketHandler->afterSocket(); } } From 4f480716a83343c7119ca38e339a1a8b9fce2694 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Tue, 9 Jan 2018 18:20:39 +0200 Subject: [PATCH 3/5] [amqp] Add test for SignalSocketHelper. --- pkg/amqp-tools/SignalSocketHelper.php | 28 ++++++++++++------- .../Consumption/Extension/SignalExtension.php | 17 ++++++++--- 2 files changed, 31 insertions(+), 14 deletions(-) diff --git a/pkg/amqp-tools/SignalSocketHelper.php b/pkg/amqp-tools/SignalSocketHelper.php index b85ace158..a29f21879 100644 --- a/pkg/amqp-tools/SignalSocketHelper.php +++ b/pkg/amqp-tools/SignalSocketHelper.php @@ -14,6 +14,11 @@ class SignalSocketHelper */ private $wasThereSignal; + /** + * @var int[] + */ + private $signals = [SIGTERM, SIGQUIT, SIGINT]; + public function __construct() { $this->handlers = []; @@ -25,22 +30,23 @@ public function beforeSocket() throw new \LogicException('The handlers property should be empty but it is not. The afterSocket method might not have been called.'); } if (null !== $this->wasThereSignal) { - throw new \LogicException('The wasSignal property should be null but it is not. The afterSocket method might not have been called.'); + throw new \LogicException('The wasThereSignal property should be null but it is not. The afterSocket method might not have been called.'); } $this->wasThereSignal = false; - foreach ([SIGTERM, SIGQUIT, SIGINT] as $signal) { + foreach ($this->signals as $signal) { /** @var callable $handler */ - if ($handler = pcntl_signal_get_handler(SIGTERM)) { - pcntl_signal($signal, function ($signal) use ($handler) { - $this->wasThereSignal = true; + $handler = pcntl_signal_get_handler($signal); - $handler($signal); - }); + pcntl_signal($signal, function ($signal) use ($handler) { + var_dump('fuckk!'); + $this->wasThereSignal = true; - $this->handlers[$signal] = $handler; - } + $handler && $handler($signal); + }); + + $handler && $this->handlers[$signal] = $handler; } } @@ -48,7 +54,9 @@ public function afterSocket() { $this->wasThereSignal = null; - foreach ($this->handlers as $signal => $handler) { + foreach ($this->signals as $signal) { + $handler = isset($this->handlers[$signal]) ? $this->handlers[$signal] : SIG_DFL; + pcntl_signal($signal, $handler); } diff --git a/pkg/enqueue/Consumption/Extension/SignalExtension.php b/pkg/enqueue/Consumption/Extension/SignalExtension.php index de5b3fe89..a8b53e8b8 100644 --- a/pkg/enqueue/Consumption/Extension/SignalExtension.php +++ b/pkg/enqueue/Consumption/Extension/SignalExtension.php @@ -31,7 +31,9 @@ public function onStart(Context $context) throw new LogicException('The pcntl extension is required in order to catch signals.'); } - pcntl_async_signals(true); + if (function_exists('pcntl_async_signals')) { + pcntl_async_signals(true); + } pcntl_signal(SIGTERM, [$this, 'handleSignal']); pcntl_signal(SIGQUIT, [$this, 'handleSignal']); @@ -47,7 +49,7 @@ public function onBeforeReceive(Context $context) { $this->logger = $context->getLogger(); - pcntl_signal_dispatch(); + $this->dispatchSignal(); $this->interruptExecutionIfNeeded($context); } @@ -65,7 +67,7 @@ public function onPreReceived(Context $context) */ public function onPostReceived(Context $context) { - pcntl_signal_dispatch(); + $this->dispatchSignal(); $this->interruptExecutionIfNeeded($context); } @@ -75,7 +77,7 @@ public function onPostReceived(Context $context) */ public function onIdle(Context $context) { - pcntl_signal_dispatch(); + $this->dispatchSignal(); $this->interruptExecutionIfNeeded($context); } @@ -119,4 +121,11 @@ public function handleSignal($signal) break; } } + + private function dispatchSignal() + { + if (false == function_exists('pcntl_async_signals')) { + pcntl_signal_dispatch(); + } + } } From 5cf044ebc3119f01fdbe84f3d84d2e4e8007fb6c Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Tue, 9 Jan 2018 18:40:53 +0200 Subject: [PATCH 4/5] [amqp][symfony] Fix driver is not properly set bug. --- .../Tests/SignalSocketHelperTest.php | 120 ++++++++++++++++++ .../Symfony/AmqpTransportFactoryTest.php | 33 +++++ 2 files changed, 153 insertions(+) create mode 100644 pkg/amqp-tools/Tests/SignalSocketHelperTest.php diff --git a/pkg/amqp-tools/Tests/SignalSocketHelperTest.php b/pkg/amqp-tools/Tests/SignalSocketHelperTest.php new file mode 100644 index 000000000..fef4f564d --- /dev/null +++ b/pkg/amqp-tools/Tests/SignalSocketHelperTest.php @@ -0,0 +1,120 @@ +backupSigTermHandler = pcntl_signal_get_handler(SIGTERM); + $this->backupSigIntHandler = pcntl_signal_get_handler(SIGINT); + + pcntl_signal(SIGTERM, SIG_DFL); + pcntl_signal(SIGINT, SIG_DFL); + + $this->signalHelper = new SignalSocketHelper(); + } + + public function tearDown() + { + parent::tearDown(); + + if ($this->signalHelper) { + $this->signalHelper->afterSocket(); + } + + if ($this->backupSigTermHandler) { + pcntl_signal(SIGTERM, $this->backupSigTermHandler); + } + + if ($this->backupSigIntHandler) { + pcntl_signal(SIGINT, $this->backupSigIntHandler); + } + } + + public function testShouldReturnFalseByDefault() + { + $this->assertFalse($this->signalHelper->wasThereSignal()); + } + + public function testShouldRegisterHandlerOnBeforeSocket() + { + $this->signalHelper->beforeSocket(); + + $this->assertAttributeSame(false, 'wasThereSignal', $this->signalHelper); + $this->assertAttributeSame([], 'handlers', $this->signalHelper); + } + + public function testShouldRegisterHandlerOnBeforeSocketAndBackupCurrentOne() + { + $handler = function () {}; + + pcntl_signal(SIGTERM, $handler); + + $this->signalHelper->beforeSocket(); + + $this->assertAttributeSame(false, 'wasThereSignal', $this->signalHelper); + + $handlers = $this->readAttribute($this->signalHelper, 'handlers'); + + $this->assertInternalType('array', $handlers); + $this->assertArrayHasKey(SIGTERM, $handlers); + $this->assertSame($handler, $handlers[SIGTERM]); + } + + public function testRestoreDefaultPropertiesOnAfterSocket() + { + $this->signalHelper->beforeSocket(); + $this->signalHelper->afterSocket(); + + $this->assertAttributeSame(null, 'wasThereSignal', $this->signalHelper); + $this->assertAttributeSame([], 'handlers', $this->signalHelper); + } + + public function testRestorePreviousHandlerOnAfterSocket() + { + $handler = function () {}; + + pcntl_signal(SIGTERM, $handler); + + $this->signalHelper->beforeSocket(); + $this->signalHelper->afterSocket(); + + $this->assertSame($handler, pcntl_signal_get_handler(SIGTERM)); + } + + public function testThrowsIfBeforeSocketCalledSecondTime() + { + $this->signalHelper->beforeSocket(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The wasThereSignal property should be null but it is not. The afterSocket method might not have been called.'); + $this->signalHelper->beforeSocket(); + } + + public function testShouldReturnTrueOnWasThereSignal() + { + $this->signalHelper->beforeSocket(); + + posix_kill(getmypid(), SIGINT); + pcntl_signal_dispatch(); + + $this->assertTrue($this->signalHelper->wasThereSignal()); + + $this->signalHelper->afterSocket(); + } +} diff --git a/pkg/enqueue/Tests/Symfony/AmqpTransportFactoryTest.php b/pkg/enqueue/Tests/Symfony/AmqpTransportFactoryTest.php index d8bb5e1fb..b27f5b5ca 100644 --- a/pkg/enqueue/Tests/Symfony/AmqpTransportFactoryTest.php +++ b/pkg/enqueue/Tests/Symfony/AmqpTransportFactoryTest.php @@ -137,6 +137,39 @@ public function testShouldAllowAddSslOptions() ], $config); } + public function testThrowIfNotSupportedDriverSet() + { + $transport = new AmqpTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + + $this->expectException(InvalidConfigurationException::class); + $this->expectExceptionMessage('Invalid configuration for path "foo.driver": Unexpected driver given "invalidDriver"'); + $processor->process($tb->buildTree(), [[ + 'driver' => 'invalidDriver', + ]]); + } + + public function testShouldAllowSetDriver() + { + $transport = new AmqpTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), [[ + 'driver' => 'ext', + ]]); + + $this->assertEquals([ + 'driver' => 'ext', + ], $config); + } + public function testShouldAllowAddConfigurationAsString() { $transport = new AmqpTransportFactory(); From ff5498e5f03cf23b001424c3a9d32d23432a3695 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Tue, 9 Jan 2018 22:52:32 +0200 Subject: [PATCH 5/5] works only on PHP 7.1 and higher --- pkg/amqp-tools/SignalSocketHelper.php | 10 ++++++++++ pkg/amqp-tools/Tests/SignalSocketHelperTest.php | 4 ++++ 2 files changed, 14 insertions(+) diff --git a/pkg/amqp-tools/SignalSocketHelper.php b/pkg/amqp-tools/SignalSocketHelper.php index a29f21879..1ea6c9693 100644 --- a/pkg/amqp-tools/SignalSocketHelper.php +++ b/pkg/amqp-tools/SignalSocketHelper.php @@ -26,6 +26,11 @@ public function __construct() public function beforeSocket() { + // PHP 7.1 and higher + if (false == function_exists('pcntl_signal_get_handler')) { + return; + } + if ($this->handlers) { throw new \LogicException('The handlers property should be empty but it is not. The afterSocket method might not have been called.'); } @@ -52,6 +57,11 @@ public function beforeSocket() public function afterSocket() { + // PHP 7.1 and higher + if (false == function_exists('pcntl_signal_get_handler')) { + return; + } + $this->wasThereSignal = null; foreach ($this->signals as $signal) { diff --git a/pkg/amqp-tools/Tests/SignalSocketHelperTest.php b/pkg/amqp-tools/Tests/SignalSocketHelperTest.php index fef4f564d..be064a7b8 100644 --- a/pkg/amqp-tools/Tests/SignalSocketHelperTest.php +++ b/pkg/amqp-tools/Tests/SignalSocketHelperTest.php @@ -20,6 +20,10 @@ public function setUp() { parent::setUp(); + if (false == function_exists('pcntl_signal_get_handler')) { + $this->markTestSkipped('PHP 7.1 and higher'); + } + $this->backupSigTermHandler = pcntl_signal_get_handler(SIGTERM); $this->backupSigIntHandler = pcntl_signal_get_handler(SIGINT);