@@ -1996,158 +1996,174 @@ def _send_cluster_commands(
19961996        # build a list of node objects based on node names we need to 
19971997        nodes  =  {}
19981998
1999-         # as we move through each command that still needs to be processed, 
2000-         # we figure out the slot number that command maps to, then from 
2001-         # the slot determine the node. 
2002-         for  c  in  attempt :
2003-             while  True :
2004-                 # refer to our internal node -> slot table that 
2005-                 # tells us where a given command should route to. 
2006-                 # (it might be possible we have a cached node that no longer 
2007-                 # exists in the cluster, which is why we do this in a loop) 
2008-                 passed_targets  =  c .options .pop ("target_nodes" , None )
2009-                 if  passed_targets  and  not  self ._is_nodes_flag (passed_targets ):
2010-                     target_nodes  =  self ._parse_target_nodes (passed_targets )
2011-                 else :
2012-                     target_nodes  =  self ._determine_nodes (
2013-                         * c .args , node_flag = passed_targets 
2014-                     )
2015-                     if  not  target_nodes :
1999+         try :
2000+             # as we move through each command that still needs to be processed, 
2001+             # we figure out the slot number that command maps to, then from 
2002+             # the slot determine the node. 
2003+             for  c  in  attempt :
2004+                 while  True :
2005+                     # refer to our internal node -> slot table that 
2006+                     # tells us where a given command should route to. 
2007+                     # (it might be possible we have a cached node that no longer 
2008+                     # exists in the cluster, which is why we do this in a loop) 
2009+                     passed_targets  =  c .options .pop ("target_nodes" , None )
2010+                     if  passed_targets  and  not  self ._is_nodes_flag (passed_targets ):
2011+                         target_nodes  =  self ._parse_target_nodes (passed_targets )
2012+                     else :
2013+                         target_nodes  =  self ._determine_nodes (
2014+                             * c .args , node_flag = passed_targets 
2015+                         )
2016+                         if  not  target_nodes :
2017+                             raise  RedisClusterException (
2018+                                 f"No targets were found to execute { c .args }  
2019+                             )
2020+                     if  len (target_nodes ) >  1 :
20162021                        raise  RedisClusterException (
2017-                             f"No  targets were found to execute  { c .args }  command on " 
2022+                             f"Too many  targets for command  { c .args }  
20182023                        )
2019-                 if  len (target_nodes ) >  1 :
2020-                     raise  RedisClusterException (
2021-                         f"Too many targets for command { c .args }  
2022-                     )
20232024
2024-                 node  =  target_nodes [0 ]
2025-                 if  node  ==  self .get_default_node ():
2026-                     is_default_node  =  True 
2025+                      node  =  target_nodes [0 ]
2026+                      if  node  ==  self .get_default_node ():
2027+                          is_default_node  =  True 
20272028
2028-                 # now that we know the name of the node 
2029-                 # ( it's just a string in the form of host:port ) 
2030-                 # we can build a list of commands for each node. 
2031-                 node_name  =  node .name 
2032-                 if  node_name  not  in nodes :
2033-                     redis_node  =  self .get_redis_connection (node )
2029+                     # now that we know the name of the node 
2030+                     # ( it's just a string in the form of host:port ) 
2031+                     # we can build a list of commands for each node. 
2032+                     node_name  =  node .name 
2033+                     if  node_name  not  in nodes :
2034+                         redis_node  =  self .get_redis_connection (node )
2035+                         try :
2036+                             connection  =  get_connection (redis_node , c .args )
2037+                         except  (ConnectionError , TimeoutError ) as  e :
2038+                             for  n  in  nodes .values ():
2039+                                 n .connection_pool .release (n .connection )
2040+                                 n .connection  =  None 
2041+                             nodes  =  {}
2042+                             if  self .retry  and  isinstance (
2043+                                 e , self .retry ._supported_errors 
2044+                             ):
2045+                                 backoff  =  self .retry ._backoff .compute (attempts_count )
2046+                                 if  backoff  >  0 :
2047+                                     time .sleep (backoff )
2048+                             self .nodes_manager .initialize ()
2049+                             if  is_default_node :
2050+                                 self .replace_default_node ()
2051+                             raise 
2052+                         nodes [node_name ] =  NodeCommands (
2053+                             redis_node .parse_response ,
2054+                             redis_node .connection_pool ,
2055+                             connection ,
2056+                         )
2057+                     nodes [node_name ].append (c )
2058+                     break 
2059+ 
2060+             # send the commands in sequence. 
2061+             # we  write to all the open sockets for each node first, 
2062+             # before reading anything 
2063+             # this allows us to flush all the requests out across the 
2064+             # network essentially in parallel 
2065+             # so that we can read them all in parallel as they come back. 
2066+             # we dont' multiplex on the sockets as they come available, 
2067+             # but that shouldn't make too much difference. 
2068+             node_commands  =  nodes .values ()
2069+             for  n  in  node_commands :
2070+                 n .write ()
2071+ 
2072+             for  n  in  node_commands :
2073+                 n .read ()
2074+ 
2075+             # release all of the redis connections we allocated earlier 
2076+             # back into the connection pool. 
2077+             # we used to do this step as part of a try/finally block, 
2078+             # but it is really dangerous to 
2079+             # release connections back into the pool if for some 
2080+             # reason the socket has data still left in it 
2081+             # from a previous operation. The write and 
2082+             # read operations already have try/catch around them for 
2083+             # all known types of errors including connection 
2084+             # and socket level errors. 
2085+             # So if we hit an exception, something really bad 
2086+             # happened and putting any oF 
2087+             # these connections back into the pool is a very bad idea. 
2088+             # the socket might have unread buffer still sitting in it, 
2089+             # and then the next time we read from it we pass the 
2090+             # buffered result back from a previous command and 
2091+             # every single request after to that connection will always get 
2092+             # a mismatched result. 
2093+             for  n  in  nodes .values ():
2094+                 n .connection_pool .release (n .connection )
2095+                 n .connection  =  None 
2096+             nodes  =  {}
2097+ 
2098+             # if the response isn't an exception it is a 
2099+             # valid response from the node 
2100+             # we're all done with that command, YAY! 
2101+             # if we have more commands to attempt, we've run into problems. 
2102+             # collect all the commands we are allowed to retry. 
2103+             # (MOVED, ASK, or connection errors or timeout errors) 
2104+             attempt  =  sorted (
2105+                 (
2106+                     c 
2107+                     for  c  in  attempt 
2108+                     if  isinstance (c .result , ClusterPipeline .ERRORS_ALLOW_RETRY )
2109+                 ),
2110+                 key = lambda  x : x .position ,
2111+             )
2112+             if  attempt  and  allow_redirections :
2113+                 # RETRY MAGIC HAPPENS HERE! 
2114+                 # send these remaing commands one at a time using `execute_command` 
2115+                 # in the main client. This keeps our retry logic 
2116+                 # in one place mostly, 
2117+                 # and allows us to be more confident in correctness of behavior. 
2118+                 # at this point any speed gains from pipelining have been lost 
2119+                 # anyway, so we might as well make the best 
2120+                 # attempt to get the correct behavior. 
2121+                 # 
2122+                 # The client command will handle retries for each 
2123+                 # individual command sequentially as we pass each 
2124+                 # one into `execute_command`. Any exceptions 
2125+                 # that bubble out should only appear once all 
2126+                 # retries have been exhausted. 
2127+                 # 
2128+                 # If a lot of commands have failed, we'll be setting the 
2129+                 # flag to rebuild the slots table from scratch. 
2130+                 # So MOVED errors should correct themselves fairly quickly. 
2131+                 self .reinitialize_counter  +=  1 
2132+                 if  self ._should_reinitialized ():
2133+                     self .nodes_manager .initialize ()
2134+                     if  is_default_node :
2135+                         self .replace_default_node ()
2136+                 for  c  in  attempt :
20342137                    try :
2035-                         connection  =  get_connection (redis_node , c .args )
2036-                     except  (ConnectionError , TimeoutError ) as  e :
2037-                         for  n  in  nodes .values ():
2038-                             n .connection_pool .release (n .connection )
2039-                         if  self .retry  and  isinstance (e , self .retry ._supported_errors ):
2040-                             backoff  =  self .retry ._backoff .compute (attempts_count )
2041-                             if  backoff  >  0 :
2042-                                 time .sleep (backoff )
2043-                         self .nodes_manager .initialize ()
2044-                         if  is_default_node :
2045-                             self .replace_default_node ()
2046-                         raise 
2047-                     nodes [node_name ] =  NodeCommands (
2048-                         redis_node .parse_response ,
2049-                         redis_node .connection_pool ,
2050-                         connection ,
2051-                     )
2052-                 nodes [node_name ].append (c )
2053-                 break 
2054- 
2055-         # send the commands in sequence. 
2056-         # we  write to all the open sockets for each node first, 
2057-         # before reading anything 
2058-         # this allows us to flush all the requests out across the 
2059-         # network essentially in parallel 
2060-         # so that we can read them all in parallel as they come back. 
2061-         # we dont' multiplex on the sockets as they come available, 
2062-         # but that shouldn't make too much difference. 
2063-         node_commands  =  nodes .values ()
2064-         for  n  in  node_commands :
2065-             n .write ()
2066- 
2067-         for  n  in  node_commands :
2068-             n .read ()
2069- 
2070-         # release all of the redis connections we allocated earlier 
2071-         # back into the connection pool. 
2072-         # we used to do this step as part of a try/finally block, 
2073-         # but it is really dangerous to 
2074-         # release connections back into the pool if for some 
2075-         # reason the socket has data still left in it 
2076-         # from a previous operation. The write and 
2077-         # read operations already have try/catch around them for 
2078-         # all known types of errors including connection 
2079-         # and socket level errors. 
2080-         # So if we hit an exception, something really bad 
2081-         # happened and putting any oF 
2082-         # these connections back into the pool is a very bad idea. 
2083-         # the socket might have unread buffer still sitting in it, 
2084-         # and then the next time we read from it we pass the 
2085-         # buffered result back from a previous command and 
2086-         # every single request after to that connection will always get 
2087-         # a mismatched result. 
2088-         for  n  in  nodes .values ():
2089-             n .connection_pool .release (n .connection )
2090- 
2091-         # if the response isn't an exception it is a 
2092-         # valid response from the node 
2093-         # we're all done with that command, YAY! 
2094-         # if we have more commands to attempt, we've run into problems. 
2095-         # collect all the commands we are allowed to retry. 
2096-         # (MOVED, ASK, or connection errors or timeout errors) 
2097-         attempt  =  sorted (
2098-             (
2099-                 c 
2100-                 for  c  in  attempt 
2101-                 if  isinstance (c .result , ClusterPipeline .ERRORS_ALLOW_RETRY )
2102-             ),
2103-             key = lambda  x : x .position ,
2104-         )
2105-         if  attempt  and  allow_redirections :
2106-             # RETRY MAGIC HAPPENS HERE! 
2107-             # send these remaing commands one at a time using `execute_command` 
2108-             # in the main client. This keeps our retry logic 
2109-             # in one place mostly, 
2110-             # and allows us to be more confident in correctness of behavior. 
2111-             # at this point any speed gains from pipelining have been lost 
2112-             # anyway, so we might as well make the best 
2113-             # attempt to get the correct behavior. 
2114-             # 
2115-             # The client command will handle retries for each 
2116-             # individual command sequentially as we pass each 
2117-             # one into `execute_command`. Any exceptions 
2118-             # that bubble out should only appear once all 
2119-             # retries have been exhausted. 
2120-             # 
2121-             # If a lot of commands have failed, we'll be setting the 
2122-             # flag to rebuild the slots table from scratch. 
2123-             # So MOVED errors should correct themselves fairly quickly. 
2124-             self .reinitialize_counter  +=  1 
2125-             if  self ._should_reinitialized ():
2126-                 self .nodes_manager .initialize ()
2127-                 if  is_default_node :
2128-                     self .replace_default_node ()
2129-             for  c  in  attempt :
2130-                 try :
2131-                     # send each command individually like we 
2132-                     # do in the main client. 
2133-                     c .result  =  super ().execute_command (* c .args , ** c .options )
2134-                 except  RedisError  as  e :
2135-                     c .result  =  e 
2136- 
2137-         # turn the response back into a simple flat array that corresponds 
2138-         # to the sequence of commands issued in the stack in pipeline.execute() 
2139-         response  =  []
2140-         for  c  in  sorted (stack , key = lambda  x : x .position ):
2141-             if  c .args [0 ] in  self .cluster_response_callbacks :
2142-                 c .result  =  self .cluster_response_callbacks [c .args [0 ]](
2143-                     c .result , ** c .options 
2144-                 )
2145-             response .append (c .result )
2146- 
2147-         if  raise_on_error :
2148-             self .raise_first_error (stack )
2138+                         # send each command individually like we 
2139+                         # do in the main client. 
2140+                         c .result  =  super ().execute_command (* c .args , ** c .options )
2141+                     except  RedisError  as  e :
2142+                         c .result  =  e 
21492143
2150-         return  response 
2144+             # turn the response back into a simple flat array that corresponds 
2145+             # to the sequence of commands issued in the stack in pipeline.execute() 
2146+             response  =  []
2147+             for  c  in  sorted (stack , key = lambda  x : x .position ):
2148+                 if  c .args [0 ] in  self .cluster_response_callbacks :
2149+                     c .result  =  self .cluster_response_callbacks [c .args [0 ]](
2150+                         c .result , ** c .options 
2151+                     )
2152+                 response .append (c .result )
2153+ 
2154+             if  raise_on_error :
2155+                 self .raise_first_error (stack )
2156+ 
2157+             return  response 
2158+         except  BaseException :
2159+             # if nodes is not empty, a problem must have occurred 
2160+             # since we cant guarantee the state of the connections, 
2161+             #   disconnect before returning it to the connection pool 
2162+             for  n  in  nodes .values ():
2163+                 if  n .connection :
2164+                     n .connection .disconnect ()
2165+                     n .connection_pool .release (n .connection )
2166+             raise 
21512167
21522168    def  _fail_on_redirect (self , allow_redirections ):
21532169        """ """ 
0 commit comments