From fc118922d7795166d455f3c4a34e3a7a04f62618 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Mon, 25 Aug 2014 01:13:09 +0200 Subject: [PATCH] Revert "Split off ResponseApi from Client" --- README.md | 12 ++--- example/cli.php | 4 +- example/incr.php | 9 ++-- src/Client.php | 75 +++++++++++++++++++++++------- src/Factory.php | 6 +-- src/RequestApi.php | 98 --------------------------------------- tests/ClientTest.php | 79 ------------------------------- tests/FunctionalTest.php | 50 +++++++++----------- tests/ResponseApiTest.php | 89 ----------------------------------- 9 files changed, 93 insertions(+), 329 deletions(-) delete mode 100644 src/RequestApi.php delete mode 100644 tests/ClientTest.php delete mode 100644 tests/ResponseApiTest.php diff --git a/README.md b/README.md index 50eca85..15928c6 100644 --- a/README.md +++ b/README.md @@ -13,21 +13,19 @@ local redis server and send some requests: $factory = new Factory($connector); $factory->createClient()->then(function (Client $client) use ($loop) { - $api = new RequestApi($client); + $client->SET('greeting', 'Hello world'); + $client->APPEND('greeting', '!'); - $api->set('greeting', 'Hello world'); - $api->append('greeting', '!'); - - $api->get('greeting')->then(function ($greeting) { + $client->GET('greeting')->then(function ($greeting) { echo $greeting . PHP_EOL; }); - $api->incr('invocation')->then(function ($n) { + $client->INCR('invocation')->then(function ($n) { echo 'count: ' . $n . PHP_EOL; }); // end connection once all pending requests have been resolved - $api->end(); + $client->end(); }); $loop->run(); diff --git a/example/cli.php b/example/cli.php index 6b01f15..80c24c7 100644 --- a/example/cli.php +++ b/example/cli.php @@ -30,7 +30,7 @@ $line = fgets(STDIN); if ($line === false || $line === '') { echo '# CTRL-D -> Ending connection...' . PHP_EOL; - $client->close(); + $client->end(); } else { $line = rtrim($line); @@ -39,7 +39,7 @@ } else { $params = explode(' ', $line); $method = array_shift($params); - $client->send($method, $params); + call_user_func_array(array($client, $method), $params); } } }); diff --git a/example/incr.php b/example/incr.php index b9bea4e..2ef0d3c 100644 --- a/example/incr.php +++ b/example/incr.php @@ -2,7 +2,6 @@ use Clue\React\Redis\Client; use Clue\React\Redis\Factory; -use Clue\React\Redis\RequestApi; require __DIR__ . '/../vendor/autoload.php'; @@ -13,15 +12,13 @@ $factory = new Factory($connector); $factory->createClient()->then(function (Client $client) { - $api = new RequestApi($client); + $client->incr('test'); - $api->incr('test'); - - $api->get('test')->then(function ($result) { + $client->get('test')->then(function ($result) { var_dump($result); }); - $api->end(); + $client->end(); }); $loop->run(); diff --git a/src/Client.php b/src/Client.php index fd2a020..8193927 100644 --- a/src/Client.php +++ b/src/Client.php @@ -20,6 +20,8 @@ class Client extends EventEmitter private $stream; private $parser; private $serializer; + private $requests = array(); + private $ending = false; public function __construct(Stream $stream, ParserInterface $parser = null, SerializerInterface $serializer = null) { @@ -46,7 +48,7 @@ public function __construct(Stream $stream, ParserInterface $parser = null, Seri foreach ($models as $data) { try { - $that->emit('message', array($data, $that)); + $that->handleMessage($data); } catch (UnderflowException $error) { $that->emit('error', array($error)); @@ -65,32 +67,73 @@ public function __construct(Stream $stream, ParserInterface $parser = null, Seri $this->serializer = $serializer; } - /** - * Sends command with given $name and additial $args - * - * @param name $name - * @param array $args - */ - public function sendRequest($name, array $args = array()) + public function __call($name, $args) + { + $request = new Deferred(); + + if ($this->ending) { + $request->reject(new RuntimeException('Connection closed')); + } else { + $this->stream->write($this->serializer->getRequestMessage($name, $args)); + $this->requests []= $request; + } + + return $request->promise(); + } + + public function handleMessage(ModelInterface $message) + { + $this->emit('message', array($message, $this)); + + if (!$this->requests) { + throw new UnderflowException('Unexpected reply received, no matching request found'); + } + + $request = array_shift($this->requests); + /* @var $request Deferred */ + + if ($message instanceof ErrorReply) { + $request->reject($message); + } else { + $request->resolve($message->getValueNative()); + } + + if ($this->ending && !$this->isBusy()) { + $this->close(); + } + } + + public function isBusy() { - $this->stream->write($this->serializer->getRequestMessage($name, $args)); + return !!$this->requests; } /** - * Sends given message model (request message) + * end connection once all pending requests have been replied to * - * @param ModelInterface $message + * @uses self::close() once all replies have been received + * @see self::close() for closing the connection immediately */ - public function sendMessage(ModelInterface $message) + public function end() { - $this->stream->write($message->getMessageSerialized($this->serializer)); + $this->ending = true; + + if (!$this->isBusy()) { + $this->close(); + } } - /** - * Immediately terminate the connection and discard incoming and outgoing buffers - */ public function close() { + $this->ending = true; + $this->stream->close(); + + // reject all remaining requests in the queue + while($this->requests) { + $request = array_shift($this->requests); + /* @var $request Request */ + $request->reject(new RuntimeException('Connection closing')); + } } } diff --git a/src/Factory.php b/src/Factory.php index bad5e00..f4e5054 100644 --- a/src/Factory.php +++ b/src/Factory.php @@ -43,8 +43,7 @@ public function createClient($target = null) if ($auth !== null) { $promise = $promise->then(function (Client $client) use ($auth) { - $api = new RequestApi($client); - return $api->auth($auth)->then( + return $client->auth($auth)->then( function () use ($client) { return $client; }, @@ -58,8 +57,7 @@ function ($error) use ($client) { if ($db !== null) { $promise = $promise->then(function (Client $client) use ($db) { - $api = new RequestApi($client); - return $api->select($db)->then( + return $client->select($db)->then( function () use ($client) { return $client; }, diff --git a/src/RequestApi.php b/src/RequestApi.php deleted file mode 100644 index ba476bd..0000000 --- a/src/RequestApi.php +++ /dev/null @@ -1,98 +0,0 @@ -client = $client; - } - - public function __call($name, $args) - { - $request = new Deferred(); - - if ($this->ending) { - $request->reject(new RuntimeException('Connection closed')); - } else { - if (!$this->isBusy()) { - $this->client->on('message', array($this, 'handleMessage')); - $this->client->on('close', array($this, 'handleClose')); - } - - $this->client->sendRequest($name, $args); - $this->requests []= $request; - } - - return $request->promise(); - } - - public function handleMessage(ModelInterface $message) - { - $request = array_shift($this->requests); - /* @var $request Deferred */ - - if ($message instanceof ErrorReply) { - $request->reject($message); - } else { - $request->resolve($message->getValueNative()); - } - - if (!$this->isBusy()) { - $this->client->removeListener('message', array($this, 'handleMessage')); - $this->client->removeListener('close', array($this, 'handleClose')); - - if ($this->ending) { - $this->client->close(); - } - } - } - - public function handleClose() - { - $this->ending = true; - - $this->client->removeListener('message', array($this, 'handleMessage')); - $this->client->removeListener('close', array($this, 'handleClose')); - - // reject all remaining requests in the queue - while($this->requests) { - $request = array_shift($this->requests); - /* @var $request Request */ - $request->reject(new RuntimeException('Connection closing')); - } - - $this->requests = array(); - } - - public function isBusy() - { - return !!$this->requests; - } - - /** - * end connection once all pending requests have been replied to - * - * @uses self::close() once all replies have been received - * @see self::close() for closing the connection immediately - */ - public function end() - { - $this->ending = true; - - if (!$this->isBusy()) { - $this->client->close(); - } - } -} diff --git a/tests/ClientTest.php b/tests/ClientTest.php deleted file mode 100644 index d1a96c7..0000000 --- a/tests/ClientTest.php +++ /dev/null @@ -1,79 +0,0 @@ -stream = $this->getMockBuilder('React\Stream\Stream')->disableOriginalConstructor()->setMethods(array('write', 'close', 'resume', 'pause'))->getMock(); - $this->parser = $this->getMock('Clue\Redis\Protocol\Parser\ParserInterface'); - $this->serializer = $this->getMock('Clue\Redis\Protocol\Serializer\SerializerInterface'); - - $this->client = new Client($this->stream, $this->parser, $this->serializer); - } - - public function testSending() - { - $this->serializer->expects($this->once())->method('getRequestMessage')->with($this->equalTo('ping'))->will($this->returnValue('message')); - $this->stream->expects($this->once())->method('write')->with($this->equalTo('message')); - - $this->client->sendRequest('ping', array()); - } - - public function testClosingClientEmitsEvent() - { - //$this->client->on('close', $this->expectCallableOnce()); - - $this->client->close(); - } - - public function testClosingStreamClosesClient() - { - $this->client->on('close', $this->expectCallableOnce()); - - $this->stream->emit('close'); - } - - public function testReceiveParseErrorEmitsErrorEvent() - { - $this->client->on('error', $this->expectCallableOnce()); - //$this->client->on('close', $this->expectCallableOnce()); - - $this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->throwException(new ParserException())); - $this->stream->emit('data', array('message')); - } - - public function testReceiveMessageEmitsEvent() - { - $this->client->on('message', $this->expectCallableOnce()); - - $this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->returnValue(array(new IntegerReply(2)))); - $this->stream->emit('data', array('message')); - } - - public function testReceiveThrowMessageEmitsErrorEvent() - { - $this->client->on('message', $this->expectCallableOnce()); - $this->client->on('message', function() { - throw new UnderflowException(); - }); - - $this->client->on('error', $this->expectCallableOnce()); - - $this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->returnValue(array(new IntegerReply(2)))); - $this->stream->emit('data', array('message')); - } - - public function testDefaultCtor() - { - $client = new Client($this->stream); - } -} diff --git a/tests/FunctionalTest.php b/tests/FunctionalTest.php index 95bb22c..aa71767 100644 --- a/tests/FunctionalTest.php +++ b/tests/FunctionalTest.php @@ -7,7 +7,6 @@ use Clue\React\Redis\Factory; use Clue\React\Redis\Client; -use Clue\React\Redis\RequestApi; class FunctionalTest extends TestCase { @@ -26,7 +25,7 @@ public static function setUpBeforeClass() public function testPing() { - $client = $this->createClientApi(); + $client = $this->createClient(); $promise = $client->ping(); $this->assertInstanceOf('React\Promise\PromiseInterface', $promise); @@ -41,10 +40,10 @@ public function testPing() /** * - * @param RequestApi $client + * @param Client $client * @depends testPing */ - public function testPipeline(RequestApi $client) + public function testPipeline(Client $client) { $this->assertFalse($client->isBusy()); @@ -62,10 +61,10 @@ public function testPipeline(RequestApi $client) /** * - * @param RequestApi $client + * @param Client $client * @depends testPipeline */ - public function testInvalidCommand(RequestApi $client) + public function testInvalidCommand(Client $client) { $client->doesnotexist(1, 2, 3)->then($this->expectCallableNever()); @@ -76,10 +75,10 @@ public function testInvalidCommand(RequestApi $client) /** * - * @param RequestApi $client + * @param Client $client * @depends testInvalidCommand */ - public function testMultiExecEmpty(RequestApi $client) + public function testMultiExecEmpty(Client $client) { $client->multi()->then($this->expectCallableOnce('OK')); $client->exec()->then($this->expectCallableOnce(array())); @@ -91,10 +90,10 @@ public function testMultiExecEmpty(RequestApi $client) /** * - * @param RequestApi $client + * @param Client $client * @depends testMultiExecEmpty */ - public function testMultiExecQueuedExecHasValues(RequestApi $client) + public function testMultiExecQueuedExecHasValues(Client $client) { $client->multi()->then($this->expectCallableOnce('OK')); $client->set('b', 10)->then($this->expectCallableOnce('QUEUED')); @@ -110,8 +109,8 @@ public function testMultiExecQueuedExecHasValues(RequestApi $client) public function testPubSub() { - $consumer = $this->createClientApi(); - $producer = $this->createClientApi(); + $consumer = $this->createClient(); + $producer = $this->createClient(); $that = $this; @@ -127,36 +126,36 @@ public function testPubSub() public function testClose() { $client = $this->createClient(); - $api = new RequestApi($client); - $api->get('willBeCanceledAnyway')->then(null, $this->expectCallableOnce()); + $client->get('willBeCanceledAnyway')->then(null, $this->expectCallableOnce()); $client->close(); - $api->get('willBeRejectedRightAway')->then(null, $this->expectCallableOnce()); + $client->get('willBeRejectedRightAway')->then(null, $this->expectCallableOnce()); } public function testInvalidProtocol() { $client = $this->createClientResponse("communication does not conform to protocol\r\n"); - $api = new RequestApi($client); $client->on('error', $this->expectCallableOnce()); $client->on('close', $this->expectCallableOnce()); - $api->get('willBeRejectedDueToClosing')->then(null, $this->expectCallableOnce()); + $client->get('willBeRejectedDueToClosing')->then(null, $this->expectCallableOnce()); - $this->waitFor($api); + $this->waitFor($client); } - public function testAdditionalServerRepliesAreBeingIgnored() + public function testInvalidServerRepliesWithDuplicateMessages() { $client = $this->createClientResponse("+OK\r\n-ERR invalid\r\n"); - $api = new RequestApi($client); - $api->set('a', 0)->then($this->expectCallableOnce('OK')); + $client->on('error', $this->expectCallableOnce()); + $client->on('close', $this->expectCallableOnce()); + + $client->set('a', 0)->then($this->expectCallableOnce('OK')); - $this->waitFor($api); + $this->waitFor($client); } /** @@ -184,11 +183,6 @@ protected function createClient() return $client; } - protected function createClientApi() - { - return new RequestApi($this->createClient()); - } - protected function createClientResponse($response) { $fp = fopen('php://temp', 'r+'); @@ -207,7 +201,7 @@ protected function createServer($response) } - protected function waitFor(RequestApi $client) + protected function waitFor(Client $client) { $this->assertTrue($client->isBusy()); diff --git a/tests/ResponseApiTest.php b/tests/ResponseApiTest.php deleted file mode 100644 index cf400a4..0000000 --- a/tests/ResponseApiTest.php +++ /dev/null @@ -1,89 +0,0 @@ -stream = $this->getMock('React\Stream\Stream'); - //$this->client = new Client($this->stream); - $this->client = $this->getMockBuilder('Clue\React\Redis\Client')->disableOriginalConstructor()->setMethods(array('sendRequest', 'close'))->getMock(); - $this->RequestApi = new RequestApi($this->client); - } - - public function testPingPong() - { - $this->client->expects($this->once())->method('sendRequest')->with($this->equalTo('ping')); - - $promise = $this->RequestApi->ping(); - - $this->client->emit('message', array(new BulkReply('PONG'))); - - $this->expectPromiseResolve($promise); - $promise->then($this->expectCallableOnce('PONG')); - } - - public function testErrorReply() - { - $promise = $this->RequestApi->invalid(); - - $err = new ErrorReply('ERR invalid command'); - $this->client->emit('message', array($err)); - - $this->expectPromiseReject($promise); - $promise->then(null, $this->expectCallableOnce($err)); - } - - public function testClosingClientRejectsAllRemainingRequests() - { - $promise = $this->RequestApi->ping(); - $this->assertTrue($this->RequestApi->isBusy()); - - $this->client->emit('close'); - - $this->expectPromiseReject($promise); - $this->assertFalse($this->RequestApi->isBusy()); - } - - public function testClosedClientRejectsAllNewRequests() - { - $promise = $this->RequestApi->ping(); - - $this->client->emit('close'); - - $this->expectPromiseReject($promise); - $this->assertFalse($this->RequestApi->isBusy()); - } - - public function testEndingNonBusyClosesClient() - { - $this->client->expects($this->once())->method('close'); - $this->RequestApi->end(); - } - - public function testEndingBusyClosesClientWhenNotBusyAnymore() - { - // count how often the "close" method has been called - $closed = 0; - $this->client->method('close')->will($this->returnCallback(function() use (&$closed) { - ++$closed; - })); - - $promise = $this->RequestApi->ping(); - $this->assertEquals(0, $closed); - - $this->RequestApi->end(); - $this->assertEquals(0, $closed); - - $this->client->emit('message', array(new BulkReply('PONG'))); - $this->assertEquals(1, $closed); - } -}