diff --git a/README.md b/README.md index ddd01bf..a6c17ba 100644 --- a/README.md +++ b/README.md @@ -81,12 +81,29 @@ $factory = new Factory($loop); ``` If you need custom DNS, proxy or TLS settings, you can explicitly pass a -custom instance of the [`ConnectorInterface`](https://github.com/reactphp/socket-client#connectorinterface): +custom instance of the [`ConnectorInterface`](https://github.com/reactphp/socket#connectorinterface): ```php +$connector = new \React\Socket\Connector($loop, array( + 'dns' => '127.0.0.1', + 'tcp' => array( + 'bindto' => '192.168.10.1:0' + ), + 'tls' => array( + 'verify_peer' => false, + 'verify_peer_name' => false + ) +)); + $factory = new Factory($loop, $connector); ``` +> Legacy notice: As of `v1.2.0`, the optional connector should implement the new + `React\Socket\ConnectorInterface`. For BC reasons it also accepts the + legacy `React\SocketClient\ConnectorInterface`. + This legacy API will be removed in a future `v2.0.0` version, so it's highly + recommended to upgrade to the above API. + #### createClient() The `createClient($redisUri = null)` method can be used to create a new [`Client`](#client). diff --git a/composer.json b/composer.json index 97edb4f..d77e979 100644 --- a/composer.json +++ b/composer.json @@ -12,9 +12,10 @@ ], "require": { "php": ">=5.3", + "react/event-loop": "0.3.*|0.4.*", "react/promise": "^2.0 || ^1.1", + "react/socket": "^0.7", "react/socket-client": "^0.7", - "react/event-loop": "0.3.*|0.4.*", "clue/redis-protocol": "0.3.*", "evenement/evenement": "~1.0|~2.0" }, diff --git a/src/ConnectionUpcaster.php b/src/ConnectionUpcaster.php new file mode 100644 index 0000000..c2a0f8f --- /dev/null +++ b/src/ConnectionUpcaster.php @@ -0,0 +1,78 @@ +stream = $stream; + + Util::forwardEvents($stream, $this, array('data', 'end', 'close', 'error', 'drain')); + $this->stream->on('close', array($this, 'close')); + } + + public function isReadable() + { + return $this->stream->isReadable(); + } + + public function isWritable() + { + return $this->isWritable(); + } + + public function pause() + { + $this->stream->pause(); + } + + public function resume() + { + $this->stream->resume(); + } + + public function pipe(WritableStreamInterface $dest, array $options = array()) + { + $this->stream->pipe($dest, $options); + } + + public function write($data) + { + return $this->stream->write($data); + } + + public function end($data = null) + { + return $this->stream->end($data); + } + + public function close() + { + $this->stream->close(); + $this->removeAllListeners(); + } + + public function getRemoteAddress() + { + return null; + } + + public function getLocalAddress() + { + return null; + } +} diff --git a/src/ConnectorUpcaster.php b/src/ConnectorUpcaster.php new file mode 100644 index 0000000..14b03a3 --- /dev/null +++ b/src/ConnectorUpcaster.php @@ -0,0 +1,29 @@ +legacy = $connector; + } + + public function connect($uri) + { + return $this->legacy->connect($uri)->then(function (DuplexStreamInterface $stream) { + return new ConnectionUpcaster($stream); + }); + } +} diff --git a/src/Factory.php b/src/Factory.php index c4b4a94..93b3cd2 100644 --- a/src/Factory.php +++ b/src/Factory.php @@ -2,24 +2,35 @@ namespace Clue\React\Redis; -use React\SocketClient\ConnectorInterface; -use React\Stream\Stream; use Clue\React\Redis\StreamingClient; use Clue\Redis\Protocol\Factory as ProtocolFactory; -use React\SocketClient\Connector; -use InvalidArgumentException; use React\EventLoop\LoopInterface; use React\Promise; +use React\Socket\ConnectionInterface; +use React\Socket\Connector; +use React\Socket\ConnectorInterface; +use InvalidArgumentException; class Factory { private $connector; private $protocol; - public function __construct(LoopInterface $loop, ConnectorInterface $connector = null, ProtocolFactory $protocol = null) + /** + * @param LoopInterface $loop + * @param ConnectorInterface|\React\SocketClient\ConnectorInterface|null $connector + * [optional] Connector to use. Should be `null` in order to use default + * Connector. Passing a `\React\SocketClient\ConnectorInterface` is + * deprecated and only supported for BC reasons and will be removed in + * future versions. + * @param ProtocolFactory|null $protocol + */ + public function __construct(LoopInterface $loop, $connector = null, ProtocolFactory $protocol = null) { if ($connector === null) { $connector = new Connector($loop); + } elseif (!$connector instanceof ConnectorInterface) { + $connector = new ConnectorUpcaster($connector); } if ($protocol === null) { @@ -46,7 +57,7 @@ public function createClient($target = null) $protocol = $this->protocol; - $promise = $this->connector->connect($parts['host'] . ':' . $parts['port'])->then(function (Stream $stream) use ($protocol) { + $promise = $this->connector->connect($parts['host'] . ':' . $parts['port'])->then(function (ConnectionInterface $stream) use ($protocol) { return new StreamingClient($stream, $protocol->createResponseParser(), $protocol->createSerializer()); }); diff --git a/src/StreamingClient.php b/src/StreamingClient.php index 875e541..69b870d 100644 --- a/src/StreamingClient.php +++ b/src/StreamingClient.php @@ -3,10 +3,8 @@ namespace Clue\React\Redis; use Evenement\EventEmitter; -use React\Stream\Stream; use Clue\Redis\Protocol\Parser\ParserInterface; use Clue\Redis\Protocol\Parser\ParserException; -use Clue\Redis\Protocol\Model\ErrorReplyException; use Clue\Redis\Protocol\Serializer\SerializerInterface; use Clue\Redis\Protocol\Factory as ProtocolFactory; use UnderflowException; @@ -17,6 +15,7 @@ use Clue\Redis\Protocol\Model\ModelInterface; use Clue\Redis\Protocol\Model\MultiBulkReply; use Clue\Redis\Protocol\Model\StatusReply; +use React\Stream\DuplexStreamInterface; /** * @internal @@ -34,7 +33,7 @@ class StreamingClient extends EventEmitter implements Client private $psubscribed = 0; private $monitoring = false; - public function __construct(Stream $stream, ParserInterface $parser = null, SerializerInterface $serializer = null) + public function __construct(DuplexStreamInterface $stream, ParserInterface $parser = null, SerializerInterface $serializer = null) { if ($parser === null || $serializer === null) { $factory = new ProtocolFactory(); diff --git a/tests/FactoryTest.php b/tests/FactoryTest.php index 4c059fd..acb0d3f 100644 --- a/tests/FactoryTest.php +++ b/tests/FactoryTest.php @@ -12,7 +12,7 @@ class FactoryTest extends TestCase public function setUp() { $this->loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); - $this->connector = $this->getMockBuilder('React\SocketClient\ConnectorInterface')->getMock(); + $this->connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock(); $this->factory = new Factory($this->loop, $this->connector); } @@ -39,9 +39,18 @@ public function testWillConnectToLocalIpWhenTargetIsLocalhost() $this->factory->createClient('tcp://localhost:1337'); } + public function testWillUpcastLegacyConnectorAndConnect() + { + $this->connector = $this->getMockBuilder('React\SocketClient\ConnectorInterface')->getMock(); + $this->factory = new Factory($this->loop, $this->connector); + + $this->connector->expects($this->once())->method('connect')->with('example.com:1337')->willReturn(Promise\reject(new \RuntimeException())); + $this->factory->createClient('tcp://example.com:1337'); + } + public function testWillResolveIfConnectorResolves() { - $stream = $this->getMockBuilder('React\Stream\Stream')->disableOriginalConstructor()->getMock(); + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); $stream->expects($this->never())->method('write'); $this->connector->expects($this->once())->method('connect')->willReturn(Promise\resolve($stream)); @@ -52,7 +61,7 @@ public function testWillResolveIfConnectorResolves() public function testWillWriteSelectCommandIfTargetContainsPath() { - $stream = $this->getMockBuilder('React\Stream\Stream')->disableOriginalConstructor()->getMock(); + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); $stream->expects($this->once())->method('write')->with("*2\r\n$6\r\nselect\r\n$4\r\ndemo\r\n"); $this->connector->expects($this->once())->method('connect')->willReturn(Promise\resolve($stream)); @@ -61,7 +70,7 @@ public function testWillWriteSelectCommandIfTargetContainsPath() public function testWillWriteAuthCommandIfTargetContainsUserInfo() { - $stream = $this->getMockBuilder('React\Stream\Stream')->disableOriginalConstructor()->getMock(); + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); $stream->expects($this->once())->method('write')->with("*2\r\n$4\r\nauth\r\n$11\r\nhello:world\r\n"); $this->connector->expects($this->once())->method('connect')->willReturn(Promise\resolve($stream)); diff --git a/tests/FunctionalTest.php b/tests/FunctionalTest.php index b7beb6f..080a4ab 100644 --- a/tests/FunctionalTest.php +++ b/tests/FunctionalTest.php @@ -5,6 +5,7 @@ use Clue\React\Redis\StreamingClient; use React\Promise\Deferred; use Clue\React\Block; +use React\SocketClient\Connector; class FunctionalTest extends TestCase { @@ -39,6 +40,20 @@ public function testPing() return $client; } + public function testPingClientWithLegacyConnector() + { + $this->client->close(); + $this->factory = new Factory($this->loop, new Connector($this->loop)); + $this->client = $this->createClient(getenv('REDIS_URI')); + + $promise = $this->client->ping(); + $this->assertInstanceOf('React\Promise\PromiseInterface', $promise); + $promise->then($this->expectCallableOnce('PONG')); + + $this->client->end(); + $this->waitFor($this->client); + } + public function testMgetIsNotInterpretedAsSubMessage() { $client = $this->client;