Skip to content

Commit

Permalink
redis cluster commands
Browse files Browse the repository at this point in the history
  • Loading branch information
iandyh authored and iandyh committed Dec 1, 2015
1 parent da13784 commit f5fbb74
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 1 deletion.
50 changes: 49 additions & 1 deletion redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,35 @@ def parse_slowlog_get(response, **options):
} for item in response]


def parse_cluster_info(response, **options):
return dict([line.split(':') for line in response.splitlines() if line])


def _parse_node_line(line):
line_items = line.split(' ')
node_id, addr, flags, master_id, ping, pong, epoch, \
connected = line.split(' ')[:8]
slots = [sl.split('-') for sl in line_items[8:]]
node_dict = {
'node_id': node_id,
'flags': flags,
'master_id': master_id,
'last_ping_sent': ping,
'last_pong_rcvd': pong,
'epoch': epoch,
'slots': slots,
'connected': True if connected == 'connected' else False
}
return addr, node_dict


def parse_cluster_nodes(response, **options):
raw_lines = response
if isinstance(response, basestring):
raw_lines = response.splitlines()
return dict([_parse_node_line(line) for line in raw_lines])


class StrictRedis(object):
"""
Implementation of the Redis protocol.
Expand Down Expand Up @@ -361,7 +390,23 @@ class StrictRedis(object):
'SLOWLOG RESET': bool_ok,
'SSCAN': parse_scan,
'TIME': lambda x: (int(x[0]), int(x[1])),
'ZSCAN': parse_zscan
'ZSCAN': parse_zscan,
'CLUSTER ADDSLOTS': bool_ok,
'CLUSTER COUNT-FAILURE-REPORTS': lambda x: int(x),
'CLUSTER COUNTKEYSINSLOT': lambda x: int(x),
'CLUSTER DELSLOTS': bool_ok,
'CLUSTER FAILOVER': bool_ok,
'CLUSTER FORGET': bool_ok,
'CLUSTER INFO': parse_cluster_info,
'CLUSTER KEYSLOT': lambda x: int(x),
'CLUSTER MEET': bool_ok,
'CLUSTER NODES': parse_cluster_nodes,
'CLUSTER REPLICATE': bool_ok,
'CLUSTER RESET': bool_ok,
'CLUSTER SAVECONFIG': bool_ok,
'CLUSTER SET-CONFIG-EPOCH': bool_ok,
'CLUSTER SETSLOT': bool_ok,
'CLUSTER SLAVES': parse_cluster_nodes
}
)

Expand Down Expand Up @@ -1920,6 +1965,9 @@ def publish(self, channel, message):
"""
return self.execute_command('PUBLISH', channel, message)

def cluster(self, cluster_arg, *args):
return self.execute_command('CLUSTER %s' % cluster_arg.upper(), *args)

def eval(self, script, numkeys, *keys_and_args):
"""
Execute the Lua ``script``, specifying the ``numkeys`` the script
Expand Down
44 changes: 44 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest
import redis
from mock import Mock

from distutils.version import StrictVersion

Expand Down Expand Up @@ -44,3 +45,46 @@ def r(request, **kwargs):
@pytest.fixture()
def sr(request, **kwargs):
return _get_client(redis.StrictRedis, request, **kwargs)


def _gen_cluster_mock_resp(r, response):
mock_connection_pool = Mock()
connection = Mock()
response = response
connection.read_response.return_value = response
mock_connection_pool.get_connection.return_value = connection
r.connection_pool = mock_connection_pool
return r


@pytest.fixture()
def mock_cluster_resp_ok(request, **kwargs):
r = _get_client(redis.Redis, request, **kwargs)
return _gen_cluster_mock_resp(r, 'OK')


@pytest.fixture()
def mock_cluster_resp_int(request, **kwargs):
r = _get_client(redis.Redis, request, **kwargs)
return _gen_cluster_mock_resp(r, '2')


@pytest.fixture()
def mock_cluster_resp_info(request, **kwargs):
r = _get_client(redis.Redis, request, **kwargs)
response = 'cluster_state:ok\r\ncluster_slots_assigned:16384\r\ncluster_slots_ok:16384\r\ncluster_slots_pfail:0\r\ncluster_slots_fail:0\r\ncluster_known_nodes:7\r\ncluster_size:3\r\ncluster_current_epoch:7\r\ncluster_my_epoch:2\r\ncluster_stats_messages_sent:170262\r\ncluster_stats_messages_received:105653\r\n'
return _gen_cluster_mock_resp(r, response)


@pytest.fixture()
def mock_cluster_resp_nodes(request, **kwargs):
r = _get_client(redis.Redis, request, **kwargs)
response = 'c8253bae761cb1ecb2b61857d85dfe455a0fec8b 172.17.0.7:7006 slave aa90da731f673a99617dfe930306549a09f83a6b 0 1447836263059 5 connected\n9bd595fe4821a0e8d6b99d70faa660638a7612b3 172.17.0.7:7008 master - 0 1447836264065 0 connected\naa90da731f673a99617dfe930306549a09f83a6b 172.17.0.7:7003 myself,master - 0 0 2 connected 5461-10922\n1df047e5a594f945d82fc140be97a1452bcbf93e 172.17.0.7:7007 slave 19efe5a631f3296fdf21a5441680f893e8cc96ec 0 1447836262556 3 connected\n4ad9a12e63e8f0207025eeba2354bcf4c85e5b22 172.17.0.7:7005 master - 0 1447836262555 7 connected 0-5460\n19efe5a631f3296fdf21a5441680f893e8cc96ec 172.17.0.7:7004 master - 0 1447836263562 3 connected 10923-16383\nfbb23ed8cfa23f17eaf27ff7d0c410492a1093d6 172.17.0.7:7002 master,fail - 1447829446956 1447829444948 1 disconnected\n'
return _gen_cluster_mock_resp(r, response)


@pytest.fixture()
def mock_cluster_resp_slaves(request, **kwargs):
r = _get_client(redis.Redis, request, **kwargs)
response = "['1df047e5a594f945d82fc140be97a1452bcbf93e 172.17.0.7:7007 slave 19efe5a631f3296fdf21a5441680f893e8cc96ec 0 1447836789290 3 connected']"
return _gen_cluster_mock_resp(r, response)
50 changes: 50 additions & 0 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,56 @@ def test_sort_all_options(self, r):
assert r.lrange('sorted', 0, 10) == \
[b('vodka'), b('milk'), b('gin'), b('apple juice')]

def test_cluster_addslots(self, mock_cluster_resp_ok):
assert mock_cluster_resp_ok.cluster('ADDSLOTS', 1) == True

def test_cluster_count_failure_reports(self, mock_cluster_resp_int):
assert isinstance(mock_cluster_resp_int.cluster(
'COUNT-FAILURE-REPORTS', 'node'), int)

def test_cluster_countkeysinslot(self, mock_cluster_resp_int):
assert isinstance(mock_cluster_resp_int.cluster(
'COUNTKEYSINSLOT', 2), int)

def test_cluster_delslots(self, mock_cluster_resp_ok):
assert mock_cluster_resp_ok.cluster('DELSLOTS', 1) == True

def test_cluster_failover(self, mock_cluster_resp_ok):
assert mock_cluster_resp_ok.cluster('FAILOVER', 1) == True

def test_cluster_forget(self, mock_cluster_resp_ok):
assert mock_cluster_resp_ok.cluster('FORGET', 1) == True

def test_cluster_info(self, mock_cluster_resp_info):
assert isinstance(mock_cluster_resp_info.cluster('info'), dict)

def test_cluster_keyslot(self, mock_cluster_resp_int):
assert isinstance(mock_cluster_resp_int.cluster(
'keyslot', 'asdf'), int)

def test_cluster_meet(self, mock_cluster_resp_ok):
assert mock_cluster_resp_ok.cluster('meet', 'ip', 'port', 1) == True

def test_cluster_nodes(self, mock_cluster_resp_nodes):
assert isinstance(mock_cluster_resp_nodes.cluster('nodes'), dict)

def test_cluster_replicate(self, mock_cluster_resp_ok):
assert mock_cluster_resp_ok.cluster('replicate', 'nodeid') == True

def test_cluster_reset(self, mock_cluster_resp_ok):
assert mock_cluster_resp_ok.cluster('reset', 'hard') == True

def test_cluster_saveconfig(self, mock_cluster_resp_ok):
assert mock_cluster_resp_ok.cluster('saveconfig') == True

def test_cluster_setslot(self, mock_cluster_resp_ok):
assert mock_cluster_resp_ok.cluster('setslot', 1,
'IMPORTING', 'nodeid') == True

def test_cluster_slaves(self, mock_cluster_resp_slaves):
assert isinstance(mock_cluster_resp_slaves.cluster(
'slaves', 'nodeid'), dict)


class TestStrictCommands(object):

Expand Down

0 comments on commit f5fbb74

Please sign in to comment.