Skip to content

Commit 13b5192

Browse files
authored
Merge pull request #317 from php-enqueue/amqp-signal-socket-issue-fix
[amqp] Fix socket and signal issue.
2 parents 63bc3b3 + c259157 commit 13b5192

File tree

7 files changed

+297
-6
lines changed

7 files changed

+297
-6
lines changed

pkg/amqp-bunny/AmqpContext.php

+21-1
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44

55
use Bunny\Channel;
66
use Bunny\Client;
7+
use Bunny\Exception\ClientException;
78
use Bunny\Message;
89
use Enqueue\AmqpTools\DelayStrategyAware;
910
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
11+
use Enqueue\AmqpTools\SignalSocketHelper;
1012
use Interop\Amqp\AmqpBind as InteropAmqpBind;
1113
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
1214
use Interop\Amqp\AmqpContext as InteropAmqpContext;
@@ -53,6 +55,11 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware
5355
*/
5456
private $subscribers;
5557

58+
/**
59+
* @var SignalSocketHelper
60+
*/
61+
private $signalSocketHandler;
62+
5663
/**
5764
* Callable must return instance of \Bunny\Channel once called.
5865
*
@@ -78,6 +85,7 @@ public function __construct($bunnyChannel, $config = [])
7885

7986
$this->buffer = new Buffer();
8087
$this->subscribers = [];
88+
$this->signalSocketHandler = new SignalSocketHelper();
8189
}
8290

8391
/**
@@ -388,7 +396,19 @@ public function consume($timeout = 0)
388396
throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming');
389397
}
390398

391-
$this->getBunnyChannel()->getClient()->run(0 !== $timeout ? $timeout / 1000 : null);
399+
$this->signalSocketHandler->beforeSocket();
400+
401+
try {
402+
$this->getBunnyChannel()->getClient()->run(0 !== $timeout ? $timeout / 1000 : null);
403+
} catch (ClientException $e) {
404+
if ('stream_select() failed.' == $e->getMessage() && $this->signalSocketHandler->wasThereSignal()) {
405+
return;
406+
}
407+
408+
throw $e;
409+
} finally {
410+
$this->signalSocketHandler->afterSocket();
411+
}
392412
}
393413

394414
/**

pkg/amqp-lib/AmqpContext.php

+18
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Enqueue\AmqpTools\DelayStrategyAware;
66
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
7+
use Enqueue\AmqpTools\SignalSocketHelper;
78
use Interop\Amqp\AmqpBind as InteropAmqpBind;
89
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
910
use Interop\Amqp\AmqpContext as InteropAmqpContext;
@@ -20,6 +21,7 @@
2021
use Interop\Queue\PsrTopic;
2122
use PhpAmqpLib\Channel\AMQPChannel;
2223
use PhpAmqpLib\Connection\AbstractConnection;
24+
use PhpAmqpLib\Exception\AMQPIOWaitException;
2325
use PhpAmqpLib\Exception\AMQPTimeoutException;
2426
use PhpAmqpLib\Message\AMQPMessage as LibAMQPMessage;
2527
use PhpAmqpLib\Wire\AMQPTable;
@@ -55,6 +57,11 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware
5557
*/
5658
private $subscribers;
5759

60+
/**
61+
* @var SignalSocketHelper
62+
*/
63+
private $signalSocketHandler;
64+
5865
/**
5966
* @param AbstractConnection $connection
6067
* @param array $config
@@ -71,6 +78,7 @@ public function __construct(AbstractConnection $connection, $config = [])
7178
$this->connection = $connection;
7279
$this->buffer = new Buffer();
7380
$this->subscribers = [];
81+
$this->signalSocketHandler = new SignalSocketHelper();
7482
}
7583

7684
/**
@@ -382,6 +390,8 @@ public function consume($timeout = 0)
382390
throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming');
383391
}
384392

393+
$this->signalSocketHandler->beforeSocket();
394+
385395
try {
386396
while (true) {
387397
$start = microtime(true);
@@ -402,6 +412,14 @@ public function consume($timeout = 0)
402412
}
403413
} catch (AMQPTimeoutException $e) {
404414
} catch (StopBasicConsumptionException $e) {
415+
} catch (AMQPIOWaitException $e) {
416+
if ($this->signalSocketHandler->wasThereSignal()) {
417+
return;
418+
}
419+
420+
throw $e;
421+
} finally {
422+
$this->signalSocketHandler->afterSocket();
405423
}
406424
}
407425

pkg/amqp-tools/SignalSocketHelper.php

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpTools;
4+
5+
class SignalSocketHelper
6+
{
7+
/**
8+
* @var callable[]
9+
*/
10+
private $handlers;
11+
12+
/**
13+
* @var bool
14+
*/
15+
private $wasThereSignal;
16+
17+
/**
18+
* @var int[]
19+
*/
20+
private $signals = [SIGTERM, SIGQUIT, SIGINT];
21+
22+
public function __construct()
23+
{
24+
$this->handlers = [];
25+
}
26+
27+
public function beforeSocket()
28+
{
29+
// PHP 7.1 and higher
30+
if (false == function_exists('pcntl_signal_get_handler')) {
31+
return;
32+
}
33+
34+
if ($this->handlers) {
35+
throw new \LogicException('The handlers property should be empty but it is not. The afterSocket method might not have been called.');
36+
}
37+
if (null !== $this->wasThereSignal) {
38+
throw new \LogicException('The wasThereSignal property should be null but it is not. The afterSocket method might not have been called.');
39+
}
40+
41+
$this->wasThereSignal = false;
42+
43+
foreach ($this->signals as $signal) {
44+
/** @var callable $handler */
45+
$handler = pcntl_signal_get_handler($signal);
46+
47+
pcntl_signal($signal, function ($signal) use ($handler) {
48+
var_dump('fuckk!');
49+
$this->wasThereSignal = true;
50+
51+
$handler && $handler($signal);
52+
});
53+
54+
$handler && $this->handlers[$signal] = $handler;
55+
}
56+
}
57+
58+
public function afterSocket()
59+
{
60+
// PHP 7.1 and higher
61+
if (false == function_exists('pcntl_signal_get_handler')) {
62+
return;
63+
}
64+
65+
$this->wasThereSignal = null;
66+
67+
foreach ($this->signals as $signal) {
68+
$handler = isset($this->handlers[$signal]) ? $this->handlers[$signal] : SIG_DFL;
69+
70+
pcntl_signal($signal, $handler);
71+
}
72+
73+
$this->handlers = [];
74+
}
75+
76+
/**
77+
* @return bool
78+
*/
79+
public function wasThereSignal()
80+
{
81+
return (bool) $this->wasThereSignal;
82+
}
83+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpTools\Tests;
4+
5+
use Enqueue\AmqpTools\SignalSocketHelper;
6+
use PHPUnit\Framework\TestCase;
7+
8+
class SignalSocketHelperTest extends TestCase
9+
{
10+
/**
11+
* @var SignalSocketHelper
12+
*/
13+
private $signalHelper;
14+
15+
private $backupSigTermHandler;
16+
17+
private $backupSigIntHandler;
18+
19+
public function setUp()
20+
{
21+
parent::setUp();
22+
23+
if (false == function_exists('pcntl_signal_get_handler')) {
24+
$this->markTestSkipped('PHP 7.1 and higher');
25+
}
26+
27+
$this->backupSigTermHandler = pcntl_signal_get_handler(SIGTERM);
28+
$this->backupSigIntHandler = pcntl_signal_get_handler(SIGINT);
29+
30+
pcntl_signal(SIGTERM, SIG_DFL);
31+
pcntl_signal(SIGINT, SIG_DFL);
32+
33+
$this->signalHelper = new SignalSocketHelper();
34+
}
35+
36+
public function tearDown()
37+
{
38+
parent::tearDown();
39+
40+
if ($this->signalHelper) {
41+
$this->signalHelper->afterSocket();
42+
}
43+
44+
if ($this->backupSigTermHandler) {
45+
pcntl_signal(SIGTERM, $this->backupSigTermHandler);
46+
}
47+
48+
if ($this->backupSigIntHandler) {
49+
pcntl_signal(SIGINT, $this->backupSigIntHandler);
50+
}
51+
}
52+
53+
public function testShouldReturnFalseByDefault()
54+
{
55+
$this->assertFalse($this->signalHelper->wasThereSignal());
56+
}
57+
58+
public function testShouldRegisterHandlerOnBeforeSocket()
59+
{
60+
$this->signalHelper->beforeSocket();
61+
62+
$this->assertAttributeSame(false, 'wasThereSignal', $this->signalHelper);
63+
$this->assertAttributeSame([], 'handlers', $this->signalHelper);
64+
}
65+
66+
public function testShouldRegisterHandlerOnBeforeSocketAndBackupCurrentOne()
67+
{
68+
$handler = function () {};
69+
70+
pcntl_signal(SIGTERM, $handler);
71+
72+
$this->signalHelper->beforeSocket();
73+
74+
$this->assertAttributeSame(false, 'wasThereSignal', $this->signalHelper);
75+
76+
$handlers = $this->readAttribute($this->signalHelper, 'handlers');
77+
78+
$this->assertInternalType('array', $handlers);
79+
$this->assertArrayHasKey(SIGTERM, $handlers);
80+
$this->assertSame($handler, $handlers[SIGTERM]);
81+
}
82+
83+
public function testRestoreDefaultPropertiesOnAfterSocket()
84+
{
85+
$this->signalHelper->beforeSocket();
86+
$this->signalHelper->afterSocket();
87+
88+
$this->assertAttributeSame(null, 'wasThereSignal', $this->signalHelper);
89+
$this->assertAttributeSame([], 'handlers', $this->signalHelper);
90+
}
91+
92+
public function testRestorePreviousHandlerOnAfterSocket()
93+
{
94+
$handler = function () {};
95+
96+
pcntl_signal(SIGTERM, $handler);
97+
98+
$this->signalHelper->beforeSocket();
99+
$this->signalHelper->afterSocket();
100+
101+
$this->assertSame($handler, pcntl_signal_get_handler(SIGTERM));
102+
}
103+
104+
public function testThrowsIfBeforeSocketCalledSecondTime()
105+
{
106+
$this->signalHelper->beforeSocket();
107+
108+
$this->expectException(\LogicException::class);
109+
$this->expectExceptionMessage('The wasThereSignal property should be null but it is not. The afterSocket method might not have been called.');
110+
$this->signalHelper->beforeSocket();
111+
}
112+
113+
public function testShouldReturnTrueOnWasThereSignal()
114+
{
115+
$this->signalHelper->beforeSocket();
116+
117+
posix_kill(getmypid(), SIGINT);
118+
pcntl_signal_dispatch();
119+
120+
$this->assertTrue($this->signalHelper->wasThereSignal());
121+
122+
$this->signalHelper->afterSocket();
123+
}
124+
}

pkg/enqueue/Consumption/Extension/SignalExtension.php

+14-3
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ public function onStart(Context $context)
3131
throw new LogicException('The pcntl extension is required in order to catch signals.');
3232
}
3333

34+
if (function_exists('pcntl_async_signals')) {
35+
pcntl_async_signals(true);
36+
}
37+
3438
pcntl_signal(SIGTERM, [$this, 'handleSignal']);
3539
pcntl_signal(SIGQUIT, [$this, 'handleSignal']);
3640
pcntl_signal(SIGINT, [$this, 'handleSignal']);
@@ -45,7 +49,7 @@ public function onBeforeReceive(Context $context)
4549
{
4650
$this->logger = $context->getLogger();
4751

48-
pcntl_signal_dispatch();
52+
$this->dispatchSignal();
4953

5054
$this->interruptExecutionIfNeeded($context);
5155
}
@@ -63,7 +67,7 @@ public function onPreReceived(Context $context)
6367
*/
6468
public function onPostReceived(Context $context)
6569
{
66-
pcntl_signal_dispatch();
70+
$this->dispatchSignal();
6771

6872
$this->interruptExecutionIfNeeded($context);
6973
}
@@ -73,7 +77,7 @@ public function onPostReceived(Context $context)
7377
*/
7478
public function onIdle(Context $context)
7579
{
76-
pcntl_signal_dispatch();
80+
$this->dispatchSignal();
7781

7882
$this->interruptExecutionIfNeeded($context);
7983
}
@@ -117,4 +121,11 @@ public function handleSignal($signal)
117121
break;
118122
}
119123
}
124+
125+
private function dispatchSignal()
126+
{
127+
if (false == function_exists('pcntl_async_signals')) {
128+
pcntl_signal_dispatch();
129+
}
130+
}
120131
}

pkg/enqueue/Symfony/AmqpTransportFactory.php

+4-2
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,11 @@ public function addConfiguration(ArrayNodeDefinition $builder)
5858
throw new \InvalidArgumentException('There is no amqp driver available. Please consider installing one of the packages: enqueue/amqp-ext, enqueue/amqp-lib, enqueue/amqp-bunny.');
5959
}
6060

61-
if (isset($v['driver']) && false == in_array($v['driver'], $drivers, true)) {
62-
throw new \InvalidArgumentException(sprintf('Unexpected driver given "invalidDriver". Available are "%s"', implode('", "', $drivers)));
61+
if ($v && false == in_array($v, $drivers, true)) {
62+
throw new \InvalidArgumentException(sprintf('Unexpected driver given "%s". Available are "%s"', $v, implode('", "', $drivers)));
6363
}
64+
65+
return $v;
6466
})
6567
->end()
6668
->end()

0 commit comments

Comments
 (0)