Skip to content

Commit

Permalink
Fixed comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan O'Leary <ryanaoleary@google.com>
  • Loading branch information
ryanaoleary committed Jun 7, 2024
1 parent 21c99a3 commit 627fcb2
Showing 1 changed file with 31 additions and 42 deletions.
73 changes: 31 additions & 42 deletions python/ray/autoscaler/_private/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,6 @@ def __init__(self, provider: NodeProvider):
self.worker_ids: List[NodeID] = []
# The head node (node kind "head")
self.head_id: Optional[NodeID] = None
# Map of replica IDs to worker nodes in each replica.
# A replica ID refers to the index of a multi-host PodSlice created by KubeRay.
self.replicas_to_nodes = defaultdict(list)

for node in self.all_node_ids:
tags = provider.node_tags(node)
Expand All @@ -139,9 +136,6 @@ def __init__(self, provider: NodeProvider):
self.worker_ids.append(node)
elif node_kind == NODE_KIND_HEAD:
self.head_id = node
if TAG_RAY_REPLICA_INDEX in tags:
node_replica_index = tags[TAG_RAY_REPLICA_INDEX]
self.replicas_to_nodes[node_replica_index].append(node)

# Note: For typical use-cases, self.all_node_ids == self.worker_ids +
# [self.head_id]. The difference being in the case of unmanaged nodes.
Expand Down Expand Up @@ -280,7 +274,11 @@ def read_fn():
self.nodes_to_terminate: List[NodeID] = []

# Tracks replicas scheduled for termination
self.replicas_to_delete = set()
self.replicas_to_delete: Set[str] = set()

# Map of replica IDs to worker nodes in each replica.
# A replica ID refers to the index of a multi-host PodSlice created by KubeRay.
self.replicas_to_nodes: Dict[str, List[NodeID]] = defaultdict(List[NodeID])

# Disable NodeUpdater threads if true.
# Should be set to true in situations where another component, such as
Expand Down Expand Up @@ -418,6 +416,14 @@ def _update(self):
)
return

# Populate mapping of replica IDs to nodes in that replica.
for node in self.non_terminated_nodes:
tags = self.provider.node_tags(node)
if TAG_RAY_REPLICA_INDEX in tags:
node_replica_index = tags[TAG_RAY_REPLICA_INDEX]
if node not in self.replicas_to_nodes[node_replica_index]:
self.replicas_to_nodes[node_replica_index].append(node)

# This will accumulate the nodes we need to terminate.
self.nodes_to_terminate = []

Expand Down Expand Up @@ -627,7 +633,7 @@ def schedule_node_termination(
if TAG_RAY_REPLICA_INDEX in tags:
replica_id = tags[TAG_RAY_REPLICA_INDEX]
if replica_id not in self.replicas_to_delete:
self.replicas_to_delete.append(replica_id)
self.replicas_to_delete.add(replica_id)

def terminate_scheduled_nodes(self):
"""Terminate scheduled nodes and clean associated autoscaler state."""
Expand Down Expand Up @@ -989,19 +995,23 @@ def _keep_worker_of_node_type(
self, node_id: NodeID, node_type_counts: Dict[NodeType, int]
) -> Tuple[KeepOrTerminate, Optional[str]]:
"""Determines if a worker should be kept based on the min_workers
and max_workers constraint of the worker's node_type.
and max_workers constraint of the worker's node_type. Additionally,
workers belonging to a multi-host replica being deleted are scheduled
to delete as well.
Returns KeepOrTerminate.keep when both of the following hold:
(a) The worker's node_type is present among the keys of the current
config's available_node_types dict.
(b) Deleting the node would violate the min_workers constraint for that
worker's node_type.
Returns KeepOrTerminate.terminate when both the following hold:
Returns KeepOrTerminate.terminate when the following hold:
(a) The worker's node_type is not present among the keys of the current
config's available_node_types dict.
(b) Keeping the node would violate the max_workers constraint for that
worker's node_type.
(c) The worker has TAG_RAY_REPLICA_INDEX tag set, and its replica index is
found in replicas_to_delete.
Return KeepOrTerminate.decide_later otherwise.
Expand All @@ -1019,6 +1029,17 @@ def _keep_worker_of_node_type(
assert self.provider

tags = self.provider.node_tags(node_id)

if TAG_RAY_REPLICA_INDEX in tags:
replica_id = tags[TAG_RAY_REPLICA_INDEX]
# All nodes in this replica should be deleted, regardless of
# available_node_types.
if replica_id in self.replicas_to_delete:
return (
KeepOrTerminate.terminate,
f"Node belongs to a replica being deleted: {replica_id}",
)

if TAG_RAY_USER_NODE_TYPE in tags:
node_type = tags[TAG_RAY_USER_NODE_TYPE]

Expand All @@ -1044,38 +1065,6 @@ def _keep_worker_of_node_type(

return KeepOrTerminate.decide_later, None

def _keep_node_of_replica(
self, node_id: NodeID
) -> Tuple[KeepOrTerminate, Optional[str]]:
"""Determines if a node should be kept based on whether any
other nodes in the same replica have been deleted.
Returns KeepOrTerminate.terminate when:
(a) The node belongs to a replica scheduled to be deleted.
Return KeepOrTerminate.keep otherwise.
Returns:
KeepOrTerminate: keep if the node should be kept, terminate if the
node should be terminated
Optional[str]: reason for termination. Not None on
KeepOrTerminate.terminate, None otherwise.
"""
# For type checking, assert that this object has been instantitiated.
assert self.provider

tags = self.provider.node_tags(node_id)
if TAG_RAY_REPLICA_INDEX in tags:
replica_id = tags[TAG_RAY_REPLICA_INDEX]
# All nodes in this replica should be deleted
if replica_id in self.replicas_to_delete:
return (
KeepOrTerminate.terminate,
f"Node belongs to a replica being deleted: {replica_id}",
)

return KeepOrTerminate.keep, None

def _node_resources(self, node_id):
node_type = self.provider.node_tags(node_id).get(TAG_RAY_USER_NODE_TYPE)
if self.available_node_types:
Expand Down

0 comments on commit 627fcb2

Please sign in to comment.