Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions python/ray/serve/_private/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
HTTPOptionsSchema,
LoggingConfig,
ProxyDetails,
ReplicaRank,
ServeActorDetails,
ServeApplicationSchema,
ServeDeploySchema,
Expand Down Expand Up @@ -1205,12 +1206,14 @@ def record_request_routing_info(self, info: RequestRoutingInfo):
"""
self.deployment_state_manager.record_request_routing_info(info)

def _get_replica_ranks_mapping(self, deployment_id: DeploymentID) -> Dict[str, int]:
def _get_replica_ranks_mapping(
self, deployment_id: DeploymentID
) -> Dict[str, ReplicaRank]:
"""Get the current rank mapping for all replicas in a deployment.
Args:
deployment_id: The deployment ID to get ranks for.
Returns:
Dictionary mapping replica_id to rank.
Dictionary mapping replica_id to ReplicaRank object (with rank, node_rank, local_rank).
"""
return self.deployment_state_manager._get_replica_ranks_mapping(deployment_id)

Expand Down
159 changes: 139 additions & 20 deletions python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,10 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[str]]:
replica_ready_check_func = (
self._actor_handle.initialize_and_get_metadata
)
self._rank = self._assign_rank_callback(self._replica_id.unique_id)
# this guarantees that node_id is set before rank is assigned
self._rank = self._assign_rank_callback(
self._replica_id.unique_id, self._node_id
)
self._ready_obj_ref = replica_ready_check_func.remote(
deployment_config, self._rank
)
Expand Down Expand Up @@ -1666,15 +1669,26 @@ class DeploymentRankManager:
This class handles rank assignment, release, consistency checking, and reassignment.
It maintains the rank system invariants and provides a clean interface for rank operations.

Maintains one level of rank tracking:
Maintains three levels of rank tracking:
- Global rank: Replica-level rank across all nodes (0, 1, 2, ...)
- Local rank: Replica's rank within its node (0, 1, 2, ... per node)
- Node rank ID: Index assigned to each node (0, 1, 2, ...)
"""

def __init__(self, fail_on_rank_error: bool = True):
# Global rank manager (existing replica-level rank)
self._replica_rank_manager = RankManager()
self._fail_on_rank_error = fail_on_rank_error

# Node rank manager (assigns rank IDs to nodes)
self._node_rank_manager = RankManager()

# Local rank managers (one per node, manages replica ranks within each node)
self._local_rank_managers: Dict[str, RankManager] = {}

# Track which node each replica is on
self._replica_to_node: Dict[str, str] = {}

def _execute_with_error_handling(self, func, safe_default, *args, **kwargs):
if self._fail_on_rank_error:
# Let exceptions propagate
Expand All @@ -1687,11 +1701,12 @@ def _execute_with_error_handling(self, func, safe_default, *args, **kwargs):
logger.error(f"Error executing function {func.__name__}: {e}")
return safe_default

def assign_rank(self, replica_id: str) -> ReplicaRank:
def assign_rank(self, replica_id: str, node_id: str) -> ReplicaRank:
"""Assign a rank to a new replica.

Args:
replica_id: The unique ID of the replica
node_id: The unique ID of the node

Returns:
ReplicaRank object with the assigned rank
Expand All @@ -1706,13 +1721,25 @@ def _assign_rank_impl():
f"Rank for {replica_id} already assigned: {self._replica_rank_manager.get_rank(replica_id)}"
)

# Track the replica-to-node mapping
self._replica_to_node[replica_id] = node_id

# Assign global rank
rank = self._replica_rank_manager.assign_rank(replica_id)

return ReplicaRank(rank=rank, node_rank=-1, local_rank=-1)
# Assign node rank if this node doesn't have one yet
if node_id not in self._local_rank_managers:
self._node_rank_manager.assign_rank(node_id)
self._local_rank_managers[node_id] = RankManager()

node_rank = self._node_rank_manager.get_rank(node_id)
# Assign local rank within the node
local_rank = self._local_rank_managers[node_id].assign_rank(replica_id)

return ReplicaRank(rank=rank, node_rank=node_rank, local_rank=local_rank)

return self._execute_with_error_handling(
_assign_rank_impl, ReplicaRank(rank=0, node_rank=-1, local_rank=-1)
_assign_rank_impl, ReplicaRank(rank=0, node_rank=0, local_rank=0)
)

def release_rank(self, replica_id: str) -> None:
Expand All @@ -1729,20 +1756,36 @@ def _release_rank_impl():
if not self.has_replica_rank(replica_id):
raise RuntimeError(f"Rank for {replica_id} not assigned")

# Get the node_id from the replica mapping
node_id = self._replica_to_node[replica_id]

# Release global rank
self._replica_rank_manager.release_rank(replica_id)

# Release local rank
self._local_rank_managers[node_id].release_rank(replica_id)

# Release node rank if this was the last replica on the node
if len(self._local_rank_managers[node_id].get_ranks_mapping()) == 0:
self._node_rank_manager.release_rank(node_id)
del self._local_rank_managers[node_id]

# Remove replica from node mapping
del self._replica_to_node[replica_id]

return self._execute_with_error_handling(_release_rank_impl, None)

def recover_rank(
self,
replica_id: str,
node_id: str,
rank: ReplicaRank,
) -> None:
"""Recover rank for a replica (e.g., after controller restart).

Args:
replica_id: ID of the replica
node_id: ID of the node
rank: The rank to recover

Raises:
Expand All @@ -1758,6 +1801,18 @@ def _recover_rank_impl():
# Recover global rank
self._replica_rank_manager.recover_rank(replica_id, rank.rank)

# Recover node rank only if this node doesn't already have one
if not self._node_rank_manager.has_rank(node_id):
self._node_rank_manager.recover_rank(node_id, rank.node_rank)

# Recover local rank
if node_id not in self._local_rank_managers:
self._local_rank_managers[node_id] = RankManager()
self._local_rank_managers[node_id].recover_rank(replica_id, rank.local_rank)

# Track the replica-to-node mapping
self._replica_to_node[replica_id] = node_id
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Inconsistent state on partial failure in recover_rank

The order of operations in _recover_rank_impl is inconsistent with _assign_rank_impl. In _assign_rank_impl, _replica_to_node[replica_id] = node_id is set first (before any rank assignments), but in _recover_rank_impl, it's set last (after all rank recoveries). When _fail_on_rank_error=False and an error occurs after recovering the global rank but before setting _replica_to_node, the system ends up in an inconsistent state where _replica_rank_manager has the replica's global rank but _replica_to_node doesn't have the mapping. This causes has_replica_rank() to return False even though ranks are partially assigned, potentially leading to duplicate assignment errors on retry.

Additional Locations (1)

Fix in Cursor Fix in Web


return self._execute_with_error_handling(_recover_rank_impl, None)

def has_replica_rank(self, replica_id: str) -> bool:
Expand All @@ -1772,7 +1827,16 @@ def has_replica_rank(self, replica_id: str) -> bool:
Raises:
RuntimeError: If the replica doesn't have ranks assigned
"""
return self._replica_rank_manager.has_rank(replica_id)
if replica_id not in self._replica_to_node:
return False

node_id = self._replica_to_node[replica_id]
return (
self._replica_rank_manager.has_rank(replica_id)
and node_id in self._local_rank_managers
and self._node_rank_manager.has_rank(node_id)
and self._local_rank_managers[node_id].has_rank(replica_id)
)

def get_replica_rank(self, replica_id: str) -> ReplicaRank:
"""Get the rank for a replica.
Expand All @@ -1792,10 +1856,15 @@ def _get_replica_rank_impl():
raise RuntimeError(f"Rank for {replica_id} not assigned")

global_rank = self._replica_rank_manager.get_rank(replica_id)
return ReplicaRank(rank=global_rank, node_rank=-1, local_rank=-1)
node_id = self._replica_to_node[replica_id]
node_rank = self._node_rank_manager.get_rank(node_id)
local_rank = self._local_rank_managers[node_id].get_rank(replica_id)
return ReplicaRank(
rank=global_rank, node_rank=node_rank, local_rank=local_rank
)

return self._execute_with_error_handling(
_get_replica_rank_impl, ReplicaRank(rank=0, node_rank=-1, local_rank=-1)
_get_replica_rank_impl, ReplicaRank(rank=0, node_rank=0, local_rank=0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my information, why change the default from -1 -> 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-1 was a placeholder, for a previous stacked diff.

)

def check_rank_consistency_and_reassign_minimally(
Expand All @@ -1806,6 +1875,8 @@ def check_rank_consistency_and_reassign_minimally(

This method ensures:
1. Global ranks are contiguous [0, N-1] for N replicas
2. Node ranks are contiguous [0, M-1] for M nodes
3. Local ranks are contiguous [0, K-1] for K replicas on each node

Args:
active_replicas: List of currently active replicas
Expand Down Expand Up @@ -1837,6 +1908,35 @@ def _check_rank_consistency_impl():
)
all_replica_ids_needing_reconfiguration.update(replica_ids_from_global)

# STEP 2: Group replicas by node and check local rank consistency per node
replicas_by_node: Dict[str, List[str]] = {}
for replica_id in active_replica_ids:
node_id = self._replica_to_node.get(replica_id)
assert (
node_id is not None
), f"Replica {replica_id} not assigned to any node"
if node_id not in replicas_by_node:
replicas_by_node[node_id] = []
replicas_by_node[node_id].append(replica_id)

for node_id, replica_ids_on_node in replicas_by_node.items():
replica_ids_from_local = self._local_rank_managers[
node_id
].check_rank_consistency_and_reassign_minimally(replica_ids_on_node)
all_replica_ids_needing_reconfiguration.update(replica_ids_from_local)

# STEP 3: Check node rank consistency
active_node_ids = list(replicas_by_node.keys())
if active_node_ids:
node_ids_needing_reassignment = self._node_rank_manager.check_rank_consistency_and_reassign_minimally(
active_node_ids,
)
# If any nodes were reassigned, all replicas on those nodes need reconfiguration
for node_id in node_ids_needing_reassignment:
all_replica_ids_needing_reconfiguration.update(
replicas_by_node[node_id]
)

# Convert replica IDs back to replica objects
# Filter out stale replicas that are not in the active set
replicas_needing_reconfiguration = [
Expand All @@ -1851,9 +1951,20 @@ def _check_rank_consistency_impl():

def clear(self) -> None:
self._replica_rank_manager.clear()
self._node_rank_manager.clear()
self._local_rank_managers.clear()
self._replica_to_node.clear()

def get_replica_ranks_mapping(self) -> Dict[str, int]:
return self._replica_rank_manager.get_ranks_mapping()
def get_replica_ranks_mapping(self) -> Dict[str, ReplicaRank]:
"""Get the current mapping of replica IDs to ReplicaRank objects.

Returns:
Dict mapping replica_id to ReplicaRank object
"""
result = {}
for replica_id in self._replica_rank_manager.get_ranks_mapping().keys():
result[replica_id] = self.get_replica_rank(replica_id)
return result


class DeploymentState:
Expand Down Expand Up @@ -2679,7 +2790,9 @@ def _check_startup_replicas(
# data structure with RUNNING state.
# Recover rank from the replica actor during controller restart
replica_id = replica.replica_id.unique_id
self._rank_manager.recover_rank(replica_id, replica.rank)
self._rank_manager.recover_rank(
replica_id, replica.actor_node_id, replica.rank
)
# This replica should be now be added to handle's replica
# set.
self._replicas.add(ReplicaState.RUNNING, replica)
Expand Down Expand Up @@ -2913,10 +3026,13 @@ def check_and_update_replicas(self):
# Release rank only after replica is successfully stopped
# This ensures rank is available during draining/graceful shutdown
replica_id = replica.replica_id.unique_id
self._rank_manager.release_rank(replica_id)
logger.debug(
f"Released rank from replica {replica_id} in deployment {self._id}"
)
if self._rank_manager.has_replica_rank(replica_id):
# Only release rank if assigned. Replicas that failed allocation
# or never reached RUNNING state won't have ranks.
self._rank_manager.release_rank(replica_id)
logger.debug(
f"Released rank from replica {replica_id} in deployment {self._id}"
)
self._autoscaling_state_manager.on_replica_stopped(replica.replica_id)

# After replica state updates, check rank consistency and perform minimal reassignment if needed
Expand Down Expand Up @@ -2970,10 +3086,11 @@ def _reconfigure_replicas_with_new_ranks(
f"Successfully reconfigured {updated_count} replicas with new ranks in deployment {self._id}"
)

def _get_replica_ranks_mapping(self) -> Dict[str, int]:
"""Get the current mapping of replica IDs to ranks.
def _get_replica_ranks_mapping(self) -> Dict[str, ReplicaRank]:
"""Get the current mapping of replica IDs to ReplicaRank objects.

Returns:
Dictionary mapping replica_id to rank.
Dictionary mapping replica_id to ReplicaRank object (with rank, node_rank, local_rank).
"""
return self._rank_manager.get_replica_ranks_mapping()

Expand Down Expand Up @@ -3721,12 +3838,14 @@ def get_ingress_replicas_info(self) -> List[Tuple[str, str, int, int]]:
)
return ingress_replicas_info

def _get_replica_ranks_mapping(self, deployment_id: DeploymentID) -> Dict[str, int]:
def _get_replica_ranks_mapping(
self, deployment_id: DeploymentID
) -> Dict[str, ReplicaRank]:
"""Get the current rank mapping for all replicas in a deployment.
Args:
deployment_id: The deployment ID to get ranks for.
Returns:
Dictionary mapping replica_id to rank.
Dictionary mapping replica_id to ReplicaRank object (with rank, node_rank, local_rank).
"""
deployment_state = self._deployment_states.get(deployment_id)
if deployment_state is None:
Expand Down
Loading