@@ -584,7 +584,8 @@ def __init__(
584584 self .retry = retry
585585 kwargs .update ({"retry" : self .retry })
586586 else :
587- kwargs .update ({"retry" : Retry (default_backoff (), 0 )})
587+ self .retry = Retry (default_backoff (), 0 )
588+ kwargs ["retry" ] = self .retry
588589
589590 self .encoder = Encoder (
590591 kwargs .get ("encoding" , "utf-8" ),
@@ -767,6 +768,7 @@ def pipeline(self, transaction=None, shard_hint=None):
767768 read_from_replicas = self .read_from_replicas ,
768769 reinitialize_steps = self .reinitialize_steps ,
769770 lock = self ._lock ,
771+ retry = self .retry ,
770772 )
771773
772774 def lock (
@@ -850,41 +852,49 @@ def set_response_callback(self, command, callback):
850852 def _determine_nodes (self , * args , ** kwargs ) -> List ["ClusterNode" ]:
851853 # Determine which nodes should be executed the command on.
852854 # Returns a list of target nodes.
853- command = args [0 ].upper ()
854- if len (args ) >= 2 and f"{ args [0 ]} { args [1 ]} " .upper () in self .command_flags :
855- command = f"{ args [0 ]} { args [1 ]} " .upper ()
856-
857- nodes_flag = kwargs .pop ("nodes_flag" , None )
858- if nodes_flag is not None :
859- # nodes flag passed by the user
860- command_flag = nodes_flag
861- else :
862- # get the nodes group for this command if it was predefined
863- command_flag = self .command_flags .get (command )
864- if command_flag == self .__class__ .RANDOM :
865- # return a random node
866- return [self .get_random_node ()]
867- elif command_flag == self .__class__ .PRIMARIES :
868- # return all primaries
869- return self .get_primaries ()
870- elif command_flag == self .__class__ .REPLICAS :
871- # return all replicas
872- return self .get_replicas ()
873- elif command_flag == self .__class__ .ALL_NODES :
874- # return all nodes
875- return self .get_nodes ()
876- elif command_flag == self .__class__ .DEFAULT_NODE :
877- # return the cluster's default node
878- return [self .nodes_manager .default_node ]
879- elif command in self .__class__ .SEARCH_COMMANDS [0 ]:
880- return [self .nodes_manager .default_node ]
881- else :
882- # get the node that holds the key's slot
883- slot = self .determine_slot (* args )
884- node = self .nodes_manager .get_node_from_slot (
885- slot , self .read_from_replicas and command in READ_COMMANDS
886- )
887- return [node ]
855+ try :
856+ command = args [0 ].upper ()
857+ if len (args ) >= 2 and f"{ args [0 ]} { args [1 ]} " .upper () in self .command_flags :
858+ command = f"{ args [0 ]} { args [1 ]} " .upper ()
859+
860+ nodes_flag = kwargs .pop ("nodes_flag" , None )
861+ if nodes_flag is not None :
862+ # nodes flag passed by the user
863+ command_flag = nodes_flag
864+ else :
865+ # get the nodes group for this command if it was predefined
866+ command_flag = self .command_flags .get (command )
867+ if command_flag == self .__class__ .RANDOM :
868+ # return a random node
869+ return [self .get_random_node ()]
870+ elif command_flag == self .__class__ .PRIMARIES :
871+ # return all primaries
872+ return self .get_primaries ()
873+ elif command_flag == self .__class__ .REPLICAS :
874+ # return all replicas
875+ return self .get_replicas ()
876+ elif command_flag == self .__class__ .ALL_NODES :
877+ # return all nodes
878+ return self .get_nodes ()
879+ elif command_flag == self .__class__ .DEFAULT_NODE :
880+ # return the cluster's default node
881+ return [self .nodes_manager .default_node ]
882+ elif command in self .__class__ .SEARCH_COMMANDS [0 ]:
883+ return [self .nodes_manager .default_node ]
884+ else :
885+ # get the node that holds the key's slot
886+ slot = self .determine_slot (* args )
887+ node = self .nodes_manager .get_node_from_slot (
888+ slot , self .read_from_replicas and command in READ_COMMANDS
889+ )
890+ return [node ]
891+ except SlotNotCoveredError as e :
892+ self .reinitialize_counter += 1
893+ if self ._should_reinitialized ():
894+ self .nodes_manager .initialize ()
895+ # Reset the counter
896+ self .reinitialize_counter = 0
897+ raise e
888898
889899 def _should_reinitialized (self ):
890900 # To reinitialize the cluster on every MOVED error,
@@ -1076,6 +1086,12 @@ def execute_command(self, *args, **kwargs):
10761086 # The nodes and slots cache were reinitialized.
10771087 # Try again with the new cluster setup.
10781088 retry_attempts -= 1
1089+ if self .retry and isinstance (e , self .retry ._supported_errors ):
1090+ backoff = self .retry ._backoff .compute (
1091+ self .cluster_error_retry_attempts - retry_attempts
1092+ )
1093+ if backoff > 0 :
1094+ time .sleep (backoff )
10791095 continue
10801096 else :
10811097 # raise the exception
@@ -1135,8 +1151,6 @@ def _execute_command(self, target_node, *args, **kwargs):
11351151 # Remove the failed node from the startup nodes before we try
11361152 # to reinitialize the cluster
11371153 self .nodes_manager .startup_nodes .pop (target_node .name , None )
1138- # Reset the cluster node's connection
1139- target_node .redis_connection = None
11401154 self .nodes_manager .initialize ()
11411155 raise e
11421156 except MovedError as e :
@@ -1156,6 +1170,13 @@ def _execute_command(self, target_node, *args, **kwargs):
11561170 else :
11571171 self .nodes_manager .update_moved_exception (e )
11581172 moved = True
1173+ except SlotNotCoveredError as e :
1174+ self .reinitialize_counter += 1
1175+ if self ._should_reinitialized ():
1176+ self .nodes_manager .initialize ()
1177+ # Reset the counter
1178+ self .reinitialize_counter = 0
1179+ raise e
11591180 except TryAgainError :
11601181 if ttl < self .RedisClusterRequestTTL / 2 :
11611182 time .sleep (0.05 )
@@ -1387,7 +1408,10 @@ def get_node_from_slot(self, slot, read_from_replicas=False, server_type=None):
13871408 # randomly choose one of the replicas
13881409 node_idx = random .randint (1 , len (self .slots_cache [slot ]) - 1 )
13891410
1390- return self .slots_cache [slot ][node_idx ]
1411+ try :
1412+ return self .slots_cache [slot ][node_idx ]
1413+ except IndexError :
1414+ return self .slots_cache [slot ][0 ]
13911415
13921416 def get_nodes_by_server_type (self , server_type ):
13931417 """
@@ -1859,6 +1883,7 @@ def __init__(
18591883 cluster_error_retry_attempts : int = 3 ,
18601884 reinitialize_steps : int = 5 ,
18611885 lock = None ,
1886+ retry : Optional ["Retry" ] = None ,
18621887 ** kwargs ,
18631888 ):
18641889 """ """
@@ -1884,6 +1909,7 @@ def __init__(
18841909 if lock is None :
18851910 lock = threading .Lock ()
18861911 self ._lock = lock
1912+ self .retry = retry
18871913
18881914 def __repr__ (self ):
18891915 """ """
@@ -2016,8 +2042,9 @@ def send_cluster_commands(
20162042 stack ,
20172043 raise_on_error = raise_on_error ,
20182044 allow_redirections = allow_redirections ,
2045+ attempts_count = self .cluster_error_retry_attempts - retry_attempts ,
20192046 )
2020- except (ClusterDownError , ConnectionError ) as e :
2047+ except (ClusterDownError , ConnectionError , TimeoutError ) as e :
20212048 if retry_attempts > 0 :
20222049 # Try again with the new cluster setup. All other errors
20232050 # should be raised.
@@ -2027,7 +2054,7 @@ def send_cluster_commands(
20272054 raise e
20282055
20292056 def _send_cluster_commands (
2030- self , stack , raise_on_error = True , allow_redirections = True
2057+ self , stack , raise_on_error = True , allow_redirections = True , attempts_count = 0
20312058 ):
20322059 """
20332060 Send a bunch of cluster commands to the redis cluster.
@@ -2082,9 +2109,11 @@ def _send_cluster_commands(
20822109 redis_node = self .get_redis_connection (node )
20832110 try :
20842111 connection = get_connection (redis_node , c .args )
2085- except ConnectionError :
2086- # Connection retries are being handled in the node's
2087- # Retry object. Reinitialize the node -> slot table.
2112+ except (ConnectionError , TimeoutError ) as e :
2113+ if self .retry and isinstance (e , self .retry ._supported_errors ):
2114+ backoff = self .retry ._backoff .compute (attempts_count )
2115+ if backoff > 0 :
2116+ time .sleep (backoff )
20882117 self .nodes_manager .initialize ()
20892118 if is_default_node :
20902119 self .replace_default_node ()
0 commit comments