Skip to content

Commit 4036252

Browse files
ryanaolearyjjyaoMengjinYan
authored
[Autoscaler] Add bundle_label_selector to request_resources sdk (#54843)
Signed-off-by: Ryan O'Leary <ryanaoleary@google.com> Signed-off-by: Ryan O'Leary <113500783+ryanaoleary@users.noreply.github.com> Co-authored-by: Jiajun Yao <jeromeyjj@gmail.com> Co-authored-by: Mengjin Yan <mengjinyan3@gmail.com>
1 parent 692c7c7 commit 4036252

File tree

12 files changed

+210
-48
lines changed

12 files changed

+210
-48
lines changed

python/ray/autoscaler/_private/commands.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -184,9 +184,16 @@ def debug_status(
184184

185185

186186
def request_resources(
187-
num_cpus: Optional[int] = None, bundles: Optional[List[dict]] = None
187+
num_cpus: Optional[int] = None,
188+
bundles: Optional[List[dict]] = None,
189+
bundle_label_selectors: Optional[List[dict]] = None,
188190
) -> None:
189-
"""Remotely request some CPU or GPU resources from the autoscaler.
191+
"""Remotely request some CPU or GPU resources from the autoscaler. Optionally
192+
specify label selectors for nodes with the requested resources.
193+
194+
If `bundle_label_selectors` is provided, `bundles` must also be provided.
195+
Both must be lists of the same length, and `bundle_label_selectors` expects a list
196+
of string dictionaries.
190197
191198
This function is to be called e.g. on a node before submitting a bunch of
192199
ray.remote calls to ensure that resources rapidly become available.
@@ -198,14 +205,24 @@ def request_resources(
198205
bundles (List[ResourceDict]): Scale the cluster to ensure this set of
199206
resource shapes can fit. This request is persistent until another
200207
call to request_resources() is made.
208+
bundle_label_selectors (List[Dict[str,str]]): Optional label selectors
209+
that new nodes must satisfy. (e.g. [{"accelerator-type": "A100"}])
210+
The elements in the bundle_label_selectors should be one-to-one mapping
211+
to the elements in bundles.
201212
"""
202213
if not ray.is_initialized():
203214
raise RuntimeError("Ray is not initialized yet")
204215
to_request = []
205-
if num_cpus:
206-
to_request += [{"CPU": 1}] * num_cpus
216+
for _ in range(num_cpus or 0):
217+
to_request.append({"resources": {"CPU": 1}, "label_selector": {}})
218+
assert not bundle_label_selectors or (
219+
bundles and len(bundles) == len(bundle_label_selectors)
220+
), "If bundle_label_selectors is provided, bundles must also be provided and have the same length."
207221
if bundles:
208-
to_request += bundles
222+
for i, bundle in enumerate(bundles):
223+
selector = bundle_label_selectors[i] if bundle_label_selectors else {}
224+
to_request.append({"resources": bundle, "label_selector": selector})
225+
209226
_internal_kv_put(
210227
AUTOSCALER_RESOURCE_REQUEST_CHANNEL, json.dumps(to_request), overwrite=True
211228
)

python/ray/autoscaler/sdk/sdk.py

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from contextlib import contextmanager
77
from typing import Any, Callable, Dict, Iterator, List, Optional, Union
88

9+
from ray._private.label_utils import validate_label_selector
910
from ray.autoscaler._private import commands
1011
from ray.autoscaler._private.cli_logger import cli_logger
1112
from ray.autoscaler._private.event_system import (
@@ -21,7 +22,7 @@ def create_or_update_cluster(
2122
*,
2223
no_restart: bool = False,
2324
restart_only: bool = False,
24-
no_config_cache: bool = False
25+
no_config_cache: bool = False,
2526
) -> Dict[str, Any]:
2627
"""Create or updates an autoscaling Ray cluster from a config json.
2728
@@ -87,7 +88,7 @@ def run_on_cluster(
8788
stop: bool = False,
8889
no_config_cache: bool = False,
8990
port_forward: Optional[commands.Port_forward] = None,
90-
with_output: bool = False
91+
with_output: bool = False,
9192
) -> Optional[str]:
9293
"""Runs a command on the specified cluster.
9394
@@ -133,7 +134,7 @@ def rsync(
133134
ip_address: Optional[str] = None,
134135
use_internal_ip: bool = False,
135136
no_config_cache: bool = False,
136-
should_bootstrap: bool = True
137+
should_bootstrap: bool = True,
137138
):
138139
"""Rsyncs files to or from the cluster.
139140
@@ -206,7 +207,9 @@ def get_worker_node_ips(cluster_config: Union[dict, str]) -> List[str]:
206207

207208
@DeveloperAPI
208209
def request_resources(
209-
num_cpus: Optional[int] = None, bundles: Optional[List[dict]] = None
210+
num_cpus: Optional[int] = None,
211+
bundles: Optional[List[dict]] = None,
212+
bundle_label_selectors: Optional[List[dict]] = None,
210213
) -> None:
211214
"""Command the autoscaler to scale to accommodate the specified requests.
212215
@@ -230,6 +233,11 @@ def request_resources(
230233
bundles (List[ResourceDict]): Scale the cluster to ensure this set of
231234
resource shapes can fit. This request is persistent until another
232235
call to request_resources() is made to override.
236+
bundle_label_selectors: A list of label selectors, applied per-bundle to the same
237+
index in the `bundles` list. For bundles without a label requirement, the
238+
corresponding item in the list is an empty dictionary. For each bundle.
239+
Label selectors consist of zero or more key-value pairs where the key is
240+
a label and the value is a operator (in, !in, etc.) and label value.
233241
234242
Examples:
235243
>>> from ray.autoscaler.sdk import request_resources
@@ -241,6 +249,13 @@ def request_resources(
241249
>>> # Same as requesting num_cpus=3.
242250
>>> request_resources( # doctest: +SKIP
243251
... bundles=[{"CPU": 1}, {"CPU": 1}, {"CPU": 1}])
252+
>>> # Requests 2 num_cpus=1 bundles, the first with
253+
>>> # label_selector={"accelerator-type": "in(A100)"} and second with
254+
>>> # label_selector={"market-type": "spot"}.
255+
>>> request_resources( # doctest: +SKIP
256+
... bundles=[{"CPU": 1}, {"CPU": 1}]),
257+
... bundle_label_selectors=[{"accelerator-type": "in(A100)"},
258+
... {"market-type": "spot"}])
244259
"""
245260
if num_cpus is not None and not isinstance(num_cpus, int):
246261
raise TypeError("num_cpus should be of type int.")
@@ -257,8 +272,34 @@ def request_resources(
257272
raise TypeError("each bundle should be a Dict.")
258273
else:
259274
raise TypeError("bundles should be of type List")
260-
261-
return commands.request_resources(num_cpus, bundles)
275+
if bundle_label_selectors is not None:
276+
if bundles is None:
277+
raise ValueError(
278+
"`bundles` must be provided when `bundle_label_selectors` is specified."
279+
)
280+
if len(bundle_label_selectors) != len(bundles):
281+
raise ValueError(
282+
"`bundle_label_selector` must be a list with length equal to the number of bundles."
283+
)
284+
for label_selector in bundle_label_selectors:
285+
if (
286+
not isinstance(label_selector, dict)
287+
or not all(isinstance(k, str) for k in label_selector.keys())
288+
or not all(isinstance(v, str) for v in label_selector.values())
289+
):
290+
raise ValueError(
291+
"Bundle label selector must be a list of string dictionary"
292+
" label selectors. For example: "
293+
'`[{ray.io/market_type": "spot"}, {"ray.io/accelerator-type": "A100"}]`.'
294+
)
295+
error_message = validate_label_selector(label_selector)
296+
if error_message:
297+
raise ValueError(
298+
f"Invalid label selector provided in bundle_label_selectors list."
299+
f" Detailed error: '{error_message}'"
300+
)
301+
302+
return commands.request_resources(num_cpus, bundles, bundle_label_selectors)
262303

263304

264305
@DeveloperAPI

python/ray/autoscaler/v2/sdk.py

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import time
2-
from collections import defaultdict
3-
from typing import List
2+
from collections import Counter
3+
from typing import List, NamedTuple
44

55
from ray._raylet import GcsClient
66
from ray.autoscaler.v2.schema import ClusterStatus, Stats
@@ -14,8 +14,15 @@
1414
DEFAULT_RPC_TIMEOUT_S = 10
1515

1616

17+
class ResourceRequest(NamedTuple):
18+
resources: dict
19+
label_selector: dict
20+
21+
1722
def request_cluster_resources(
18-
gcs_address: str, to_request: List[dict], timeout: int = DEFAULT_RPC_TIMEOUT_S
23+
gcs_address: str,
24+
to_request: List[dict],
25+
timeout: int = DEFAULT_RPC_TIMEOUT_S,
1926
):
2027
"""Request resources from the autoscaler.
2128
@@ -28,28 +35,46 @@ def request_cluster_resources(
2835
2936
Args:
3037
gcs_address: The GCS address to query.
31-
to_request: A list of resource bundles to request the cluster to have.
32-
Each bundle is a dict of resource name to resource quantity, e.g:
33-
[{"CPU": 1}, {"GPU": 1}].
38+
to_request: A list of resource requests to request the cluster to have.
39+
Each resource request is a tuple of resources and a label_selector
40+
to apply per-bundle. e.g.: [{"resources": {"CPU": 1, "GPU": 1}, "label_selector": {"accelerator-type": "A100"}}]
3441
timeout: Timeout in seconds for the request to be timeout
3542
3643
"""
3744
assert len(gcs_address) > 0, "GCS address is not specified."
3845

39-
# Aggregate bundle by shape.
40-
resource_requests_by_count = defaultdict(int)
41-
for request in to_request:
42-
bundle = frozenset(request.items())
43-
resource_requests_by_count[bundle] += 1
46+
# Convert bundle dicts to ResourceRequest tuples.
47+
normalized: List[ResourceRequest] = []
48+
for r in to_request:
49+
assert isinstance(
50+
r, dict
51+
), f"Internal Error: Expected a dict, but got {type(r)}"
52+
resources = r.get("resources", {})
53+
selector = r.get("label_selector", {})
54+
normalized.append(ResourceRequest(resources, selector))
55+
56+
to_request = normalized
57+
58+
# Aggregate bundle by shape
59+
def keyfunc(r):
60+
return (
61+
frozenset(r.resources.items()),
62+
frozenset(r.label_selector.items()),
63+
)
64+
65+
grouped_requests = Counter(keyfunc(r) for r in to_request)
66+
67+
bundles: List[dict] = []
68+
label_selectors: List[dict] = []
69+
counts: List[int] = []
4470

45-
bundles = []
46-
counts = []
47-
for bundle, count in resource_requests_by_count.items():
71+
for (bundle, selector), count in grouped_requests.items():
4872
bundles.append(dict(bundle))
73+
label_selectors.append(dict(selector))
4974
counts.append(count)
5075

5176
GcsClient(gcs_address).request_cluster_resource_constraint(
52-
bundles, counts, timeout_s=timeout
77+
bundles, label_selectors, counts, timeout_s=timeout
5378
)
5479

5580

python/ray/autoscaler/v2/tests/test_autoscaler.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,9 @@ def test_basic_scaling(make_autoscaler):
121121

122122
# Resource requests
123123
print("=================== Test scaling up constraint 1/2====================")
124-
request_cluster_resources(gcs_address, [{"CPU": 1}, {"GPU": 1}])
124+
request_cluster_resources(
125+
gcs_address, [{"resources": {"CPU": 1}}, {"resources": {"GPU": 1}}]
126+
)
125127

126128
def verify():
127129
autoscaler.update_autoscaling_state()

python/ray/autoscaler/v2/tests/test_sdk.py

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717
NodeInfo,
1818
ResourceRequestByCount,
1919
)
20-
from ray.autoscaler.v2.sdk import get_cluster_status, request_cluster_resources
20+
from ray.autoscaler.v2.sdk import (
21+
get_cluster_status,
22+
request_cluster_resources,
23+
)
2124
from ray.autoscaler.v2.tests.util import (
2225
get_available_resources,
2326
get_cluster_resource_state,
@@ -26,6 +29,7 @@
2629
)
2730
from ray.core.generated import autoscaler_pb2, autoscaler_pb2_grpc
2831
from ray.core.generated.autoscaler_pb2 import ClusterResourceState, NodeStatus
32+
from ray.core.generated.common_pb2 import LabelSelectorOperator
2933
from ray.util.state.api import list_nodes
3034

3135

@@ -247,7 +251,7 @@ def test_request_cluster_resources_basic(shutdown_only):
247251
gcs_address = ctx.address_info["gcs_address"]
248252

249253
# Request one
250-
request_cluster_resources(gcs_address, [{"CPU": 1}])
254+
request_cluster_resources(gcs_address, [{"resources": {"CPU": 1}}])
251255

252256
def verify():
253257
state = get_cluster_resource_state(stub)
@@ -257,7 +261,9 @@ def verify():
257261
wait_for_condition(verify)
258262

259263
# Request another overrides the previous request
260-
request_cluster_resources(gcs_address, [{"CPU": 2, "GPU": 1}, {"CPU": 1}])
264+
request_cluster_resources(
265+
gcs_address, [{"resources": {"CPU": 2, "GPU": 1}}, {"resources": {"CPU": 1}}]
266+
)
261267

262268
def verify():
263269
state = get_cluster_resource_state(stub)
@@ -267,7 +273,7 @@ def verify():
267273
return True
268274

269275
# Request multiple is aggregated by shape.
270-
request_cluster_resources(gcs_address, [{"CPU": 1}] * 100)
276+
request_cluster_resources(gcs_address, [{"resources": {"CPU": 1}}] * 100)
271277

272278
def verify():
273279
state = get_cluster_resource_state(stub)
@@ -277,6 +283,65 @@ def verify():
277283
wait_for_condition(verify)
278284

279285

286+
def test_request_cluster_resources_with_label_selectors(shutdown_only):
287+
ctx = ray.init(num_cpus=1)
288+
stub = _autoscaler_state_service_stub()
289+
gcs_address = ctx.address_info["gcs_address"]
290+
291+
# Define two bundles, each with its own label_selector, to request.
292+
bundles = [
293+
{"CPU": 1},
294+
{"GPU": 1, "CPU": 2},
295+
]
296+
bundle_label_selectors = [
297+
{"region": "us-west1"},
298+
{"accelerator-type": "!in(A100)"},
299+
]
300+
to_request = [
301+
{"resources": b, "label_selector": s}
302+
for b, s in zip(bundles, bundle_label_selectors)
303+
]
304+
305+
# Send the request for these resource bundles
306+
request_cluster_resources(gcs_address, to_request)
307+
308+
def verify():
309+
state = get_cluster_resource_state(stub)
310+
# Validate shape and resource request count
311+
assert_cluster_resource_constraints(state, bundles, [1, 1])
312+
313+
# Check that requests carry expected label selectors
314+
requests = state.cluster_resource_constraints[0].resource_requests
315+
316+
# First resource request
317+
label_selectors_0 = requests[0].request.label_selectors
318+
selector_0 = label_selectors_0[0]
319+
constraints_0 = {
320+
c.label_key: list(c.label_values) for c in selector_0.label_constraints
321+
}
322+
assert constraints_0 == {"region": ["us-west1"]}
323+
assert (
324+
selector_0.label_constraints[0].operator
325+
== LabelSelectorOperator.LABEL_OPERATOR_IN
326+
)
327+
328+
# Second resource request
329+
label_selectors_1 = requests[1].request.label_selectors
330+
selector_1 = label_selectors_1[0]
331+
constraints_1 = {
332+
c.label_key: list(c.label_values) for c in selector_1.label_constraints
333+
}
334+
assert constraints_1 == {"accelerator-type": ["A100"]}
335+
assert (
336+
selector_1.label_constraints[0].operator
337+
== LabelSelectorOperator.LABEL_OPERATOR_NOT_IN
338+
)
339+
340+
return True
341+
342+
wait_for_condition(verify)
343+
344+
280345
def test_node_info_basic(shutdown_only, monkeypatch):
281346
with monkeypatch.context() as m:
282347
m.setenv("RAY_CLOUD_INSTANCE_ID", "instance-id")
@@ -646,7 +711,7 @@ def verify_task_demands():
646711

647712
# Request resources through SDK
648713
request_cluster_resources(
649-
gcs_address=cluster.address, to_request=[{"GPU": 1, "CPU": 2}]
714+
gcs_address=cluster.address, to_request=[{"resources": {"GPU": 1, "CPU": 2}}]
650715
)
651716

652717
def verify_cluster_constraint_demand():

python/ray/includes/common.pxd

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,8 @@ cdef extern from "ray/gcs_rpc_client/accessor.h" nogil:
561561
CRayStatus RequestClusterResourceConstraint(
562562
int64_t timeout_ms,
563563
const c_vector[unordered_map[c_string, double]] &bundles,
564-
const c_vector[int64_t] &count_array
564+
const c_vector[unordered_map[c_string, c_string]] &label_selectors,
565+
const c_vector[int64_t] &count_array,
565566
)
566567

567568
CRayStatus GetClusterResourceState(

0 commit comments

Comments
 (0)