diff --git a/README.md b/README.md index 5136784..2fbc3fa 100644 --- a/README.md +++ b/README.md @@ -100,7 +100,9 @@ $factory = new Factory($loop, $connector); #### createClient() -The `createClient($redisUri)` method can be used to create a new [`Client`](#client). +The `createClient($redisUri): PromiseInterface` method can be used to +create a new [`Client`](#client). + It helps with establishing a plain TCP/IP or secure TLS connection to Redis and optionally authenticating (AUTH) and selecting the right database (SELECT). @@ -115,6 +117,24 @@ $factory->createClient('redis://localhost:6379')->then( ); ``` +The method returns a [Promise](https://github.com/reactphp/promise) that +will resolve with a [`Client`](#client) +instance on success or will reject with an `Exception` if the URL is +invalid or the connection or authentication fails. + +The returned Promise is implemented in such a way that it can be +cancelled when it is still pending. Cancelling a pending promise will +reject its value with an Exception and will cancel the underlying TCP/IP +connection attempt and/or Redis authentication. + +```php +$promise = $factory->createConnection($redisUri); + +$loop->addTimer(3.0, function () use ($promise) { + $promise->cancel(); +}); +``` + The `$redisUri` can be given in the [standard](https://www.iana.org/assignments/uri-schemes/prov/redis) form `[redis[s]://][:auth@]host[:port][/db]`. diff --git a/composer.json b/composer.json index 9213724..311c863 100644 --- a/composer.json +++ b/composer.json @@ -16,7 +16,7 @@ "evenement/evenement": "^3.0 || ^2.0 || ^1.0", "react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3", "react/promise": "^2.0 || ^1.1", - "react/socket": "^1.0 || ^0.8.3" + "react/socket": "^1.1" }, "autoload": { "psr-4": { "Clue\\React\\Redis\\": "src/" } diff --git a/src/Factory.php b/src/Factory.php index ac80a61..09c8d0e 100644 --- a/src/Factory.php +++ b/src/Factory.php @@ -2,10 +2,10 @@ namespace Clue\React\Redis; -use Clue\React\Redis\StreamingClient; use Clue\Redis\Protocol\Factory as ProtocolFactory; use React\EventLoop\LoopInterface; use React\Promise; +use React\Promise\Deferred; use React\Socket\ConnectionInterface; use React\Socket\Connector; use React\Socket\ConnectorInterface; @@ -50,9 +50,20 @@ public function createClient($target) return Promise\reject($e); } - $protocol = $this->protocol; + $connecting = $this->connector->connect($parts['authority']); + $deferred = new Deferred(function ($_, $reject) use ($connecting) { + // connection cancelled, start with rejecting attempt, then clean up + $reject(new \RuntimeException('Connection to database server cancelled')); + + // either close successful connection or cancel pending connection attempt + $connecting->then(function (ConnectionInterface $connection) { + $connection->close(); + }); + $connecting->cancel(); + }); - $promise = $this->connector->connect($parts['authority'])->then(function (ConnectionInterface $stream) use ($protocol) { + $protocol = $this->protocol; + $promise = $connecting->then(function (ConnectionInterface $stream) use ($protocol) { return new StreamingClient($stream, $protocol->createResponseParser(), $protocol->createSerializer()); }); @@ -84,7 +95,9 @@ function ($error) use ($client) { }); } - return $promise; + $promise->then(array($deferred, 'resolve'), array($deferred, 'reject')); + + return $deferred->promise(); } /** diff --git a/tests/FactoryTest.php b/tests/FactoryTest.php index 9286b58..feae26c 100644 --- a/tests/FactoryTest.php +++ b/tests/FactoryTest.php @@ -4,6 +4,7 @@ use Clue\React\Redis\Factory; use React\Promise; +use React\Promise\Deferred; class FactoryTest extends TestCase { @@ -153,4 +154,36 @@ public function testWillRejectIfTargetIsInvalid() $this->expectPromiseReject($promise); } + + public function testCancelWillRejectPromise() + { + $promise = new \React\Promise\Promise(function () { }); + $this->connector->expects($this->once())->method('connect')->with('127.0.0.1:2')->willReturn($promise); + + $promise = $this->factory->createClient('redis://127.0.0.1:2'); + $promise->cancel(); + + $promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException'))); + } + + public function testCancelWillCancelConnectorWhenConnectionIsPending() + { + $deferred = new Deferred($this->expectCallableOnce()); + $this->connector->expects($this->once())->method('connect')->with('127.0.0.1:2')->willReturn($deferred->promise()); + + $promise = $this->factory->createClient('redis://127.0.0.1:2'); + $promise->cancel(); + } + + public function testCancelWillCloseConnectionWhenConnectionWaitsForSelect() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->once())->method('write'); + $stream->expects($this->once())->method('close'); + + $this->connector->expects($this->once())->method('connect')->willReturn(Promise\resolve($stream)); + + $promise = $this->factory->createClient('redis://127.0.0.1:2/123'); + $promise->cancel(); + } }