@@ -591,7 +591,8 @@ def __init__(
591591 self .retry = retry
592592 kwargs .update ({"retry" : self .retry })
593593 else :
594- kwargs .update ({"retry" : Retry (default_backoff (), 0 )})
594+ self .retry = Retry (default_backoff (), 0 )
595+ kwargs ["retry" ] = self .retry
595596
596597 self .encoder = Encoder (
597598 kwargs .get ("encoding" , "utf-8" ),
@@ -775,6 +776,7 @@ def pipeline(self, transaction=None, shard_hint=None):
775776 read_from_replicas = self .read_from_replicas ,
776777 reinitialize_steps = self .reinitialize_steps ,
777778 lock = self ._lock ,
779+ retry = self .retry ,
778780 )
779781
780782 def lock (
@@ -858,41 +860,49 @@ def set_response_callback(self, command, callback):
858860 def _determine_nodes (self , * args , ** kwargs ) -> List ["ClusterNode" ]:
859861 # Determine which nodes should be executed the command on.
860862 # Returns a list of target nodes.
861- command = args [0 ].upper ()
862- if len (args ) >= 2 and f"{ args [0 ]} { args [1 ]} " .upper () in self .command_flags :
863- command = f"{ args [0 ]} { args [1 ]} " .upper ()
864-
865- nodes_flag = kwargs .pop ("nodes_flag" , None )
866- if nodes_flag is not None :
867- # nodes flag passed by the user
868- command_flag = nodes_flag
869- else :
870- # get the nodes group for this command if it was predefined
871- command_flag = self .command_flags .get (command )
872- if command_flag == self .__class__ .RANDOM :
873- # return a random node
874- return [self .get_random_node ()]
875- elif command_flag == self .__class__ .PRIMARIES :
876- # return all primaries
877- return self .get_primaries ()
878- elif command_flag == self .__class__ .REPLICAS :
879- # return all replicas
880- return self .get_replicas ()
881- elif command_flag == self .__class__ .ALL_NODES :
882- # return all nodes
883- return self .get_nodes ()
884- elif command_flag == self .__class__ .DEFAULT_NODE :
885- # return the cluster's default node
886- return [self .nodes_manager .default_node ]
887- elif command in self .__class__ .SEARCH_COMMANDS [0 ]:
888- return [self .nodes_manager .default_node ]
889- else :
890- # get the node that holds the key's slot
891- slot = self .determine_slot (* args )
892- node = self .nodes_manager .get_node_from_slot (
893- slot , self .read_from_replicas and command in READ_COMMANDS
894- )
895- return [node ]
863+ try :
864+ command = args [0 ].upper ()
865+ if len (args ) >= 2 and f"{ args [0 ]} { args [1 ]} " .upper () in self .command_flags :
866+ command = f"{ args [0 ]} { args [1 ]} " .upper ()
867+
868+ nodes_flag = kwargs .pop ("nodes_flag" , None )
869+ if nodes_flag is not None :
870+ # nodes flag passed by the user
871+ command_flag = nodes_flag
872+ else :
873+ # get the nodes group for this command if it was predefined
874+ command_flag = self .command_flags .get (command )
875+ if command_flag == self .__class__ .RANDOM :
876+ # return a random node
877+ return [self .get_random_node ()]
878+ elif command_flag == self .__class__ .PRIMARIES :
879+ # return all primaries
880+ return self .get_primaries ()
881+ elif command_flag == self .__class__ .REPLICAS :
882+ # return all replicas
883+ return self .get_replicas ()
884+ elif command_flag == self .__class__ .ALL_NODES :
885+ # return all nodes
886+ return self .get_nodes ()
887+ elif command_flag == self .__class__ .DEFAULT_NODE :
888+ # return the cluster's default node
889+ return [self .nodes_manager .default_node ]
890+ elif command in self .__class__ .SEARCH_COMMANDS [0 ]:
891+ return [self .nodes_manager .default_node ]
892+ else :
893+ # get the node that holds the key's slot
894+ slot = self .determine_slot (* args )
895+ node = self .nodes_manager .get_node_from_slot (
896+ slot , self .read_from_replicas and command in READ_COMMANDS
897+ )
898+ return [node ]
899+ except SlotNotCoveredError as e :
900+ self .reinitialize_counter += 1
901+ if self ._should_reinitialized ():
902+ self .nodes_manager .initialize ()
903+ # Reset the counter
904+ self .reinitialize_counter = 0
905+ raise e
896906
897907 def _should_reinitialized (self ):
898908 # To reinitialize the cluster on every MOVED error,
@@ -1084,6 +1094,12 @@ def execute_command(self, *args, **kwargs):
10841094 # The nodes and slots cache were reinitialized.
10851095 # Try again with the new cluster setup.
10861096 retry_attempts -= 1
1097+ if self .retry and isinstance (e , self .retry ._supported_errors ):
1098+ backoff = self .retry ._backoff .compute (
1099+ self .cluster_error_retry_attempts - retry_attempts
1100+ )
1101+ if backoff > 0 :
1102+ time .sleep (backoff )
10871103 continue
10881104 else :
10891105 # raise the exception
@@ -1143,8 +1159,6 @@ def _execute_command(self, target_node, *args, **kwargs):
11431159 # Remove the failed node from the startup nodes before we try
11441160 # to reinitialize the cluster
11451161 self .nodes_manager .startup_nodes .pop (target_node .name , None )
1146- # Reset the cluster node's connection
1147- target_node .redis_connection = None
11481162 self .nodes_manager .initialize ()
11491163 raise e
11501164 except MovedError as e :
@@ -1164,6 +1178,13 @@ def _execute_command(self, target_node, *args, **kwargs):
11641178 else :
11651179 self .nodes_manager .update_moved_exception (e )
11661180 moved = True
1181+ except SlotNotCoveredError as e :
1182+ self .reinitialize_counter += 1
1183+ if self ._should_reinitialized ():
1184+ self .nodes_manager .initialize ()
1185+ # Reset the counter
1186+ self .reinitialize_counter = 0
1187+ raise e
11671188 except TryAgainError :
11681189 if ttl < self .RedisClusterRequestTTL / 2 :
11691190 time .sleep (0.05 )
@@ -1397,7 +1418,10 @@ def get_node_from_slot(self, slot, read_from_replicas=False, server_type=None):
13971418 # randomly choose one of the replicas
13981419 node_idx = random .randint (1 , len (self .slots_cache [slot ]) - 1 )
13991420
1400- return self .slots_cache [slot ][node_idx ]
1421+ try :
1422+ return self .slots_cache [slot ][node_idx ]
1423+ except IndexError :
1424+ return self .slots_cache [slot ][0 ]
14011425
14021426 def get_nodes_by_server_type (self , server_type ):
14031427 """
@@ -1774,6 +1798,7 @@ def __init__(
17741798 cluster_error_retry_attempts : int = 3 ,
17751799 reinitialize_steps : int = 5 ,
17761800 lock = None ,
1801+ retry : Optional ["Retry" ] = None ,
17771802 ** kwargs ,
17781803 ):
17791804 """ """
@@ -1799,6 +1824,7 @@ def __init__(
17991824 if lock is None :
18001825 lock = threading .Lock ()
18011826 self ._lock = lock
1827+ self .retry = retry
18021828
18031829 def __repr__ (self ):
18041830 """ """
@@ -1931,8 +1957,9 @@ def send_cluster_commands(
19311957 stack ,
19321958 raise_on_error = raise_on_error ,
19331959 allow_redirections = allow_redirections ,
1960+ attempts_count = self .cluster_error_retry_attempts - retry_attempts ,
19341961 )
1935- except (ClusterDownError , ConnectionError ) as e :
1962+ except (ClusterDownError , ConnectionError , TimeoutError ) as e :
19361963 if retry_attempts > 0 :
19371964 # Try again with the new cluster setup. All other errors
19381965 # should be raised.
@@ -1942,7 +1969,7 @@ def send_cluster_commands(
19421969 raise e
19431970
19441971 def _send_cluster_commands (
1945- self , stack , raise_on_error = True , allow_redirections = True
1972+ self , stack , raise_on_error = True , allow_redirections = True , attempts_count = 0
19461973 ):
19471974 """
19481975 Send a bunch of cluster commands to the redis cluster.
@@ -1997,9 +2024,11 @@ def _send_cluster_commands(
19972024 redis_node = self .get_redis_connection (node )
19982025 try :
19992026 connection = get_connection (redis_node , c .args )
2000- except ConnectionError :
2001- # Connection retries are being handled in the node's
2002- # Retry object. Reinitialize the node -> slot table.
2027+ except (ConnectionError , TimeoutError ) as e :
2028+ if self .retry and isinstance (e , self .retry ._supported_errors ):
2029+ backoff = self .retry ._backoff .compute (attempts_count )
2030+ if backoff > 0 :
2031+ time .sleep (backoff )
20032032 self .nodes_manager .initialize ()
20042033 if is_default_node :
20052034 self .replace_default_node ()
0 commit comments