Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,19 @@ local redis server and send some requests:

$factory = new Factory($connector);
$factory->createClient()->then(function (Client $client) use ($loop) {
$api = new RequestApi($client);
$client->SET('greeting', 'Hello world');
$client->APPEND('greeting', '!');

$api->set('greeting', 'Hello world');
$api->append('greeting', '!');

$api->get('greeting')->then(function ($greeting) {
$client->GET('greeting')->then(function ($greeting) {
echo $greeting . PHP_EOL;
});

$api->incr('invocation')->then(function ($n) {
$client->INCR('invocation')->then(function ($n) {
echo 'count: ' . $n . PHP_EOL;
});

// end connection once all pending requests have been resolved
$api->end();
$client->end();
});

$loop->run();
Expand Down
4 changes: 2 additions & 2 deletions example/cli.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
$line = fgets(STDIN);
if ($line === false || $line === '') {
echo '# CTRL-D -> Ending connection...' . PHP_EOL;
$client->close();
$client->end();
} else {
$line = rtrim($line);

Expand All @@ -39,7 +39,7 @@
} else {
$params = explode(' ', $line);
$method = array_shift($params);
$client->send($method, $params);
call_user_func_array(array($client, $method), $params);
}
}
});
Expand Down
9 changes: 3 additions & 6 deletions example/incr.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use Clue\React\Redis\Client;
use Clue\React\Redis\Factory;
use Clue\React\Redis\RequestApi;

require __DIR__ . '/../vendor/autoload.php';

Expand All @@ -13,15 +12,13 @@
$factory = new Factory($connector);

$factory->createClient()->then(function (Client $client) {
$api = new RequestApi($client);
$client->incr('test');

$api->incr('test');

$api->get('test')->then(function ($result) {
$client->get('test')->then(function ($result) {
var_dump($result);
});

$api->end();
$client->end();
});

$loop->run();
75 changes: 59 additions & 16 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class Client extends EventEmitter
private $stream;
private $parser;
private $serializer;
private $requests = array();
private $ending = false;

public function __construct(Stream $stream, ParserInterface $parser = null, SerializerInterface $serializer = null)
{
Expand All @@ -46,7 +48,7 @@ public function __construct(Stream $stream, ParserInterface $parser = null, Seri

foreach ($models as $data) {
try {
$that->emit('message', array($data, $that));
$that->handleMessage($data);
}
catch (UnderflowException $error) {
$that->emit('error', array($error));
Expand All @@ -65,32 +67,73 @@ public function __construct(Stream $stream, ParserInterface $parser = null, Seri
$this->serializer = $serializer;
}

/**
* Sends command with given $name and additial $args
*
* @param name $name
* @param array $args
*/
public function sendRequest($name, array $args = array())
public function __call($name, $args)
{
$request = new Deferred();

if ($this->ending) {
$request->reject(new RuntimeException('Connection closed'));
} else {
$this->stream->write($this->serializer->getRequestMessage($name, $args));
$this->requests []= $request;
}

return $request->promise();
}

public function handleMessage(ModelInterface $message)
{
$this->emit('message', array($message, $this));

if (!$this->requests) {
throw new UnderflowException('Unexpected reply received, no matching request found');
}

$request = array_shift($this->requests);
/* @var $request Deferred */

if ($message instanceof ErrorReply) {
$request->reject($message);
} else {
$request->resolve($message->getValueNative());
}

if ($this->ending && !$this->isBusy()) {
$this->close();
}
}

public function isBusy()
{
$this->stream->write($this->serializer->getRequestMessage($name, $args));
return !!$this->requests;
}

/**
* Sends given message model (request message)
* end connection once all pending requests have been replied to
*
* @param ModelInterface $message
* @uses self::close() once all replies have been received
* @see self::close() for closing the connection immediately
*/
public function sendMessage(ModelInterface $message)
public function end()
{
$this->stream->write($message->getMessageSerialized($this->serializer));
$this->ending = true;

if (!$this->isBusy()) {
$this->close();
}
}

/**
* Immediately terminate the connection and discard incoming and outgoing buffers
*/
public function close()
{
$this->ending = true;

$this->stream->close();

// reject all remaining requests in the queue
while($this->requests) {
$request = array_shift($this->requests);
/* @var $request Request */
$request->reject(new RuntimeException('Connection closing'));
}
}
}
6 changes: 2 additions & 4 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ public function createClient($target = null)

if ($auth !== null) {
$promise = $promise->then(function (Client $client) use ($auth) {
$api = new RequestApi($client);
return $api->auth($auth)->then(
return $client->auth($auth)->then(
function () use ($client) {
return $client;
},
Expand All @@ -58,8 +57,7 @@ function ($error) use ($client) {

if ($db !== null) {
$promise = $promise->then(function (Client $client) use ($db) {
$api = new RequestApi($client);
return $api->select($db)->then(
return $client->select($db)->then(
function () use ($client) {
return $client;
},
Expand Down
98 changes: 0 additions & 98 deletions src/RequestApi.php

This file was deleted.

79 changes: 0 additions & 79 deletions tests/ClientTest.php

This file was deleted.

Loading