Skip to content

Commit 01d549d

Browse files
committed
Adding new feature to pipeline.execute() so it can optionally add
multi-exec around consecutive redis commands that are for the same slot. Redis cluster does not support multi-exec for different slots, even if the slots are on the same shard. This feature provides limited support (*1) of atomic transaction using multi-exec on redis cluster. Step1: add optional arg use_multi=False to pipeline.execute(), so the default behavior is what it used to be. ie. this feature is off by default. *1: limited support: only for consecutive commands in pipeline for the same slot. eg. for commands incr a{1} #1 decr b{1} #2 incr a{2} #3 decr b{2} #4 incr a{3} Grokzen#5 Since #1,#2 are for the same slot, we will add multi-exec around them (multi before #1, and exec after #2). #3,#4 are in their own one slot, but different from that of #1,#2, so another set of multi-exec. Grokzen#5 is in its own slot, no multi-exec added. Thus the commands sent to the cluster are: multi incr a{1} #1 decr b{1} #2 exec multi incr a{2} #3 decr b{2} #4 exec incr a{3} Grokzen#5 The result of the added multi-exec are stripped from the response, so the client will see no change in response format.
1 parent 38807c5 commit 01d549d

File tree

1 file changed

+31
-7
lines changed

1 file changed

+31
-7
lines changed

rediscluster/pipeline.py

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ def annotate_exception(self, exception, number, command):
9090
number, cmd, unicode(exception.args[0]))
9191
exception.args = (msg,) + exception.args[1:]
9292

93-
def execute(self, raise_on_error=True):
93+
def execute(self, raise_on_error=True, use_multi = False):
9494
"""
9595
"""
9696
stack = self.command_stack
@@ -99,7 +99,7 @@ def execute(self, raise_on_error=True):
9999
return []
100100

101101
try:
102-
return self.send_cluster_commands(stack, raise_on_error)
102+
return self.send_cluster_commands(stack, raise_on_error, use_multi = use_multi )
103103
finally:
104104
self.reset()
105105

@@ -136,7 +136,7 @@ def reset(self):
136136
# self.connection = None
137137

138138
@clusterdown_wrapper
139-
def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=True):
139+
def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=True, use_multi = False ):
140140
"""
141141
Send a bunch of cluster commands to the redis cluster.
142142
@@ -165,7 +165,10 @@ def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=T
165165
# we can build a list of commands for each node.
166166
node_name = node['name']
167167
if node_name not in nodes:
168-
nodes[node_name] = NodeCommands(self.parse_response, self.connection_pool.get_connection_by_node(node))
168+
nodes[node_name] = NodeCommands(
169+
self.parse_response,
170+
self.connection_pool.get_connection_by_node(node),
171+
use_multi = use_multi )
169172

170173
nodes[node_name].append(c)
171174

@@ -377,12 +380,14 @@ class NodeCommands(object):
377380
"""
378381
"""
379382

380-
def __init__(self, parse_response, connection):
383+
def __init__(self, parse_response, connection,
384+
use_multi = False ):
381385
"""
382386
"""
383387
self.parse_response = parse_response
384388
self.connection = connection
385-
self.commands = []
389+
self.commands = []
390+
self.useMulti = use_multi # if true, use multi-exec where possible.
386391

387392
def append(self, c):
388393
"""
@@ -404,12 +409,26 @@ def write(self):
404409
# build up all commands into a single request to increase network perf
405410
# send all the commands and catch connection and timeout errors.
406411
try:
407-
connection.send_packed_command(connection.pack_commands([c.args for c in commands]))
412+
if self.useMulti :
413+
cmds = self.addTransaction( commands )
414+
else :
415+
cmds = [ c.args for c in commands ]
416+
connection.send_packed_command( connection.pack_commands( cmds ) )
408417
except (ConnectionError, TimeoutError) as e:
409418
for c in commands:
410419
c.result = e
411420

421+
def addTransaction( self, commands ):
422+
cmds = []
423+
return cmds
424+
412425
def read(self):
426+
if self.useMulti :
427+
self.readMultiExec()
428+
else :
429+
self.readPlain()
430+
431+
def readPlain(self):
413432
"""
414433
"""
415434
connection = self.connection
@@ -434,3 +453,8 @@ def read(self):
434453
return
435454
except RedisError:
436455
c.result = sys.exc_info()[1]
456+
457+
def readMultiExec(self):
458+
"""
459+
"""
460+
pass

0 commit comments

Comments
 (0)