Skip to content

Commit

Permalink
Include timeout logic to avoid dependency on reactphp/promise-timer
Browse files Browse the repository at this point in the history
  • Loading branch information
clue committed Jan 14, 2024
1 parent 615ae23 commit 668f253
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 17 deletions.
1 change: 0 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
"evenement/evenement": "^3.0 || ^2.0 || ^1.0",
"react/event-loop": "^1.2",
"react/promise": "^3",
"react/promise-timer": "^1.10",
"react/socket": "^1.15"
},
"require-dev": {
Expand Down
52 changes: 43 additions & 9 deletions src/Io/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@
use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use React\Promise\Deferred;
use React\Promise\Promise;
use React\Promise\PromiseInterface;
use React\Promise\Timer\TimeoutException;
use React\Socket\ConnectionInterface;
use React\Socket\Connector;
use React\Socket\ConnectorInterface;
use function React\Promise\reject;
use function React\Promise\Timer\timeout;

/**
* @internal
Expand Down Expand Up @@ -175,14 +174,49 @@ function (\Exception $e) use ($redis, $uri) {
return $deferred->promise();
}

return timeout($deferred->promise(), $timeout, $this->loop)->then(null, function (\Throwable $e) use ($uri) {
if ($e instanceof TimeoutException) {
throw new \RuntimeException(
'Connection to ' . $uri . ' timed out after ' . $e->getTimeout() . ' seconds (ETIMEDOUT)',
defined('SOCKET_ETIMEDOUT') ? SOCKET_ETIMEDOUT : 110
);
$promise = $deferred->promise();
/** @var Promise<StreamingClient> */
return new Promise(function (callable $resolve, callable $reject) use ($timeout, $promise, $uri): void {

Check failure on line 179 in src/Io/Factory.php

View workflow job for this annotation

GitHub Actions / PHPStan (PHP 7.1)

Method Clue\React\Redis\Io\Factory::createClient() should return React\Promise\PromiseInterface<Clue\React\Redis\Io\StreamingClient> but returns React\Promise\Promise<mixed>.
/** @var ?\React\EventLoop\TimerInterface */
$timer = null;
$promise = $promise->then(function (StreamingClient $v) use (&$timer, $resolve): void {
if ($timer) {
$this->loop->cancelTimer($timer);
}
$timer = false;
$resolve($v);
}, function (\Throwable $e) use (&$timer, $reject): void {
if ($timer) {
$this->loop->cancelTimer($timer);
}
$timer = false;
$reject($e);
});

// promise already settled => no need to start timer
if ($timer === false) {
return;
}
throw $e;

// start timeout timer which will cancel the pending promise
$timer = $this->loop->addTimer($timeout, function () use ($timeout, &$promise, $reject, $uri): void {
$reject(new \RuntimeException(
'Connection to ' . $uri . ' timed out after ' . $timeout . ' seconds (ETIMEDOUT)',
\defined('SOCKET_ETIMEDOUT') ? \SOCKET_ETIMEDOUT : 110
));

// Cancel pending connection to clean up any underlying resources and references.
// Avoid garbage references in call stack by passing pending promise by reference.
\assert($promise instanceof PromiseInterface);
$promise->cancel();
$promise = null;
});
}, function () use (&$promise): void {
// Cancelling this promise will cancel the pending connection, thus triggering the rejection logic above.
// Avoid garbage references in call stack by passing pending promise by reference.
\assert($promise instanceof PromiseInterface);
$promise->cancel();
$promise = null;
});
}
}
18 changes: 11 additions & 7 deletions tests/FunctionalTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@

use Clue\React\Redis\RedisClient;
use React\EventLoop\Loop;
use React\Promise\Deferred;
use React\Promise\Promise;
use React\Promise\PromiseInterface;
use function React\Async\await;
use function React\Promise\Timer\timeout;

class FunctionalTest extends TestCase
{
Expand Down Expand Up @@ -144,19 +143,24 @@ public function testPubSub(): void
$channel = 'channel:test:' . mt_rand();

// consumer receives a single message
/** @var Deferred<void> */
$deferred = new Deferred();
$consumer->on('message', $this->expectCallableOnce());
$consumer->on('message', [$deferred, 'resolve']);
$once = $this->expectCallableOnceWith(1);
$consumer->subscribe($channel)->then(function() use ($producer, $channel, $once){
// producer sends a single message
$producer->publish($channel, 'hello world')->then($once);
})->then($this->expectCallableOnce());

// expect "message" event to take no longer than 0.1s

await(timeout($deferred->promise(), 0.1));
await(new Promise(function (callable $resolve, callable $reject) use ($consumer): void {
$timeout = Loop::addTimer(0.1, function () use ($consumer, $reject): void {
$consumer->close();
$reject(new \RuntimeException('Timed out'));
});
$consumer->on('message', function () use ($timeout, $resolve): void {
Loop::cancelTimer($timeout);
$resolve(null);
});
}));

/** @var PromiseInterface<array{0:"unsubscribe",1:string,2:0}> */
$promise = $consumer->unsubscribe($channel);
Expand Down
32 changes: 32 additions & 0 deletions tests/Io/FactoryStreamingClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Clue\Tests\React\Redis\TestCase;
use PHPUnit\Framework\MockObject\MockObject;
use React\EventLoop\LoopInterface;
use React\EventLoop\TimerInterface;
use React\Promise\Deferred;
use React\Socket\ConnectionInterface;
use React\Socket\ConnectorInterface;
Expand Down Expand Up @@ -633,4 +634,35 @@ public function testCreateClientWithoutTimeoutParameterWillStartTimerWithDefault
$this->factory->createClient('redis://127.0.0.1:2');
ini_set('default_socket_timeout', $old);
}

public function testCreateClientWillCancelTimerWhenConnectionResolves(): void
{
$timer = $this->createMock(TimerInterface::class);
$this->loop->expects($this->once())->method('addTimer')->willReturn($timer);
$this->loop->expects($this->once())->method('cancelTimer')->with($timer);

$deferred = new Deferred();
$this->connector->expects($this->once())->method('connect')->with('127.0.0.1:6379')->willReturn($deferred->promise());

$promise = $this->factory->createClient('127.0.0.1');
$promise->then($this->expectCallableOnce());

$deferred->resolve($this->createMock(ConnectionInterface::class));
}

public function testCreateClientWillCancelTimerWhenConnectionRejects(): void
{
$timer = $this->createMock(TimerInterface::class);
$this->loop->expects($this->once())->method('addTimer')->willReturn($timer);
$this->loop->expects($this->once())->method('cancelTimer')->with($timer);

$deferred = new Deferred();
$this->connector->expects($this->once())->method('connect')->with('127.0.0.1:6379')->willReturn($deferred->promise());

$promise = $this->factory->createClient('127.0.0.1');

$promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException')));

$deferred->reject(new \RuntimeException());
}
}

0 comments on commit 668f253

Please sign in to comment.