diff --git a/README.md b/README.md index 15928c6..50eca85 100644 --- a/README.md +++ b/README.md @@ -13,19 +13,21 @@ local redis server and send some requests: $factory = new Factory($connector); $factory->createClient()->then(function (Client $client) use ($loop) { - $client->SET('greeting', 'Hello world'); - $client->APPEND('greeting', '!'); + $api = new RequestApi($client); - $client->GET('greeting')->then(function ($greeting) { + $api->set('greeting', 'Hello world'); + $api->append('greeting', '!'); + + $api->get('greeting')->then(function ($greeting) { echo $greeting . PHP_EOL; }); - $client->INCR('invocation')->then(function ($n) { + $api->incr('invocation')->then(function ($n) { echo 'count: ' . $n . PHP_EOL; }); // end connection once all pending requests have been resolved - $client->end(); + $api->end(); }); $loop->run(); diff --git a/example/cli.php b/example/cli.php index 80c24c7..6b01f15 100644 --- a/example/cli.php +++ b/example/cli.php @@ -30,7 +30,7 @@ $line = fgets(STDIN); if ($line === false || $line === '') { echo '# CTRL-D -> Ending connection...' . PHP_EOL; - $client->end(); + $client->close(); } else { $line = rtrim($line); @@ -39,7 +39,7 @@ } else { $params = explode(' ', $line); $method = array_shift($params); - call_user_func_array(array($client, $method), $params); + $client->send($method, $params); } } }); diff --git a/example/incr.php b/example/incr.php index 2ef0d3c..b9bea4e 100644 --- a/example/incr.php +++ b/example/incr.php @@ -2,6 +2,7 @@ use Clue\React\Redis\Client; use Clue\React\Redis\Factory; +use Clue\React\Redis\RequestApi; require __DIR__ . '/../vendor/autoload.php'; @@ -12,13 +13,15 @@ $factory = new Factory($connector); $factory->createClient()->then(function (Client $client) { - $client->incr('test'); + $api = new RequestApi($client); - $client->get('test')->then(function ($result) { + $api->incr('test'); + + $api->get('test')->then(function ($result) { var_dump($result); }); - $client->end(); + $api->end(); }); $loop->run(); diff --git a/src/Client.php b/src/Client.php index 8193927..fd2a020 100644 --- a/src/Client.php +++ b/src/Client.php @@ -20,8 +20,6 @@ class Client extends EventEmitter private $stream; private $parser; private $serializer; - private $requests = array(); - private $ending = false; public function __construct(Stream $stream, ParserInterface $parser = null, SerializerInterface $serializer = null) { @@ -48,7 +46,7 @@ public function __construct(Stream $stream, ParserInterface $parser = null, Seri foreach ($models as $data) { try { - $that->handleMessage($data); + $that->emit('message', array($data, $that)); } catch (UnderflowException $error) { $that->emit('error', array($error)); @@ -67,73 +65,32 @@ public function __construct(Stream $stream, ParserInterface $parser = null, Seri $this->serializer = $serializer; } - public function __call($name, $args) - { - $request = new Deferred(); - - if ($this->ending) { - $request->reject(new RuntimeException('Connection closed')); - } else { - $this->stream->write($this->serializer->getRequestMessage($name, $args)); - $this->requests []= $request; - } - - return $request->promise(); - } - - public function handleMessage(ModelInterface $message) - { - $this->emit('message', array($message, $this)); - - if (!$this->requests) { - throw new UnderflowException('Unexpected reply received, no matching request found'); - } - - $request = array_shift($this->requests); - /* @var $request Deferred */ - - if ($message instanceof ErrorReply) { - $request->reject($message); - } else { - $request->resolve($message->getValueNative()); - } - - if ($this->ending && !$this->isBusy()) { - $this->close(); - } - } - - public function isBusy() + /** + * Sends command with given $name and additial $args + * + * @param name $name + * @param array $args + */ + public function sendRequest($name, array $args = array()) { - return !!$this->requests; + $this->stream->write($this->serializer->getRequestMessage($name, $args)); } /** - * end connection once all pending requests have been replied to + * Sends given message model (request message) * - * @uses self::close() once all replies have been received - * @see self::close() for closing the connection immediately + * @param ModelInterface $message */ - public function end() + public function sendMessage(ModelInterface $message) { - $this->ending = true; - - if (!$this->isBusy()) { - $this->close(); - } + $this->stream->write($message->getMessageSerialized($this->serializer)); } + /** + * Immediately terminate the connection and discard incoming and outgoing buffers + */ public function close() { - $this->ending = true; - $this->stream->close(); - - // reject all remaining requests in the queue - while($this->requests) { - $request = array_shift($this->requests); - /* @var $request Request */ - $request->reject(new RuntimeException('Connection closing')); - } } } diff --git a/src/Factory.php b/src/Factory.php index f4e5054..bad5e00 100644 --- a/src/Factory.php +++ b/src/Factory.php @@ -43,7 +43,8 @@ public function createClient($target = null) if ($auth !== null) { $promise = $promise->then(function (Client $client) use ($auth) { - return $client->auth($auth)->then( + $api = new RequestApi($client); + return $api->auth($auth)->then( function () use ($client) { return $client; }, @@ -57,7 +58,8 @@ function ($error) use ($client) { if ($db !== null) { $promise = $promise->then(function (Client $client) use ($db) { - return $client->select($db)->then( + $api = new RequestApi($client); + return $api->select($db)->then( function () use ($client) { return $client; }, diff --git a/src/RequestApi.php b/src/RequestApi.php new file mode 100644 index 0000000..ba476bd --- /dev/null +++ b/src/RequestApi.php @@ -0,0 +1,98 @@ +client = $client; + } + + public function __call($name, $args) + { + $request = new Deferred(); + + if ($this->ending) { + $request->reject(new RuntimeException('Connection closed')); + } else { + if (!$this->isBusy()) { + $this->client->on('message', array($this, 'handleMessage')); + $this->client->on('close', array($this, 'handleClose')); + } + + $this->client->sendRequest($name, $args); + $this->requests []= $request; + } + + return $request->promise(); + } + + public function handleMessage(ModelInterface $message) + { + $request = array_shift($this->requests); + /* @var $request Deferred */ + + if ($message instanceof ErrorReply) { + $request->reject($message); + } else { + $request->resolve($message->getValueNative()); + } + + if (!$this->isBusy()) { + $this->client->removeListener('message', array($this, 'handleMessage')); + $this->client->removeListener('close', array($this, 'handleClose')); + + if ($this->ending) { + $this->client->close(); + } + } + } + + public function handleClose() + { + $this->ending = true; + + $this->client->removeListener('message', array($this, 'handleMessage')); + $this->client->removeListener('close', array($this, 'handleClose')); + + // reject all remaining requests in the queue + while($this->requests) { + $request = array_shift($this->requests); + /* @var $request Request */ + $request->reject(new RuntimeException('Connection closing')); + } + + $this->requests = array(); + } + + public function isBusy() + { + return !!$this->requests; + } + + /** + * end connection once all pending requests have been replied to + * + * @uses self::close() once all replies have been received + * @see self::close() for closing the connection immediately + */ + public function end() + { + $this->ending = true; + + if (!$this->isBusy()) { + $this->client->close(); + } + } +} diff --git a/tests/ClientTest.php b/tests/ClientTest.php new file mode 100644 index 0000000..d1a96c7 --- /dev/null +++ b/tests/ClientTest.php @@ -0,0 +1,79 @@ +stream = $this->getMockBuilder('React\Stream\Stream')->disableOriginalConstructor()->setMethods(array('write', 'close', 'resume', 'pause'))->getMock(); + $this->parser = $this->getMock('Clue\Redis\Protocol\Parser\ParserInterface'); + $this->serializer = $this->getMock('Clue\Redis\Protocol\Serializer\SerializerInterface'); + + $this->client = new Client($this->stream, $this->parser, $this->serializer); + } + + public function testSending() + { + $this->serializer->expects($this->once())->method('getRequestMessage')->with($this->equalTo('ping'))->will($this->returnValue('message')); + $this->stream->expects($this->once())->method('write')->with($this->equalTo('message')); + + $this->client->sendRequest('ping', array()); + } + + public function testClosingClientEmitsEvent() + { + //$this->client->on('close', $this->expectCallableOnce()); + + $this->client->close(); + } + + public function testClosingStreamClosesClient() + { + $this->client->on('close', $this->expectCallableOnce()); + + $this->stream->emit('close'); + } + + public function testReceiveParseErrorEmitsErrorEvent() + { + $this->client->on('error', $this->expectCallableOnce()); + //$this->client->on('close', $this->expectCallableOnce()); + + $this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->throwException(new ParserException())); + $this->stream->emit('data', array('message')); + } + + public function testReceiveMessageEmitsEvent() + { + $this->client->on('message', $this->expectCallableOnce()); + + $this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->returnValue(array(new IntegerReply(2)))); + $this->stream->emit('data', array('message')); + } + + public function testReceiveThrowMessageEmitsErrorEvent() + { + $this->client->on('message', $this->expectCallableOnce()); + $this->client->on('message', function() { + throw new UnderflowException(); + }); + + $this->client->on('error', $this->expectCallableOnce()); + + $this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->returnValue(array(new IntegerReply(2)))); + $this->stream->emit('data', array('message')); + } + + public function testDefaultCtor() + { + $client = new Client($this->stream); + } +} diff --git a/tests/FunctionalTest.php b/tests/FunctionalTest.php index aa71767..95bb22c 100644 --- a/tests/FunctionalTest.php +++ b/tests/FunctionalTest.php @@ -7,6 +7,7 @@ use Clue\React\Redis\Factory; use Clue\React\Redis\Client; +use Clue\React\Redis\RequestApi; class FunctionalTest extends TestCase { @@ -25,7 +26,7 @@ public static function setUpBeforeClass() public function testPing() { - $client = $this->createClient(); + $client = $this->createClientApi(); $promise = $client->ping(); $this->assertInstanceOf('React\Promise\PromiseInterface', $promise); @@ -40,10 +41,10 @@ public function testPing() /** * - * @param Client $client + * @param RequestApi $client * @depends testPing */ - public function testPipeline(Client $client) + public function testPipeline(RequestApi $client) { $this->assertFalse($client->isBusy()); @@ -61,10 +62,10 @@ public function testPipeline(Client $client) /** * - * @param Client $client + * @param RequestApi $client * @depends testPipeline */ - public function testInvalidCommand(Client $client) + public function testInvalidCommand(RequestApi $client) { $client->doesnotexist(1, 2, 3)->then($this->expectCallableNever()); @@ -75,10 +76,10 @@ public function testInvalidCommand(Client $client) /** * - * @param Client $client + * @param RequestApi $client * @depends testInvalidCommand */ - public function testMultiExecEmpty(Client $client) + public function testMultiExecEmpty(RequestApi $client) { $client->multi()->then($this->expectCallableOnce('OK')); $client->exec()->then($this->expectCallableOnce(array())); @@ -90,10 +91,10 @@ public function testMultiExecEmpty(Client $client) /** * - * @param Client $client + * @param RequestApi $client * @depends testMultiExecEmpty */ - public function testMultiExecQueuedExecHasValues(Client $client) + public function testMultiExecQueuedExecHasValues(RequestApi $client) { $client->multi()->then($this->expectCallableOnce('OK')); $client->set('b', 10)->then($this->expectCallableOnce('QUEUED')); @@ -109,8 +110,8 @@ public function testMultiExecQueuedExecHasValues(Client $client) public function testPubSub() { - $consumer = $this->createClient(); - $producer = $this->createClient(); + $consumer = $this->createClientApi(); + $producer = $this->createClientApi(); $that = $this; @@ -126,36 +127,36 @@ public function testPubSub() public function testClose() { $client = $this->createClient(); + $api = new RequestApi($client); - $client->get('willBeCanceledAnyway')->then(null, $this->expectCallableOnce()); + $api->get('willBeCanceledAnyway')->then(null, $this->expectCallableOnce()); $client->close(); - $client->get('willBeRejectedRightAway')->then(null, $this->expectCallableOnce()); + $api->get('willBeRejectedRightAway')->then(null, $this->expectCallableOnce()); } public function testInvalidProtocol() { $client = $this->createClientResponse("communication does not conform to protocol\r\n"); + $api = new RequestApi($client); $client->on('error', $this->expectCallableOnce()); $client->on('close', $this->expectCallableOnce()); - $client->get('willBeRejectedDueToClosing')->then(null, $this->expectCallableOnce()); + $api->get('willBeRejectedDueToClosing')->then(null, $this->expectCallableOnce()); - $this->waitFor($client); + $this->waitFor($api); } - public function testInvalidServerRepliesWithDuplicateMessages() + public function testAdditionalServerRepliesAreBeingIgnored() { $client = $this->createClientResponse("+OK\r\n-ERR invalid\r\n"); + $api = new RequestApi($client); - $client->on('error', $this->expectCallableOnce()); - $client->on('close', $this->expectCallableOnce()); - - $client->set('a', 0)->then($this->expectCallableOnce('OK')); + $api->set('a', 0)->then($this->expectCallableOnce('OK')); - $this->waitFor($client); + $this->waitFor($api); } /** @@ -183,6 +184,11 @@ protected function createClient() return $client; } + protected function createClientApi() + { + return new RequestApi($this->createClient()); + } + protected function createClientResponse($response) { $fp = fopen('php://temp', 'r+'); @@ -201,7 +207,7 @@ protected function createServer($response) } - protected function waitFor(Client $client) + protected function waitFor(RequestApi $client) { $this->assertTrue($client->isBusy()); diff --git a/tests/ResponseApiTest.php b/tests/ResponseApiTest.php new file mode 100644 index 0000000..cf400a4 --- /dev/null +++ b/tests/ResponseApiTest.php @@ -0,0 +1,89 @@ +stream = $this->getMock('React\Stream\Stream'); + //$this->client = new Client($this->stream); + $this->client = $this->getMockBuilder('Clue\React\Redis\Client')->disableOriginalConstructor()->setMethods(array('sendRequest', 'close'))->getMock(); + $this->RequestApi = new RequestApi($this->client); + } + + public function testPingPong() + { + $this->client->expects($this->once())->method('sendRequest')->with($this->equalTo('ping')); + + $promise = $this->RequestApi->ping(); + + $this->client->emit('message', array(new BulkReply('PONG'))); + + $this->expectPromiseResolve($promise); + $promise->then($this->expectCallableOnce('PONG')); + } + + public function testErrorReply() + { + $promise = $this->RequestApi->invalid(); + + $err = new ErrorReply('ERR invalid command'); + $this->client->emit('message', array($err)); + + $this->expectPromiseReject($promise); + $promise->then(null, $this->expectCallableOnce($err)); + } + + public function testClosingClientRejectsAllRemainingRequests() + { + $promise = $this->RequestApi->ping(); + $this->assertTrue($this->RequestApi->isBusy()); + + $this->client->emit('close'); + + $this->expectPromiseReject($promise); + $this->assertFalse($this->RequestApi->isBusy()); + } + + public function testClosedClientRejectsAllNewRequests() + { + $promise = $this->RequestApi->ping(); + + $this->client->emit('close'); + + $this->expectPromiseReject($promise); + $this->assertFalse($this->RequestApi->isBusy()); + } + + public function testEndingNonBusyClosesClient() + { + $this->client->expects($this->once())->method('close'); + $this->RequestApi->end(); + } + + public function testEndingBusyClosesClientWhenNotBusyAnymore() + { + // count how often the "close" method has been called + $closed = 0; + $this->client->method('close')->will($this->returnCallback(function() use (&$closed) { + ++$closed; + })); + + $promise = $this->RequestApi->ping(); + $this->assertEquals(0, $closed); + + $this->RequestApi->end(); + $this->assertEquals(0, $closed); + + $this->client->emit('message', array(new BulkReply('PONG'))); + $this->assertEquals(1, $closed); + } +}