Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions src/SubscriptionApi.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?php

namespace Clue\React\Redis;

use Evenement\EventEmitter;
use Clue\Redis\Protocol\Model\ModelInterface;
use Clue\Redis\Protocol\Model\MultiBulkReply;

/**
* http://redis.io/topics/pubsub
* http://redis.io/commands#pubsub
*/
class SubscriptionApi extends EventEmitter
{
private $client;
private $requestApi;

public function __construct(Client $client, RequestApi $requestApi = null)
{
if ($requestApi === null) {
$requestApi = new RequestApi($client);
}

$this->client = $client;
$this->requestApi = $requestApi;

$this->client->on('message', array($this, 'handleMessage'));
}

public function subscribe($channel)
{
return $this->respond('subscribe', func_get_args());
}

public function psubscribe($pattern)
{
return $this->respond('psubscribe', func_get_args());
}

public function unsubscribe($channel = null)
{

}

public function publish($channel, $message)
{
return $this->requestApi->publish($channel, $message);
}

private function respond($name, $args)
{
return call_user_func_array(array($this->requestApi, $name), $args);
}

public function handleMessage(ModelInterface $message)
{
if (!($message instanceof MultiBulkReply)) {
return;
}

$parts = $message->getValueNative();
if (count($parts) !== 3) {
return;
}

$name = array_shift($parts);
$this->emit($name, $parts);
}
}
35 changes: 35 additions & 0 deletions tests/SubscriptionApiTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

use Clue\React\Redis\SubscriptionApi;
use Clue\Redis\Protocol\Model\MultiBulkReply;
use Clue\Redis\Protocol\Model\ModelInterface;
use Clue\Redis\Protocol\Model\BulkReply;
use Clue\Redis\Protocol\Model\IntegerReply;

class SubscriptionApiTest extends TestCase
{
private $client;
private $subscriptionApi;

public function setUp()
{
$this->client = $this->getMockBuilder('Clue\React\Redis\Client')->disableOriginalConstructor()->setMethods(array('sendRequest', 'close'))->getMock();
$this->subscriptionApi = new SubscriptionApi($this->client);
}

public function testSubscribe()
{
$promise = $this->subscriptionApi->subscribe('a');

$this->expectPromiseResolve($promise);
$this->pretendMessage(new MultiBulkReply(array(new BulkReply('subscribe'), new BulkReply('a'), new IntegerReply(1))));

$this->subscriptionApi->on('message', $this->expectCallableOnce());
$this->pretendMessage(new MultiBulkReply(array(new BulkReply('message'), new BulkReply('a'), new BulkReply('data'))));
}

private function pretendMessage(ModelInterface $model)
{
$this->client->emit('message', array($model, $this->client));
}
}