Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add type definitions for all APIs and tests #128

Merged
merged 1 commit into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ $redis = $factory->createLazyClient('localhost:6379');
$redis->set('greeting', 'Hello world');
$redis->append('greeting', '!');

$redis->get('greeting')->then(function ($greeting) {
$redis->get('greeting')->then(function (string $greeting) {
// Hello world!
echo $greeting . PHP_EOL;
});

$redis->incr('invocation')->then(function ($n) {
$redis->incr('invocation')->then(function (int $n) {
echo 'This is invocation #' . $n . PHP_EOL;
});

Expand Down Expand Up @@ -184,7 +184,7 @@ subscribe to a channel and then receive incoming PubSub `message` events:
$channel = 'user';
$redis->subscribe($channel);

$redis->on('message', function ($channel, $payload) {
$redis->on('message', function (string $channel, string $payload) {
// pubsub message received on given $channel
var_dump($channel, json_decode($payload));
});
Expand All @@ -208,7 +208,7 @@ all incoming PubSub messages with the `pmessage` event:
$pattern = 'user.*';
$redis->psubscribe($pattern);

$redis->on('pmessage', function ($pattern, $channel, $payload) {
$redis->on('pmessage', function (string $pattern, string $channel, string $payload) {
// pubsub message received matching given $pattern
var_dump($channel, json_decode($payload));
});
Expand Down Expand Up @@ -248,16 +248,16 @@ Additionally, can listen for the following PubSub events to get notifications
about subscribed/unsubscribed channels and patterns:

```php
$redis->on('subscribe', function ($channel, $total) {
$redis->on('subscribe', function (string $channel, int $total) {
// subscribed to given $channel
});
$redis->on('psubscribe', function ($pattern, $total) {
$redis->on('psubscribe', function (string $pattern, int $total) {
// subscribed to matching given $pattern
});
$redis->on('unsubscribe', function ($channel, $total) {
$redis->on('unsubscribe', function (string $channel, int $total) {
// unsubscribed from given $channel
});
$redis->on('punsubscribe', function ($pattern, $total) {
$redis->on('punsubscribe', function (string $pattern, int $total) {
// unsubscribed from matching given $pattern
});
```
Expand Down
2 changes: 1 addition & 1 deletion examples/incr.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

$redis->incr('test');

$redis->get('test')->then(function ($result) {
$redis->get('test')->then(function (string $result) {
var_dump($result);
}, function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
Expand Down
2 changes: 1 addition & 1 deletion examples/publish.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
$channel = $argv[1] ?? 'channel';
$message = $argv[2] ?? 'message';

$redis->publish($channel, $message)->then(function ($received) {
$redis->publish($channel, $message)->then(function (int $received) {
echo 'Successfully published. Received by ' . $received . PHP_EOL;
}, function (Exception $e) {
echo 'Unable to publish: ' . $e->getMessage() . PHP_EOL;
Expand Down
6 changes: 3 additions & 3 deletions examples/subscribe.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
echo 'Unable to subscribe: ' . $e->getMessage() . PHP_EOL;
});

$redis->on('message', function ($channel, $message) {
$redis->on('message', function (string $channel, string $message) {
echo 'Message on ' . $channel . ': ' . $message . PHP_EOL;
});

// automatically re-subscribe to channel on connection issues
$redis->on('unsubscribe', function ($channel) use ($redis) {
$redis->on('unsubscribe', function (string $channel) use ($redis) {
echo 'Unsubscribed from ' . $channel . PHP_EOL;

Loop::addPeriodicTimer(2.0, function ($timer) use ($redis, $channel){
Loop::addPeriodicTimer(2.0, function (React\EventLoop\TimerInterface $timer) use ($redis, $channel){
$redis->subscribe($channel)->then(function () use ($timer) {
echo 'Now subscribed again' . PHP_EOL;
Loop::cancelTimer($timer);
Expand Down
6 changes: 3 additions & 3 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ interface Client extends EventEmitterInterface
* @param string[] $args
* @return PromiseInterface Promise<mixed,Exception>
*/
public function __call($name, $args);
public function __call(string $name, array $args): PromiseInterface;

/**
* end connection once all pending requests have been replied to
Expand All @@ -40,7 +40,7 @@ public function __call($name, $args);
* @uses self::close() once all replies have been received
* @see self::close() for closing the connection immediately
*/
public function end();
public function end(): void;

/**
* close connection immediately
Expand All @@ -50,5 +50,5 @@ public function end();
* @return void
* @see self::end() for closing the connection once the client is idle
*/
public function close();
public function close(): void;
}
7 changes: 4 additions & 3 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use React\Promise\Timer\TimeoutException;
use React\Socket\ConnectionInterface;
use React\Socket\Connector;
Expand Down Expand Up @@ -40,10 +41,10 @@ public function __construct(LoopInterface $loop = null, ConnectorInterface $conn
* Create Redis client connected to address of given redis instance
*
* @param string $uri Redis server URI to connect to
* @return \React\Promise\PromiseInterface<Client,\Exception> Promise that will
* @return PromiseInterface<Client,\Exception> Promise that will
* be fulfilled with `Client` on success or rejects with `\Exception` on error.
*/
public function createClient($uri)
public function createClient(string $uri): PromiseInterface
{
// support `redis+unix://` scheme for Unix domain socket (UDS) paths
if (preg_match('/^(redis\+unix:\/\/(?:[^:]*:[^@]*@)?)(.+?)?$/', $uri, $match)) {
Expand Down Expand Up @@ -184,7 +185,7 @@ function (\Exception $e) use ($redis, $uri) {
* @param string $target
* @return Client
*/
public function createLazyClient($target)
public function createLazyClient($target): Client
{
return new LazyClient($target, $this, $this->loop);
}
Expand Down
59 changes: 34 additions & 25 deletions src/LazyClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,48 @@
namespace Clue\React\Redis;

use Evenement\EventEmitter;
use React\Stream\Util;
use React\EventLoop\LoopInterface;
use React\EventLoop\TimerInterface;
use React\Promise\PromiseInterface;
use React\Stream\Util;
use function React\Promise\reject;

/**
* @internal
*/
class LazyClient extends EventEmitter implements Client
{
/** @var string */
private $target;

/** @var Factory */
private $factory;

/** @var bool */
private $closed = false;
private $promise;

/** @var ?PromiseInterface */
private $promise = null;

/** @var LoopInterface */
private $loop;

/** @var float */
private $idlePeriod = 60.0;
private $idleTimer;

/** @var ?TimerInterface */
private $idleTimer = null;

/** @var int */
private $pending = 0;

/** @var array<string,bool> */
private $subscribed = [];

/** @var array<string,bool> */
private $psubscribed = [];

/**
* @param $target
*/
public function __construct($target, Factory $factory, LoopInterface $loop)
public function __construct(string $target, Factory $factory, LoopInterface $loop)
{
$args = [];
\parse_str((string) \parse_url($target, \PHP_URL_QUERY), $args);
Expand All @@ -42,7 +57,7 @@ public function __construct($target, Factory $factory, LoopInterface $loop)
$this->loop = $loop;
}

private function client()
private function client(): PromiseInterface
{
if ($this->promise !== null) {
return $this->promise;
Expand Down Expand Up @@ -71,16 +86,16 @@ private function client()
});

// keep track of all channels and patterns this connection is subscribed to
$redis->on('subscribe', function ($channel) {
$redis->on('subscribe', function (string $channel) {
$this->subscribed[$channel] = true;
});
$redis->on('psubscribe', function ($pattern) {
$redis->on('psubscribe', function (string $pattern) {
$this->psubscribed[$pattern] = true;
});
$redis->on('unsubscribe', function ($channel) {
$redis->on('unsubscribe', function (string $channel) {
unset($this->subscribed[$channel]);
});
$redis->on('punsubscribe', function ($pattern) {
$redis->on('punsubscribe', function (string $pattern) {
unset($this->psubscribed[$pattern]);
});

Expand All @@ -106,7 +121,7 @@ private function client()
});
}

public function __call($name, $args)
public function __call(string $name, array $args): PromiseInterface
{
if ($this->closed) {
return reject(new \RuntimeException(
Expand All @@ -122,15 +137,15 @@ function ($result) {
$this->idle();
return $result;
},
function ($error) {
function (\Exception $error) {
$this->idle();
throw $error;
}
);
});
}

public function end()
public function end(): void
{
if ($this->promise === null) {
$this->close();
Expand All @@ -140,15 +155,15 @@ public function end()
return;
}

return $this->client()->then(function (Client $redis) {
$this->client()->then(function (Client $redis) {
$redis->on('close', function () {
$this->close();
});
$redis->end();
});
}

public function close()
public function close(): void
{
if ($this->closed) {
return;
Expand Down Expand Up @@ -176,10 +191,7 @@ public function close()
$this->removeAllListeners();
}

/**
* @internal
*/
public function awake()
private function awake(): void
{
++$this->pending;

Expand All @@ -189,10 +201,7 @@ public function awake()
}
}

/**
* @internal
*/
public function idle()
private function idle(): void
{
--$this->pending;

Expand Down
27 changes: 21 additions & 6 deletions src/StreamingClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,36 @@
use Clue\Redis\Protocol\Serializer\SerializerInterface;
use Evenement\EventEmitter;
use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use React\Stream\DuplexStreamInterface;

/**
* @internal
*/
class StreamingClient extends EventEmitter implements Client
{
/** @var DuplexStreamInterface */
private $stream;

/** @var ParserInterface */
private $parser;

/** @var SerializerInterface */
private $serializer;

/** @var Deferred[] */
private $requests = [];

/** @var bool */
private $ending = false;

/** @var bool */
private $closed = false;

/** @var int */
private $subscribed = 0;

/** @var int */
private $psubscribed = 0;

public function __construct(DuplexStreamInterface $stream, ParserInterface $parser = null, SerializerInterface $serializer = null)
Expand All @@ -40,7 +55,7 @@ public function __construct(DuplexStreamInterface $stream, ParserInterface $pars
}
}

$stream->on('data', function($chunk) use ($parser) {
$stream->on('data', function (string $chunk) use ($parser) {
try {
$models = $parser->pushIncoming($chunk);
} catch (ParserException $error) {
Expand Down Expand Up @@ -71,7 +86,7 @@ public function __construct(DuplexStreamInterface $stream, ParserInterface $pars
$this->serializer = $serializer;
}

public function __call($name, $args)
public function __call(string $name, array $args): PromiseInterface
{
$request = new Deferred();
$promise = $request->promise();
Expand Down Expand Up @@ -102,7 +117,7 @@ public function __call($name, $args)
}

if (in_array($name, $pubsubs)) {
$promise->then(function ($array) {
$promise->then(function (array $array) {
$first = array_shift($array);

// (p)(un)subscribe messages are to be forwarded
Expand All @@ -120,7 +135,7 @@ public function __call($name, $args)
return $promise;
}

public function handleMessage(ModelInterface $message)
public function handleMessage(ModelInterface $message): void
{
if (($this->subscribed !== 0 || $this->psubscribed !== 0) && $message instanceof MultiBulkReply) {
$array = $message->getValueNative();
Expand Down Expand Up @@ -154,7 +169,7 @@ public function handleMessage(ModelInterface $message)
}
}

public function end()
public function end(): void
{
$this->ending = true;

Expand All @@ -163,7 +178,7 @@ public function end()
}
}

public function close()
public function close(): void
{
if ($this->closed) {
return;
Expand Down
Loading