Skip to content

Commit

Permalink
fix: repeat read select if interrupted by signal and timeout not reac…
Browse files Browse the repository at this point in the history
…hed (#1129)
  • Loading branch information
ramunasd authored Oct 11, 2023
1 parent 3f05dac commit 73d6fe1
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 48 deletions.
51 changes: 31 additions & 20 deletions PhpAmqpLib/Wire/AMQPIOReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -77,35 +77,46 @@ protected function rawread(int $n): string
*
* AMQPTimeoutException can be raised if the timeout is set
*
* @throws \PhpAmqpLib\Exception\AMQPTimeoutException when timeout is set and no data received
* @throws \PhpAmqpLib\Exception\AMQPNoDataException when no data is ready to read from IO
* @throws AMQPTimeoutException when timeout is set and no data received
* @throws AMQPNoDataException when no data is ready to read from IO
*/
protected function wait()
protected function wait(): void
{
$timeout = $this->timeout;
if (null === $timeout) {
// timeout=null just poll state and return instantly
$sec = 0;
$usec = 0;
} elseif ($timeout > 0) {
list($sec, $usec) = MiscHelper::splitSecondsMicroseconds($this->getTimeout());
} else {
// wait indefinitely for data if timeout=0
$sec = null;
$usec = 0;
$result = $this->io->select(0);
if ($result === 0) {
throw new AMQPNoDataException('No data is ready to read');
}
return;
}

$result = $this->io->select($sec, $usec);

if ($result === 0) {
if ($timeout > 0) {
throw new AMQPTimeoutException(sprintf(
'The connection timed out after %s sec while awaiting incoming data',
$timeout
));
} else {
if (!($timeout > 0)) {
// wait indefinitely for data if timeout=0
$result = $this->io->select(null);
if ($result === 0) {
throw new AMQPNoDataException('No data is ready to read');
}
return;
}

$leftTime = $timeout;
$started = microtime(true);
do {
[$sec, $usec] = MiscHelper::splitSecondsMicroseconds($leftTime);
$result = $this->io->select($sec, $usec);
if ($result > 0) {
return;
}
// select might be interrupted by signal, calculate left time and repeat
$leftTime = $timeout - (microtime(true) - $started);
} while ($leftTime > 0);

throw new AMQPTimeoutException(sprintf(
'The connection timed out after %s sec while awaiting incoming data',
$timeout
));

}
}
12 changes: 7 additions & 5 deletions PhpAmqpLib/Wire/IO/AbstractIO.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPHeartbeatMissedException;
use PhpAmqpLib\Exception\AMQPIOWaitException;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Wire\AMQPWriter;

abstract class AbstractIO
Expand Down Expand Up @@ -55,7 +56,7 @@ abstract class AbstractIO
* @param int $len
* @return string
* @throws \PhpAmqpLib\Exception\AMQPIOException
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
* @throws AMQPRuntimeException
* @throws \PhpAmqpLib\Exception\AMQPSocketException
* @throws \PhpAmqpLib\Exception\AMQPTimeoutException
* @throws \PhpAmqpLib\Exception\AMQPConnectionClosedException
Expand All @@ -80,8 +81,9 @@ abstract public function close();
* @param int|null $sec
* @param int $usec
* @return int
* @throws \PhpAmqpLib\Exception\AMQPIOException
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
* @throws AMQPIOWaitException
* @throws AMQPRuntimeException
* @throws AMQPConnectionClosedException
*/
public function select(?int $sec, int $usec = 0)
{
Expand Down Expand Up @@ -120,7 +122,7 @@ abstract protected function do_select(?int $sec, int $usec);
* Set ups the connection.
* @return void
* @throws \PhpAmqpLib\Exception\AMQPIOException
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
* @throws AMQPRuntimeException
*/
abstract public function connect();

Expand All @@ -137,7 +139,7 @@ public function afterTune(int $heartbeat): void
/**
* Heartbeat logic: check connection health here
* @return void
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
* @throws AMQPRuntimeException
*/
public function check_heartbeat()
{
Expand Down
14 changes: 6 additions & 8 deletions docker/php/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
FROM php:7.2-cli

RUN apt update && \
apt -qy install git unzip zlib1g-dev libzip-dev
RUN docker-php-ext-install sockets pcntl zip
ADD --chmod=0755 https://github.com/mlocati/docker-php-extension-installer/releases/download/2.1.56/install-php-extensions /usr/local/bin/
COPY --from=composer /usr/bin/composer /usr/bin/composer

WORKDIR /src
RUN php -r "copy('https://getcomposer.org/installer', 'composer-setup.php');" && \
php composer-setup.php && \
php -r "unlink('composer-setup.php');" && \
mv composer.phar /usr/local/bin/composer
RUN install-php-extensions sockets pcntl zip xdebug
RUN echo 'memory_limit = 1G' >> /usr/local/etc/php/conf.d/docker-php-memlimit.ini
RUN rm /usr/local/etc/php/conf.d/docker-php-ext-xdebug.ini

WORKDIR /src
27 changes: 13 additions & 14 deletions tests/Functional/Channel/ChannelTimeoutTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Helper\MiscHelper;
use PhpAmqpLib\Wire\IO\AbstractIO;
use PhpAmqpLib\Wire\IO\StreamIO;
Expand All @@ -29,6 +30,8 @@ class ChannelTimeoutTest extends TestCaseCompat
/** @var AMQPChannel $channel */
private $channel;

private $selectResult = 1;

protected function setUpCompat()
{
$channel_rpc_timeout = 3.5;
Expand All @@ -40,6 +43,12 @@ protected function setUpCompat()
->setConstructorArgs(array(HOST, PORT, 3, 3, null, false, 0))
->setMethods(array('select'))
->getMock();
$this->io
->expects(self::atLeastOnce())
->method('select')
->willReturnCallback(function(){
return $this->selectResult;
});
$this->connection = $this->getMockBuilder(AbstractConnection::class)
->setConstructorArgs(array(USER, PASS, '/', false, 'AMQPLAIN', null, 'en_US', $this->io, 0, 0, $channel_rpc_timeout))
->setMethods(array())
Expand All @@ -53,23 +62,19 @@ protected function setUpCompat()
*
* @dataProvider provide_operations
* @param string $operation
* @param array $args
* @param mixed[] $args
*
* @covers \PhpAmqpLib\Channel\AMQPChannel::exchange_declare
* @covers \PhpAmqpLib\Channel\AMQPChannel::queue_declare
* @covers \PhpAmqpLib\Channel\AMQPChannel::confirm_select
*/
public function should_throw_exception_for_basic_operations_when_timeout_exceeded($operation, $args)
public function should_throw_exception_for_basic_operations_when_timeout_exceeded(string $operation, array $args)
{
$this->expectException(\PhpAmqpLib\Exception\AMQPTimeoutException::class);
$this->expectException(AMQPTimeoutException::class);
$this->expectExceptionMessage('The connection timed out after 3.5 sec while awaiting incoming data');

// simulate blocking on the I/O level
$this->io->expects($this->any())
->method('select')
->with($this->channel_rpc_timeout_seconds, $this->channel_rpc_timeout_microseconds)
->willReturn(0);

$this->selectResult = 0;
call_user_func_array(array($this->channel, $operation), $args);
}

Expand All @@ -84,13 +89,7 @@ public function provide_operations()

protected function tearDownCompat()
{
if ($this->channel) {
$this->channel->close();
}
$this->channel = null;
if ($this->connection) {
$this->connection->close();
}
$this->connection = null;
}
}
32 changes: 31 additions & 1 deletion tests/Functional/Channel/ChannelWaitTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPSocketConnection;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPNoDataException;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Message\AMQPMessage;
use PHPUnit\Framework\TestCase;

Expand Down Expand Up @@ -36,6 +38,34 @@ public function should_wait_until_signal_by_default($factory)
$this->assertNull($result);
}

/**
* @test
* @group signals
* @covers AMQPIOReader::wait()
*/
public function should_wait_until_timeout_after_signal(): void
{
$factory = $this->channelFactory(true, 30, 15);
$channel = $factory();

$exception = null;
$started = microtime(true);
$this->deferSignal(1);
$this->deferSignal(2);
try {
$result = $channel->wait(null, false, 3);
} catch (\Throwable $exception) {
}

$took = microtime(true) - $started;
self::assertGreaterThan(2, $took);
self::assertLessThan(4, $took);
self::assertInstanceOf(AMQPTimeoutException::class, $exception);

$this->closeChannel($channel);
$this->assertNull($result);
}

/**
* @test
* @small
Expand All @@ -44,7 +74,7 @@ public function should_wait_until_signal_by_default($factory)
*/
public function should_throw_timeout_exception($factory)
{
$this->expectException(\PhpAmqpLib\Exception\AMQPTimeoutException::class);
$this->expectException(AMQPTimeoutException::class);

$channel = $factory();
$channel->wait(null, false, 0.01);
Expand Down

0 comments on commit 73d6fe1

Please sign in to comment.