diff --git a/pkg/amqp-bunny/AmqpContext.php b/pkg/amqp-bunny/AmqpContext.php index a519227ca..911eff8c6 100644 --- a/pkg/amqp-bunny/AmqpContext.php +++ b/pkg/amqp-bunny/AmqpContext.php @@ -4,9 +4,11 @@ use Bunny\Channel; use Bunny\Client; +use Bunny\Exception\ClientException; use Bunny\Message; use Enqueue\AmqpTools\DelayStrategyAware; use Enqueue\AmqpTools\DelayStrategyAwareTrait; +use Enqueue\AmqpTools\SignalSocketHelper; use Interop\Amqp\AmqpBind as InteropAmqpBind; use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer; use Interop\Amqp\AmqpContext as InteropAmqpContext; @@ -53,6 +55,11 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware */ private $subscribers; + /** + * @var SignalSocketHelper + */ + private $signalSocketHandler; + /** * Callable must return instance of \Bunny\Channel once called. * @@ -78,6 +85,7 @@ public function __construct($bunnyChannel, $config = []) $this->buffer = new Buffer(); $this->subscribers = []; + $this->signalSocketHandler = new SignalSocketHelper(); } /** @@ -388,7 +396,19 @@ public function consume($timeout = 0) throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming'); } - $this->getBunnyChannel()->getClient()->run(0 !== $timeout ? $timeout / 1000 : null); + $this->signalSocketHandler->beforeSocket(); + + try { + $this->getBunnyChannel()->getClient()->run(0 !== $timeout ? $timeout / 1000 : null); + } catch (ClientException $e) { + if ('stream_select() failed.' == $e->getMessage() && $this->signalSocketHandler->wasThereSignal()) { + return; + } + + throw $e; + } finally { + $this->signalSocketHandler->afterSocket(); + } } /** diff --git a/pkg/amqp-lib/AmqpContext.php b/pkg/amqp-lib/AmqpContext.php index 1c28eb7e2..3378ebc53 100644 --- a/pkg/amqp-lib/AmqpContext.php +++ b/pkg/amqp-lib/AmqpContext.php @@ -4,6 +4,7 @@ use Enqueue\AmqpTools\DelayStrategyAware; use Enqueue\AmqpTools\DelayStrategyAwareTrait; +use Enqueue\AmqpTools\SignalSocketHelper; use Interop\Amqp\AmqpBind as InteropAmqpBind; use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer; use Interop\Amqp\AmqpContext as InteropAmqpContext; @@ -20,6 +21,7 @@ use Interop\Queue\PsrTopic; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AbstractConnection; +use PhpAmqpLib\Exception\AMQPIOWaitException; use PhpAmqpLib\Exception\AMQPTimeoutException; use PhpAmqpLib\Message\AMQPMessage as LibAMQPMessage; use PhpAmqpLib\Wire\AMQPTable; @@ -55,6 +57,11 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware */ private $subscribers; + /** + * @var SignalSocketHelper + */ + private $signalSocketHandler; + /** * @param AbstractConnection $connection * @param array $config @@ -71,6 +78,7 @@ public function __construct(AbstractConnection $connection, $config = []) $this->connection = $connection; $this->buffer = new Buffer(); $this->subscribers = []; + $this->signalSocketHandler = new SignalSocketHelper(); } /** @@ -382,6 +390,8 @@ public function consume($timeout = 0) throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming'); } + $this->signalSocketHandler->beforeSocket(); + try { while (true) { $start = microtime(true); @@ -402,6 +412,14 @@ public function consume($timeout = 0) } } catch (AMQPTimeoutException $e) { } catch (StopBasicConsumptionException $e) { + } catch (AMQPIOWaitException $e) { + if ($this->signalSocketHandler->wasThereSignal()) { + return; + } + + throw $e; + } finally { + $this->signalSocketHandler->afterSocket(); } } diff --git a/pkg/amqp-tools/SignalSocketHelper.php b/pkg/amqp-tools/SignalSocketHelper.php new file mode 100644 index 000000000..1ea6c9693 --- /dev/null +++ b/pkg/amqp-tools/SignalSocketHelper.php @@ -0,0 +1,83 @@ +handlers = []; + } + + public function beforeSocket() + { + // PHP 7.1 and higher + if (false == function_exists('pcntl_signal_get_handler')) { + return; + } + + if ($this->handlers) { + throw new \LogicException('The handlers property should be empty but it is not. The afterSocket method might not have been called.'); + } + if (null !== $this->wasThereSignal) { + throw new \LogicException('The wasThereSignal property should be null but it is not. The afterSocket method might not have been called.'); + } + + $this->wasThereSignal = false; + + foreach ($this->signals as $signal) { + /** @var callable $handler */ + $handler = pcntl_signal_get_handler($signal); + + pcntl_signal($signal, function ($signal) use ($handler) { + var_dump('fuckk!'); + $this->wasThereSignal = true; + + $handler && $handler($signal); + }); + + $handler && $this->handlers[$signal] = $handler; + } + } + + public function afterSocket() + { + // PHP 7.1 and higher + if (false == function_exists('pcntl_signal_get_handler')) { + return; + } + + $this->wasThereSignal = null; + + foreach ($this->signals as $signal) { + $handler = isset($this->handlers[$signal]) ? $this->handlers[$signal] : SIG_DFL; + + pcntl_signal($signal, $handler); + } + + $this->handlers = []; + } + + /** + * @return bool + */ + public function wasThereSignal() + { + return (bool) $this->wasThereSignal; + } +} diff --git a/pkg/amqp-tools/Tests/SignalSocketHelperTest.php b/pkg/amqp-tools/Tests/SignalSocketHelperTest.php new file mode 100644 index 000000000..be064a7b8 --- /dev/null +++ b/pkg/amqp-tools/Tests/SignalSocketHelperTest.php @@ -0,0 +1,124 @@ +markTestSkipped('PHP 7.1 and higher'); + } + + $this->backupSigTermHandler = pcntl_signal_get_handler(SIGTERM); + $this->backupSigIntHandler = pcntl_signal_get_handler(SIGINT); + + pcntl_signal(SIGTERM, SIG_DFL); + pcntl_signal(SIGINT, SIG_DFL); + + $this->signalHelper = new SignalSocketHelper(); + } + + public function tearDown() + { + parent::tearDown(); + + if ($this->signalHelper) { + $this->signalHelper->afterSocket(); + } + + if ($this->backupSigTermHandler) { + pcntl_signal(SIGTERM, $this->backupSigTermHandler); + } + + if ($this->backupSigIntHandler) { + pcntl_signal(SIGINT, $this->backupSigIntHandler); + } + } + + public function testShouldReturnFalseByDefault() + { + $this->assertFalse($this->signalHelper->wasThereSignal()); + } + + public function testShouldRegisterHandlerOnBeforeSocket() + { + $this->signalHelper->beforeSocket(); + + $this->assertAttributeSame(false, 'wasThereSignal', $this->signalHelper); + $this->assertAttributeSame([], 'handlers', $this->signalHelper); + } + + public function testShouldRegisterHandlerOnBeforeSocketAndBackupCurrentOne() + { + $handler = function () {}; + + pcntl_signal(SIGTERM, $handler); + + $this->signalHelper->beforeSocket(); + + $this->assertAttributeSame(false, 'wasThereSignal', $this->signalHelper); + + $handlers = $this->readAttribute($this->signalHelper, 'handlers'); + + $this->assertInternalType('array', $handlers); + $this->assertArrayHasKey(SIGTERM, $handlers); + $this->assertSame($handler, $handlers[SIGTERM]); + } + + public function testRestoreDefaultPropertiesOnAfterSocket() + { + $this->signalHelper->beforeSocket(); + $this->signalHelper->afterSocket(); + + $this->assertAttributeSame(null, 'wasThereSignal', $this->signalHelper); + $this->assertAttributeSame([], 'handlers', $this->signalHelper); + } + + public function testRestorePreviousHandlerOnAfterSocket() + { + $handler = function () {}; + + pcntl_signal(SIGTERM, $handler); + + $this->signalHelper->beforeSocket(); + $this->signalHelper->afterSocket(); + + $this->assertSame($handler, pcntl_signal_get_handler(SIGTERM)); + } + + public function testThrowsIfBeforeSocketCalledSecondTime() + { + $this->signalHelper->beforeSocket(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The wasThereSignal property should be null but it is not. The afterSocket method might not have been called.'); + $this->signalHelper->beforeSocket(); + } + + public function testShouldReturnTrueOnWasThereSignal() + { + $this->signalHelper->beforeSocket(); + + posix_kill(getmypid(), SIGINT); + pcntl_signal_dispatch(); + + $this->assertTrue($this->signalHelper->wasThereSignal()); + + $this->signalHelper->afterSocket(); + } +} diff --git a/pkg/enqueue/Consumption/Extension/SignalExtension.php b/pkg/enqueue/Consumption/Extension/SignalExtension.php index aeeadff28..a8b53e8b8 100644 --- a/pkg/enqueue/Consumption/Extension/SignalExtension.php +++ b/pkg/enqueue/Consumption/Extension/SignalExtension.php @@ -31,6 +31,10 @@ public function onStart(Context $context) throw new LogicException('The pcntl extension is required in order to catch signals.'); } + if (function_exists('pcntl_async_signals')) { + pcntl_async_signals(true); + } + pcntl_signal(SIGTERM, [$this, 'handleSignal']); pcntl_signal(SIGQUIT, [$this, 'handleSignal']); pcntl_signal(SIGINT, [$this, 'handleSignal']); @@ -45,7 +49,7 @@ public function onBeforeReceive(Context $context) { $this->logger = $context->getLogger(); - pcntl_signal_dispatch(); + $this->dispatchSignal(); $this->interruptExecutionIfNeeded($context); } @@ -63,7 +67,7 @@ public function onPreReceived(Context $context) */ public function onPostReceived(Context $context) { - pcntl_signal_dispatch(); + $this->dispatchSignal(); $this->interruptExecutionIfNeeded($context); } @@ -73,7 +77,7 @@ public function onPostReceived(Context $context) */ public function onIdle(Context $context) { - pcntl_signal_dispatch(); + $this->dispatchSignal(); $this->interruptExecutionIfNeeded($context); } @@ -117,4 +121,11 @@ public function handleSignal($signal) break; } } + + private function dispatchSignal() + { + if (false == function_exists('pcntl_async_signals')) { + pcntl_signal_dispatch(); + } + } } diff --git a/pkg/enqueue/Symfony/AmqpTransportFactory.php b/pkg/enqueue/Symfony/AmqpTransportFactory.php index 0a749f769..e69bf44de 100644 --- a/pkg/enqueue/Symfony/AmqpTransportFactory.php +++ b/pkg/enqueue/Symfony/AmqpTransportFactory.php @@ -58,9 +58,11 @@ public function addConfiguration(ArrayNodeDefinition $builder) throw new \InvalidArgumentException('There is no amqp driver available. Please consider installing one of the packages: enqueue/amqp-ext, enqueue/amqp-lib, enqueue/amqp-bunny.'); } - if (isset($v['driver']) && false == in_array($v['driver'], $drivers, true)) { - throw new \InvalidArgumentException(sprintf('Unexpected driver given "invalidDriver". Available are "%s"', implode('", "', $drivers))); + if ($v && false == in_array($v, $drivers, true)) { + throw new \InvalidArgumentException(sprintf('Unexpected driver given "%s". Available are "%s"', $v, implode('", "', $drivers))); } + + return $v; }) ->end() ->end() diff --git a/pkg/enqueue/Tests/Symfony/AmqpTransportFactoryTest.php b/pkg/enqueue/Tests/Symfony/AmqpTransportFactoryTest.php index d8bb5e1fb..b27f5b5ca 100644 --- a/pkg/enqueue/Tests/Symfony/AmqpTransportFactoryTest.php +++ b/pkg/enqueue/Tests/Symfony/AmqpTransportFactoryTest.php @@ -137,6 +137,39 @@ public function testShouldAllowAddSslOptions() ], $config); } + public function testThrowIfNotSupportedDriverSet() + { + $transport = new AmqpTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + + $this->expectException(InvalidConfigurationException::class); + $this->expectExceptionMessage('Invalid configuration for path "foo.driver": Unexpected driver given "invalidDriver"'); + $processor->process($tb->buildTree(), [[ + 'driver' => 'invalidDriver', + ]]); + } + + public function testShouldAllowSetDriver() + { + $transport = new AmqpTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), [[ + 'driver' => 'ext', + ]]); + + $this->assertEquals([ + 'driver' => 'ext', + ], $config); + } + public function testShouldAllowAddConfigurationAsString() { $transport = new AmqpTransportFactory();