@@ -1996,158 +1996,174 @@ def _send_cluster_commands(
19961996 # build a list of node objects based on node names we need to
19971997 nodes = {}
19981998
1999- # as we move through each command that still needs to be processed,
2000- # we figure out the slot number that command maps to, then from
2001- # the slot determine the node.
2002- for c in attempt :
2003- while True :
2004- # refer to our internal node -> slot table that
2005- # tells us where a given command should route to.
2006- # (it might be possible we have a cached node that no longer
2007- # exists in the cluster, which is why we do this in a loop)
2008- passed_targets = c .options .pop ("target_nodes" , None )
2009- if passed_targets and not self ._is_nodes_flag (passed_targets ):
2010- target_nodes = self ._parse_target_nodes (passed_targets )
2011- else :
2012- target_nodes = self ._determine_nodes (
2013- * c .args , node_flag = passed_targets
2014- )
2015- if not target_nodes :
1999+ try :
2000+ # as we move through each command that still needs to be processed,
2001+ # we figure out the slot number that command maps to, then from
2002+ # the slot determine the node.
2003+ for c in attempt :
2004+ while True :
2005+ # refer to our internal node -> slot table that
2006+ # tells us where a given command should route to.
2007+ # (it might be possible we have a cached node that no longer
2008+ # exists in the cluster, which is why we do this in a loop)
2009+ passed_targets = c .options .pop ("target_nodes" , None )
2010+ if passed_targets and not self ._is_nodes_flag (passed_targets ):
2011+ target_nodes = self ._parse_target_nodes (passed_targets )
2012+ else :
2013+ target_nodes = self ._determine_nodes (
2014+ * c .args , node_flag = passed_targets
2015+ )
2016+ if not target_nodes :
2017+ raise RedisClusterException (
2018+ f"No targets were found to execute { c .args } command on"
2019+ )
2020+ if len (target_nodes ) > 1 :
20162021 raise RedisClusterException (
2017- f"No targets were found to execute { c .args } command on "
2022+ f"Too many targets for command { c .args } "
20182023 )
2019- if len (target_nodes ) > 1 :
2020- raise RedisClusterException (
2021- f"Too many targets for command { c .args } "
2022- )
20232024
2024- node = target_nodes [0 ]
2025- if node == self .get_default_node ():
2026- is_default_node = True
2025+ node = target_nodes [0 ]
2026+ if node == self .get_default_node ():
2027+ is_default_node = True
20272028
2028- # now that we know the name of the node
2029- # ( it's just a string in the form of host:port )
2030- # we can build a list of commands for each node.
2031- node_name = node .name
2032- if node_name not in nodes :
2033- redis_node = self .get_redis_connection (node )
2029+ # now that we know the name of the node
2030+ # ( it's just a string in the form of host:port )
2031+ # we can build a list of commands for each node.
2032+ node_name = node .name
2033+ if node_name not in nodes :
2034+ redis_node = self .get_redis_connection (node )
2035+ try :
2036+ connection = get_connection (redis_node , c .args )
2037+ except (ConnectionError , TimeoutError ) as e :
2038+ for n in nodes .values ():
2039+ n .connection_pool .release (n .connection )
2040+ n .connection = None
2041+ nodes = {}
2042+ if self .retry and isinstance (
2043+ e , self .retry ._supported_errors
2044+ ):
2045+ backoff = self .retry ._backoff .compute (attempts_count )
2046+ if backoff > 0 :
2047+ time .sleep (backoff )
2048+ self .nodes_manager .initialize ()
2049+ if is_default_node :
2050+ self .replace_default_node ()
2051+ raise
2052+ nodes [node_name ] = NodeCommands (
2053+ redis_node .parse_response ,
2054+ redis_node .connection_pool ,
2055+ connection ,
2056+ )
2057+ nodes [node_name ].append (c )
2058+ break
2059+
2060+ # send the commands in sequence.
2061+ # we write to all the open sockets for each node first,
2062+ # before reading anything
2063+ # this allows us to flush all the requests out across the
2064+ # network essentially in parallel
2065+ # so that we can read them all in parallel as they come back.
2066+ # we dont' multiplex on the sockets as they come available,
2067+ # but that shouldn't make too much difference.
2068+ node_commands = nodes .values ()
2069+ for n in node_commands :
2070+ n .write ()
2071+
2072+ for n in node_commands :
2073+ n .read ()
2074+
2075+ # release all of the redis connections we allocated earlier
2076+ # back into the connection pool.
2077+ # we used to do this step as part of a try/finally block,
2078+ # but it is really dangerous to
2079+ # release connections back into the pool if for some
2080+ # reason the socket has data still left in it
2081+ # from a previous operation. The write and
2082+ # read operations already have try/catch around them for
2083+ # all known types of errors including connection
2084+ # and socket level errors.
2085+ # So if we hit an exception, something really bad
2086+ # happened and putting any oF
2087+ # these connections back into the pool is a very bad idea.
2088+ # the socket might have unread buffer still sitting in it,
2089+ # and then the next time we read from it we pass the
2090+ # buffered result back from a previous command and
2091+ # every single request after to that connection will always get
2092+ # a mismatched result.
2093+ for n in nodes .values ():
2094+ n .connection_pool .release (n .connection )
2095+ n .connection = None
2096+ nodes = {}
2097+
2098+ # if the response isn't an exception it is a
2099+ # valid response from the node
2100+ # we're all done with that command, YAY!
2101+ # if we have more commands to attempt, we've run into problems.
2102+ # collect all the commands we are allowed to retry.
2103+ # (MOVED, ASK, or connection errors or timeout errors)
2104+ attempt = sorted (
2105+ (
2106+ c
2107+ for c in attempt
2108+ if isinstance (c .result , ClusterPipeline .ERRORS_ALLOW_RETRY )
2109+ ),
2110+ key = lambda x : x .position ,
2111+ )
2112+ if attempt and allow_redirections :
2113+ # RETRY MAGIC HAPPENS HERE!
2114+ # send these remaing commands one at a time using `execute_command`
2115+ # in the main client. This keeps our retry logic
2116+ # in one place mostly,
2117+ # and allows us to be more confident in correctness of behavior.
2118+ # at this point any speed gains from pipelining have been lost
2119+ # anyway, so we might as well make the best
2120+ # attempt to get the correct behavior.
2121+ #
2122+ # The client command will handle retries for each
2123+ # individual command sequentially as we pass each
2124+ # one into `execute_command`. Any exceptions
2125+ # that bubble out should only appear once all
2126+ # retries have been exhausted.
2127+ #
2128+ # If a lot of commands have failed, we'll be setting the
2129+ # flag to rebuild the slots table from scratch.
2130+ # So MOVED errors should correct themselves fairly quickly.
2131+ self .reinitialize_counter += 1
2132+ if self ._should_reinitialized ():
2133+ self .nodes_manager .initialize ()
2134+ if is_default_node :
2135+ self .replace_default_node ()
2136+ for c in attempt :
20342137 try :
2035- connection = get_connection (redis_node , c .args )
2036- except (ConnectionError , TimeoutError ) as e :
2037- for n in nodes .values ():
2038- n .connection_pool .release (n .connection )
2039- if self .retry and isinstance (e , self .retry ._supported_errors ):
2040- backoff = self .retry ._backoff .compute (attempts_count )
2041- if backoff > 0 :
2042- time .sleep (backoff )
2043- self .nodes_manager .initialize ()
2044- if is_default_node :
2045- self .replace_default_node ()
2046- raise
2047- nodes [node_name ] = NodeCommands (
2048- redis_node .parse_response ,
2049- redis_node .connection_pool ,
2050- connection ,
2051- )
2052- nodes [node_name ].append (c )
2053- break
2054-
2055- # send the commands in sequence.
2056- # we write to all the open sockets for each node first,
2057- # before reading anything
2058- # this allows us to flush all the requests out across the
2059- # network essentially in parallel
2060- # so that we can read them all in parallel as they come back.
2061- # we dont' multiplex on the sockets as they come available,
2062- # but that shouldn't make too much difference.
2063- node_commands = nodes .values ()
2064- for n in node_commands :
2065- n .write ()
2066-
2067- for n in node_commands :
2068- n .read ()
2069-
2070- # release all of the redis connections we allocated earlier
2071- # back into the connection pool.
2072- # we used to do this step as part of a try/finally block,
2073- # but it is really dangerous to
2074- # release connections back into the pool if for some
2075- # reason the socket has data still left in it
2076- # from a previous operation. The write and
2077- # read operations already have try/catch around them for
2078- # all known types of errors including connection
2079- # and socket level errors.
2080- # So if we hit an exception, something really bad
2081- # happened and putting any oF
2082- # these connections back into the pool is a very bad idea.
2083- # the socket might have unread buffer still sitting in it,
2084- # and then the next time we read from it we pass the
2085- # buffered result back from a previous command and
2086- # every single request after to that connection will always get
2087- # a mismatched result.
2088- for n in nodes .values ():
2089- n .connection_pool .release (n .connection )
2090-
2091- # if the response isn't an exception it is a
2092- # valid response from the node
2093- # we're all done with that command, YAY!
2094- # if we have more commands to attempt, we've run into problems.
2095- # collect all the commands we are allowed to retry.
2096- # (MOVED, ASK, or connection errors or timeout errors)
2097- attempt = sorted (
2098- (
2099- c
2100- for c in attempt
2101- if isinstance (c .result , ClusterPipeline .ERRORS_ALLOW_RETRY )
2102- ),
2103- key = lambda x : x .position ,
2104- )
2105- if attempt and allow_redirections :
2106- # RETRY MAGIC HAPPENS HERE!
2107- # send these remaing commands one at a time using `execute_command`
2108- # in the main client. This keeps our retry logic
2109- # in one place mostly,
2110- # and allows us to be more confident in correctness of behavior.
2111- # at this point any speed gains from pipelining have been lost
2112- # anyway, so we might as well make the best
2113- # attempt to get the correct behavior.
2114- #
2115- # The client command will handle retries for each
2116- # individual command sequentially as we pass each
2117- # one into `execute_command`. Any exceptions
2118- # that bubble out should only appear once all
2119- # retries have been exhausted.
2120- #
2121- # If a lot of commands have failed, we'll be setting the
2122- # flag to rebuild the slots table from scratch.
2123- # So MOVED errors should correct themselves fairly quickly.
2124- self .reinitialize_counter += 1
2125- if self ._should_reinitialized ():
2126- self .nodes_manager .initialize ()
2127- if is_default_node :
2128- self .replace_default_node ()
2129- for c in attempt :
2130- try :
2131- # send each command individually like we
2132- # do in the main client.
2133- c .result = super ().execute_command (* c .args , ** c .options )
2134- except RedisError as e :
2135- c .result = e
2136-
2137- # turn the response back into a simple flat array that corresponds
2138- # to the sequence of commands issued in the stack in pipeline.execute()
2139- response = []
2140- for c in sorted (stack , key = lambda x : x .position ):
2141- if c .args [0 ] in self .cluster_response_callbacks :
2142- c .result = self .cluster_response_callbacks [c .args [0 ]](
2143- c .result , ** c .options
2144- )
2145- response .append (c .result )
2146-
2147- if raise_on_error :
2148- self .raise_first_error (stack )
2138+ # send each command individually like we
2139+ # do in the main client.
2140+ c .result = super ().execute_command (* c .args , ** c .options )
2141+ except RedisError as e :
2142+ c .result = e
21492143
2150- return response
2144+ # turn the response back into a simple flat array that corresponds
2145+ # to the sequence of commands issued in the stack in pipeline.execute()
2146+ response = []
2147+ for c in sorted (stack , key = lambda x : x .position ):
2148+ if c .args [0 ] in self .cluster_response_callbacks :
2149+ c .result = self .cluster_response_callbacks [c .args [0 ]](
2150+ c .result , ** c .options
2151+ )
2152+ response .append (c .result )
2153+
2154+ if raise_on_error :
2155+ self .raise_first_error (stack )
2156+
2157+ return response
2158+ except BaseException :
2159+ # if nodes is not empty, a problem must have occurred
2160+ # since we cant guarantee the state of the connections,
2161+ # disconnect before returning it to the connection pool
2162+ for n in nodes .values ():
2163+ if n .connection :
2164+ n .connection .disconnect ()
2165+ n .connection_pool .release (n .connection )
2166+ raise
21512167
21522168 def _fail_on_redirect (self , allow_redirections ):
21532169 """ """
0 commit comments