Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,21 @@ local redis server and send some requests:

$factory = new Factory($connector);
$factory->createClient()->then(function (Client $client) use ($loop) {
$client->SET('greeting', 'Hello world');
$client->APPEND('greeting', '!');
$api = new RequestApi($client);

$client->GET('greeting')->then(function ($greeting) {
$api->set('greeting', 'Hello world');
$api->append('greeting', '!');

$api->get('greeting')->then(function ($greeting) {
echo $greeting . PHP_EOL;
});

$client->INCR('invocation')->then(function ($n) {
$api->incr('invocation')->then(function ($n) {
echo 'count: ' . $n . PHP_EOL;
});

// end connection once all pending requests have been resolved
$client->end();
$api->end();
});

$loop->run();
Expand Down
4 changes: 2 additions & 2 deletions example/cli.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
$line = fgets(STDIN);
if ($line === false || $line === '') {
echo '# CTRL-D -> Ending connection...' . PHP_EOL;
$client->end();
$client->close();
} else {
$line = rtrim($line);

Expand All @@ -39,7 +39,7 @@
} else {
$params = explode(' ', $line);
$method = array_shift($params);
call_user_func_array(array($client, $method), $params);
$client->send($method, $params);
}
}
});
Expand Down
9 changes: 6 additions & 3 deletions example/incr.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use Clue\React\Redis\Client;
use Clue\React\Redis\Factory;
use Clue\React\Redis\RequestApi;

require __DIR__ . '/../vendor/autoload.php';

Expand All @@ -12,13 +13,15 @@
$factory = new Factory($connector);

$factory->createClient()->then(function (Client $client) {
$client->incr('test');
$api = new RequestApi($client);

$client->get('test')->then(function ($result) {
$api->incr('test');

$api->get('test')->then(function ($result) {
var_dump($result);
});

$client->end();
$api->end();
});

$loop->run();
75 changes: 16 additions & 59 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ class Client extends EventEmitter
private $stream;
private $parser;
private $serializer;
private $requests = array();
private $ending = false;

public function __construct(Stream $stream, ParserInterface $parser = null, SerializerInterface $serializer = null)
{
Expand All @@ -48,7 +46,7 @@ public function __construct(Stream $stream, ParserInterface $parser = null, Seri

foreach ($models as $data) {
try {
$that->handleMessage($data);
$that->emit('message', array($data, $that));
}
catch (UnderflowException $error) {
$that->emit('error', array($error));
Expand All @@ -67,73 +65,32 @@ public function __construct(Stream $stream, ParserInterface $parser = null, Seri
$this->serializer = $serializer;
}

public function __call($name, $args)
{
$request = new Deferred();

if ($this->ending) {
$request->reject(new RuntimeException('Connection closed'));
} else {
$this->stream->write($this->serializer->getRequestMessage($name, $args));
$this->requests []= $request;
}

return $request->promise();
}

public function handleMessage(ModelInterface $message)
{
$this->emit('message', array($message, $this));

if (!$this->requests) {
throw new UnderflowException('Unexpected reply received, no matching request found');
}

$request = array_shift($this->requests);
/* @var $request Deferred */

if ($message instanceof ErrorReply) {
$request->reject($message);
} else {
$request->resolve($message->getValueNative());
}

if ($this->ending && !$this->isBusy()) {
$this->close();
}
}

public function isBusy()
/**
* Sends command with given $name and additial $args
*
* @param name $name
* @param array $args
*/
public function sendRequest($name, array $args = array())
{
return !!$this->requests;
$this->stream->write($this->serializer->getRequestMessage($name, $args));
}

/**
* end connection once all pending requests have been replied to
* Sends given message model (request message)
*
* @uses self::close() once all replies have been received
* @see self::close() for closing the connection immediately
* @param ModelInterface $message
*/
public function end()
public function sendMessage(ModelInterface $message)
{
$this->ending = true;

if (!$this->isBusy()) {
$this->close();
}
$this->stream->write($message->getMessageSerialized($this->serializer));
}

/**
* Immediately terminate the connection and discard incoming and outgoing buffers
*/
public function close()
{
$this->ending = true;

$this->stream->close();

// reject all remaining requests in the queue
while($this->requests) {
$request = array_shift($this->requests);
/* @var $request Request */
$request->reject(new RuntimeException('Connection closing'));
}
}
}
6 changes: 4 additions & 2 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public function createClient($target = null)

if ($auth !== null) {
$promise = $promise->then(function (Client $client) use ($auth) {
return $client->auth($auth)->then(
$api = new RequestApi($client);
return $api->auth($auth)->then(
function () use ($client) {
return $client;
},
Expand All @@ -57,7 +58,8 @@ function ($error) use ($client) {

if ($db !== null) {
$promise = $promise->then(function (Client $client) use ($db) {
return $client->select($db)->then(
$api = new RequestApi($client);
return $api->select($db)->then(
function () use ($client) {
return $client;
},
Expand Down
98 changes: 98 additions & 0 deletions src/RequestApi.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
<?php

namespace Clue\React\Redis;

use Clue\Redis\Protocol\Model\ModelInterface;
use UnderflowException;
use RuntimeException;
use React\Promise\Deferred;
use Clue\Redis\Protocol\Model\ErrorReply;

class RequestApi
{
private $client;
private $requests = array();
private $ending = false;

public function __construct(Client $client)
{
$this->client = $client;
}

public function __call($name, $args)
{
$request = new Deferred();

if ($this->ending) {
$request->reject(new RuntimeException('Connection closed'));
} else {
if (!$this->isBusy()) {
$this->client->on('message', array($this, 'handleMessage'));
$this->client->on('close', array($this, 'handleClose'));
}

$this->client->sendRequest($name, $args);
$this->requests []= $request;
}

return $request->promise();
}

public function handleMessage(ModelInterface $message)
{
$request = array_shift($this->requests);
/* @var $request Deferred */

if ($message instanceof ErrorReply) {
$request->reject($message);
} else {
$request->resolve($message->getValueNative());
}

if (!$this->isBusy()) {
$this->client->removeListener('message', array($this, 'handleMessage'));
$this->client->removeListener('close', array($this, 'handleClose'));

if ($this->ending) {
$this->client->close();
}
}
}

public function handleClose()
{
$this->ending = true;

$this->client->removeListener('message', array($this, 'handleMessage'));
$this->client->removeListener('close', array($this, 'handleClose'));

// reject all remaining requests in the queue
while($this->requests) {
$request = array_shift($this->requests);
/* @var $request Request */
$request->reject(new RuntimeException('Connection closing'));
}

$this->requests = array();
}

public function isBusy()
{
return !!$this->requests;
}

/**
* end connection once all pending requests have been replied to
*
* @uses self::close() once all replies have been received
* @see self::close() for closing the connection immediately
*/
public function end()
{
$this->ending = true;

if (!$this->isBusy()) {
$this->client->close();
}
}
}
79 changes: 79 additions & 0 deletions tests/ClientTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
<?php

use Clue\React\Redis\Client;
use Clue\Redis\Protocol\Parser\ParserException;
use Clue\Redis\Protocol\Model\IntegerReply;

class ClientTest extends TestCase
{
private $stream;
private $parser;
private $serializer;
private $client;

public function setUp()
{
$this->stream = $this->getMockBuilder('React\Stream\Stream')->disableOriginalConstructor()->setMethods(array('write', 'close', 'resume', 'pause'))->getMock();
$this->parser = $this->getMock('Clue\Redis\Protocol\Parser\ParserInterface');
$this->serializer = $this->getMock('Clue\Redis\Protocol\Serializer\SerializerInterface');

$this->client = new Client($this->stream, $this->parser, $this->serializer);
}

public function testSending()
{
$this->serializer->expects($this->once())->method('getRequestMessage')->with($this->equalTo('ping'))->will($this->returnValue('message'));
$this->stream->expects($this->once())->method('write')->with($this->equalTo('message'));

$this->client->sendRequest('ping', array());
}

public function testClosingClientEmitsEvent()
{
//$this->client->on('close', $this->expectCallableOnce());

$this->client->close();
}

public function testClosingStreamClosesClient()
{
$this->client->on('close', $this->expectCallableOnce());

$this->stream->emit('close');
}

public function testReceiveParseErrorEmitsErrorEvent()
{
$this->client->on('error', $this->expectCallableOnce());
//$this->client->on('close', $this->expectCallableOnce());

$this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->throwException(new ParserException()));
$this->stream->emit('data', array('message'));
}

public function testReceiveMessageEmitsEvent()
{
$this->client->on('message', $this->expectCallableOnce());

$this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->returnValue(array(new IntegerReply(2))));
$this->stream->emit('data', array('message'));
}

public function testReceiveThrowMessageEmitsErrorEvent()
{
$this->client->on('message', $this->expectCallableOnce());
$this->client->on('message', function() {
throw new UnderflowException();
});

$this->client->on('error', $this->expectCallableOnce());

$this->parser->expects($this->once())->method('pushIncoming')->with($this->equalTo('message'))->will($this->returnValue(array(new IntegerReply(2))));
$this->stream->emit('data', array('message'));
}

public function testDefaultCtor()
{
$client = new Client($this->stream);
}
}
Loading