Skip to content

Commit

Permalink
Include timeout logic to avoid dependency on reactphp/promise-timer
Browse files Browse the repository at this point in the history
  • Loading branch information
clue committed Jun 6, 2023
1 parent 7da8a6b commit 3c4b656
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 75 deletions.
6 changes: 3 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@
"require": {
"php": ">=5.3.0",
"evenement/evenement": "^3.0 || ^2.0 || ^1.0",
"react/dns": "^1.8",
"react/dns": "^1.11",
"react/event-loop": "^1.2",
"react/promise": "^3 || ^2.6 || ^1.2.1",
"react/promise-timer": "^1.9",
"react/stream": "^1.2"
},
"require-dev": {
"phpunit/phpunit": "^9.5 || ^5.7 || ^4.8.35",
"react/async": "^4 || ^3 || ^2",
"react/promise-stream": "^1.4"
"react/promise-stream": "^1.4",
"react/promise-timer": "^1.9"
},
"autoload": {
"psr-4": {
Expand Down
67 changes: 43 additions & 24 deletions src/TimeoutConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use React\Promise\Timer;
use React\Promise\Timer\TimeoutException;
use React\Promise\Promise;

final class TimeoutConnector implements ConnectorInterface
{
Expand All @@ -22,30 +21,50 @@ public function __construct(ConnectorInterface $connector, $timeout, LoopInterfa

public function connect($uri)
{
return Timer\timeout($this->connector->connect($uri), $this->timeout, $this->loop)->then(null, self::handler($uri));
}
$promise = $this->connector->connect($uri);

/**
* Creates a static rejection handler that reports a proper error message in case of a timeout.
*
* This uses a private static helper method to ensure this closure is not
* bound to this instance and the exception trace does not include a
* reference to this instance and its connector stack as a result.
*
* @param string $uri
* @return callable
*/
private static function handler($uri)
{
return function (\Exception $e) use ($uri) {
if ($e instanceof TimeoutException) {
throw new \RuntimeException(
'Connection to ' . $uri . ' timed out after ' . $e->getTimeout() . ' seconds (ETIMEDOUT)',
\defined('SOCKET_ETIMEDOUT') ? \SOCKET_ETIMEDOUT : 110
);
$loop = $this->loop;
$time = $this->timeout;
return new Promise(function ($resolve, $reject) use ($loop, $time, $promise, $uri) {
$timer = null;
$promise = $promise->then(function ($v) use (&$timer, $loop, $resolve) {
if ($timer) {
$loop->cancelTimer($timer);
}
$timer = false;
$resolve($v);
}, function ($v) use (&$timer, $loop, $reject) {
if ($timer) {
$loop->cancelTimer($timer);
}
$timer = false;
$reject($v);
});

// promise already resolved => no need to start timer
if ($timer === false) {
return;
}

throw $e;
};
// start timeout timer which will cancel the pending promise
$timer = $loop->addTimer($time, function () use ($time, &$promise, $reject, $uri) {
$reject(new \RuntimeException(
'Connection to ' . $uri . ' timed out after ' . $time . ' seconds (ETIMEDOUT)',
\defined('SOCKET_ETIMEDOUT') ? \SOCKET_ETIMEDOUT : 110
));

// Cancel pending connection to clean up any underlying resources and references.
// Avoid garbage references in call stack by passing pending promise by reference.
assert(\method_exists($promise, 'cancel'));
$promise->cancel();
$promise = null;
});
}, function () use (&$promise) {
// Cancelling this promise will cancel the pending connection, thus triggering the rejection logic above.
// Avoid garbage references in call stack by passing pending promise by reference.
assert(\method_exists($promise, 'cancel'));
$promise->cancel();
$promise = null;
});
}
}
181 changes: 133 additions & 48 deletions tests/TimeoutConnectorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
namespace React\Tests\Socket;

use React\EventLoop\Loop;
use React\Promise;
use React\Promise\Deferred;
use React\Promise\Promise;
use React\Socket\TimeoutConnector;

class TimeoutConnectorTest extends TestCase
Expand All @@ -22,90 +22,175 @@ public function testConstructWithoutLoopAssignsLoopAutomatically()
$this->assertInstanceOf('React\EventLoop\LoopInterface', $loop);
}

public function testRejectsWithTimeoutReasonOnTimeout()
public function testRejectsPromiseWithoutStartingTimerWhenWrappedConnectorReturnsRejectedPromise()
{
$promise = new Promise\Promise(function () { });
$loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();
$loop->expects($this->never())->method('addTimer');
$loop->expects($this->never())->method('cancelTimer');

$connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock();
$connector->expects($this->once())->method('connect')->with('google.com:80')->will($this->returnValue($promise));
$connector->expects($this->once())->method('connect')->with('example.com:80')->willReturn(\React\Promise\reject(new \RuntimeException('Failed', 42)));

$timeout = new TimeoutConnector($connector, 0.01);
$timeout = new TimeoutConnector($connector, 5.0, $loop);

$promise = $timeout->connect('google.com:80');
Loop::run();
$promise = $timeout->connect('example.com:80');

$exception = null;
$promise->then(null, function ($reason) use (&$exception) {
$exception = $reason;
});

$this->setExpectedException(
'RuntimeException',
'Connection to google.com:80 timed out after 0.01 seconds (ETIMEDOUT)',
\defined('SOCKET_ETIMEDOUT') ? \SOCKET_ETIMEDOUT : 110
);
\React\Async\await($promise);
assert($exception instanceof \RuntimeException);
$this->assertEquals('Failed', $exception->getMessage());
$this->assertEquals(42, $exception->getCode());
}

public function testRejectsWithOriginalReasonWhenConnectorRejects()
public function testRejectsPromiseAfterCancellingTimerWhenWrappedConnectorReturnsPendingPromiseThatRejects()
{
$promise = Promise\reject(new \RuntimeException('Failed', 42));
$timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock();
$loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();
$loop->expects($this->once())->method('addTimer')->with(5.0, $this->anything())->willReturn($timer);
$loop->expects($this->once())->method('cancelTimer')->with($timer);

$deferred = new Deferred();
$connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock();
$connector->expects($this->once())->method('connect')->with('google.com:80')->will($this->returnValue($promise));
$connector->expects($this->once())->method('connect')->with('example.com:80')->willReturn($deferred->promise());

$timeout = new TimeoutConnector($connector, 5.0);
$timeout = new TimeoutConnector($connector, 5.0, $loop);

$promise = $timeout->connect('example.com:80');

$deferred->reject(new \RuntimeException('Failed', 42));

$exception = null;
$promise->then(null, function ($reason) use (&$exception) {
$exception = $reason;
});

$this->setExpectedException(
'RuntimeException',
'Failed',
42
);
\React\Async\await($timeout->connect('google.com:80'));
assert($exception instanceof \RuntimeException);
$this->assertEquals('Failed', $exception->getMessage());
$this->assertEquals(42, $exception->getCode());
}

public function testResolvesWhenConnectorResolves()
public function testResolvesPromiseWithoutStartingTimerWhenWrappedConnectorReturnsResolvedPromise()
{
$promise = Promise\resolve(null);
$loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();
$loop->expects($this->never())->method('addTimer');
$loop->expects($this->never())->method('cancelTimer');

$connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
$connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock();
$connector->expects($this->once())->method('connect')->with('google.com:80')->will($this->returnValue($promise));
$connector->expects($this->once())->method('connect')->with('example.com:80')->willReturn(\React\Promise\resolve($connection));

$timeout = new TimeoutConnector($connector, 5.0);
$timeout = new TimeoutConnector($connector, 5.0, $loop);

$timeout->connect('google.com:80')->then(
$this->expectCallableOnce(),
$this->expectCallableNever()
);
$promise = $timeout->connect('example.com:80');

Loop::run();
$resolved = null;
$promise->then(function ($value) use (&$resolved) {
$resolved = $value;
});

$this->assertSame($connection, $resolved);
}

public function testRejectsAndCancelsPendingPromiseOnTimeout()
public function testResolvesPromiseAfterCancellingTimerWhenWrappedConnectorReturnsPendingPromiseThatResolves()
{
$promise = new Promise\Promise(function () { }, $this->expectCallableOnce());
$timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock();
$loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();
$loop->expects($this->once())->method('addTimer')->with(5.0, $this->anything())->willReturn($timer);
$loop->expects($this->once())->method('cancelTimer')->with($timer);

$deferred = new Deferred();
$connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock();
$connector->expects($this->once())->method('connect')->with('google.com:80')->will($this->returnValue($promise));
$connector->expects($this->once())->method('connect')->with('example.com:80')->willReturn($deferred->promise());

$timeout = new TimeoutConnector($connector, 0.01);
$timeout = new TimeoutConnector($connector, 5.0, $loop);

$timeout->connect('google.com:80')->then(
$this->expectCallableNever(),
$this->expectCallableOnce()
);
$promise = $timeout->connect('example.com:80');

Loop::run();
$connection = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock();
$deferred->resolve($connection);

$resolved = null;
$promise->then(function ($value) use (&$resolved) {
$resolved = $value;
});

$this->assertSame($connection, $resolved);
}

public function testCancelsPendingPromiseOnCancel()
public function testRejectsPromiseAndCancelsPendingConnectionWhenTimeoutTriggers()
{
$promise = new Promise\Promise(function () { }, function () { throw new \Exception(); });
$timerCallback = null;
$timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock();
$loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();
$loop->expects($this->once())->method('addTimer')->with(0.01, $this->callback(function ($callback) use (&$timerCallback) {
$timerCallback = $callback;
return true;
}))->willReturn($timer);
$loop->expects($this->once())->method('cancelTimer')->with($timer);

$cancelled = 0;
$connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock();
$connector->expects($this->once())->method('connect')->with('example.com:80')->willReturn(new Promise(function () { }, function () use (&$cancelled) {
++$cancelled;
throw new \RuntimeException();
}));

$timeout = new TimeoutConnector($connector, 0.01, $loop);

$promise = $timeout->connect('example.com:80');

$this->assertEquals(0, $cancelled);

$this->assertNotNull($timerCallback);
$timerCallback();

$this->assertEquals(1, $cancelled);

$exception = null;
$promise->then(null, function ($reason) use (&$exception) {
$exception = $reason;
});

assert($exception instanceof \RuntimeException);
$this->assertEquals('Connection to example.com:80 timed out after 0.01 seconds (ETIMEDOUT)' , $exception->getMessage());
$this->assertEquals(\defined('SOCKET_ETIMEDOUT') ? \SOCKET_ETIMEDOUT : 110, $exception->getCode());
}

public function testCancellingPromiseWillCancelPendingConnectionAndRejectPromise()
{
$timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock();
$loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();
$loop->expects($this->once())->method('addTimer')->with(0.01, $this->anything())->willReturn($timer);
$loop->expects($this->once())->method('cancelTimer')->with($timer);

$cancelled = 0;
$connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock();
$connector->expects($this->once())->method('connect')->with('google.com:80')->will($this->returnValue($promise));
$connector->expects($this->once())->method('connect')->with('example.com:80')->willReturn(new Promise(function () { }, function () use (&$cancelled) {
++$cancelled;
throw new \RuntimeException('Cancelled');
}));

$timeout = new TimeoutConnector($connector, 0.01);
$timeout = new TimeoutConnector($connector, 0.01, $loop);

$promise = $timeout->connect('example.com:80');

$this->assertEquals(0, $cancelled);

assert(method_exists($promise, 'cancel'));
$promise->cancel();

$out = $timeout->connect('google.com:80');
$out->cancel();
$this->assertEquals(1, $cancelled);

$exception = null;
$promise->then(null, function ($reason) use (&$exception) {
$exception = $reason;
});

$out->then($this->expectCallableNever(), $this->expectCallableOnce());
assert($exception instanceof \RuntimeException);
$this->assertEquals('Cancelled', $exception->getMessage());
}

public function testRejectionDuringConnectionShouldNotCreateAnyGarbageReferences()
Expand Down

0 comments on commit 3c4b656

Please sign in to comment.