diff --git a/src/Client.php b/src/Client.php index 7c447cd..3ca5a13 100644 --- a/src/Client.php +++ b/src/Client.php @@ -12,6 +12,14 @@ * @event data(ModelInterface $messageModel, Client $thisClient) * @event close() * + * @event message($channel, $message) + * @event subscribe($channel, $numberOfChannels) + * @event unsubscribe($channel, $numberOfChannels) + * + * @event pmessage($pattern, $channel, $message) + * @event psubscribe($channel, $numberOfChannels) + * @event punsubscribe($channel, $numberOfChannels) + * * @event monitor(ModelInterface $statusModel) */ interface Client extends EventEmitterInterface diff --git a/src/StreamingClient.php b/src/StreamingClient.php index 229f6ec..399126e 100644 --- a/src/StreamingClient.php +++ b/src/StreamingClient.php @@ -11,9 +11,11 @@ use Clue\Redis\Protocol\Factory as ProtocolFactory; use UnderflowException; use RuntimeException; +use InvalidArgumentException; use React\Promise\Deferred; use Clue\Redis\Protocol\Model\ErrorReply; use Clue\Redis\Protocol\Model\ModelInterface; +use Clue\Redis\Protocol\Model\MultiBulkReply; use Clue\Redis\Protocol\Model\StatusReply; class StreamingClient extends EventEmitter implements Client @@ -25,6 +27,8 @@ class StreamingClient extends EventEmitter implements Client private $ending = false; private $closed = false; + private $subscribed = 0; + private $psubscribed = 0; private $monitoring = false; public function __construct(Stream $stream, ParserInterface $parser = null, SerializerInterface $serializer = null) @@ -74,18 +78,43 @@ public function __call($name, $args) { $request = new Deferred(); + $name = strtolower($name); + + // special (p)(un)subscribe commands only accept a single parameter and have custom response logic applied + static $pubsubs = array('subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe'); + if ($this->ending) { $request->reject(new RuntimeException('Connection closed')); + } elseif (count($args) !== 1 && in_array($name, $pubsubs)) { + $request->reject(new InvalidArgumentException('PubSub commands limited to single argument')); } else { $this->stream->write($this->serializer->getRequestMessage($name, $args)); $this->requests []= $request; } - if (strtolower($name) === 'monitor') { + if ($name === 'monitor') { $monitoring =& $this->monitoring; $request->then(function () use (&$monitoring) { $monitoring = true; }); + } elseif (in_array($name, $pubsubs)) { + $that = $this; + $subscribed =& $this->subscribed; + $psubscribed =& $this->psubscribed; + + $request->then(function ($array) use ($that, &$subscribed, &$psubscribed) { + $first = array_shift($array); + + // (p)(un)subscribe messages are to be forwarded + $that->emit($first, $array); + + // remember number of (p)subscribe topics + if ($first === 'subscribe' || $first === 'unsubscribe') { + $subscribed = $array[1]; + } else { + $psubscribed = $array[1]; + } + }); } return $request->promise(); @@ -100,6 +129,17 @@ public function handleMessage(ModelInterface $message) return; } + if (($this->subscribed !== 0 || $this->psubscribed !== 0) && $message instanceof MultiBulkReply) { + $array = $message->getValueNative(); + $first = array_shift($array); + + // pub/sub messages are to be forwarded and should not be processed as request responses + if (in_array($first, array('message', 'pmessage'))) { + $this->emit($first, $array); + return; + } + } + if (!$this->requests) { throw new UnderflowException('Unexpected reply received, no matching request found'); } diff --git a/tests/FunctionalTest.php b/tests/FunctionalTest.php index c2d7bbf..7e9f50a 100644 --- a/tests/FunctionalTest.php +++ b/tests/FunctionalTest.php @@ -34,6 +34,18 @@ public function testPing() return $client; } + public function testMgetIsNotInterpretedAsSubMessage() + { + $client = $this->createClient(); + + $client->mset('message', 'message', 'channel', 'channel', 'payload', 'payload'); + + $client->mget('message', 'channel', 'payload')->then($this->expectCallableOnce()); + $client->on('message', $this->expectCallableNever()); + + $this->waitFor($client); + } + /** * * @param StreamingClient $client diff --git a/tests/StreamingClientTest.php b/tests/StreamingClientTest.php index 6aa57e1..5ae1240 100644 --- a/tests/StreamingClientTest.php +++ b/tests/StreamingClientTest.php @@ -5,6 +5,8 @@ use Clue\Redis\Protocol\Model\IntegerReply; use Clue\Redis\Protocol\Model\BulkReply; use Clue\Redis\Protocol\Model\ErrorReply; +use Clue\Redis\Protocol\Model\MultiBulkReply; +use Clue\React\Redis\Client; use Clue\Redis\Protocol\Model\StatusReply; class ClientTest extends TestCase @@ -210,4 +212,47 @@ public function testReceivingUnexpectedMessageThrowsException() $this->setExpectedException('UnderflowException'); $this->client->handleMessage(new BulkReply('PONG')); } + + public function testPubsubSubscribe() + { + $promise = $this->client->subscribe('test'); + $this->expectPromiseResolve($promise); + + $this->client->on('subscribe', $this->expectCallableOnce()); + $this->client->handleMessage(new MultiBulkReply(array(new BulkReply('subscribe'), new BulkReply('test'), new IntegerReply(1)))); + + return $this->client; + } + + /** + * @depends testPubsubSubscribe + * @param Client $client + */ + public function testPubsubPatternSubscribe(Client $client) + { + $promise = $client->psubscribe('demo_*'); + $this->expectPromiseResolve($promise); + + $client->on('psubscribe', $this->expectCallableOnce()); + $client->handleMessage(new MultiBulkReply(array(new BulkReply('psubscribe'), new BulkReply('demo_*'), new IntegerReply(1)))); + + return $client; + } + + /** + * @depends testPubsubPatternSubscribe + * @param Client $client + */ + public function testPubsubMessage(Client $client) + { + $client->on('message', $this->expectCallableOnce()); + $client->handleMessage(new MultiBulkReply(array(new BulkReply('message'), new BulkReply('test'), new BulkReply('payload')))); + } + + public function testPubsubSubscribeSingleOnly() + { + $this->expectPromiseReject($this->client->subscribe('a', 'b')); + $this->expectPromiseReject($this->client->unsubscribe('a', 'b')); + $this->expectPromiseReject($this->client->unsubscribe()); + } }