Skip to content

Commit 0efb09b

Browse files
committed
on the write side:
- scan thru all the commands for one node. Add multi-exec around consecutive commands for the same slot. - add NodeCommands.iRsp[] to record the indices of the expected responses. eg. for [1], iRsp will be [ None, "0", "1", [ 0, 1 ], None, "2", "3", [ 2, 3 ], 4 ] multi #1 #2 exec multi #3 #4 exec Grokzen#5 - the rsp to MULTI is "OK", so we want to discard that rsp. If the index is None, we will discard the rsp. - the rsp for queued commands is "QUEUED" (good case) or error. If good case, we want to discard the placeholder rsp "QUEUED". An index with type str indicates we should discard good case rsp. In case of error, we use that index (str->int) to record error rsp. - the rsp for exec is a list of rsp for queued commands (good case) or an error. If good case, we have the indices of the commands corresponding to the rsp. If error, we recorded the errors already, so we can discard this error (error of exec says "see previous errors" anyway). - A plain int index is for commands outside multi-exec, eg. Grokzen#5. [1] multi incr a{1} #1 decr b{1} #2 exec multi incr a{2} #3 decr b{2} #4 exec incr a{3} Grokzen#5
1 parent c77c063 commit 0efb09b

File tree

1 file changed

+44
-0
lines changed

1 file changed

+44
-0
lines changed

rediscluster/pipeline.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,7 @@ def __init__(self, parse_response, connection,
389389
self.parse_response = parse_response
390390
self.connection = connection
391391
self.commands = []
392+
self.iRsp = []
392393
self.useMulti = use_multi # if true, use multi-exec where possible.
393394

394395
def append(self, c):
@@ -422,8 +423,51 @@ def write(self):
422423

423424
def addTransaction( self, commands ):
424425
cmds = []
426+
iRsp = [] # list of indices where the response should go
427+
428+
nCmds = len( commands )
429+
if nCmds < 2:
430+
cmds = [ c.args for c in commands ]
431+
iRsp = [ i for i, c in enumerate( commands )]
432+
self.iRsp = iRsp
433+
return cmds
434+
435+
iRun = [] # list of indices in the run
436+
inArun = False # in a run of commands for same slot
437+
for i in range( nCmds - 1 ) :
438+
c1 = commands[ i ]
439+
c2 = commands[ i + 1 ]
440+
# starting a new run?
441+
if not inArun and c1.slot == c2.slot :
442+
inArun = True
443+
cmds.append( ( 'MULTI', ) )
444+
iRsp.append( None ) # should discard this rsp (will be "OK")
445+
# add the actual cmd
446+
inArun = self.addCmd( cmds, c1, i, iRsp, iRun, inArun, c1.slot == c2.slot )
447+
448+
# remember to add the last commands
449+
inArun = self.addCmd( cmds, c2, i + 1, iRsp, iRun, inArun, False )
450+
451+
self.iRsp = iRsp
425452
return cmds
426453

454+
def addCmd( self, cmds, cmd, i, iRsp, iRun, inArun, sameSlot ):
455+
cmds.append( cmd.args )
456+
if inArun:
457+
index = str( i ) # in good case we should discard this rsp.
458+
iRsp.append( index ) # should discard this rsp (will be "QUEUED")
459+
iRun.append( i )
460+
else:
461+
iRsp.append( i ) # this response is for cmds[ i ]
462+
463+
# ending a run?
464+
if inArun and not sameSlot :
465+
inArun = False
466+
cmds.append( ( 'EXEC', ) )
467+
iRsp.append( iRun[ : ] ) # need [:] to create a shallow copy, because we clear iRun next.
468+
iRun.clear() # must use clear(), since the caller owns iRun.
469+
return inArun
470+
427471
def read(self):
428472
if self.useMulti :
429473
self.readMultiExec()

0 commit comments

Comments
 (0)