@@ -576,7 +576,8 @@ def __init__(
576576 self .retry = retry
577577 kwargs .update ({"retry" : self .retry })
578578 else :
579- kwargs .update ({"retry" : Retry (default_backoff (), 0 )})
579+ self .retry = Retry (default_backoff (), 0 )
580+ kwargs ["retry" ] = self .retry
580581
581582 self .encoder = Encoder (
582583 kwargs .get ("encoding" , "utf-8" ),
@@ -759,6 +760,7 @@ def pipeline(self, transaction=None, shard_hint=None):
759760 read_from_replicas = self .read_from_replicas ,
760761 reinitialize_steps = self .reinitialize_steps ,
761762 lock = self ._lock ,
763+ retry = self .retry ,
762764 )
763765
764766 def lock (
@@ -842,41 +844,49 @@ def set_response_callback(self, command, callback):
842844 def _determine_nodes (self , * args , ** kwargs ) -> List ["ClusterNode" ]:
843845 # Determine which nodes should be executed the command on.
844846 # Returns a list of target nodes.
845- command = args [0 ].upper ()
846- if len (args ) >= 2 and f"{ args [0 ]} { args [1 ]} " .upper () in self .command_flags :
847- command = f"{ args [0 ]} { args [1 ]} " .upper ()
848-
849- nodes_flag = kwargs .pop ("nodes_flag" , None )
850- if nodes_flag is not None :
851- # nodes flag passed by the user
852- command_flag = nodes_flag
853- else :
854- # get the nodes group for this command if it was predefined
855- command_flag = self .command_flags .get (command )
856- if command_flag == self .__class__ .RANDOM :
857- # return a random node
858- return [self .get_random_node ()]
859- elif command_flag == self .__class__ .PRIMARIES :
860- # return all primaries
861- return self .get_primaries ()
862- elif command_flag == self .__class__ .REPLICAS :
863- # return all replicas
864- return self .get_replicas ()
865- elif command_flag == self .__class__ .ALL_NODES :
866- # return all nodes
867- return self .get_nodes ()
868- elif command_flag == self .__class__ .DEFAULT_NODE :
869- # return the cluster's default node
870- return [self .nodes_manager .default_node ]
871- elif command in self .__class__ .SEARCH_COMMANDS [0 ]:
872- return [self .nodes_manager .default_node ]
873- else :
874- # get the node that holds the key's slot
875- slot = self .determine_slot (* args )
876- node = self .nodes_manager .get_node_from_slot (
877- slot , self .read_from_replicas and command in READ_COMMANDS
878- )
879- return [node ]
847+ try :
848+ command = args [0 ].upper ()
849+ if len (args ) >= 2 and f"{ args [0 ]} { args [1 ]} " .upper () in self .command_flags :
850+ command = f"{ args [0 ]} { args [1 ]} " .upper ()
851+
852+ nodes_flag = kwargs .pop ("nodes_flag" , None )
853+ if nodes_flag is not None :
854+ # nodes flag passed by the user
855+ command_flag = nodes_flag
856+ else :
857+ # get the nodes group for this command if it was predefined
858+ command_flag = self .command_flags .get (command )
859+ if command_flag == self .__class__ .RANDOM :
860+ # return a random node
861+ return [self .get_random_node ()]
862+ elif command_flag == self .__class__ .PRIMARIES :
863+ # return all primaries
864+ return self .get_primaries ()
865+ elif command_flag == self .__class__ .REPLICAS :
866+ # return all replicas
867+ return self .get_replicas ()
868+ elif command_flag == self .__class__ .ALL_NODES :
869+ # return all nodes
870+ return self .get_nodes ()
871+ elif command_flag == self .__class__ .DEFAULT_NODE :
872+ # return the cluster's default node
873+ return [self .nodes_manager .default_node ]
874+ elif command in self .__class__ .SEARCH_COMMANDS [0 ]:
875+ return [self .nodes_manager .default_node ]
876+ else :
877+ # get the node that holds the key's slot
878+ slot = self .determine_slot (* args )
879+ node = self .nodes_manager .get_node_from_slot (
880+ slot , self .read_from_replicas and command in READ_COMMANDS
881+ )
882+ return [node ]
883+ except SlotNotCoveredError as e :
884+ self .reinitialize_counter += 1
885+ if self ._should_reinitialized ():
886+ self .nodes_manager .initialize ()
887+ # Reset the counter
888+ self .reinitialize_counter = 0
889+ raise e
880890
881891 def _should_reinitialized (self ):
882892 # To reinitialize the cluster on every MOVED error,
@@ -1068,6 +1078,12 @@ def execute_command(self, *args, **kwargs):
10681078 # The nodes and slots cache were reinitialized.
10691079 # Try again with the new cluster setup.
10701080 retry_attempts -= 1
1081+ if self .retry and isinstance (e , self .retry ._supported_errors ):
1082+ backoff = self .retry ._backoff .compute (
1083+ self .cluster_error_retry_attempts - retry_attempts
1084+ )
1085+ if backoff > 0 :
1086+ time .sleep (backoff )
10711087 continue
10721088 else :
10731089 # raise the exception
@@ -1127,8 +1143,6 @@ def _execute_command(self, target_node, *args, **kwargs):
11271143 # Remove the failed node from the startup nodes before we try
11281144 # to reinitialize the cluster
11291145 self .nodes_manager .startup_nodes .pop (target_node .name , None )
1130- # Reset the cluster node's connection
1131- target_node .redis_connection = None
11321146 self .nodes_manager .initialize ()
11331147 raise e
11341148 except MovedError as e :
@@ -1148,6 +1162,13 @@ def _execute_command(self, target_node, *args, **kwargs):
11481162 else :
11491163 self .nodes_manager .update_moved_exception (e )
11501164 moved = True
1165+ except SlotNotCoveredError as e :
1166+ self .reinitialize_counter += 1
1167+ if self ._should_reinitialized ():
1168+ self .nodes_manager .initialize ()
1169+ # Reset the counter
1170+ self .reinitialize_counter = 0
1171+ raise e
11511172 except TryAgainError :
11521173 if ttl < self .RedisClusterRequestTTL / 2 :
11531174 time .sleep (0.05 )
@@ -1379,7 +1400,10 @@ def get_node_from_slot(self, slot, read_from_replicas=False, server_type=None):
13791400 # randomly choose one of the replicas
13801401 node_idx = random .randint (1 , len (self .slots_cache [slot ]) - 1 )
13811402
1382- return self .slots_cache [slot ][node_idx ]
1403+ try :
1404+ return self .slots_cache [slot ][node_idx ]
1405+ except IndexError :
1406+ return self .slots_cache [slot ][0 ]
13831407
13841408 def get_nodes_by_server_type (self , server_type ):
13851409 """
@@ -1744,6 +1768,7 @@ def __init__(
17441768 cluster_error_retry_attempts : int = 3 ,
17451769 reinitialize_steps : int = 5 ,
17461770 lock = None ,
1771+ retry : Optional ["Retry" ] = None ,
17471772 ** kwargs ,
17481773 ):
17491774 """ """
@@ -1769,6 +1794,7 @@ def __init__(
17691794 if lock is None :
17701795 lock = threading .Lock ()
17711796 self ._lock = lock
1797+ self .retry = retry
17721798
17731799 def __repr__ (self ):
17741800 """ """
@@ -1901,8 +1927,9 @@ def send_cluster_commands(
19011927 stack ,
19021928 raise_on_error = raise_on_error ,
19031929 allow_redirections = allow_redirections ,
1930+ attempts_count = self .cluster_error_retry_attempts - retry_attempts ,
19041931 )
1905- except (ClusterDownError , ConnectionError ) as e :
1932+ except (ClusterDownError , ConnectionError , TimeoutError ) as e :
19061933 if retry_attempts > 0 :
19071934 # Try again with the new cluster setup. All other errors
19081935 # should be raised.
@@ -1912,7 +1939,7 @@ def send_cluster_commands(
19121939 raise e
19131940
19141941 def _send_cluster_commands (
1915- self , stack , raise_on_error = True , allow_redirections = True
1942+ self , stack , raise_on_error = True , allow_redirections = True , attempts_count = 0
19161943 ):
19171944 """
19181945 Send a bunch of cluster commands to the redis cluster.
@@ -1967,9 +1994,11 @@ def _send_cluster_commands(
19671994 redis_node = self .get_redis_connection (node )
19681995 try :
19691996 connection = get_connection (redis_node , c .args )
1970- except ConnectionError :
1971- # Connection retries are being handled in the node's
1972- # Retry object. Reinitialize the node -> slot table.
1997+ except (ConnectionError , TimeoutError ) as e :
1998+ if self .retry and isinstance (e , self .retry ._supported_errors ):
1999+ backoff = self .retry ._backoff .compute (attempts_count )
2000+ if backoff > 0 :
2001+ time .sleep (backoff )
19732002 self .nodes_manager .initialize ()
19742003 if is_default_node :
19752004 self .replace_default_node ()
0 commit comments