diff --git a/python/ray/serve/_private/controller.py b/python/ray/serve/_private/controller.py index 808ffec48d77..1caf1d856e40 100644 --- a/python/ray/serve/_private/controller.py +++ b/python/ray/serve/_private/controller.py @@ -82,6 +82,7 @@ HTTPOptionsSchema, LoggingConfig, ProxyDetails, + ReplicaRank, ServeActorDetails, ServeApplicationSchema, ServeDeploySchema, @@ -1205,12 +1206,14 @@ def record_request_routing_info(self, info: RequestRoutingInfo): """ self.deployment_state_manager.record_request_routing_info(info) - def _get_replica_ranks_mapping(self, deployment_id: DeploymentID) -> Dict[str, int]: + def _get_replica_ranks_mapping( + self, deployment_id: DeploymentID + ) -> Dict[str, ReplicaRank]: """Get the current rank mapping for all replicas in a deployment. Args: deployment_id: The deployment ID to get ranks for. Returns: - Dictionary mapping replica_id to rank. + Dictionary mapping replica_id to ReplicaRank object (with rank, node_rank, local_rank). """ return self.deployment_state_manager._get_replica_ranks_mapping(deployment_id) diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index c5d5aba98d8d..633e7beb7232 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -743,7 +743,10 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[str]]: replica_ready_check_func = ( self._actor_handle.initialize_and_get_metadata ) - self._rank = self._assign_rank_callback(self._replica_id.unique_id) + # this guarantees that node_id is set before rank is assigned + self._rank = self._assign_rank_callback( + self._replica_id.unique_id, self._node_id + ) self._ready_obj_ref = replica_ready_check_func.remote( deployment_config, self._rank ) @@ -1666,8 +1669,10 @@ class DeploymentRankManager: This class handles rank assignment, release, consistency checking, and reassignment. It maintains the rank system invariants and provides a clean interface for rank operations. - Maintains one level of rank tracking: + Maintains three levels of rank tracking: - Global rank: Replica-level rank across all nodes (0, 1, 2, ...) + - Local rank: Replica's rank within its node (0, 1, 2, ... per node) + - Node rank ID: Index assigned to each node (0, 1, 2, ...) """ def __init__(self, fail_on_rank_error: bool = True): @@ -1675,6 +1680,15 @@ def __init__(self, fail_on_rank_error: bool = True): self._replica_rank_manager = RankManager() self._fail_on_rank_error = fail_on_rank_error + # Node rank manager (assigns rank IDs to nodes) + self._node_rank_manager = RankManager() + + # Local rank managers (one per node, manages replica ranks within each node) + self._local_rank_managers: Dict[str, RankManager] = {} + + # Track which node each replica is on + self._replica_to_node: Dict[str, str] = {} + def _execute_with_error_handling(self, func, safe_default, *args, **kwargs): if self._fail_on_rank_error: # Let exceptions propagate @@ -1687,11 +1701,12 @@ def _execute_with_error_handling(self, func, safe_default, *args, **kwargs): logger.error(f"Error executing function {func.__name__}: {e}") return safe_default - def assign_rank(self, replica_id: str) -> ReplicaRank: + def assign_rank(self, replica_id: str, node_id: str) -> ReplicaRank: """Assign a rank to a new replica. Args: replica_id: The unique ID of the replica + node_id: The unique ID of the node Returns: ReplicaRank object with the assigned rank @@ -1706,13 +1721,25 @@ def _assign_rank_impl(): f"Rank for {replica_id} already assigned: {self._replica_rank_manager.get_rank(replica_id)}" ) + # Track the replica-to-node mapping + self._replica_to_node[replica_id] = node_id + # Assign global rank rank = self._replica_rank_manager.assign_rank(replica_id) - return ReplicaRank(rank=rank, node_rank=-1, local_rank=-1) + # Assign node rank if this node doesn't have one yet + if node_id not in self._local_rank_managers: + self._node_rank_manager.assign_rank(node_id) + self._local_rank_managers[node_id] = RankManager() + + node_rank = self._node_rank_manager.get_rank(node_id) + # Assign local rank within the node + local_rank = self._local_rank_managers[node_id].assign_rank(replica_id) + + return ReplicaRank(rank=rank, node_rank=node_rank, local_rank=local_rank) return self._execute_with_error_handling( - _assign_rank_impl, ReplicaRank(rank=0, node_rank=-1, local_rank=-1) + _assign_rank_impl, ReplicaRank(rank=0, node_rank=0, local_rank=0) ) def release_rank(self, replica_id: str) -> None: @@ -1729,20 +1756,36 @@ def _release_rank_impl(): if not self.has_replica_rank(replica_id): raise RuntimeError(f"Rank for {replica_id} not assigned") + # Get the node_id from the replica mapping + node_id = self._replica_to_node[replica_id] + # Release global rank self._replica_rank_manager.release_rank(replica_id) + # Release local rank + self._local_rank_managers[node_id].release_rank(replica_id) + + # Release node rank if this was the last replica on the node + if len(self._local_rank_managers[node_id].get_ranks_mapping()) == 0: + self._node_rank_manager.release_rank(node_id) + del self._local_rank_managers[node_id] + + # Remove replica from node mapping + del self._replica_to_node[replica_id] + return self._execute_with_error_handling(_release_rank_impl, None) def recover_rank( self, replica_id: str, + node_id: str, rank: ReplicaRank, ) -> None: """Recover rank for a replica (e.g., after controller restart). Args: replica_id: ID of the replica + node_id: ID of the node rank: The rank to recover Raises: @@ -1758,6 +1801,18 @@ def _recover_rank_impl(): # Recover global rank self._replica_rank_manager.recover_rank(replica_id, rank.rank) + # Recover node rank only if this node doesn't already have one + if not self._node_rank_manager.has_rank(node_id): + self._node_rank_manager.recover_rank(node_id, rank.node_rank) + + # Recover local rank + if node_id not in self._local_rank_managers: + self._local_rank_managers[node_id] = RankManager() + self._local_rank_managers[node_id].recover_rank(replica_id, rank.local_rank) + + # Track the replica-to-node mapping + self._replica_to_node[replica_id] = node_id + return self._execute_with_error_handling(_recover_rank_impl, None) def has_replica_rank(self, replica_id: str) -> bool: @@ -1772,7 +1827,16 @@ def has_replica_rank(self, replica_id: str) -> bool: Raises: RuntimeError: If the replica doesn't have ranks assigned """ - return self._replica_rank_manager.has_rank(replica_id) + if replica_id not in self._replica_to_node: + return False + + node_id = self._replica_to_node[replica_id] + return ( + self._replica_rank_manager.has_rank(replica_id) + and node_id in self._local_rank_managers + and self._node_rank_manager.has_rank(node_id) + and self._local_rank_managers[node_id].has_rank(replica_id) + ) def get_replica_rank(self, replica_id: str) -> ReplicaRank: """Get the rank for a replica. @@ -1792,10 +1856,15 @@ def _get_replica_rank_impl(): raise RuntimeError(f"Rank for {replica_id} not assigned") global_rank = self._replica_rank_manager.get_rank(replica_id) - return ReplicaRank(rank=global_rank, node_rank=-1, local_rank=-1) + node_id = self._replica_to_node[replica_id] + node_rank = self._node_rank_manager.get_rank(node_id) + local_rank = self._local_rank_managers[node_id].get_rank(replica_id) + return ReplicaRank( + rank=global_rank, node_rank=node_rank, local_rank=local_rank + ) return self._execute_with_error_handling( - _get_replica_rank_impl, ReplicaRank(rank=0, node_rank=-1, local_rank=-1) + _get_replica_rank_impl, ReplicaRank(rank=0, node_rank=0, local_rank=0) ) def check_rank_consistency_and_reassign_minimally( @@ -1806,6 +1875,8 @@ def check_rank_consistency_and_reassign_minimally( This method ensures: 1. Global ranks are contiguous [0, N-1] for N replicas + 2. Node ranks are contiguous [0, M-1] for M nodes + 3. Local ranks are contiguous [0, K-1] for K replicas on each node Args: active_replicas: List of currently active replicas @@ -1837,6 +1908,35 @@ def _check_rank_consistency_impl(): ) all_replica_ids_needing_reconfiguration.update(replica_ids_from_global) + # STEP 2: Group replicas by node and check local rank consistency per node + replicas_by_node: Dict[str, List[str]] = {} + for replica_id in active_replica_ids: + node_id = self._replica_to_node.get(replica_id) + assert ( + node_id is not None + ), f"Replica {replica_id} not assigned to any node" + if node_id not in replicas_by_node: + replicas_by_node[node_id] = [] + replicas_by_node[node_id].append(replica_id) + + for node_id, replica_ids_on_node in replicas_by_node.items(): + replica_ids_from_local = self._local_rank_managers[ + node_id + ].check_rank_consistency_and_reassign_minimally(replica_ids_on_node) + all_replica_ids_needing_reconfiguration.update(replica_ids_from_local) + + # STEP 3: Check node rank consistency + active_node_ids = list(replicas_by_node.keys()) + if active_node_ids: + node_ids_needing_reassignment = self._node_rank_manager.check_rank_consistency_and_reassign_minimally( + active_node_ids, + ) + # If any nodes were reassigned, all replicas on those nodes need reconfiguration + for node_id in node_ids_needing_reassignment: + all_replica_ids_needing_reconfiguration.update( + replicas_by_node[node_id] + ) + # Convert replica IDs back to replica objects # Filter out stale replicas that are not in the active set replicas_needing_reconfiguration = [ @@ -1851,9 +1951,20 @@ def _check_rank_consistency_impl(): def clear(self) -> None: self._replica_rank_manager.clear() + self._node_rank_manager.clear() + self._local_rank_managers.clear() + self._replica_to_node.clear() - def get_replica_ranks_mapping(self) -> Dict[str, int]: - return self._replica_rank_manager.get_ranks_mapping() + def get_replica_ranks_mapping(self) -> Dict[str, ReplicaRank]: + """Get the current mapping of replica IDs to ReplicaRank objects. + + Returns: + Dict mapping replica_id to ReplicaRank object + """ + result = {} + for replica_id in self._replica_rank_manager.get_ranks_mapping().keys(): + result[replica_id] = self.get_replica_rank(replica_id) + return result class DeploymentState: @@ -2679,7 +2790,9 @@ def _check_startup_replicas( # data structure with RUNNING state. # Recover rank from the replica actor during controller restart replica_id = replica.replica_id.unique_id - self._rank_manager.recover_rank(replica_id, replica.rank) + self._rank_manager.recover_rank( + replica_id, replica.actor_node_id, replica.rank + ) # This replica should be now be added to handle's replica # set. self._replicas.add(ReplicaState.RUNNING, replica) @@ -2913,10 +3026,13 @@ def check_and_update_replicas(self): # Release rank only after replica is successfully stopped # This ensures rank is available during draining/graceful shutdown replica_id = replica.replica_id.unique_id - self._rank_manager.release_rank(replica_id) - logger.debug( - f"Released rank from replica {replica_id} in deployment {self._id}" - ) + if self._rank_manager.has_replica_rank(replica_id): + # Only release rank if assigned. Replicas that failed allocation + # or never reached RUNNING state won't have ranks. + self._rank_manager.release_rank(replica_id) + logger.debug( + f"Released rank from replica {replica_id} in deployment {self._id}" + ) self._autoscaling_state_manager.on_replica_stopped(replica.replica_id) # After replica state updates, check rank consistency and perform minimal reassignment if needed @@ -2970,10 +3086,11 @@ def _reconfigure_replicas_with_new_ranks( f"Successfully reconfigured {updated_count} replicas with new ranks in deployment {self._id}" ) - def _get_replica_ranks_mapping(self) -> Dict[str, int]: - """Get the current mapping of replica IDs to ranks. + def _get_replica_ranks_mapping(self) -> Dict[str, ReplicaRank]: + """Get the current mapping of replica IDs to ReplicaRank objects. + Returns: - Dictionary mapping replica_id to rank. + Dictionary mapping replica_id to ReplicaRank object (with rank, node_rank, local_rank). """ return self._rank_manager.get_replica_ranks_mapping() @@ -3721,12 +3838,14 @@ def get_ingress_replicas_info(self) -> List[Tuple[str, str, int, int]]: ) return ingress_replicas_info - def _get_replica_ranks_mapping(self, deployment_id: DeploymentID) -> Dict[str, int]: + def _get_replica_ranks_mapping( + self, deployment_id: DeploymentID + ) -> Dict[str, ReplicaRank]: """Get the current rank mapping for all replicas in a deployment. Args: deployment_id: The deployment ID to get ranks for. Returns: - Dictionary mapping replica_id to rank. + Dictionary mapping replica_id to ReplicaRank object (with rank, node_rank, local_rank). """ deployment_state = self._deployment_states.get(deployment_id) if deployment_state is None: diff --git a/python/ray/serve/tests/test_replica_ranks.py b/python/ray/serve/tests/test_replica_ranks.py index ce74f643936f..28e90a45c3b0 100644 --- a/python/ray/serve/tests/test_replica_ranks.py +++ b/python/ray/serve/tests/test_replica_ranks.py @@ -30,8 +30,15 @@ def get_controller() -> ServeController: return ray.get_actor(SERVE_CONTROLLER_NAME, namespace=SERVE_NAMESPACE) -def get_replica_ranks(deployment_name: str) -> Dict[str, int]: - """Get the current rank mapping for all replicas in a deployment.""" +def get_replica_ranks(deployment_name: str) -> Dict[str, ReplicaRank]: + """Get the current rank mapping for all replicas in a deployment. + + Args: + deployment_name: Name of the deployment to get ranks for + + Returns: + Dict mapping replica_id to ReplicaRank object + """ controller = get_controller() deployment_id = DeploymentID(name=deployment_name, app_name=SERVE_DEFAULT_APP_NAME) @@ -51,19 +58,64 @@ def get_running_replica_ids(deployment_name: str) -> List[str]: return [replica.replica_id.unique_id for replica in running_replicas] -def check_rank_contiguity(ranks: Dict[str, int]) -> bool: - """Check that ranks form a contiguous sequence from 0 to N-1.""" +def check_rank_contiguity(ranks: Dict[str, ReplicaRank]) -> bool: + """Check that all rank types form contiguous sequences from 0 to N-1. + + Args: + ranks: Dict mapping replica_id to ReplicaRank object + + Returns: + True if all rank types (global, node, local) are contiguous + """ if not ranks: return True - rank_values = sorted(ranks.values()) - expected = list(range(len(rank_values))) - assert rank_values == expected, f"Expected {expected}, got {rank_values}" + # Check global ranks are contiguous + global_ranks = sorted([r.rank for r in ranks.values()]) + expected_global = list(range(len(global_ranks))) + if global_ranks != expected_global: + print( + f"Global ranks not contiguous. Expected {expected_global}, got {global_ranks}" + ) + return False + + # Group by node_rank and check local ranks are contiguous per node + replicas_by_node = {} + node_ranks_set = set() + for replica_id, rank_obj in ranks.items(): + node_rank = rank_obj.node_rank + node_ranks_set.add(node_rank) + if node_rank not in replicas_by_node: + replicas_by_node[node_rank] = [] + replicas_by_node[node_rank].append(rank_obj.local_rank) + + # Check node ranks are contiguous + node_ranks_sorted = sorted(node_ranks_set) + expected_node_ranks = list(range(len(node_ranks_sorted))) + if node_ranks_sorted != expected_node_ranks: + print( + f"Node ranks not contiguous. Expected {expected_node_ranks}, got {node_ranks_sorted}" + ) + return False + + # Check local ranks are contiguous per node + for node_rank, local_ranks in replicas_by_node.items(): + local_ranks_sorted = sorted(local_ranks) + expected_local = list(range(len(local_ranks_sorted))) + if local_ranks_sorted != expected_local: + print( + f"Local ranks not contiguous on node {node_rank}. Expected {expected_local}, got {local_ranks_sorted}" + ) + return False + return True def check_rank_assignment_complete(deployment_name: str, expected_count: int) -> bool: - """Check that all replicas have been assigned ranks and they are contiguous.""" + """Check that all replicas have been assigned ranks and they are contiguous. + + This validates global ranks, node ranks, and local ranks for all running replicas. + """ try: replica_ids = get_running_replica_ids(deployment_name) ranks = get_replica_ranks(deployment_name) @@ -71,18 +123,23 @@ def check_rank_assignment_complete(deployment_name: str, expected_count: int) -> # Check all running replicas have ranks for replica_id in replica_ids: if replica_id not in ranks: - print(f"Replica {replica_id} not found in ranks: {ranks}") + print(f"Replica {replica_id} not found in ranks: {ranks.keys()}") return False # Check we have expected number of ranks if len(ranks) != expected_count: - print(f"Expected {expected_count} ranks, got {len(ranks)}: {ranks}") + print( + f"Expected {expected_count} ranks, got {len(ranks)}: {list(ranks.keys())}" + ) return False - # Check ranks are contiguous + # Check all rank types are contiguous (global, node, local) return check_rank_contiguity(ranks) except Exception as e: print(f"Error checking rank assignment: {e}") + import traceback + + traceback.print_exc() return False @@ -136,6 +193,135 @@ def __call__(self): assert 0 <= rank < num_replicas +def test_node_and_local_rank_assignment(serve_instance): + """Test node_rank and local_rank assignment in addition to global rank.""" + + @serve.deployment(num_replicas=4) + class NodeRankTracker: + def __call__(self): + context = serve.get_replica_context() + if context.rank: + return { + "rank": context.rank.rank, + "node_rank": context.rank.node_rank, + "local_rank": context.rank.local_rank, + "world_size": context.world_size, + } + return None + + handle = serve.run(NodeRankTracker.bind()) + + # Wait for all replicas to be running + wait_for_condition( + lambda: check_rank_assignment_complete("NodeRankTracker", 4), + ) + + # Collect responses from all replicas + responses = [] + max_attempts = 50 + for _ in range(max_attempts): + response = handle.remote().result() + if response and response not in responses: + responses.append(response) + if len(responses) == 4: + break + + assert len(responses) == 4, f"Expected 4 unique responses, got {len(responses)}" + + # Verify all responses have valid ranks + global_ranks = set() + node_ranks = set() + replicas_by_node = {} + + for response in responses: + assert response["world_size"] == 4 + + # Check global rank + global_rank = response["rank"] + assert 0 <= global_rank < 4 + assert global_rank not in global_ranks, "Duplicate global rank found" + global_ranks.add(global_rank) + + # Check node_rank and local_rank + node_rank = response["node_rank"] + local_rank = response["local_rank"] + assert node_rank >= 0 + assert local_rank >= 0 + node_ranks.add(node_rank) + + # Track replicas by node for local rank verification + if node_rank not in replicas_by_node: + replicas_by_node[node_rank] = [] + replicas_by_node[node_rank].append(local_rank) + + # Verify global ranks are contiguous 0..3 + assert global_ranks == {0, 1, 2, 3} + + # Verify node ranks are contiguous starting from 0 + assert min(node_ranks) == 0 + assert max(node_ranks) == len(node_ranks) - 1 + + # Verify local ranks within each node are contiguous starting from 0 + for node_rank, local_ranks_list in replicas_by_node.items(): + local_ranks_set = set(local_ranks_list) + expected_local_ranks = set(range(len(local_ranks_list))) + assert local_ranks_set == expected_local_ranks, ( + f"Node {node_rank} has non-contiguous local ranks: {local_ranks_set}, " + f"expected {expected_local_ranks}" + ) + + +def test_local_rank_contiguity_within_node(serve_instance): + """Test that local ranks are contiguous within each node.""" + + @serve.deployment(num_replicas=3) + class LocalRankTracker: + def __call__(self): + context = serve.get_replica_context() + if context.rank: + return { + "rank": context.rank.rank, + "node_rank": context.rank.node_rank, + "local_rank": context.rank.local_rank, + } + return None + + handle = serve.run(LocalRankTracker.bind()) + + # Wait for all replicas to be running + wait_for_condition( + lambda: check_rank_assignment_complete("LocalRankTracker", 3), + ) + + # Collect all responses + responses = [] + for _ in range(30): + response = handle.remote().result() + if response and response not in responses: + responses.append(response) + if len(responses) == 3: + break + + assert len(responses) == 3 + + # Group by node_rank and check local_rank contiguity + by_node = {} + for r in responses: + node_rank = r["node_rank"] + if node_rank not in by_node: + by_node[node_rank] = [] + by_node[node_rank].append(r["local_rank"]) + + # Within each node, local ranks should start at 0 and be contiguous + for node_rank, local_ranks in by_node.items(): + local_ranks_sorted = sorted(local_ranks) + expected = list(range(len(local_ranks))) + assert local_ranks_sorted == expected, ( + f"Node {node_rank} has non-contiguous local ranks: " + f"{local_ranks_sorted}, expected {expected}" + ) + + def test_rank_assignment_with_autoscaling(serve_instance): """Test rank assignment and reassignment during autoscaling.""" signal_actor = SignalActor.remote() @@ -158,6 +344,8 @@ async def __call__(self): context = serve.get_replica_context() return { "rank": context.rank.rank if context.rank else None, + "node_rank": context.rank.node_rank if context.rank else None, + "local_rank": context.rank.local_rank if context.rank else None, "world_size": context.world_size, } @@ -267,6 +455,8 @@ def __call__(self): context = serve.get_replica_context() return { "rank": context.rank.rank if context.rank else None, + "node_rank": context.rank.node_rank if context.rank else None, + "local_rank": context.rank.local_rank if context.rank else None, "world_size": context.world_size, } @@ -280,11 +470,16 @@ def __call__(self): # Verify single replica has rank 0 ranks = get_replica_ranks("SingleReplicaTracker") assert len(ranks) == 1 - assert 0 in ranks.values() + rank_obj = list(ranks.values())[0] + assert rank_obj.rank == 0 + assert rank_obj.node_rank == 0 + assert rank_obj.local_rank == 0 - # Verify API returns correct values + # Verify API returns correct values for all rank types response = handle.remote().result() assert response["rank"] == 0 + assert response["node_rank"] == 0 + assert response["local_rank"] == 0 assert response["world_size"] == 1 @@ -333,11 +528,13 @@ def __call__(self): assert check_rank_contiguity(ranks2) # Both should have rank 0 (in their own space) - assert 0 in ranks1.values() - assert 0 in ranks2.values() - assert 1 in ranks1.values() - assert 1 in ranks2.values() - assert 2 in ranks2.values() # Only deployment2 should have rank 2 + ranks1_global = {r.rank for r in ranks1.values()} + ranks2_global = {r.rank for r in ranks2.values()} + assert 0 in ranks1_global + assert 0 in ranks2_global + assert 1 in ranks1_global + assert 1 in ranks2_global + assert 2 in ranks2_global # Only deployment2 should have rank 2 handle1 = serve.get_deployment_handle("deployment1", SERVE_DEFAULT_APP_NAME) handle2 = serve.get_deployment_handle("deployment2", SERVE_DEFAULT_APP_NAME) @@ -396,6 +593,99 @@ def _check(): assert final_ranks[replica_id] == initial_ranks[replica_id] +def test_node_rank_stability_on_replica_death(serve_instance): + """Test that node_rank and local_rank are correctly maintained when replicas die.""" + + @serve.deployment(num_replicas=4) + class NodeRankStabilityTracker: + def __call__(self): + context = serve.get_replica_context() + if context.rank: + return { + "rank": context.rank.rank, + "node_rank": context.rank.node_rank, + "local_rank": context.rank.local_rank, + "replica_id": context.replica_id.unique_id, + } + return None + + handle = serve.run(NodeRankStabilityTracker.bind()) + + # Wait for all replicas to be running + wait_for_condition( + lambda: check_rank_assignment_complete("NodeRankStabilityTracker", 4), + ) + + # Collect initial rank information + initial_responses = [] + for _ in range(50): + response = handle.remote().result() + if response and response not in initial_responses: + initial_responses.append(response) + if len(initial_responses) == 4: + break + + assert len(initial_responses) == 4 + + # Kill a random replica + random_replica = random.choice(initial_responses) + killed_replica_id = random_replica["replica_id"] + + replica_handle = ray.get_actor( + f"SERVE_REPLICA::default#NodeRankStabilityTracker#{killed_replica_id}", + namespace=SERVE_NAMESPACE, + ) + ray.kill(replica_handle, no_restart=False) + + # Wait for the replica to be restarted + def _check_replica_restarted(): + replica_ids = get_running_replica_ids("NodeRankStabilityTracker") + return len(replica_ids) == 4 and killed_replica_id not in replica_ids + + wait_for_condition(_check_replica_restarted, timeout=20) + + # Wait for rank assignment to be complete + wait_for_condition( + lambda: check_rank_assignment_complete("NodeRankStabilityTracker", 4), + ) + + # Collect final rank information + final_responses = [] + for _ in range(50): + response = handle.remote().result() + if response and response not in final_responses: + final_responses.append(response) + if len(final_responses) == 4: + break + + assert len(final_responses) == 4 + + # Create mappings for comparison + initial_by_replica_id = {r["replica_id"]: r for r in initial_responses} + final_by_replica_id = {r["replica_id"]: r for r in final_responses} + + # Verify that surviving replicas kept their ranks + for replica_id in initial_by_replica_id: + if replica_id != killed_replica_id and replica_id in final_by_replica_id: + initial = initial_by_replica_id[replica_id] + final = final_by_replica_id[replica_id] + + # All rank values should be preserved + assert ( + initial["rank"] == final["rank"] + ), f"Global rank changed for replica {replica_id}" + assert ( + initial["node_rank"] == final["node_rank"] + ), f"Node rank changed for replica {replica_id}" + assert ( + initial["local_rank"] == final["local_rank"] + ), f"Local rank changed for replica {replica_id}" + + # Verify all global ranks are still contiguous + global_ranks = sorted([r["rank"] for r in final_responses]) + assert global_ranks == [0, 1, 2, 3] + + def test_user_reconfigure_rank(serve_instance): """Test that user can reconfigure the rank of a deployment.""" signal_actor = SignalActor.remote() @@ -452,5 +742,76 @@ def _check(): wait_for_condition(_check) +def test_user_reconfigure_with_all_rank_fields(serve_instance): + """Test that reconfigure receives all rank fields (rank, node_rank, local_rank).""" + signal_actor = SignalActor.remote() + + @serve.deployment(num_replicas=3, max_ongoing_requests=1) + class AllRanksTracker: + def __init__(self): + self.rank_info = None + + async def __call__(self): + await signal_actor.wait.remote() + return self.rank_info + + async def reconfigure(self, user_config: Any, rank: ReplicaRank): + # Store all rank information + self.rank_info = { + "rank": rank.rank, + "node_rank": rank.node_rank, + "local_rank": rank.local_rank, + } + + handle = serve.run(AllRanksTracker.bind()) + wait_for_condition( + lambda: check_rank_assignment_complete("AllRanksTracker", 3), + ) + + # Send requests to all replicas + futures = [handle.remote() for _ in range(3)] + + wait_for_condition( + lambda: ray.get(signal_actor.cur_num_waiters.remote()) == 3, + ) + + signal_actor.send.remote() + + # Collect results + results = [f.result() for f in futures] + + # Verify all replicas received their rank information + global_ranks = [] + node_ranks = [] + local_ranks = [] + + for result in results: + assert result is not None, "Replica did not receive rank information" + assert "rank" in result + assert "node_rank" in result + assert "local_rank" in result + + # Validate rank values are in expected range + assert result["rank"] in {0, 1, 2}, f"Invalid global rank: {result['rank']}" + + global_ranks.append(result["rank"]) + node_ranks.append(result["node_rank"]) + local_ranks.append(result["local_rank"]) + + # Verify global ranks are unique and complete + assert set(global_ranks) == {0, 1, 2} + + # Verify node ranks form contiguous sequence starting from 0 + node_ranks_sorted = sorted(set(node_ranks)) + expected_node_ranks = list(range(len(node_ranks_sorted))) + assert ( + node_ranks_sorted == expected_node_ranks + ), f"Node ranks not contiguous from 0: {node_ranks_sorted}" + + # Verify local ranks are valid (non-negative and reasonable) + for local_rank in local_ranks: + assert local_rank in range(3), f"Invalid local rank: {local_rank}" + + if __name__ == "__main__": sys.exit(pytest.main(["-v", "-s", __file__])) diff --git a/python/ray/serve/tests/unit/test_deployment_rank_manager.py b/python/ray/serve/tests/unit/test_deployment_rank_manager.py index 3ff3b04fa4f3..f40782bdf9a9 100644 --- a/python/ray/serve/tests/unit/test_deployment_rank_manager.py +++ b/python/ray/serve/tests/unit/test_deployment_rank_manager.py @@ -36,18 +36,18 @@ def test_init(self, rank_manager): """Test initialization creates empty state.""" assert rank_manager.get_replica_ranks_mapping() == {} - def test_assign_rank_first_replica(self, rank_manager): + def test_assign_rank_first_replica(self, rank_manager: DeploymentRankManager): """Test assigning rank to first replica.""" - rank = rank_manager.assign_rank("replica_1") + rank = rank_manager.assign_rank("replica_1", "node_1") assert rank.rank == 0 assert rank_manager.has_replica_rank("replica_1") assert rank_manager.get_replica_rank("replica_1").rank == 0 - def test_assign_rank_multiple_replicas(self, rank_manager): + def test_assign_rank_multiple_replicas(self, rank_manager: DeploymentRankManager): """Test assigning ranks to multiple replicas.""" - rank1 = rank_manager.assign_rank("replica_1") - rank2 = rank_manager.assign_rank("replica_2") - rank3 = rank_manager.assign_rank("replica_3") + rank1 = rank_manager.assign_rank("replica_1", "node_1") + rank2 = rank_manager.assign_rank("replica_2", "node_1") + rank3 = rank_manager.assign_rank("replica_3", "node_1") assert rank1.rank == 0 assert rank2.rank == 1 @@ -55,36 +55,43 @@ def test_assign_rank_multiple_replicas(self, rank_manager): mapping = rank_manager.get_replica_ranks_mapping() assert len(mapping) == 3 - assert mapping == {"replica_1": 0, "replica_2": 1, "replica_3": 2} - - def test_assign_rank_reuses_released_ranks(self, rank_manager): + assert mapping == { + "replica_1": ReplicaRank(rank=0, node_rank=0, local_rank=0), + "replica_2": ReplicaRank(rank=1, node_rank=0, local_rank=1), + "replica_3": ReplicaRank(rank=2, node_rank=0, local_rank=2), + } + + def test_assign_rank_reuses_released_ranks( + self, rank_manager: DeploymentRankManager + ): """Test that released ranks are reused before assigning new ones.""" # Assign ranks to 3 replicas - rank_manager.assign_rank("replica_1") - rank_manager.assign_rank("replica_2") - rank_manager.assign_rank("replica_3") + rank_manager.assign_rank("replica_1", "node_1") + rank_manager.assign_rank("replica_2", "node_1") + rank_manager.assign_rank("replica_3", "node_1") # Release middle rank rank_manager.release_rank("replica_2") assert not rank_manager.has_replica_rank("replica_2") # New replica should get the released rank (1) - rank = rank_manager.assign_rank("replica_4") + rank = rank_manager.assign_rank("replica_4", "node_1") assert rank.rank == 1 - assert rank_manager.get_replica_rank("replica_4").rank == 1 + assert rank_manager.get_replica_rank("replica_4") == ReplicaRank( + rank=1, node_rank=0, local_rank=1 + ) - def test_assign_rank_duplicate_fails(self): + def test_assign_rank_duplicate_fails(self, rank_manager: DeploymentRankManager): """Test assigning rank to replica that already has one fails.""" - rank_manager = DeploymentRankManager() - rank_manager.assign_rank("replica_1") + rank_manager.assign_rank("replica_1", "node_1") with pytest.raises(RuntimeError, match="already assigned"): - rank_manager.assign_rank("replica_1") + rank_manager.assign_rank("replica_1", "node_1") - def test_release_rank(self, rank_manager): + def test_release_rank(self, rank_manager: DeploymentRankManager): """Test releasing a rank makes it available for reuse.""" - rank_manager.assign_rank("replica_1") - rank_manager.assign_rank("replica_2") + rank_manager.assign_rank("replica_1", "node_1") + rank_manager.assign_rank("replica_2", "node_1") rank_manager.release_rank("replica_1") @@ -92,44 +99,51 @@ def test_release_rank(self, rank_manager): assert rank_manager.has_replica_rank("replica_2") assert rank_manager.get_replica_rank("replica_2").rank == 1 - def test_release_rank_nonexistent_replica(self): + def test_release_rank_nonexistent_replica( + self, rank_manager: DeploymentRankManager + ): """Test releasing rank for non-existent replica fails.""" - rank_manager = DeploymentRankManager() with pytest.raises(RuntimeError, match="not assigned"): rank_manager.release_rank("nonexistent") - def test_recover_rank_basic(self, rank_manager): + def test_recover_rank_basic(self, rank_manager: DeploymentRankManager): """Test basic rank recovery.""" rank_manager.recover_rank( - "replica_1", ReplicaRank(rank=5, node_rank=0, local_rank=0) + "replica_1", "node_1", ReplicaRank(rank=5, node_rank=0, local_rank=0) ) assert rank_manager.has_replica_rank("replica_1") assert rank_manager.get_replica_rank("replica_1").rank == 5 - def test_recover_rank_updates_next_rank(self, rank_manager): + def test_recover_rank_updates_next_rank(self, rank_manager: DeploymentRankManager): """Test that recovering a high rank updates next_rank appropriately.""" - rank_manager.assign_rank("replica_1") # Gets rank 0 + rank_manager.assign_rank("replica_1", "node_1") # Gets rank 0 rank_manager.recover_rank( - "replica_2", ReplicaRank(rank=10, node_rank=0, local_rank=0) + "replica_2", "node_1", ReplicaRank(rank=10, node_rank=0, local_rank=0) ) # New replica should get rank 11 (next available after 10) - rank = rank_manager.assign_rank("replica_3") + rank = rank_manager.assign_rank("replica_3", "node_1") assert rank.rank == 11 mapping = rank_manager.get_replica_ranks_mapping() - assert mapping == {"replica_1": 0, "replica_2": 10, "replica_3": 11} - - def test_recover_rank_removes_from_available(self, rank_manager): + assert mapping == { + "replica_1": ReplicaRank(rank=0, node_rank=0, local_rank=0), + "replica_2": ReplicaRank(rank=10, node_rank=0, local_rank=0), + "replica_3": ReplicaRank(rank=11, node_rank=0, local_rank=1), + } + + def test_recover_rank_removes_from_available( + self, rank_manager: DeploymentRankManager + ): """Test that recovering a rank removes it from available ranks.""" - rank_manager.assign_rank("replica_1") - rank_manager.assign_rank("replica_2") + rank_manager.assign_rank("replica_1", "node_1") + rank_manager.assign_rank("replica_2", "node_1") rank_manager.release_rank("replica_1") # Rank 0 becomes available # Recover rank 0 for a new replica rank_manager.recover_rank( - "replica_3", ReplicaRank(rank=0, node_rank=0, local_rank=0) + "replica_3", "node_1", ReplicaRank(rank=0, node_rank=0, local_rank=0) ) # Verify replica_3 has rank 0 @@ -137,52 +151,55 @@ def test_recover_rank_removes_from_available(self, rank_manager): assert rank_manager.get_replica_rank("replica_3").rank == 0 # Next assigned replica should get rank 2 (not 0, which is now taken) - rank = rank_manager.assign_rank("replica_4") + rank = rank_manager.assign_rank("replica_4", "node_1") assert rank.rank == 2 - def test_recover_rank_duplicate_fails(self): + def test_recover_rank_duplicate_fails(self, rank_manager: DeploymentRankManager): """Test recovering rank for replica that already has one fails.""" - rank_manager = DeploymentRankManager() - rank_manager.assign_rank("replica_1") + rank_manager.assign_rank("replica_1", "node_1") with pytest.raises(RuntimeError, match="already assigned"): rank_manager.recover_rank( - "replica_1", ReplicaRank(rank=5, node_rank=0, local_rank=0) + "replica_1", "node_1", ReplicaRank(rank=5, node_rank=0, local_rank=0) ) - def test_get_replica_rank_existing(self, rank_manager): + def test_get_replica_rank_existing(self, rank_manager: DeploymentRankManager): """Test getting rank for existing replica.""" - rank_manager.assign_rank("replica_1") + rank_manager.assign_rank("replica_1", "node_1") rank = rank_manager.get_replica_rank("replica_1") assert rank.rank == 0 - def test_get_replica_rank_nonexistent_fails(self): + def test_get_replica_rank_nonexistent_fails( + self, rank_manager: DeploymentRankManager + ): """Test getting rank for non-existent replica fails.""" - rank_manager = DeploymentRankManager() with pytest.raises(RuntimeError, match="not assigned"): rank_manager.get_replica_rank("nonexistent") - def test_get_replica_ranks_mapping(self, rank_manager): + def test_get_replica_ranks_mapping(self, rank_manager: DeploymentRankManager): """Test getting copy of replica ranks mapping.""" - rank_manager.assign_rank("replica_1") - rank_manager.assign_rank("replica_2") + rank_manager.assign_rank("replica_1", "node_1") + rank_manager.assign_rank("replica_2", "node_1") mapping = rank_manager.get_replica_ranks_mapping() - expected = {"replica_1": 0, "replica_2": 1} + expected = { + "replica_1": ReplicaRank(rank=0, node_rank=0, local_rank=0), + "replica_2": ReplicaRank(rank=1, node_rank=0, local_rank=1), + } assert mapping == expected # Verify it's a copy by modifying it - mapping["replica_3"] = 2 + mapping["replica_3"] = ReplicaRank(rank=2, node_rank=0, local_rank=2) # Get a fresh mapping to verify the original wasn't changed fresh_mapping = rank_manager.get_replica_ranks_mapping() assert "replica_3" not in fresh_mapping assert fresh_mapping == expected - def test_clear(self, rank_manager): + def test_clear(self, rank_manager: DeploymentRankManager): """Test clearing all rank data.""" - rank_manager.assign_rank("replica_1") - rank_manager.assign_rank("replica_2") + rank_manager.assign_rank("replica_1", "node_1") + rank_manager.assign_rank("replica_2", "node_1") rank_manager.release_rank("replica_1") rank_manager.clear() @@ -193,24 +210,28 @@ def test_clear(self, rank_manager): assert not rank_manager.has_replica_rank("replica_2") # Should be able to assign from 0 again - rank = rank_manager.assign_rank("replica_3") + rank = rank_manager.assign_rank("replica_3", "node_1") assert rank.rank == 0 - def test_check_rank_consistency_empty_replicas(self, rank_manager): + def test_check_rank_consistency_empty_replicas( + self, rank_manager: DeploymentRankManager + ): """Test consistency check with no active replicas.""" result = rank_manager.check_rank_consistency_and_reassign_minimally([]) assert result == [] - def test_check_rank_consistency_contiguous_ranks(self, rank_manager): + def test_check_rank_consistency_contiguous_ranks( + self, rank_manager: DeploymentRankManager + ): """Test consistency check with contiguous ranks (no reassignment needed).""" # Set up contiguous ranks replica1 = MockDeploymentReplica("replica_1") replica2 = MockDeploymentReplica("replica_2") replica3 = MockDeploymentReplica("replica_3") - rank_manager.assign_rank("replica_1") # rank 0 - rank_manager.assign_rank("replica_2") # rank 1 - rank_manager.assign_rank("replica_3") # rank 2 + rank_manager.assign_rank("replica_1", "node_1") # rank 0 + rank_manager.assign_rank("replica_2", "node_1") # rank 1 + rank_manager.assign_rank("replica_3", "node_1") # rank 2 result = rank_manager.check_rank_consistency_and_reassign_minimally( [replica1, replica2, replica3] @@ -218,7 +239,9 @@ def test_check_rank_consistency_contiguous_ranks(self, rank_manager): assert result == [] - def test_check_rank_consistency_non_contiguous_ranks(self, rank_manager): + def test_check_rank_consistency_non_contiguous_ranks( + self, rank_manager: DeploymentRankManager + ): """Test consistency check with non-contiguous ranks (reassignment needed).""" # Set up non-contiguous ranks (simulate a replica being removed) replica1 = MockDeploymentReplica("replica_1") @@ -227,13 +250,13 @@ def test_check_rank_consistency_non_contiguous_ranks(self, rank_manager): # Manually assign non-contiguous ranks using recover_rank rank_manager.recover_rank( - "replica_1", ReplicaRank(rank=0, node_rank=0, local_rank=0) + "replica_1", "node_1", ReplicaRank(rank=0, node_rank=0, local_rank=0) ) rank_manager.recover_rank( - "replica_2", ReplicaRank(rank=2, node_rank=0, local_rank=0) + "replica_2", "node_1", ReplicaRank(rank=2, node_rank=0, local_rank=1) ) # Gap at rank 1 rank_manager.recover_rank( - "replica_3", ReplicaRank(rank=3, node_rank=0, local_rank=0) + "replica_3", "node_1", ReplicaRank(rank=3, node_rank=0, local_rank=2) ) result = rank_manager.check_rank_consistency_and_reassign_minimally( @@ -245,11 +268,13 @@ def test_check_rank_consistency_non_contiguous_ranks(self, rank_manager): # After reassignment, ranks should be contiguous [0, 1, 2] mapping = rank_manager.get_replica_ranks_mapping() - final_ranks = sorted(mapping.values()) + final_ranks = sorted([r.rank for r in mapping.values()]) expected_ranks = [0, 1, 2] assert final_ranks == expected_ranks - def test_minimal_reassignment_keeps_existing_when_possible(self, rank_manager): + def test_minimal_reassignment_keeps_existing_when_possible( + self, rank_manager: DeploymentRankManager + ): """Test that minimal reassignment keeps existing ranks when possible.""" replica1 = MockDeploymentReplica("replica_1") replica2 = MockDeploymentReplica("replica_2") @@ -258,16 +283,16 @@ def test_minimal_reassignment_keeps_existing_when_possible(self, rank_manager): # Set up ranks: 0, 2, 5, 7 (non-contiguous) using recover_rank rank_manager.recover_rank( - "replica_1", ReplicaRank(rank=0, node_rank=0, local_rank=0) + "replica_1", "node_1", ReplicaRank(rank=0, node_rank=0, local_rank=0) ) # Should keep this rank_manager.recover_rank( - "replica_2", ReplicaRank(rank=2, node_rank=0, local_rank=0) + "replica_2", "node_1", ReplicaRank(rank=2, node_rank=0, local_rank=1) ) # Should keep this rank_manager.recover_rank( - "replica_3", ReplicaRank(rank=5, node_rank=0, local_rank=0) + "replica_3", "node_1", ReplicaRank(rank=5, node_rank=0, local_rank=2) ) # Should be reassigned to 1 rank_manager.recover_rank( - "replica_4", ReplicaRank(rank=7, node_rank=0, local_rank=0) + "replica_4", "node_1", ReplicaRank(rank=7, node_rank=0, local_rank=3) ) # Should be reassigned to 3 result = rank_manager.check_rank_consistency_and_reassign_minimally( @@ -281,46 +306,51 @@ def test_minimal_reassignment_keeps_existing_when_possible(self, rank_manager): # Verify final ranks are contiguous mapping = rank_manager.get_replica_ranks_mapping() - final_ranks = sorted(mapping.values()) + final_ranks = sorted([r.rank for r in mapping.values()]) assert final_ranks == [0, 1, 2, 3] # Verify that replica_1 and replica_2 kept their original ranks assert rank_manager.get_replica_rank("replica_1").rank == 0 assert rank_manager.get_replica_rank("replica_2").rank == 2 - def test_check_rank_consistency_unranked_replicas_fails(self): + def test_check_rank_consistency_unranked_replicas_fails( + self, rank_manager: DeploymentRankManager + ): """Test consistency check fails when active replicas have no ranks.""" - rank_manager = DeploymentRankManager() replica1 = MockDeploymentReplica("replica_1") with pytest.raises(RuntimeError, match="Rank system is in an invalid state"): rank_manager.check_rank_consistency_and_reassign_minimally([replica1]) - def test_check_rank_consistency_stale_ranks_fails(self): + def test_check_rank_consistency_stale_ranks_fails( + self, rank_manager: DeploymentRankManager + ): """Test consistency check fails when there are stale ranks.""" - rank_manager = DeploymentRankManager() replica1 = MockDeploymentReplica("replica_1") # Set up stale rank (replica not in active list) - rank_manager.assign_rank("replica_1") - rank_manager.assign_rank("stale_replica") + rank_manager.assign_rank("replica_1", "node_1") + rank_manager.assign_rank("stale_replica", "node_1") with pytest.raises(RuntimeError, match="Rank system is in an invalid state"): rank_manager.check_rank_consistency_and_reassign_minimally([replica1]) - def test_check_rank_consistency_duplicate_ranks_fails(self): + def test_check_rank_consistency_duplicate_ranks_fails( + self, rank_manager: DeploymentRankManager + ): """Test consistency check fails when there are duplicate ranks.""" - rank_manager = DeploymentRankManager() replica1 = MockDeploymentReplica("replica_1") replica2 = MockDeploymentReplica("replica_2") # Manually create duplicate ranks using recover_rank (this should never happen in normal operation) + # Note: We can only test this with duplicate global ranks, not duplicate local ranks + # since local_rank uniqueness is enforced by the underlying RankManager rank_manager.recover_rank( - "replica_1", ReplicaRank(rank=0, node_rank=0, local_rank=0) + "replica_1", "node_1", ReplicaRank(rank=0, node_rank=0, local_rank=0) ) rank_manager.recover_rank( - "replica_2", ReplicaRank(rank=0, node_rank=0, local_rank=0) - ) # Duplicate! + "replica_2", "node_1", ReplicaRank(rank=0, node_rank=0, local_rank=1) + ) # Duplicate global rank! with pytest.raises(RuntimeError, match="Rank system is in an invalid state"): rank_manager.check_rank_consistency_and_reassign_minimally( @@ -328,6 +358,733 @@ def test_check_rank_consistency_duplicate_ranks_fails(self): ) +class TestDeploymentRankManagerMultiNode: + def test_assign_rank_multiple_nodes(self): + """Test that replicas on different nodes get correct node_rank and local_rank.""" + rank_manager = DeploymentRankManager() + + # Assign replicas across different nodes + rank1 = rank_manager.assign_rank("replica_1", "node_1") + rank2 = rank_manager.assign_rank("replica_2", "node_2") + rank3 = rank_manager.assign_rank("replica_3", "node_1") + rank4 = rank_manager.assign_rank("replica_4", "node_2") + rank5 = rank_manager.assign_rank("replica_5", "node_3") + + # Verify global ranks are sequential + assert rank1.rank == 0 + assert rank2.rank == 1 + assert rank3.rank == 2 + assert rank4.rank == 3 + assert rank5.rank == 4 + + # Verify node ranks are assigned correctly + assert rank1.node_rank == 0 # node_1 is first + assert rank2.node_rank == 1 # node_2 is second + assert rank3.node_rank == 0 # node_1 already has rank 0 + assert rank4.node_rank == 1 # node_2 already has rank 1 + assert rank5.node_rank == 2 # node_3 is third + + # Verify local ranks restart per node + assert rank1.local_rank == 0 # First replica on node_1 + assert rank2.local_rank == 0 # First replica on node_2 + assert rank3.local_rank == 1 # Second replica on node_1 + assert rank4.local_rank == 1 # Second replica on node_2 + assert rank5.local_rank == 0 # First replica on node_3 + + def test_local_rank_independence_across_nodes(self): + """Test that local_rank sequences are independent per node.""" + rank_manager = DeploymentRankManager() + + # Add 3 replicas to node_1 + r1 = rank_manager.assign_rank("n1_r1", "node_1") + r2 = rank_manager.assign_rank("n1_r2", "node_1") + r3 = rank_manager.assign_rank("n1_r3", "node_1") + + # Add 2 replicas to node_2 + r4 = rank_manager.assign_rank("n2_r1", "node_2") + r5 = rank_manager.assign_rank("n2_r2", "node_2") + + # Verify local ranks are independent + assert r1.local_rank == 0 + assert r2.local_rank == 1 + assert r3.local_rank == 2 + assert r4.local_rank == 0 # Restarts at 0 for node_2 + assert r5.local_rank == 1 + + def test_release_rank_removes_node_when_last_replica(self): + """Test that releasing the last replica on a node releases the node rank.""" + rank_manager = DeploymentRankManager() + + # Assign replicas to two nodes + rank_manager.assign_rank("n1_r1", "node_1") + rank_manager.assign_rank("n2_r1", "node_2") + rank_manager.assign_rank("n2_r2", "node_2") + + # Release the only replica on node_1 + rank_manager.release_rank("n1_r1") + + # Now add a replica to node_3 - it should get node_rank 0 (reused from node_1) + new_rank = rank_manager.assign_rank("n3_r1", "node_3") + assert new_rank.node_rank == 0 # Reused node rank + + def test_release_rank_keeps_node_when_replicas_remain(self): + """Test that releasing a replica doesn't release node rank if other replicas remain.""" + rank_manager = DeploymentRankManager() + + # Assign 3 replicas to node_1 + rank_manager.assign_rank("r1", "node_1") + rank_manager.assign_rank("r2", "node_1") + rank_manager.assign_rank("r3", "node_1") + + # Assign replica to node_2 + node2_rank = rank_manager.assign_rank("r4", "node_2") + assert node2_rank.node_rank == 1 + + # Release one replica from node_1 + rank_manager.release_rank("r2") + + # Add another replica to node_1 - should still have node_rank 0 + new_rank = rank_manager.assign_rank("r5", "node_1") + assert new_rank.node_rank == 0 # Node rank preserved + + def test_node_rank_reuse_after_complete_release(self): + """Test that node ranks are reused after all replicas are removed.""" + rank_manager = DeploymentRankManager() + + # Create replicas on 3 nodes + rank_manager.assign_rank("n1_r1", "node_1") + rank_manager.assign_rank("n2_r1", "node_2") + rank_manager.assign_rank("n3_r1", "node_3") + + # Verify node ranks + assert rank_manager.get_replica_rank("n1_r1").node_rank == 0 + assert rank_manager.get_replica_rank("n2_r1").node_rank == 1 + assert rank_manager.get_replica_rank("n3_r1").node_rank == 2 + + # Remove all replicas from node_2 (middle node) + rank_manager.release_rank("n2_r1") + + # Add replica to a new node - should reuse node_rank 1 + new_rank = rank_manager.assign_rank("n4_r1", "node_4") + assert new_rank.node_rank == 1 # Reused from released node_2 + + def test_local_rank_reuse_within_node(self): + """Test that local ranks are reused within a node after release.""" + rank_manager = DeploymentRankManager() + + # Assign 3 replicas to node_1 + rank_manager.assign_rank("r1", "node_1") + rank_manager.assign_rank("r2", "node_1") + rank_manager.assign_rank("r3", "node_1") + + # Release middle replica (local_rank=1) + rank_manager.release_rank("r2") + + # Add new replica to node_1 - should reuse local_rank 1 + new_rank = rank_manager.assign_rank("r4", "node_1") + assert new_rank.local_rank == 1 # Reused local rank + + def test_recover_rank_multiple_nodes(self): + """Test recovering ranks for replicas on different nodes.""" + rank_manager = DeploymentRankManager() + + # Recover replicas on different nodes + rank_manager.recover_rank( + "r1", "node_1", ReplicaRank(rank=0, node_rank=0, local_rank=0) + ) + rank_manager.recover_rank( + "r2", "node_2", ReplicaRank(rank=1, node_rank=1, local_rank=0) + ) + rank_manager.recover_rank( + "r3", "node_1", ReplicaRank(rank=2, node_rank=0, local_rank=1) + ) + + # Verify all ranks are recovered correctly + assert rank_manager.get_replica_rank("r1") == ReplicaRank( + rank=0, node_rank=0, local_rank=0 + ) + assert rank_manager.get_replica_rank("r2") == ReplicaRank( + rank=1, node_rank=1, local_rank=0 + ) + assert rank_manager.get_replica_rank("r3") == ReplicaRank( + rank=2, node_rank=0, local_rank=1 + ) + + def test_recover_rank_preserves_node_rank_when_node_exists(self): + """Test that recovering a replica on an existing node doesn't create duplicate node ranks.""" + rank_manager = DeploymentRankManager() + + # Recover first replica on node_1 + rank_manager.recover_rank( + "r1", "node_1", ReplicaRank(rank=0, node_rank=0, local_rank=0) + ) + + # Recover another replica on the same node + rank_manager.recover_rank( + "r2", "node_1", ReplicaRank(rank=1, node_rank=0, local_rank=1) + ) + + # Both should have the same node_rank + assert rank_manager.get_replica_rank("r1").node_rank == 0 + assert rank_manager.get_replica_rank("r2").node_rank == 0 + + def test_check_rank_consistency_across_multiple_nodes(self): + """Test consistency checking with replicas spread across nodes.""" + rank_manager = DeploymentRankManager() + + replica1 = MockDeploymentReplica("r1") + replica2 = MockDeploymentReplica("r2") + replica3 = MockDeploymentReplica("r3") + replica4 = MockDeploymentReplica("r4") + + # Set up non-contiguous global ranks across nodes + rank_manager.recover_rank( + "r1", "node_1", ReplicaRank(rank=0, node_rank=0, local_rank=0) + ) + rank_manager.recover_rank( + "r2", "node_2", ReplicaRank(rank=5, node_rank=1, local_rank=0) + ) + rank_manager.recover_rank( + "r3", "node_1", ReplicaRank(rank=8, node_rank=0, local_rank=1) + ) + rank_manager.recover_rank( + "r4", "node_2", ReplicaRank(rank=10, node_rank=1, local_rank=1) + ) + + result = rank_manager.check_rank_consistency_and_reassign_minimally( + [replica1, replica2, replica3, replica4] + ) + + # Should reassign some replicas to make global ranks contiguous + assert len(result) > 0 + + # Verify global ranks are now contiguous + mapping = rank_manager.get_replica_ranks_mapping() + global_ranks = sorted([r.rank for r in mapping.values()]) + assert global_ranks == [0, 1, 2, 3] + + def test_check_rank_consistency_local_ranks_per_node(self): + """Test that local rank consistency is checked independently per node.""" + rank_manager = DeploymentRankManager() + + replica1 = MockDeploymentReplica("r1") + replica2 = MockDeploymentReplica("r2") + replica3 = MockDeploymentReplica("r3") + replica4 = MockDeploymentReplica("r4") + + # Set up non-contiguous local ranks on node_1, contiguous on node_2 + rank_manager.recover_rank( + "r1", "node_1", ReplicaRank(rank=0, node_rank=0, local_rank=0) + ) + rank_manager.recover_rank( + "r2", "node_1", ReplicaRank(rank=1, node_rank=0, local_rank=5) + ) # Gap in local rank + rank_manager.recover_rank( + "r3", "node_2", ReplicaRank(rank=2, node_rank=1, local_rank=0) + ) + rank_manager.recover_rank( + "r4", "node_2", ReplicaRank(rank=3, node_rank=1, local_rank=1) + ) + + result = rank_manager.check_rank_consistency_and_reassign_minimally( + [replica1, replica2, replica3, replica4] + ) + + # Should reassign replica on node_1 with non-contiguous local rank + assert len(result) > 0 + + # Verify local ranks are contiguous per node + r1_rank = rank_manager.get_replica_rank("r1") + r2_rank = rank_manager.get_replica_rank("r2") + r3_rank = rank_manager.get_replica_rank("r3") + r4_rank = rank_manager.get_replica_rank("r4") + + # Node 1 local ranks should be [0, 1] + node1_local_ranks = sorted([r1_rank.local_rank, r2_rank.local_rank]) + assert node1_local_ranks == [0, 1] + + # Node 2 local ranks should be [0, 1] + node2_local_ranks = sorted([r3_rank.local_rank, r4_rank.local_rank]) + assert node2_local_ranks == [0, 1] + + def test_check_rank_consistency_node_ranks(self): + """Test that node rank consistency is maintained.""" + rank_manager = DeploymentRankManager() + + replica1 = MockDeploymentReplica("r1") + replica2 = MockDeploymentReplica("r2") + replica3 = MockDeploymentReplica("r3") + + # Set up non-contiguous node ranks (0, 2, 5) + rank_manager.recover_rank( + "r1", "node_1", ReplicaRank(rank=0, node_rank=0, local_rank=0) + ) + rank_manager.recover_rank( + "r2", "node_2", ReplicaRank(rank=1, node_rank=2, local_rank=0) + ) + rank_manager.recover_rank( + "r3", "node_3", ReplicaRank(rank=2, node_rank=5, local_rank=0) + ) + + result = rank_manager.check_rank_consistency_and_reassign_minimally( + [replica1, replica2, replica3] + ) + + # Should reassign replicas to make node ranks contiguous + assert len(result) > 0 + + # Verify node ranks are now contiguous [0, 1, 2] + mapping = rank_manager.get_replica_ranks_mapping() + node_ranks = sorted([r.node_rank for r in mapping.values()]) + assert node_ranks == [0, 1, 2] + + def test_clear_with_multiple_nodes(self): + """Test that clear() removes all node-related state.""" + rank_manager = DeploymentRankManager() + + # Assign replicas to multiple nodes + rank_manager.assign_rank("r1", "node_1") + rank_manager.assign_rank("r2", "node_2") + rank_manager.assign_rank("r3", "node_3") + + # Clear everything + rank_manager.clear() + + # Verify all state is cleared + assert rank_manager.get_replica_ranks_mapping() == {} + + # Verify we can assign fresh ranks starting from 0 + new_rank = rank_manager.assign_rank("r4", "node_4") + assert new_rank.rank == 0 + assert new_rank.node_rank == 0 + assert new_rank.local_rank == 0 + + def test_get_replica_ranks_mapping_multiple_nodes(self): + """Test that get_replica_ranks_mapping includes correct values for all nodes.""" + rank_manager = DeploymentRankManager() + + # Create replicas across multiple nodes + rank_manager.assign_rank("r1", "node_1") + rank_manager.assign_rank("r2", "node_2") + rank_manager.assign_rank("r3", "node_1") + rank_manager.assign_rank("r4", "node_3") + + mapping = rank_manager.get_replica_ranks_mapping() + + # Verify all replicas are in mapping with correct values + assert len(mapping) == 4 + assert mapping["r1"] == ReplicaRank(rank=0, node_rank=0, local_rank=0) + assert mapping["r2"] == ReplicaRank(rank=1, node_rank=1, local_rank=0) + assert mapping["r3"] == ReplicaRank(rank=2, node_rank=0, local_rank=1) + assert mapping["r4"] == ReplicaRank(rank=3, node_rank=2, local_rank=0) + + def test_complex_multi_node_lifecycle(self): + """Test a complex scenario with adds, releases, and consistency checks across nodes.""" + rank_manager = DeploymentRankManager() + + # Phase 1: Initial deployment across 3 nodes + rank_manager.assign_rank("n1_r1", "node_1") + rank_manager.assign_rank("n1_r2", "node_1") + rank_manager.assign_rank("n2_r1", "node_2") + rank_manager.assign_rank("n3_r1", "node_3") + rank_manager.assign_rank("n3_r2", "node_3") + + # Phase 2: Scale down - remove some replicas + rank_manager.release_rank("n1_r2") # Remove from node_1 + rank_manager.release_rank("n2_r1") # Remove all from node_2 + + # Phase 3: Scale up - add replicas to new and existing nodes + rank_manager.assign_rank("n1_r3", "node_1") # Add to existing node_1 + rank_manager.assign_rank("n4_r1", "node_4") # New node + + # Verify state is consistent + mapping = rank_manager.get_replica_ranks_mapping() + assert len(mapping) == 5 + + # Verify node ranks - node_2 was removed, so node_4 should reuse its rank + assert mapping["n4_r1"].node_rank == 1 # Reused from node_2 + + # Verify local ranks per node + assert mapping["n1_r1"].local_rank == 0 + assert mapping["n1_r3"].local_rank == 1 # Reused local rank + assert mapping["n3_r1"].local_rank == 0 + assert mapping["n3_r2"].local_rank == 1 + assert mapping["n4_r1"].local_rank == 0 + + def test_scaling_up_and_down_across_nodes(self): + """Test scaling operations across multiple nodes.""" + rank_manager = DeploymentRankManager() + + # Scale up: Add replicas to 4 nodes + for node_idx in range(4): + for replica_idx in range(2): + replica_id = f"n{node_idx}_r{replica_idx}" + rank_manager.assign_rank(replica_id, f"node_{node_idx}") + + # Verify 8 replicas total + assert len(rank_manager.get_replica_ranks_mapping()) == 8 + + # Scale down: Remove all replicas from nodes 1 and 3 + rank_manager.release_rank("n1_r0") + rank_manager.release_rank("n1_r1") + rank_manager.release_rank("n3_r0") + rank_manager.release_rank("n3_r1") + + # Verify 4 replicas remain + assert len(rank_manager.get_replica_ranks_mapping()) == 4 + + # Scale up again: Add to new node + new_rank = rank_manager.assign_rank("n5_r0", "node_5") + + # Should reuse a released node rank (1 or 3) + assert new_rank.node_rank in [1, 3] + + def test_minimal_reassignment_preserves_node_assignments(self): + """Test that minimal reassignment doesn't move replicas between nodes.""" + rank_manager = DeploymentRankManager() + + replica1 = MockDeploymentReplica("r1") + replica2 = MockDeploymentReplica("r2") + replica3 = MockDeploymentReplica("r3") + replica4 = MockDeploymentReplica("r4") + + # Set up non-contiguous ranks across nodes + rank_manager.recover_rank( + "r1", "node_1", ReplicaRank(rank=0, node_rank=0, local_rank=0) + ) + rank_manager.recover_rank( + "r2", "node_2", ReplicaRank(rank=5, node_rank=1, local_rank=0) + ) + rank_manager.recover_rank( + "r3", "node_1", ReplicaRank(rank=10, node_rank=0, local_rank=1) + ) + rank_manager.recover_rank( + "r4", "node_2", ReplicaRank(rank=15, node_rank=1, local_rank=1) + ) + + # Perform consistency check + rank_manager.check_rank_consistency_and_reassign_minimally( + [replica1, replica2, replica3, replica4] + ) + + # Verify replicas stayed on their original nodes + mapping = rank_manager.get_replica_ranks_mapping() + + # r1 and r3 should still be on node_1 (node_rank=0) + assert mapping["r1"].node_rank == mapping["r3"].node_rank == 0 + + # r2 and r4 should still be on node_2 (node_rank=1) + assert mapping["r2"].node_rank == mapping["r4"].node_rank == 1 + + # Verify local ranks are contiguous per node + assert sorted([mapping["r1"].local_rank, mapping["r3"].local_rank]) == [0, 1] + assert sorted([mapping["r2"].local_rank, mapping["r4"].local_rank]) == [0, 1] + + +class TestDeploymentRankManagerEdgeCases: + """Test edge cases and corner cases for correctness.""" + + def test_recover_rank_updates_next_rank_correctly(self): + """When recovering a rank >= _next_rank, verify next assigned rank is correct.""" + rank_manager = DeploymentRankManager() + + # Assign first replica normally - gets rank 0 + rank_manager.assign_rank("r1", "node_1") + + # Recover a replica with a very high rank + rank_manager.recover_rank( + "r2", "node_1", ReplicaRank(rank=100, node_rank=0, local_rank=1) + ) + + # Now assign a new replica - should get rank 101 (next after 100) + new_rank = rank_manager.assign_rank("r3", "node_1") + assert new_rank.rank == 101 + + def test_assign_after_recovery_with_very_high_rank(self): + """Test that assignment after recovering a very high rank works correctly.""" + rank_manager = DeploymentRankManager() + + # Recover with rank 1000 + rank_manager.recover_rank( + "r1", "node_1", ReplicaRank(rank=1000, node_rank=0, local_rank=0) + ) + + # Assign new replica - should get 1001 + new_rank = rank_manager.assign_rank("r2", "node_1") + assert new_rank.rank == 1001 + assert new_rank.local_rank == 1 # Second replica on node_1 + + def test_recover_multiple_high_ranks_out_of_order(self): + """Test recovering multiple high ranks in non-sequential order.""" + rank_manager = DeploymentRankManager() + + # Recover ranks in random order: 50, 10, 100 + rank_manager.recover_rank( + "r1", "node_1", ReplicaRank(rank=50, node_rank=0, local_rank=0) + ) + rank_manager.recover_rank( + "r2", "node_2", ReplicaRank(rank=10, node_rank=1, local_rank=0) + ) + rank_manager.recover_rank( + "r3", "node_1", ReplicaRank(rank=100, node_rank=0, local_rank=1) + ) + + # Next assignment should get 101 (max + 1) + new_rank = rank_manager.assign_rank("r4", "node_3") + assert new_rank.rank == 101 + + def test_recover_rank_removes_from_released_ranks(self): + """Test that recovering a rank that was released removes it from available set.""" + rank_manager = DeploymentRankManager() + + # Assign and then release rank 5 + rank_manager.recover_rank( + "r1", "node_1", ReplicaRank(rank=0, node_rank=0, local_rank=0) + ) + rank_manager.recover_rank( + "r2", "node_1", ReplicaRank(rank=5, node_rank=0, local_rank=1) + ) + rank_manager.release_rank("r2") # Rank 5 is now released + + # Recover a different replica with rank 5 + rank_manager.recover_rank( + "r3", "node_1", ReplicaRank(rank=5, node_rank=0, local_rank=1) + ) + + # Assign new replica - should get rank 6, not 5 (since 5 is taken) + new_rank = rank_manager.assign_rank("r4", "node_1") + assert new_rank.rank == 6 + + def test_reassignment_with_all_ranks_out_of_range(self): + """Test reassignment when all replicas have ranks outside target range.""" + rank_manager = DeploymentRankManager() + + replica1 = MockDeploymentReplica("r1") + replica2 = MockDeploymentReplica("r2") + replica3 = MockDeploymentReplica("r3") + + # All replicas have ranks outside [0, 1, 2] range + rank_manager.recover_rank( + "r1", "node_1", ReplicaRank(rank=10, node_rank=0, local_rank=0) + ) + rank_manager.recover_rank( + "r2", "node_1", ReplicaRank(rank=20, node_rank=0, local_rank=1) + ) + rank_manager.recover_rank( + "r3", "node_1", ReplicaRank(rank=30, node_rank=0, local_rank=2) + ) + + result = rank_manager.check_rank_consistency_and_reassign_minimally( + [replica1, replica2, replica3] + ) + + # All replicas should be reassigned + assert len(result) == 3 + + # Verify final ranks are exactly [0, 1, 2] + mapping = rank_manager.get_replica_ranks_mapping() + final_ranks = sorted([r.rank for r in mapping.values()]) + assert final_ranks == [0, 1, 2] + + def test_reassignment_preserves_target_ranks_exactly(self): + """Test that after reassignment, ranks are exactly [0, N-1].""" + rank_manager = DeploymentRankManager() + + replicas = [] + for i in range(5): + replicas.append(MockDeploymentReplica(f"r{i}")) + + # Create non-contiguous ranks: 0, 3, 7, 11, 15 + rank_manager.recover_rank( + "r0", "node_1", ReplicaRank(rank=0, node_rank=0, local_rank=0) + ) + rank_manager.recover_rank( + "r1", "node_1", ReplicaRank(rank=3, node_rank=0, local_rank=1) + ) + rank_manager.recover_rank( + "r2", "node_1", ReplicaRank(rank=7, node_rank=0, local_rank=2) + ) + rank_manager.recover_rank( + "r3", "node_1", ReplicaRank(rank=11, node_rank=0, local_rank=3) + ) + rank_manager.recover_rank( + "r4", "node_1", ReplicaRank(rank=15, node_rank=0, local_rank=4) + ) + + rank_manager.check_rank_consistency_and_reassign_minimally(replicas) + + # Verify ranks are exactly [0, 1, 2, 3, 4] + mapping = rank_manager.get_replica_ranks_mapping() + final_ranks = sorted([r.rank for r in mapping.values()]) + assert final_ranks == [0, 1, 2, 3, 4] + + # Verify no duplicates + assert len(final_ranks) == len(set(final_ranks)) + + def test_multiple_sequential_releases_reuse_in_order(self): + """Test releasing multiple ranks in sequence maintains correct state.""" + rank_manager = DeploymentRankManager() + + # Assign ranks 0-5 + for i in range(6): + rank_manager.assign_rank(f"r{i}", "node_1") + + # Release ranks 0, 2, 4 + rank_manager.release_rank("r0") + rank_manager.release_rank("r2") + rank_manager.release_rank("r4") + + # Assign new replicas - should reuse in ascending order (min first) + new_rank1 = rank_manager.assign_rank("r6", "node_1") + new_rank2 = rank_manager.assign_rank("r7", "node_1") + new_rank3 = rank_manager.assign_rank("r8", "node_1") + + # Should reuse 0, 2, 4 in that order + assert new_rank1.rank == 0 + assert new_rank2.rank == 2 + assert new_rank3.rank == 4 + + def test_interleaved_assign_release_recover(self): + """Test complex sequence of operations maintains consistency.""" + rank_manager = DeploymentRankManager() + + # Complex sequence + rank_manager.assign_rank("r1", "node_1") # rank 0 + rank_manager.assign_rank("r2", "node_1") # rank 1 + rank_manager.release_rank("r1") # release 0 + rank_manager.recover_rank( + "r3", "node_2", ReplicaRank(rank=5, node_rank=1, local_rank=0) + ) + rank_manager.assign_rank("r4", "node_1") # should get 0 (reused) + rank_manager.release_rank("r2") # release 1 + rank_manager.assign_rank("r5", "node_2") # should get 1 (reused) + + # Verify final state + assert rank_manager.get_replica_rank("r4").rank == 0 + assert rank_manager.get_replica_rank("r5").rank == 1 + assert rank_manager.get_replica_rank("r3").rank == 5 + + def test_release_all_then_reassign_all(self): + """Test releasing all replicas then reassigning maintains correct state.""" + rank_manager = DeploymentRankManager() + + # Assign replicas across nodes + rank_manager.assign_rank("r1", "node_1") + rank_manager.assign_rank("r2", "node_2") + rank_manager.assign_rank("r3", "node_1") + + # Release all + rank_manager.release_rank("r1") + rank_manager.release_rank("r2") + rank_manager.release_rank("r3") + + # Verify mapping is empty + assert rank_manager.get_replica_ranks_mapping() == {} + + # Reassign new replicas - should reuse ranks 0, 1, 2 + new_rank1 = rank_manager.assign_rank("r4", "node_3") + new_rank2 = rank_manager.assign_rank("r5", "node_3") + new_rank3 = rank_manager.assign_rank("r6", "node_3") + + assert new_rank1.rank == 0 + assert new_rank2.rank == 1 + assert new_rank3.rank == 2 + + def test_recover_rank_with_same_replica_different_rank(self): + """Test that recovering the same replica_id twice raises an error.""" + rank_manager = DeploymentRankManager() + + # First recovery + rank_manager.recover_rank( + "r1", "node_1", ReplicaRank(rank=0, node_rank=0, local_rank=0) + ) + + # Try to recover same replica_id again - should raise error + with pytest.raises(RuntimeError, match="already assigned"): + rank_manager.recover_rank( + "r1", "node_1", ReplicaRank(rank=1, node_rank=0, local_rank=1) + ) + + def test_large_scale_rank_management(self): + """Test with many nodes and many replicas per node.""" + rank_manager = DeploymentRankManager() + + num_nodes = 50 + replicas_per_node = 10 + total_replicas = num_nodes * replicas_per_node + + # Assign replicas + replica_ids = [] + for node_idx in range(num_nodes): + for replica_idx in range(replicas_per_node): + replica_id = f"n{node_idx}_r{replica_idx}" + replica_ids.append(replica_id) + rank_manager.assign_rank(replica_id, f"node_{node_idx}") + + # Verify total count + mapping = rank_manager.get_replica_ranks_mapping() + assert len(mapping) == total_replicas + + # Verify global ranks are contiguous [0, total-1] + global_ranks = sorted([r.rank for r in mapping.values()]) + assert global_ranks == list(range(total_replicas)) + + # Verify node ranks are contiguous [0, num_nodes-1] + node_ranks = sorted({r.node_rank for r in mapping.values()}) + assert node_ranks == list(range(num_nodes)) + + # Verify local ranks per node + for node_idx in range(num_nodes): + node_replicas = [ + rid for rid in replica_ids if rid.startswith(f"n{node_idx}_") + ] + local_ranks = sorted([mapping[rid].local_rank for rid in node_replicas]) + assert local_ranks == list(range(replicas_per_node)) + + def test_consistency_check_with_released_ranks_in_system(self): + """Test consistency check works correctly when released_ranks exist.""" + rank_manager = DeploymentRankManager() + + replica1 = MockDeploymentReplica("r1") + replica2 = MockDeploymentReplica("r2") + replica3 = MockDeploymentReplica("r3") + + # Assign 5 replicas + rank_manager.assign_rank("r1", "node_1") # 0 + rank_manager.assign_rank("r2", "node_1") # 1 + rank_manager.assign_rank("r3", "node_1") # 2 + rank_manager.assign_rank("r4", "node_1") # 3 + rank_manager.assign_rank("r5", "node_1") # 4 + + # Release two (creating gaps) + rank_manager.release_rank("r4") # Release 3 + rank_manager.release_rank("r5") # Release 4 + + # Check consistency with only remaining 3 replicas + result = rank_manager.check_rank_consistency_and_reassign_minimally( + [replica1, replica2, replica3] + ) + + # Should not need reassignment (ranks are already 0, 1, 2) + assert result == [] + + # Verify ranks are correct + mapping = rank_manager.get_replica_ranks_mapping() + assert sorted([r.rank for r in mapping.values()]) == [0, 1, 2] + + def test_has_replica_rank_returns_false_for_unassigned(self): + """Test has_replica_rank returns False for various unassigned states.""" + rank_manager = DeploymentRankManager() + + # Completely unassigned replica + assert not rank_manager.has_replica_rank("nonexistent") + + # Assign then release + rank_manager.assign_rank("r1", "node_1") + rank_manager.release_rank("r1") + assert not rank_manager.has_replica_rank("r1") + + class TestDeploymentRankManagerErrorHandling: """Test cases for DeploymentRankManager error handling with fail_on_rank_error flag. @@ -338,19 +1095,19 @@ class TestDeploymentRankManagerErrorHandling: def test_assign_rank_error_with_fail_on_rank_error_true(self): """Test that assign_rank raises exception when fail_on_rank_error=True.""" rank_manager = DeploymentRankManager(fail_on_rank_error=True) - rank_manager.assign_rank("replica_1") + rank_manager.assign_rank("replica_1", "node_1") # Should raise RuntimeError for duplicate assignment with pytest.raises(RuntimeError, match="already assigned"): - rank_manager.assign_rank("replica_1") + rank_manager.assign_rank("replica_1", "node_1") def test_assign_rank_error_with_fail_on_rank_error_false(self): """Test that assign_rank returns safe default when fail_on_rank_error=False.""" rank_manager = DeploymentRankManager(fail_on_rank_error=False) - rank_manager.assign_rank("replica_1") + rank_manager.assign_rank("replica_1", "node_1") # Should return safe default (ReplicaRank(rank=0)) instead of raising - result = rank_manager.assign_rank("replica_1") + result = rank_manager.assign_rank("replica_1", "node_1") assert result is not None assert isinstance(result, ReplicaRank) assert result.rank == 0 @@ -374,22 +1131,22 @@ def test_release_rank_error_with_fail_on_rank_error_false(self): def test_recover_rank_error_with_fail_on_rank_error_true(self): """Test that recover_rank raises exception when fail_on_rank_error=True.""" rank_manager = DeploymentRankManager(fail_on_rank_error=True) - rank_manager.assign_rank("replica_1") + rank_manager.assign_rank("replica_1", "node_1") # Should raise RuntimeError for duplicate recovery with pytest.raises(RuntimeError, match="already assigned"): rank_manager.recover_rank( - "replica_1", ReplicaRank(rank=5, node_rank=-1, local_rank=-1) + "replica_1", "node_1", ReplicaRank(rank=5, node_rank=0, local_rank=0) ) def test_recover_rank_error_with_fail_on_rank_error_false(self): """Test that recover_rank returns safe default when fail_on_rank_error=False.""" rank_manager = DeploymentRankManager(fail_on_rank_error=False) - rank_manager.assign_rank("replica_1") + rank_manager.assign_rank("replica_1", "node_1") # Should return None instead of raising result = rank_manager.recover_rank( - "replica_1", ReplicaRank(rank=5, node_rank=-1, local_rank=-1) + "replica_1", "node_1", ReplicaRank(rank=5, node_rank=0, local_rank=0) ) assert result is None @@ -435,8 +1192,8 @@ def test_check_rank_consistency_with_stale_ranks_error_handling(self): replica1 = MockDeploymentReplica("replica_1") # Set up stale rank (replica not in active list) - rank_manager.assign_rank("replica_1") - rank_manager.assign_rank("stale_replica") + rank_manager.assign_rank("replica_1", "node_1") + rank_manager.assign_rank("stale_replica", "node_1") # Should return empty list instead of raising result = rank_manager.check_rank_consistency_and_reassign_minimally([replica1]) @@ -450,10 +1207,10 @@ def test_check_rank_consistency_with_duplicate_ranks_error_handling(self): # Manually create duplicate ranks rank_manager.recover_rank( - "replica_1", ReplicaRank(rank=0, node_rank=-1, local_rank=-1) + "replica_1", "node_1", ReplicaRank(rank=0, node_rank=0, local_rank=0) ) rank_manager.recover_rank( - "replica_2", ReplicaRank(rank=0, node_rank=-1, local_rank=-1) + "replica_2", "node_1", ReplicaRank(rank=0, node_rank=0, local_rank=0) ) # Should return empty list instead of raising @@ -467,7 +1224,7 @@ def test_normal_operations_work_with_fail_on_rank_error_false(self): rank_manager = DeploymentRankManager(fail_on_rank_error=False) # Test normal assign - rank1 = rank_manager.assign_rank("replica_1") + rank1 = rank_manager.assign_rank("replica_1", "node_1") assert rank1.rank == 0 # Test normal get @@ -480,7 +1237,7 @@ def test_normal_operations_work_with_fail_on_rank_error_false(self): # Test normal recover rank_manager.recover_rank( - "replica_2", ReplicaRank(rank=5, node_rank=-1, local_rank=-1) + "replica_2", "node_1", ReplicaRank(rank=5, node_rank=0, local_rank=0) ) assert rank_manager.get_replica_rank("replica_2").rank == 5 @@ -488,7 +1245,7 @@ def test_normal_operations_work_with_fail_on_rank_error_false(self): # Test normal consistency check replica2 = MockDeploymentReplica("replica_2") replica3 = MockDeploymentReplica("replica_3") - rank_manager.assign_rank("replica_3") + rank_manager.assign_rank("replica_3", "node_1") result = rank_manager.check_rank_consistency_and_reassign_minimally( [replica2, replica3] @@ -510,7 +1267,7 @@ def test_multiple_errors_do_not_crash_with_fail_on_rank_error_false(self): assert result3 is None # And normal operations should still work after errors - rank = rank_manager.assign_rank("replica_1") + rank = rank_manager.assign_rank("replica_1", "node_1") assert rank.rank == 0 diff --git a/python/ray/serve/tests/unit/test_deployment_state.py b/python/ray/serve/tests/unit/test_deployment_state.py index fb81cd81482e..e0f0d6a6b857 100644 --- a/python/ray/serve/tests/unit/test_deployment_state.py +++ b/python/ray/serve/tests/unit/test_deployment_state.py @@ -234,7 +234,7 @@ def start( ): self.started = True self._assign_rank_callback = assign_rank_callback - self._rank = assign_rank_callback(self._replica_id.unique_id) + self._rank = assign_rank_callback(self._replica_id.unique_id, node_id=-1) replica_rank_context[self._replica_id.unique_id] = self._rank def _on_scheduled_stub(*args, **kwargs): @@ -5474,7 +5474,7 @@ def test_scaling_up_and_down_scenario(self, mock_deployment_state_manager): # Check initial ranks are 0, 1, 2 ranks_mapping = ds._get_replica_ranks_mapping() - ranks = sorted(ranks_mapping.values()) + ranks = sorted([r.rank for r in ranks_mapping.values()]) assert ranks == [0, 1, 2], f"Expected ranks [0, 1, 2], got {ranks}" # Scale down to 2 replicas - this should trigger rank reassignment @@ -5500,7 +5500,7 @@ def test_scaling_up_and_down_scenario(self, mock_deployment_state_manager): # After scaling down and reaching healthy status, ranks should be contiguous [0, 1] ranks_mapping = ds._get_replica_ranks_mapping() - ranks = sorted(ranks_mapping.values()) + ranks = sorted([r.rank for r in ranks_mapping.values()]) assert ranks == [0, 1], f"Expected ranks [0, 1] after scale down, got {ranks}" # Scale back up to 3 replicas - new replica should reuse available rank @@ -5526,7 +5526,7 @@ def test_scaling_up_and_down_scenario(self, mock_deployment_state_manager): # Final ranks should be contiguous [0, 1, 2] ranks_mapping = ds._get_replica_ranks_mapping() - ranks = sorted(ranks_mapping.values()) + ranks = sorted([r.rank for r in ranks_mapping.values()]) assert ranks == [0, 1, 2], f"Expected final ranks [0, 1, 2], got {ranks}" def test_controller_recovery_with_scattered_ranks( @@ -5572,7 +5572,7 @@ def test_controller_recovery_with_scattered_ranks( # At this point ranks should be scattered but all values [0, 1, 2] should be present ranks_mapping = new_ds._get_replica_ranks_mapping() - ranks = sorted(ranks_mapping.values()) + ranks = sorted([r.rank for r in ranks_mapping.values()]) assert ranks == [0, 1, 2], "Should have recovered scattered ranks" # Trigger rank consistency check with one more update - this should reorder if needed @@ -5580,7 +5580,7 @@ def test_controller_recovery_with_scattered_ranks( # After rank consistency check, ranks should still be [0, 1, 2] final_ranks_mapping = new_ds._get_replica_ranks_mapping() - final_ranks = sorted(final_ranks_mapping.values()) + final_ranks = sorted([r.rank for r in final_ranks_mapping.values()]) assert final_ranks == [ 0, 1, @@ -5617,16 +5617,16 @@ def test_complex_reassignment_scenario(self, mock_deployment_state_manager): global replica_rank_context replica_rank_context.clear() replica_rank_context[replica_ids[0].unique_id] = ReplicaRank( - rank=0, node_rank=-1, local_rank=-1 + rank=0, node_rank=0, local_rank=0 ) replica_rank_context[replica_ids[1].unique_id] = ReplicaRank( - rank=3, node_rank=-1, local_rank=-1 + rank=3, node_rank=0, local_rank=1 ) replica_rank_context[replica_ids[2].unique_id] = ReplicaRank( - rank=7, node_rank=-1, local_rank=-1 + rank=7, node_rank=0, local_rank=2 ) replica_rank_context[replica_ids[3].unique_id] = ReplicaRank( - rank=10, node_rank=-1, local_rank=-1 + rank=10, node_rank=0, local_rank=3 ) # Simulate controller crashed! Create a new deployment state manager @@ -5650,7 +5650,7 @@ def test_complex_reassignment_scenario(self, mock_deployment_state_manager): # After reassignment, ranks should be contiguous [0, 1, 2, 3] ranks_mapping = new_ds._get_replica_ranks_mapping() - ranks = sorted(ranks_mapping.values()) + ranks = sorted([r.rank for r in ranks_mapping.values()]) assert ranks == [ 0, 1, @@ -5680,7 +5680,7 @@ def test_rank_consistency_during_version_rollout( # Verify initial ranks are contiguous ranks_mapping = ds._get_replica_ranks_mapping() - initial_ranks = sorted(ranks_mapping.values()) + initial_ranks = sorted([r.rank for r in ranks_mapping.values()]) assert initial_ranks == [0, 1, 2] # Deploy version 2 - this should trigger rolling update @@ -5717,7 +5717,7 @@ def test_rank_consistency_during_version_rollout( # After rolling update, verify ranks are still contiguous final_ranks_mapping = ds._get_replica_ranks_mapping() - final_ranks = sorted(final_ranks_mapping.values()) + final_ranks = sorted([r.rank for r in final_ranks_mapping.values()]) assert final_ranks == [ 0, 1, @@ -5777,7 +5777,7 @@ def test_rank_assignment_with_replica_failures(self, mock_deployment_state_manag replica.replica_id.unique_id for replica in running_replicas ] running_replica_ranks = [ - ranks_mapping[replica_id] + ranks_mapping[replica_id].rank for replica_id in running_replica_ids if replica_id in ranks_mapping ] @@ -5787,7 +5787,7 @@ def test_rank_assignment_with_replica_failures(self, mock_deployment_state_manag 0, 1, 2, - }, f"Expected ranks [0, 1, 2], got {ranks_mapping.values()}" + }, f"Expected ranks [0, 1, 2], got {[r.rank for r in ranks_mapping.values()]}" class TestGetOutboundDeployments: