From 8771a4c86d109766d186e50b7c9b65779c5ceeae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Sat, 26 Jul 2014 01:50:51 +0200 Subject: [PATCH 1/3] Skeleton for subscription API --- src/SubscriptionApi.php | 50 +++++++++++++++++++++++++++++++++++ tests/SubscriptionApiTest.php | 23 ++++++++++++++++ 2 files changed, 73 insertions(+) create mode 100644 src/SubscriptionApi.php create mode 100644 tests/SubscriptionApiTest.php diff --git a/src/SubscriptionApi.php b/src/SubscriptionApi.php new file mode 100644 index 0000000..a4ebd80 --- /dev/null +++ b/src/SubscriptionApi.php @@ -0,0 +1,50 @@ +client = $client; + $this->responseApi = $responseApi; + } + + public function subscribe($channel) + { + return $this->respond('subscribe', func_get_args()); + } + + public function psubscribe($pattern) + { + return $this->respond('psubscribe', func_get_args()); + } + + public function unsubscribe($channel = null) + { + + } + + public function publish($channel, $message) + { + return $this->responseApi->publish($channel, $message); + } + + private function respond($name, $args) + { + return call_user_func_array(array($this->responseApi, $name), $args); + } +} diff --git a/tests/SubscriptionApiTest.php b/tests/SubscriptionApiTest.php new file mode 100644 index 0000000..23955e5 --- /dev/null +++ b/tests/SubscriptionApiTest.php @@ -0,0 +1,23 @@ +client = $this->getMock('Clue\React\Redis\Client'); + $this->subscriptionApi = new SubscriptionApi($this->client); + } + + public function testSubscribe() + { + $promise = $this->subscriptionApi->subscribe('a'); + $this->expectPromiseReject($promise); + + $this->subscriptionApi->on('message', $this->expectCallableOnce()); + } +} From 92bc9805aaecf980d2c6525a21b129bc507801b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Tue, 29 Jul 2014 01:29:18 +0200 Subject: [PATCH 2/3] Issue SUBSCRIBE commands against RequestApi --- src/SubscriptionApi.php | 16 +++++++++------- tests/SubscriptionApiTest.php | 18 +++++++++++++++--- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/src/SubscriptionApi.php b/src/SubscriptionApi.php index a4ebd80..665c78d 100644 --- a/src/SubscriptionApi.php +++ b/src/SubscriptionApi.php @@ -3,6 +3,8 @@ namespace Clue\React\Redis; use Evenement\EventEmitter; +use Clue\Redis\Protocol\Model\ModelInterface; +use Clue\Redis\Protocol\Model\MultiBulkReply; /** * http://redis.io/topics/pubsub @@ -11,16 +13,16 @@ class SubscriptionApi extends EventEmitter { private $client; - private $responseApi; + private $requestApi; - public function __construct(Client $client, ResponseApi $responseApi = null) + public function __construct(Client $client, RequestApi $requestApi = null) { - if ($responseApi === null) { - $responseApi = new ResponseApi($client); + if ($requestApi === null) { + $requestApi = new RequestApi($client); } $this->client = $client; - $this->responseApi = $responseApi; + $this->requestApi = $requestApi; } public function subscribe($channel) @@ -40,11 +42,11 @@ public function unsubscribe($channel = null) public function publish($channel, $message) { - return $this->responseApi->publish($channel, $message); + return $this->requestApi->publish($channel, $message); } private function respond($name, $args) { - return call_user_func_array(array($this->responseApi, $name), $args); + return call_user_func_array(array($this->requestApi, $name), $args); } } diff --git a/tests/SubscriptionApiTest.php b/tests/SubscriptionApiTest.php index 23955e5..187cc6c 100644 --- a/tests/SubscriptionApiTest.php +++ b/tests/SubscriptionApiTest.php @@ -1,6 +1,10 @@ client = $this->getMock('Clue\React\Redis\Client'); + $this->client = $this->getMockBuilder('Clue\React\Redis\Client')->disableOriginalConstructor()->setMethods(array('sendRequest', 'close'))->getMock(); $this->subscriptionApi = new SubscriptionApi($this->client); } public function testSubscribe() { $promise = $this->subscriptionApi->subscribe('a'); - $this->expectPromiseReject($promise); - $this->subscriptionApi->on('message', $this->expectCallableOnce()); + $this->expectPromiseResolve($promise); + $this->pretendMessage(new MultiBulkReply(array(new BulkReply('subscribe'), new BulkReply('a'), new IntegerReply(1)))); + + //$this->subscriptionApi->on('message', $this->expectCallableOnce()); + //$this->pretendMessage(new MultiBulkReply(array(new BulkReply('message'), new BulkReply('a'), new BulkReply('data')))); + } + + private function pretendMessage(ModelInterface $model) + { + $this->client->emit('message', array($model, $this->client)); } } From 92a3441dc5e2cfe92e7473a7e76a5f52eece7d74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Tue, 29 Jul 2014 01:32:05 +0200 Subject: [PATCH 3/3] Forward message/subscribe/unsubscribe events --- src/SubscriptionApi.php | 17 +++++++++++++++++ tests/SubscriptionApiTest.php | 4 ++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/src/SubscriptionApi.php b/src/SubscriptionApi.php index 665c78d..d51f431 100644 --- a/src/SubscriptionApi.php +++ b/src/SubscriptionApi.php @@ -23,6 +23,8 @@ public function __construct(Client $client, RequestApi $requestApi = null) $this->client = $client; $this->requestApi = $requestApi; + + $this->client->on('message', array($this, 'handleMessage')); } public function subscribe($channel) @@ -49,4 +51,19 @@ private function respond($name, $args) { return call_user_func_array(array($this->requestApi, $name), $args); } + + public function handleMessage(ModelInterface $message) + { + if (!($message instanceof MultiBulkReply)) { + return; + } + + $parts = $message->getValueNative(); + if (count($parts) !== 3) { + return; + } + + $name = array_shift($parts); + $this->emit($name, $parts); + } } diff --git a/tests/SubscriptionApiTest.php b/tests/SubscriptionApiTest.php index 187cc6c..fdd0579 100644 --- a/tests/SubscriptionApiTest.php +++ b/tests/SubscriptionApiTest.php @@ -24,8 +24,8 @@ public function testSubscribe() $this->expectPromiseResolve($promise); $this->pretendMessage(new MultiBulkReply(array(new BulkReply('subscribe'), new BulkReply('a'), new IntegerReply(1)))); - //$this->subscriptionApi->on('message', $this->expectCallableOnce()); - //$this->pretendMessage(new MultiBulkReply(array(new BulkReply('message'), new BulkReply('a'), new BulkReply('data')))); + $this->subscriptionApi->on('message', $this->expectCallableOnce()); + $this->pretendMessage(new MultiBulkReply(array(new BulkReply('message'), new BulkReply('a'), new BulkReply('data')))); } private function pretendMessage(ModelInterface $model)