Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Enable Scaling Down for Multi-Host TPU Replicas #43470

Merged
merged 57 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from 54 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
50129eb
Multi-host replica deletion logic initial commit
ryanaoleary Feb 27, 2024
f532cf1
Multi-host replica deletion logic
ryanaoleary Feb 27, 2024
8a1d4b9
Fix broken tests
ryanaoleary May 8, 2024
f1704b7
Fix KubeRay spelling
ryanaoleary May 8, 2024
544a80b
Make replica_index optional and added KeyError check
ryanaoleary May 10, 2024
2d3d990
Remove self from tags initialization
ryanaoleary May 21, 2024
f2c227b
Update python/ray/autoscaler/batching_node_provider.py
ryanaoleary May 31, 2024
7d76c40
Update python/ray/autoscaler/_private/kuberay/node_provider.py
ryanaoleary May 31, 2024
62f25b2
Update python/ray/autoscaler/_private/kuberay/node_provider.py
ryanaoleary May 31, 2024
de6f7f2
Move replicas_to_delete and change to set
ryanaoleary May 31, 2024
e6a7372
Add assert for self.replicas_to_nodes
ryanaoleary May 31, 2024
1f1bccc
Move replicas_to_node initialization to constructor
ryanaoleary May 31, 2024
2d969c8
Fixed comments
ryanaoleary Jun 7, 2024
89c6c81
Merge branch 'master' into autoscaling-changes
ryanaoleary Jun 7, 2024
1286434
Fixed comments
ryanaoleary Jun 7, 2024
60930a4
Add back in changes lost in bad rebase
ryanaoleary Jun 7, 2024
95c3f16
Change to iterate through worker_ids
ryanaoleary Jun 7, 2024
73e9517
Fix replicas_to_nodes instantiation
ryanaoleary Jun 7, 2024
3062e27
Merge branch 'master' into autoscaling-changes
ryanaoleary Jun 8, 2024
143e03a
Fix errors causing autoscaler test to fail
ryanaoleary Jun 8, 2024
1394935
Update python/ray/autoscaler/_private/kuberay/node_provider.py
ryanaoleary Jun 11, 2024
dd22594
Fixed comments
ryanaoleary Jun 11, 2024
473b7cb
Update python/ray/autoscaler/_private/autoscaler.py
ryanaoleary Jun 12, 2024
fa083aa
Fix replicas_to_nodes creation
ryanaoleary Jun 12, 2024
81df1df
Add log statement and make naming consistent
ryanaoleary Jun 12, 2024
c219c22
Change all instances of replica ID to index
ryanaoleary Jun 12, 2024
112a448
Remove NumOfHosts check from node provider
ryanaoleary Jun 13, 2024
64c5d4b
Update python/ray/autoscaler/batching_node_provider.py
ryanaoleary Jun 13, 2024
eab706f
Update python/ray/autoscaler/batching_node_provider.py
ryanaoleary Jun 13, 2024
96272bb
Clean up replicas to nodes and fixed var names
ryanaoleary Jun 13, 2024
c70bdbb
Merge branch 'master' into autoscaling-changes
ryanaoleary Jun 13, 2024
fa4e285
Change from remove to pop
ryanaoleary Jun 13, 2024
9635250
Add None check to replicas_to_delete
ryanaoleary Jun 14, 2024
0504175
Add more testing for scale down behavior
ryanaoleary Jun 14, 2024
fd27aca
Add more error checking
ryanaoleary Jun 14, 2024
47e46ac
Add node_tags None check
ryanaoleary Jun 20, 2024
1ae9ec9
Merge branch 'master' into autoscaling-changes
ryanaoleary Jun 20, 2024
a06581c
Change get_node_tags and fix terminate node logic
ryanaoleary Jun 20, 2024
08b8eb9
Merge branch 'master' into autoscaling-changes
ryanaoleary Jun 24, 2024
c5048d6
Merge branch 'master' into autoscaling-changes
ryanaoleary Jun 25, 2024
33d7f0a
Move multi-host scale down logic entirely to batching_node_provider
ryanaoleary Jun 26, 2024
45c5b23
Add logger info statement for multi-host scaling
ryanaoleary Jun 26, 2024
184bc2b
Add back in replica index tag
ryanaoleary Jun 27, 2024
ef7e025
BatchingNodeProvider autoscaling test
ryanaoleary Jun 27, 2024
72cfedf
Merge branch 'master' into autoscaling-changes
ryanaoleary Jun 27, 2024
776f8df
Remove newline
ryanaoleary Jun 27, 2024
5659499
Merge branch 'ray-project:master' into autoscaling-changes
ryanaoleary Jun 27, 2024
3f168c5
Merge branch 'master' into autoscaling-changes
ryanaoleary Jun 27, 2024
76f092d
Mock BatchNodeProvider directly
ryanaoleary Jun 27, 2024
f1037d1
Update podlist2 with actual TPU pod yaml
ryanaoleary Jun 28, 2024
302b69a
Fix test comments
ryanaoleary Jun 28, 2024
14e8f79
Fix batching node provider log statement
ryanaoleary Jun 28, 2024
aa28221
Remove unused index increment
ryanaoleary Jul 1, 2024
875767d
Merge branch 'master' into autoscaling-changes
ryanaoleary Jul 1, 2024
2aa5a31
Add type annotation and fix nits
ryanaoleary Jul 2, 2024
b67be14
Merge branch 'master' into autoscaling-changes
ryanaoleary Jul 2, 2024
de1fa8a
Merge branch 'master' into autoscaling-changes
ryanaoleary Jul 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion python/ray/autoscaler/_private/kuberay/node_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@

RAY_HEAD_POD_NAME = os.getenv("RAY_HEAD_POD_NAME")

# Key for GKE label that identifies which multi-host replica a pod belongs to
REPLICA_INDEX_KEY = "replicaIndex"
ryanaoleary marked this conversation as resolved.
Show resolved Hide resolved

# Design:

# Each modification the autoscaler wants to make is posted to the API server goal state
Expand Down Expand Up @@ -79,7 +82,10 @@ def node_data_from_pod(pod: Dict[str, Any]) -> NodeData:
kind, type = kind_and_type(pod)
status = status_tag(pod)
ip = pod_ip(pod)
return NodeData(kind=kind, type=type, status=status, ip=ip)
replica_index = _replica_index_label(pod)
kevin85421 marked this conversation as resolved.
Show resolved Hide resolved
return NodeData(
kind=kind, type=type, replica_index=replica_index, status=status, ip=ip
)


def kind_and_type(pod: Dict[str, Any]) -> Tuple[NodeKind, NodeType]:
Expand All @@ -96,6 +102,16 @@ def kind_and_type(pod: Dict[str, Any]) -> Tuple[NodeKind, NodeType]:
return kind, type


def _replica_index_label(pod: Dict[str, Any]) -> Optional[str]:
ryanaoleary marked this conversation as resolved.
Show resolved Hide resolved
"""Returns the replicaIndex label for a Pod in a multi-host TPU worker group.
The replicaIndex label is set by the GKE TPU Ray webhook and is of
the form {$WORKER_GROUP_NAME-$REPLICA_INDEX} where $REPLICA_INDEX
is an integer from 0 to Replicas-1.
"""
labels = pod["metadata"]["labels"]
ryanaoleary marked this conversation as resolved.
Show resolved Hide resolved
return labels.get(REPLICA_INDEX_KEY, None)


def pod_ip(pod: Dict[str, Any]) -> NodeIP:
return pod["status"].get("podIP", "IP not yet assigned")

Expand Down
32 changes: 31 additions & 1 deletion python/ray/autoscaler/batching_node_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
NODE_KIND_HEAD,
TAG_RAY_NODE_KIND,
TAG_RAY_NODE_STATUS,
TAG_RAY_REPLICA_INDEX,
TAG_RAY_USER_NODE_TYPE,
)

Expand Down Expand Up @@ -43,6 +44,8 @@ class NodeData:
Attributes:
kind: Whether the node is the head or a worker.
type: The user-defined type of the node.
replica_index: An identifier for nodes in a replica of a TPU worker group.
This value is set as a Pod label by a GKE webhook when TPUs are requested
ip: Cluster-internal ip of the node. ip can be None if the ip
has not yet been assigned.
status: The status of the node. You must adhere to the following semantics
Expand All @@ -58,6 +61,7 @@ class NodeData:
type: NodeType
ip: Optional[NodeIP]
status: NodeStatus
replica_index: Optional[str] = None


class BatchingNodeProvider(NodeProvider):
Expand Down Expand Up @@ -116,6 +120,9 @@ def __init__(

self.scale_request = ScaleRequest()

# Initialize map of replica indices to nodes in that replica
self.replicas_to_nodes = defaultdict(list)
ryanaoleary marked this conversation as resolved.
Show resolved Hide resolved
ryanaoleary marked this conversation as resolved.
Show resolved Hide resolved

def get_node_data(self) -> Dict[NodeID, NodeData]:
"""Queries cluster manager for node info. Returns a mapping from node id to
NodeData.
Expand Down Expand Up @@ -160,6 +167,12 @@ def non_terminated_nodes(self, tag_filters: Dict[str, str]) -> List[str]:
workers_to_delete=set(), # No workers to delete yet
)
all_nodes = list(self.node_data_dict.keys())
self.replicas_to_nodes.clear()
for node_id in all_nodes:
replica_index = self.node_data_dict[node_id].replica_index
# Only add node to map if it belongs to a multi-host podslice
if replica_index is not None:
ryanaoleary marked this conversation as resolved.
Show resolved Hide resolved
self.replicas_to_nodes[replica_index].append(node_id)
# Support filtering by TAG_RAY_NODE_KIND, TAG_RAY_NODE_STATUS, and
# TAG_RAY_USER_NODE_TYPE.
# The autoscaler only uses tag_filters={},
Expand Down Expand Up @@ -187,11 +200,14 @@ def _cur_num_workers(self, node_data_dict: Dict[str, Any]):

def node_tags(self, node_id: str) -> Dict[str, str]:
ryanaoleary marked this conversation as resolved.
Show resolved Hide resolved
node_data = self.node_data_dict[node_id]
return {
tags = {
TAG_RAY_NODE_KIND: node_data.kind,
ryanaoleary marked this conversation as resolved.
Show resolved Hide resolved
TAG_RAY_NODE_STATUS: node_data.status,
TAG_RAY_USER_NODE_TYPE: node_data.type,
}
if node_data.replica_index is not None:
tags[TAG_RAY_REPLICA_INDEX] = node_data.replica_index
return tags

def internal_ip(self, node_id: str) -> str:
return self.node_data_dict[node_id].ip
Expand Down Expand Up @@ -230,6 +246,20 @@ def terminate_node(self, node_id: str) -> Optional[Dict[str, Any]]:
f"{node_type}. Skipping termination request."
)

# Terminate node
self.scale_request.desired_num_workers[node_type] -= 1
self.scale_request.workers_to_delete.add(node_id)

# Scale down all nodes in replica if node_id is part of a multi-host podslice
tags = self.node_tags(node_id)
if TAG_RAY_REPLICA_INDEX in tags:
node_replica_index = tags[TAG_RAY_REPLICA_INDEX]
for worker_id in self.replicas_to_nodes[node_replica_index]:
# Check if worker has already been scheduled to delete
if worker_id not in self.scale_request.workers_to_delete:
self.scale_request.workers_to_delete.add(worker_id)
logger.info(
f"Autoscaler terminating node {worker_id} "
f"in multi-host replica {node_replica_index}."
)
self.scale_change_needed = True
2 changes: 2 additions & 0 deletions python/ray/autoscaler/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# Tag for user defined node types (e.g., m4xl_spot). This is used for multi
# node type clusters.
TAG_RAY_USER_NODE_TYPE = "ray-user-node-type"
# Tag for index of replica node belongs to. Used for multi-host worker groups.
TAG_RAY_REPLICA_INDEX = "ray-replica-index"
# Tag for autofilled node types for legacy cluster yamls without multi
# node type defined in the cluster configs.
NODE_TYPE_LEGACY_HEAD = "ray-legacy-head-node-type"
Expand Down
Loading
Loading