Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions rediscluster/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@
# 3rd party imports
from redis import StrictRedis
from redis.client import list_or_args, parse_info
from redis.connection import Token
from redis._compat import iteritems, basestring, b, izip, nativestr, long
from redis._compat import iteritems, basestring, izip, nativestr, long
from redis.exceptions import RedisError, ResponseError, TimeoutError, DataError, ConnectionError, BusyLoadingError


Expand Down Expand Up @@ -278,9 +277,9 @@ def pipeline(self, transaction=None, shard_hint=None):
"""
if shard_hint:
raise RedisClusterException("shard_hint is deprecated in cluster mode")

if transaction:
raise RedisClusterException("transaction is deprecated in cluster mode")
#Hendrik: Make sure that you use only pipelining and transactions on the same shard!
#if transaction:
# raise RedisClusterException("transaction is deprecated in cluster mode")

return StrictClusterPipeline(
connection_pool=self.connection_pool,
Expand Down Expand Up @@ -535,7 +534,7 @@ def cluster_failover(self, node_id, option):
Sends to specefied node
"""
assert option.upper() in ('FORCE', 'TAKEOVER') # TODO: change this option handling
return self.execute_command('CLUSTER FAILOVER', Token(option))
return self.execute_command('CLUSTER FAILOVER', option)

def cluster_info(self):
"""
Expand Down Expand Up @@ -586,7 +585,7 @@ def cluster_reset(self, node_id, soft=True):

Sends to specefied node
"""
return self.execute_command('CLUSTER RESET', Token('SOFT' if soft else 'HARD'), node_id=node_id)
return self.execute_command('CLUSTER RESET', 'SOFT' if soft else 'HARD', node_id=node_id)

def cluster_reset_all_nodes(self, soft=True):
"""
Expand All @@ -600,7 +599,7 @@ def cluster_reset_all_nodes(self, soft=True):
return [
self.execute_command(
'CLUSTER RESET',
Token('SOFT' if soft else 'HARD'),
'SOFT' if soft else 'HARD',
node_id=node['id'],
)
for node in self.cluster_nodes()
Expand Down Expand Up @@ -636,9 +635,9 @@ def cluster_setslot(self, node_id, slot_id, state, bind_to_node_id=None):
Sends to specefied node
"""
if state.upper() in ('IMPORTING', 'MIGRATING', 'NODE') and node_id is not None:
return self.execute_command('CLUSTER SETSLOT', slot_id, Token(state), node_id)
return self.execute_command('CLUSTER SETSLOT', slot_id, state, node_id)
elif state.upper() == 'STABLE':
return self.execute_command('CLUSTER SETSLOT', slot_id, Token('STABLE'))
return self.execute_command('CLUSTER SETSLOT', slot_id, 'STABLE')
else:
raise RedisError('Invalid slot state: {0}'.format(state))

Expand Down Expand Up @@ -694,9 +693,9 @@ def scan_iter(self, match=None, count=None):

pieces = ['SCAN', cursors[node]]
if match is not None:
pieces.extend([Token('MATCH'), match])
pieces.extend(['MATCH', match])
if count is not None:
pieces.extend([Token('COUNT'), count])
pieces.extend(['COUNT', count])

conn.send_command(*pieces)

Expand Down Expand Up @@ -1271,8 +1270,9 @@ def pipeline(self, transaction=True, shard_hint=None):
if shard_hint:
raise RedisClusterException("shard_hint is deprecated in cluster mode")

if transaction:
raise RedisClusterException("transaction is deprecated in cluster mode")
#Hendrik: Make sure that you use only pipelining and transactions on the same shard!
#if transaction:
# raise RedisClusterException("transaction is deprecated in cluster mode")

return StrictClusterPipeline(
connection_pool=self.connection_pool,
Expand Down
29 changes: 8 additions & 21 deletions rediscluster/nodemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

# 3rd party imports
from redis import StrictRedis
from redis._compat import b, unicode, bytes, long, basestring
from redis._compat import unicode, long, basestring
from redis.connection import Encoder
from redis import ConnectionError, TimeoutError, ResponseError


Expand Down Expand Up @@ -37,34 +38,20 @@ def __init__(self, startup_nodes=None, reinitialize_steps=None, skip_full_covera
self.reinitialize_steps = reinitialize_steps or 25
self._skip_full_coverage_check = skip_full_coverage_check
self.nodemanager_follow_cluster = nodemanager_follow_cluster

self.encoder = Encoder(
connection_kwargs.get('encoding', 'utf-8'),
connection_kwargs.get('encoding_errors', 'strict'),
connection_kwargs.get('decode_responses', False)
)
if not self.startup_nodes:
raise RedisClusterException("No startup nodes provided")

def encode(self, value):
"""
Return a bytestring representation of the value.
This method is copied from Redis' connection.py:Connection.encode
"""
if isinstance(value, bytes):
return value
elif isinstance(value, (int, long)):
value = b(str(value))
elif isinstance(value, float):
value = b(repr(value))
elif not isinstance(value, basestring):
value = unicode(value)
if isinstance(value, unicode):
# The encoding should be configurable as in connection.py:Connection.encode
value = value.encode('utf-8')
return value

def keyslot(self, key):
"""
Calculate keyslot for a given key.
Tuned for compatibility with python 2.7.x
"""
k = self.encode(key)
k = self.encoder.encode(key)

start = k.find(b"{")

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
redis==2.10.6
redis==3.3.6
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
url='http://github.com/grokzen/redis-py-cluster',
license='MIT',
install_requires=[
'redis==2.10.6'
'redis==3.3.6'
],
keywords=[
'redis',
Expand Down