Skip to content
171 changes: 161 additions & 10 deletions rediscluster/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
# rediscluster imports
from .client import RedisCluster
from .exceptions import (
RedisClusterException, AskError, MovedError, TryAgainError,
RedisClusterException, AskError, MovedError, TryAgainError, ResponseError,
)
from .utils import clusterdown_wrapper, dict_merge

# 3rd party imports
from redis import Redis
from redis.exceptions import ConnectionError, RedisError, TimeoutError
from redis.exceptions import ConnectionError, RedisError, TimeoutError, ExecAbortError
from redis._compat import imap, unicode


Expand Down Expand Up @@ -90,7 +90,7 @@ def annotate_exception(self, exception, number, command):
number, cmd, unicode(exception.args[0]))
exception.args = (msg,) + exception.args[1:]

def execute(self, raise_on_error=True):
def execute(self, raise_on_error=True, use_multi = False):
"""
"""
stack = self.command_stack
Expand All @@ -99,7 +99,7 @@ def execute(self, raise_on_error=True):
return []

try:
return self.send_cluster_commands(stack, raise_on_error)
return self.send_cluster_commands(stack, raise_on_error, use_multi = use_multi )
finally:
self.reset()

Expand Down Expand Up @@ -136,7 +136,7 @@ def reset(self):
# self.connection = None

@clusterdown_wrapper
def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=True):
def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=True, use_multi = False ):
"""
Send a bunch of cluster commands to the redis cluster.

Expand All @@ -157,6 +157,7 @@ def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=T
# command should route to.
slot = self._determine_slot(*c.args)
node = self.connection_pool.get_node_by_slot(slot)
c.slot = slot

# little hack to make sure the node name is populated. probably could clean this up.
self.connection_pool.nodes.set_node_name(node)
Expand All @@ -165,7 +166,11 @@ def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=T
# we can build a list of commands for each node.
node_name = node['name']
if node_name not in nodes:
nodes[node_name] = NodeCommands(self.parse_response, self.connection_pool.get_connection_by_node(node))
nodes[node_name] = NodeCommands(
self.parse_response,
self.connection_pool.get_connection_by_node(node),
response_callbacks = self.response_callbacks,
use_multi = use_multi )

nodes[node_name].append(c)

Expand Down Expand Up @@ -201,7 +206,13 @@ def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=T
# collect all the commands we are allowed to retry.
# (MOVED, ASK, or connection errors or timeout errors)
attempt = sorted([c for c in attempt if isinstance(c.result, ERRORS_ALLOW_RETRY)], key=lambda x: x.position)
if attempt and allow_redirections:
if attempt and allow_redirections and use_multi :
# some slots moved, refresh node-slot table asap.
self.refresh_table_asap = True
super( ClusterPipeline, self ).execute_command( 'cluster', 'keyslot', 'a' )
# ^^ run a harmless command to force fetch a new node-slot table.

if attempt and allow_redirections and not use_multi :
# RETRY MAGIC HAPPENS HERE!
# send these remaing comamnds one at a time using `execute_command`
# in the main client. This keeps our retry logic in one place mostly,
Expand All @@ -217,6 +228,11 @@ def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=T
# If a lot of commands have failed, we'll be setting the
# flag to rebuild the slots table from scratch. So MOVED errors should
# correct themselves fairly quickly.
#
# not use_multi: if use_multi == True, it means we want the commands for
# the same slot to be atomic. Automatic retry here spoils the atomic
# semantic. So no automatic retry here and let the caller handle the
# retry, to ensure atomic semantic.
self.connection_pool.nodes.increment_reinitialize_counter(len(attempt))
for c in attempt:
try:
Expand Down Expand Up @@ -371,18 +387,23 @@ def __init__(self, args, options=None, position=None):
self.result = None
self.node = None
self.asking = False
self.slot = None


class NodeCommands(object):
"""
"""

def __init__(self, parse_response, connection):
def __init__(self, parse_response, connection,
response_callbacks = None, use_multi = False ):
"""
"""
self.parse_response = parse_response
self.connection = connection
self.commands = []
self.response_callbacks = response_callbacks or dict()
self.commands = []
self.iRsp = []
self.useMulti = use_multi # if true, use multi-exec where possible.

def append(self, c):
"""
Expand All @@ -404,12 +425,69 @@ def write(self):
# build up all commands into a single request to increase network perf
# send all the commands and catch connection and timeout errors.
try:
connection.send_packed_command(connection.pack_commands([c.args for c in commands]))
if self.useMulti :
cmds = self.addTransaction( commands )
else :
cmds = [ c.args for c in commands ]
connection.send_packed_command( connection.pack_commands( cmds ) )
except (ConnectionError, TimeoutError) as e:
for c in commands:
c.result = e

def addTransaction( self, commands ):
cmds = []
iRsp = [] # list of indices where the response should go

nCmds = len( commands )
if nCmds < 2:
cmds = [ c.args for c in commands ]
iRsp = [ i for i, c in enumerate( commands )]
self.iRsp = iRsp
return cmds

iRun = [] # list of indices in the run
inArun = False # in a run of commands for same slot
for i in range( nCmds - 1 ) :
c1 = commands[ i ]
c2 = commands[ i + 1 ]
# starting a new run?
if not inArun and c1.slot == c2.slot :
inArun = True
cmds.append( ( 'MULTI', ) )
iRsp.append( None ) # should discard this rsp (will be "OK")
# add the actual cmd
inArun = self.addCmd( cmds, c1, i, iRsp, iRun, inArun, c1.slot == c2.slot )

# remember to add the last commands
inArun = self.addCmd( cmds, c2, i + 1, iRsp, iRun, inArun, False )

self.iRsp = iRsp
return cmds

def addCmd( self, cmds, cmd, i, iRsp, iRun, inArun, sameSlot ):
cmds.append( cmd.args )
if inArun:
index = str( i ) # in good case we should discard this rsp.
iRsp.append( index ) # should discard this rsp (will be "QUEUED")
iRun.append( i )
else:
iRsp.append( i ) # this response is for cmds[ i ]

# ending a run?
if inArun and not sameSlot :
inArun = False
cmds.append( ( 'EXEC', ) )
iRsp.append( iRun[ : ] ) # need [:] to create a shallow copy, because we clear iRun next.
iRun.clear() # must use clear(), since the caller owns iRun.
return inArun

def read(self):
if self.useMulti :
self.readMultiExec()
else :
self.readPlain()

def readPlain(self):
"""
"""
connection = self.connection
Expand All @@ -434,3 +512,76 @@ def read(self):
return
except RedisError:
c.result = sys.exc_info()[1]

def readMultiExec(self):
"""
"""
# much of error handling copied from
# redis.client.Pipeline._execute_transaction
connection = self.connection
commands = self.commands
for index in self.iRsp : # iRsp = list of indices into responses
try:
rsp = self.parse_response( connection, '_' )
except (ConnectionError, TimeoutError) as e:
for c in self.commands:
c.result = e
return
except RedisError:
rsp = sys.exc_info()[ 1 ]

if index is None :
# We should eat this response ('OK' from MULTI).
# We used to eat 'QUEUED' for commands in multi-exec. But it could
# be moved or other errors. In those case we want to record the
# error to the commands. This is handled in next section.
# TODO: if the rsp to MULTI is not 'OK'?
continue

if isinstance( index, str ) :
# for commands that are part of a multi-exec transaction, we encode
# the index as str (usually int).
# If the result is "QUEUED" (good case), we can discard this result.
# But if the result is an error, we need to store the error result.
if rsp == b'QUEUED' :
continue # good case, discard the placeholder rsp.
index = int( index )
cmd = commands[ index ]
if cmd . result is None :
cmd . result = rsp # error case, record the error.
continue

if isinstance( index, int ) :
# one rsp for one plain command
indexList = [ index ]
rspList = [ rsp ]
else :
# list of rsp from EXEC (as in MULTI-EXEC)
indexList = index
rspList = rsp

if isinstance( rsp, ExecAbortError ) :
# got exec abort error, eg. due to slot moved.
# the errors were already recorded the the commands, no need
# to do it again here in the EXEC step.
continue

if len( rspList ) != len( indexList ) :
self.connection.disconnect()
raise ResponseError( "Wrong number of response items from "
"pipeline execution" )

for i, r in zip( indexList, rspList ) :
cmd = commands[ i ]
if cmd.result is not None:
continue # already have the result

# We have to run response callbacks manually
if not isinstance( r, Exception ):
args = cmd.args
options = cmd.options
command_name = args[ 0 ]
if command_name in self.response_callbacks :
r = self.response_callbacks[ command_name ]( r, **options )

cmd.result = r