diff --git a/python/ray/_private/client_mode_hook.py b/python/ray/_private/client_mode_hook.py index 77eb4633347a..223f939cb911 100644 --- a/python/ray/_private/client_mode_hook.py +++ b/python/ray/_private/client_mode_hook.py @@ -69,6 +69,30 @@ def client_mode_should_convert(): return client_mode_enabled and _client_hook_enabled +def client_mode_wrap(func): + """Wraps a function called during client mode for execution as a remote + task. + + Can be used to implement public features of ray client which do not + belong in the main ray API (`ray.*`), yet require server-side execution. + An example is the creation of placement groups: + `ray.util.placement_group.placement_group()`. When called on the client + side, this function is wrapped in a task to facilitate interaction with + the GCS. + """ + from ray.util.client import ray + + @wraps(func) + def wrapper(*args, **kwargs): + if client_mode_should_convert(): + f = ray.remote(num_cpus=0)(func) + ref = f.remote(*args, **kwargs) + return ray.get(ref) + return func(*args, **kwargs) + + return wrapper + + def client_mode_convert_function(func_cls, in_args, in_kwargs, **kwargs): """Runs a preregistered ray RemoteFunction through the ray client. @@ -80,7 +104,10 @@ def client_mode_convert_function(func_cls, in_args, in_kwargs, **kwargs): from ray.util.client import ray key = getattr(func_cls, RAY_CLIENT_MODE_ATTR, None) - if key is None: + + # Second part of "or" is needed in case func_cls is reused between Ray + # client sessions in one Python interpreter session. + if (key is None) or (not ray._converted_key_exists(key)): key = ray._convert_function(func_cls) setattr(func_cls, RAY_CLIENT_MODE_ATTR, key) client_func = ray._get_converted(key) @@ -98,7 +125,9 @@ def client_mode_convert_actor(actor_cls, in_args, in_kwargs, **kwargs): from ray.util.client import ray key = getattr(actor_cls, RAY_CLIENT_MODE_ATTR, None) - if key is None: + # Second part of "or" is needed in case actor_cls is reused between Ray + # client sessions in one Python interpreter session. + if (key is None) or (not ray._converted_key_exists(key)): key = ray._convert_actor(actor_cls) setattr(actor_cls, RAY_CLIENT_MODE_ATTR, key) client_actor = ray._get_converted(key) diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index 8306878024c4..4925a6ca213d 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -148,6 +148,13 @@ def ray_start_cluster(request): yield res +@pytest.fixture +def ray_start_cluster_init(request): + param = getattr(request, "param", {}) + with _ray_start_cluster(do_init=True, **param) as res: + yield res + + @pytest.fixture def ray_start_cluster_head(request): param = getattr(request, "param", {}) diff --git a/python/ray/tests/test_client.py b/python/ray/tests/test_client.py index 5f0c8cbe1712..6e6bea5ab1b7 100644 --- a/python/ray/tests/test_client.py +++ b/python/ray/tests/test_client.py @@ -7,9 +7,12 @@ import _thread import ray.util.client.server.server as ray_client_server +from ray.tests.client_test_utils import create_remote_signal_actor from ray.util.client.common import ClientObjectRef +from ray.util.client.ray_client_helpers import connect_to_client_or_not from ray.util.client.ray_client_helpers import ray_start_client_server -from ray._private.client_mode_hook import _explicitly_enable_client_mode +from ray._private.client_mode_hook import client_mode_should_convert +from ray._private.client_mode_hook import enable_client_mode @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") @@ -179,6 +182,8 @@ def test_wait(ray_start_regular_shared): @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") def test_remote_functions(ray_start_regular_shared): with ray_start_client_server() as ray: + SignalActor = create_remote_signal_actor(ray) + signaler = SignalActor.remote() @ray.remote def plus2(x): @@ -220,6 +225,18 @@ def fact(x): all_vals = ray.get(res[0]) assert all_vals == [236, 2_432_902_008_176_640_000, 120, 3628800] + # Timeout 0 on ray.wait leads to immediate return + # (not indefinite wait for first return as with timeout None): + unready_ref = signaler.wait.remote() + res = ray.wait([unready_ref], timeout=0) + # Not ready. + assert res[0] == [] and len(res[1]) == 1 + ray.get(signaler.send.remote()) + ready_ref = signaler.wait.remote() + # Ready. + res = ray.wait([ready_ref], timeout=10) + assert len(res[0]) == 1 and res[1] == [] + @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") def test_function_calling_function(ray_start_regular_shared): @@ -523,16 +540,16 @@ def test_client_gpu_ids(call_ray_stop_only): import ray ray.init(num_cpus=2) - _explicitly_enable_client_mode() - # No client connection. - with pytest.raises(Exception) as e: - ray.get_gpu_ids() - assert str(e.value) == "Ray Client is not connected."\ - " Please connect by calling `ray.connect`." + with enable_client_mode(): + # No client connection. + with pytest.raises(Exception) as e: + ray.get_gpu_ids() + assert str(e.value) == "Ray Client is not connected."\ + " Please connect by calling `ray.connect`." - with ray_start_client_server(): - # Now have a client connection. - assert ray.get_gpu_ids() == [] + with ray_start_client_server(): + # Now have a client connection. + assert ray.get_gpu_ids() == [] def test_client_serialize_addon(call_ray_stop_only): @@ -548,5 +565,19 @@ class User(pydantic.BaseModel): assert ray.get(ray.put(User(name="ray"))).name == "ray" +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_client_context_manager(ray_start_regular_shared, connect_to_client): + import ray + with connect_to_client_or_not(connect_to_client): + if connect_to_client: + # Client mode is on. + assert client_mode_should_convert() is True + # We're connected to Ray client. + assert ray.util.client.ray.is_connected() is True + else: + assert client_mode_should_convert() is False + assert ray.util.client.ray.is_connected() is False + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tests/test_client_init.py b/python/ray/tests/test_client_init.py index b76733b6b453..4e978f6ea60b 100644 --- a/python/ray/tests/test_client_init.py +++ b/python/ray/tests/test_client_init.py @@ -76,19 +76,26 @@ def test_validate_port(): def test_basic_preregister(init_and_serve): + """Tests conversion of Ray actors and remote functions to client actors + and client remote functions. + + Checks that the conversion works when disconnecting and reconnecting client + sessions. + """ from ray.util.client import ray - ray.connect("localhost:50051") - val = ray.get(hello_world.remote()) - print(val) - assert val >= 20 - assert val <= 200 - c = C.remote(3) - x = c.double.remote() - y = c.double.remote() - ray.wait([x, y]) - val = ray.get(c.get.remote()) - assert val == 12 - ray.disconnect() + for _ in range(2): + ray.connect("localhost:50051") + val = ray.get(hello_world.remote()) + print(val) + assert val >= 20 + assert val <= 200 + c = C.remote(3) + x = c.double.remote() + y = c.double.remote() + ray.wait([x, y]) + val = ray.get(c.get.remote()) + assert val == 12 + ray.disconnect() def test_num_clients(init_and_serve_lazy): diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index d2448bc6e956..98a4815bc78c 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -20,6 +20,7 @@ from ray.util.placement_group import (PlacementGroup, placement_group, remove_placement_group, get_current_placement_group) +from ray.util.client.ray_client_helpers import connect_to_client_or_not @ray.remote @@ -28,7 +29,8 @@ def method(self, x): return x + 2 -def test_placement_group_pack(ray_start_cluster): +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_placement_group_pack(ray_start_cluster, connect_to_client): @ray.remote(num_cpus=2) class Actor(object): def __init__(self): @@ -43,44 +45,46 @@ def value(self): cluster.add_node(num_cpus=4) ray.init(address=cluster.address) - placement_group = ray.util.placement_group( - name="name", - strategy="PACK", - bundles=[ - { - "CPU": 2, - "GPU": 0 # Test 0 resource spec doesn't break tests. - }, - { - "CPU": 2 - } - ]) - ray.get(placement_group.ready()) - actor_1 = Actor.options( - placement_group=placement_group, - placement_group_bundle_index=0).remote() - actor_2 = Actor.options( - placement_group=placement_group, - placement_group_bundle_index=1).remote() + with connect_to_client_or_not(connect_to_client): + placement_group = ray.util.placement_group( + name="name", + strategy="PACK", + bundles=[ + { + "CPU": 2, + "GPU": 0 # Test 0 resource spec doesn't break tests. + }, + { + "CPU": 2 + } + ]) + ray.get(placement_group.ready()) + actor_1 = Actor.options( + placement_group=placement_group, + placement_group_bundle_index=0).remote() + actor_2 = Actor.options( + placement_group=placement_group, + placement_group_bundle_index=1).remote() - ray.get(actor_1.value.remote()) - ray.get(actor_2.value.remote()) + ray.get(actor_1.value.remote()) + ray.get(actor_2.value.remote()) - # Get all actors. - actor_infos = ray.actors() + # Get all actors. + actor_infos = ray.actors() - # Make sure all actors in counter_list are collocated in one node. - actor_info_1 = actor_infos.get(actor_1._actor_id.hex()) - actor_info_2 = actor_infos.get(actor_2._actor_id.hex()) + # Make sure all actors in counter_list are collocated in one node. + actor_info_1 = actor_infos.get(actor_1._actor_id.hex()) + actor_info_2 = actor_infos.get(actor_2._actor_id.hex()) - assert actor_info_1 and actor_info_2 + assert actor_info_1 and actor_info_2 - node_of_actor_1 = actor_info_1["Address"]["NodeID"] - node_of_actor_2 = actor_info_2["Address"]["NodeID"] - assert node_of_actor_1 == node_of_actor_2 + node_of_actor_1 = actor_info_1["Address"]["NodeID"] + node_of_actor_2 = actor_info_2["Address"]["NodeID"] + assert node_of_actor_1 == node_of_actor_2 -def test_placement_group_strict_pack(ray_start_cluster): +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_placement_group_strict_pack(ray_start_cluster, connect_to_client): @ray.remote(num_cpus=2) class Actor(object): def __init__(self): @@ -95,45 +99,47 @@ def value(self): cluster.add_node(num_cpus=4) ray.init(address=cluster.address) - placement_group = ray.util.placement_group( - name="name", - strategy="STRICT_PACK", - bundles=[ - { - "memory": 50 * 1024 * - 1024, # Test memory resource spec doesn't break tests. - "CPU": 2 - }, - { - "CPU": 2 - } - ]) - ray.get(placement_group.ready()) - actor_1 = Actor.options( - placement_group=placement_group, - placement_group_bundle_index=0).remote() - actor_2 = Actor.options( - placement_group=placement_group, - placement_group_bundle_index=1).remote() + with connect_to_client_or_not(connect_to_client): + placement_group = ray.util.placement_group( + name="name", + strategy="STRICT_PACK", + bundles=[ + { + "memory": 50 * 1024 * + 1024, # Test memory resource spec doesn't break tests. + "CPU": 2 + }, + { + "CPU": 2 + } + ]) + ray.get(placement_group.ready()) + actor_1 = Actor.options( + placement_group=placement_group, + placement_group_bundle_index=0).remote() + actor_2 = Actor.options( + placement_group=placement_group, + placement_group_bundle_index=1).remote() - ray.get(actor_1.value.remote()) - ray.get(actor_2.value.remote()) + ray.get(actor_1.value.remote()) + ray.get(actor_2.value.remote()) - # Get all actors. - actor_infos = ray.actors() + # Get all actors. + actor_infos = ray.actors() - # Make sure all actors in counter_list are collocated in one node. - actor_info_1 = actor_infos.get(actor_1._actor_id.hex()) - actor_info_2 = actor_infos.get(actor_2._actor_id.hex()) + # Make sure all actors in counter_list are collocated in one node. + actor_info_1 = actor_infos.get(actor_1._actor_id.hex()) + actor_info_2 = actor_infos.get(actor_2._actor_id.hex()) - assert actor_info_1 and actor_info_2 + assert actor_info_1 and actor_info_2 - node_of_actor_1 = actor_info_1["Address"]["NodeID"] - node_of_actor_2 = actor_info_2["Address"]["NodeID"] - assert node_of_actor_1 == node_of_actor_2 + node_of_actor_1 = actor_info_1["Address"]["NodeID"] + node_of_actor_2 = actor_info_2["Address"]["NodeID"] + assert node_of_actor_1 == node_of_actor_2 -def test_placement_group_spread(ray_start_cluster): +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_placement_group_spread(ray_start_cluster, connect_to_client): @ray.remote(num_cpus=2) class Actor(object): def __init__(self): @@ -148,38 +154,40 @@ def value(self): cluster.add_node(num_cpus=4) ray.init(address=cluster.address) - placement_group = ray.util.placement_group( - name="name", strategy="SPREAD", bundles=[{ - "CPU": 2 - }, { - "CPU": 2 - }]) - ray.get(placement_group.ready()) - actor_1 = Actor.options( - placement_group=placement_group, - placement_group_bundle_index=0).remote() - actor_2 = Actor.options( - placement_group=placement_group, - placement_group_bundle_index=1).remote() + with connect_to_client_or_not(connect_to_client): + placement_group = ray.util.placement_group( + name="name", strategy="SPREAD", bundles=[{ + "CPU": 2 + }, { + "CPU": 2 + }]) + ray.get(placement_group.ready()) + actor_1 = Actor.options( + placement_group=placement_group, + placement_group_bundle_index=0).remote() + actor_2 = Actor.options( + placement_group=placement_group, + placement_group_bundle_index=1).remote() - ray.get(actor_1.value.remote()) - ray.get(actor_2.value.remote()) + ray.get(actor_1.value.remote()) + ray.get(actor_2.value.remote()) - # Get all actors. - actor_infos = ray.actors() + # Get all actors. + actor_infos = ray.actors() - # Make sure all actors in counter_list are located in separate nodes. - actor_info_1 = actor_infos.get(actor_1._actor_id.hex()) - actor_info_2 = actor_infos.get(actor_2._actor_id.hex()) + # Make sure all actors in counter_list are located in separate nodes. + actor_info_1 = actor_infos.get(actor_1._actor_id.hex()) + actor_info_2 = actor_infos.get(actor_2._actor_id.hex()) - assert actor_info_1 and actor_info_2 + assert actor_info_1 and actor_info_2 - node_of_actor_1 = actor_info_1["Address"]["NodeID"] - node_of_actor_2 = actor_info_2["Address"]["NodeID"] - assert node_of_actor_1 != node_of_actor_2 + node_of_actor_1 = actor_info_1["Address"]["NodeID"] + node_of_actor_2 = actor_info_2["Address"]["NodeID"] + assert node_of_actor_1 != node_of_actor_2 -def test_placement_group_strict_spread(ray_start_cluster): +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_placement_group_strict_spread(ray_start_cluster, connect_to_client): @ray.remote(num_cpus=2) class Actor(object): def __init__(self): @@ -194,50 +202,53 @@ def value(self): cluster.add_node(num_cpus=4) ray.init(address=cluster.address) - placement_group = ray.util.placement_group( - name="name", - strategy="STRICT_SPREAD", - bundles=[{ - "CPU": 2 - }, { - "CPU": 2 - }, { - "CPU": 2 - }]) - ray.get(placement_group.ready()) - actor_1 = Actor.options( - placement_group=placement_group, - placement_group_bundle_index=0).remote() - actor_2 = Actor.options( - placement_group=placement_group, - placement_group_bundle_index=1).remote() - actor_3 = Actor.options( - placement_group=placement_group, - placement_group_bundle_index=2).remote() + with connect_to_client_or_not(connect_to_client): + placement_group = ray.util.placement_group( + name="name", + strategy="STRICT_SPREAD", + bundles=[{ + "CPU": 2 + }, { + "CPU": 2 + }, { + "CPU": 2 + }]) + ray.get(placement_group.ready()) + actor_1 = Actor.options( + placement_group=placement_group, + placement_group_bundle_index=0).remote() + actor_2 = Actor.options( + placement_group=placement_group, + placement_group_bundle_index=1).remote() + actor_3 = Actor.options( + placement_group=placement_group, + placement_group_bundle_index=2).remote() - ray.get(actor_1.value.remote()) - ray.get(actor_2.value.remote()) - ray.get(actor_3.value.remote()) + ray.get(actor_1.value.remote()) + ray.get(actor_2.value.remote()) + ray.get(actor_3.value.remote()) - # Get all actors. - actor_infos = ray.actors() + # Get all actors. + actor_infos = ray.actors() - # Make sure all actors in counter_list are located in separate nodes. - actor_info_1 = actor_infos.get(actor_1._actor_id.hex()) - actor_info_2 = actor_infos.get(actor_2._actor_id.hex()) - actor_info_3 = actor_infos.get(actor_3._actor_id.hex()) + # Make sure all actors in counter_list are located in separate nodes. + actor_info_1 = actor_infos.get(actor_1._actor_id.hex()) + actor_info_2 = actor_infos.get(actor_2._actor_id.hex()) + actor_info_3 = actor_infos.get(actor_3._actor_id.hex()) - assert actor_info_1 and actor_info_2 and actor_info_3 + assert actor_info_1 and actor_info_2 and actor_info_3 - node_of_actor_1 = actor_info_1["Address"]["NodeID"] - node_of_actor_2 = actor_info_2["Address"]["NodeID"] - node_of_actor_3 = actor_info_3["Address"]["NodeID"] - assert node_of_actor_1 != node_of_actor_2 - assert node_of_actor_1 != node_of_actor_3 - assert node_of_actor_2 != node_of_actor_3 + node_of_actor_1 = actor_info_1["Address"]["NodeID"] + node_of_actor_2 = actor_info_2["Address"]["NodeID"] + node_of_actor_3 = actor_info_3["Address"]["NodeID"] + assert node_of_actor_1 != node_of_actor_2 + assert node_of_actor_1 != node_of_actor_3 + assert node_of_actor_2 != node_of_actor_3 -def test_placement_group_actor_resource_ids(ray_start_cluster): +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_placement_group_actor_resource_ids(ray_start_cluster, + connect_to_client): @ray.remote(num_cpus=1) class F: def f(self): @@ -249,14 +260,17 @@ def f(self): cluster.add_node(num_cpus=4) ray.init(address=cluster.address) - g1 = ray.util.placement_group([{"CPU": 2}]) - a1 = F.options(placement_group=g1).remote() - resources = ray.get(a1.f.remote()) - assert len(resources) == 1, resources - assert "CPU_group_" in list(resources.keys())[0], resources + with connect_to_client_or_not(connect_to_client): + g1 = ray.util.placement_group([{"CPU": 2}]) + a1 = F.options(placement_group=g1).remote() + resources = ray.get(a1.f.remote()) + assert len(resources) == 1, resources + assert "CPU_group_" in list(resources.keys())[0], resources -def test_placement_group_task_resource_ids(ray_start_cluster): +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_placement_group_task_resource_ids(ray_start_cluster, + connect_to_client): @ray.remote(num_cpus=1) def f(): return ray.get_resource_ids() @@ -267,24 +281,28 @@ def f(): cluster.add_node(num_cpus=4) ray.init(address=cluster.address) - g1 = ray.util.placement_group([{"CPU": 2}]) - o1 = f.options(placement_group=g1).remote() - resources = ray.get(o1) - assert len(resources) == 1, resources - assert "CPU_group_" in list(resources.keys())[0], resources - assert "CPU_group_0_" not in list(resources.keys())[0], resources - - # Now retry with a bundle index constraint. - o1 = f.options(placement_group=g1, placement_group_bundle_index=0).remote() - resources = ray.get(o1) - assert len(resources) == 2, resources - keys = list(resources.keys()) - assert "CPU_group_" in keys[0], resources - assert "CPU_group_" in keys[1], resources - assert "CPU_group_0_" in keys[0] or "CPU_group_0_" in keys[1], resources - - -def test_placement_group_hang(ray_start_cluster): + with connect_to_client_or_not(connect_to_client): + g1 = ray.util.placement_group([{"CPU": 2}]) + o1 = f.options(placement_group=g1).remote() + resources = ray.get(o1) + assert len(resources) == 1, resources + assert "CPU_group_" in list(resources.keys())[0], resources + assert "CPU_group_0_" not in list(resources.keys())[0], resources + + # Now retry with a bundle index constraint. + o1 = f.options( + placement_group=g1, placement_group_bundle_index=0).remote() + resources = ray.get(o1) + assert len(resources) == 2, resources + keys = list(resources.keys()) + assert "CPU_group_" in keys[0], resources + assert "CPU_group_" in keys[1], resources + assert ("CPU_group_0_" in keys[0] + or "CPU_group_0_" in keys[1]), resources + + +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_placement_group_hang(ray_start_cluster, connect_to_client): @ray.remote(num_cpus=1) def f(): return ray.get_resource_ids() @@ -295,109 +313,118 @@ def f(): cluster.add_node(num_cpus=4) ray.init(address=cluster.address) - # Warm workers up, so that this triggers the hang rice. - ray.get(f.remote()) + with connect_to_client_or_not(connect_to_client): + # Warm workers up, so that this triggers the hang rice. + ray.get(f.remote()) - g1 = ray.util.placement_group([{"CPU": 2}]) - # This will start out infeasible. The placement group will then be created - # and it transitions to feasible. - o1 = f.options(placement_group=g1).remote() + g1 = ray.util.placement_group([{"CPU": 2}]) + # This will start out infeasible. The placement group will then be + # created and it transitions to feasible. + o1 = f.options(placement_group=g1).remote() - resources = ray.get(o1) - assert len(resources) == 1, resources - assert "CPU_group_" in list(resources.keys())[0], resources + resources = ray.get(o1) + assert len(resources) == 1, resources + assert "CPU_group_" in list(resources.keys())[0], resources -def test_remove_placement_group(ray_start_cluster): +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_remove_placement_group(ray_start_cluster, connect_to_client): cluster = ray_start_cluster cluster.add_node(num_cpus=4) ray.init(address=cluster.address) - # First try to remove a placement group that doesn't - # exist. This should not do anything. - random_group_id = PlacementGroupID.from_random() - random_placement_group = PlacementGroup(random_group_id) - for _ in range(3): - ray.util.remove_placement_group(random_placement_group) - - # Creating a placement group as soon as it is - # created should work. - placement_group = ray.util.placement_group([{"CPU": 2}, {"CPU": 2}]) - assert placement_group.wait(10) - ray.util.remove_placement_group(placement_group) - def is_placement_group_removed(): - table = ray.util.placement_group_table(placement_group) - if "state" not in table: - return False - return table["state"] == "REMOVED" + with connect_to_client_or_not(connect_to_client): + # First try to remove a placement group that doesn't + # exist. This should not do anything. + random_group_id = PlacementGroupID.from_random() + random_placement_group = PlacementGroup(random_group_id) + for _ in range(3): + ray.util.remove_placement_group(random_placement_group) - wait_for_condition(is_placement_group_removed) + # Creating a placement group as soon as it is + # created should work. + placement_group = ray.util.placement_group([{"CPU": 2}, {"CPU": 2}]) + assert placement_group.wait(10) - # # Now let's create a placement group. - placement_group = ray.util.placement_group([{"CPU": 2}, {"CPU": 2}]) - assert placement_group.wait(10) - - # Create an actor that occupies resources. - @ray.remote(num_cpus=2) - class A: - def f(self): - return 3 + ray.util.remove_placement_group(placement_group) - # Currently, there's no way to prevent - # tasks to be retried for removed placement group. - # Set max_retrie=0 for testing. - # TODO(sang): Handle this edge case. - @ray.remote(num_cpus=2, max_retries=0) - def long_running_task(): - print(os.getpid()) - import time - time.sleep(50) - - # Schedule a long running task and actor. - task_ref = long_running_task.options( - placement_group=placement_group).remote() - a = A.options(placement_group=placement_group).remote() - assert ray.get(a.f.remote()) == 3 + def is_placement_group_removed(): + table = ray.util.placement_group_table(placement_group) + if "state" not in table: + return False + return table["state"] == "REMOVED" + + wait_for_condition(is_placement_group_removed) + + # # Now let's create a placement group. + placement_group = ray.util.placement_group([{"CPU": 2}, {"CPU": 2}]) + assert placement_group.wait(10) + + # Create an actor that occupies resources. + @ray.remote(num_cpus=2) + class A: + def f(self): + return 3 + + # Currently, there's no way to prevent + # tasks to be retried for removed placement group. + # Set max_retrie=0 for testing. + # TODO(sang): Handle this edge case. + @ray.remote(num_cpus=2, max_retries=0) + def long_running_task(): + print(os.getpid()) + import time + time.sleep(50) + + # Schedule a long running task and actor. + task_ref = long_running_task.options( + placement_group=placement_group).remote() + a = A.options(placement_group=placement_group).remote() + assert ray.get(a.f.remote()) == 3 - ray.util.remove_placement_group(placement_group) - # Subsequent remove request shouldn't do anything. - for _ in range(3): ray.util.remove_placement_group(placement_group) + # Subsequent remove request shouldn't do anything. + for _ in range(3): + ray.util.remove_placement_group(placement_group) + + # Make sure placement group resources are + # released and we can schedule this task. + @ray.remote(num_cpus=4) + def f(): + return 3 - # Make sure placement group resources are - # released and we can schedule this task. - @ray.remote(num_cpus=4) - def f(): - return 3 - - assert ray.get(f.remote()) == 3 - # Since the placement group is removed, - # the actor should've been killed. - # That means this request should fail. - with pytest.raises(ray.exceptions.RayActorError, match="actor died"): - ray.get(a.f.remote(), timeout=3.0) - with pytest.raises(ray.exceptions.WorkerCrashedError): - ray.get(task_ref) + assert ray.get(f.remote()) == 3 + # Since the placement group is removed, + # the actor should've been killed. + # That means this request should fail. + with pytest.raises(ray.exceptions.RayActorError, match="actor died"): + ray.get(a.f.remote(), timeout=3.0) + with pytest.raises(ray.exceptions.WorkerCrashedError): + ray.get(task_ref) -def test_remove_pending_placement_group(ray_start_cluster): +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_remove_pending_placement_group(ray_start_cluster, connect_to_client): cluster = ray_start_cluster cluster.add_node(num_cpus=4) ray.init(address=cluster.address) - # Create a placement group that cannot be scheduled now. - placement_group = ray.util.placement_group([{"GPU": 2}, {"CPU": 2}]) - ray.util.remove_placement_group(placement_group) - # TODO(sang): Add state check here. - @ray.remote(num_cpus=4) - def f(): - return 3 + with connect_to_client_or_not(connect_to_client): + # Create a placement group that cannot be scheduled now. + placement_group = ray.util.placement_group([{"GPU": 2}, {"CPU": 2}]) + ray.util.remove_placement_group(placement_group) - # Make sure this task is still schedulable. - assert ray.get(f.remote()) == 3 + # TODO(sang): Add state check here. + @ray.remote(num_cpus=4) + def f(): + return 3 + # Make sure this task is still schedulable. + assert ray.get(f.remote()) == 3 -def test_placement_group_table(ray_start_cluster): + +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_placement_group_table(ray_start_cluster, connect_to_client): @ray.remote(num_cpus=2) class Actor(object): def __init__(self): @@ -412,56 +439,60 @@ def value(self): cluster.add_node(num_cpus=4) ray.init(address=cluster.address) - # Originally placement group creation should be pending because - # there are no resources. - name = "name" - strategy = "PACK" - bundles = [{"CPU": 2, "GPU": 1}, {"CPU": 2}] - placement_group = ray.util.placement_group( - name=name, strategy=strategy, bundles=bundles) - result = ray.util.placement_group_table(placement_group) - assert result["name"] == name - assert result["strategy"] == strategy - for i in range(len(bundles)): - assert bundles[i] == result["bundles"][i] - assert result["state"] == "PENDING" - - # Now the placement group should be scheduled. - cluster.add_node(num_cpus=5, num_gpus=1) - - cluster.wait_for_nodes() - actor_1 = Actor.options( - placement_group=placement_group, - placement_group_bundle_index=0).remote() - ray.get(actor_1.value.remote()) + with connect_to_client_or_not(connect_to_client): + # Originally placement group creation should be pending because + # there are no resources. + name = "name" + strategy = "PACK" + bundles = [{"CPU": 2, "GPU": 1}, {"CPU": 2}] + placement_group = ray.util.placement_group( + name=name, strategy=strategy, bundles=bundles) + result = ray.util.placement_group_table(placement_group) + assert result["name"] == name + assert result["strategy"] == strategy + for i in range(len(bundles)): + assert bundles[i] == result["bundles"][i] + assert result["state"] == "PENDING" + + # Now the placement group should be scheduled. + cluster.add_node(num_cpus=5, num_gpus=1) + + cluster.wait_for_nodes() + actor_1 = Actor.options( + placement_group=placement_group, + placement_group_bundle_index=0).remote() + ray.get(actor_1.value.remote()) - result = ray.util.placement_group_table(placement_group) - assert result["state"] == "CREATED" + result = ray.util.placement_group_table(placement_group) + assert result["state"] == "CREATED" - # Add tow more placement group for placement group table test. - second_strategy = "SPREAD" - ray.util.placement_group( - name="second_placement_group", - strategy=second_strategy, - bundles=bundles) - ray.util.placement_group( - name="third_placement_group", - strategy=second_strategy, - bundles=bundles) + # Add tow more placement group for placement group table test. + second_strategy = "SPREAD" + ray.util.placement_group( + name="second_placement_group", + strategy=second_strategy, + bundles=bundles) + ray.util.placement_group( + name="third_placement_group", + strategy=second_strategy, + bundles=bundles) - placement_group_table = ray.util.placement_group_table() - assert len(placement_group_table) == 3 + placement_group_table = ray.util.placement_group_table() + assert len(placement_group_table) == 3 - true_name_set = {"name", "second_placement_group", "third_placement_group"} - get_name_set = set() + true_name_set = { + "name", "second_placement_group", "third_placement_group" + } + get_name_set = set() - for _, placement_group_data in placement_group_table.items(): - get_name_set.add(placement_group_data["name"]) + for _, placement_group_data in placement_group_table.items(): + get_name_set.add(placement_group_data["name"]) - assert true_name_set == get_name_set + assert true_name_set == get_name_set -def test_cuda_visible_devices(ray_start_cluster): +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_cuda_visible_devices(ray_start_cluster, connect_to_client): @ray.remote(num_gpus=1) def f(): return os.environ["CUDA_VISIBLE_DEVICES"] @@ -472,14 +503,17 @@ def f(): cluster.add_node(num_gpus=1) ray.init(address=cluster.address) - g1 = ray.util.placement_group([{"CPU": 1, "GPU": 1}]) - o1 = f.options(placement_group=g1).remote() + with connect_to_client_or_not(connect_to_client): + g1 = ray.util.placement_group([{"CPU": 1, "GPU": 1}]) + o1 = f.options(placement_group=g1).remote() - devices = ray.get(o1) - assert devices == "0", devices + devices = ray.get(o1) + assert devices == "0", devices -def test_placement_group_reschedule_when_node_dead(ray_start_cluster): +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_placement_group_reschedule_when_node_dead(ray_start_cluster, + connect_to_client): @ray.remote(num_cpus=1) class Actor(object): def __init__(self): @@ -500,54 +534,56 @@ def value(self): assert len(nodes) == 3 assert nodes[0]["alive"] and nodes[1]["alive"] and nodes[2]["alive"] - placement_group = ray.util.placement_group( - name="name", - strategy="SPREAD", - bundles=[{ - "CPU": 2 - }, { - "CPU": 2 - }, { - "CPU": 2 - }]) - actor_1 = Actor.options( - placement_group=placement_group, - placement_group_bundle_index=0, - lifetime="detached").remote() - actor_2 = Actor.options( - placement_group=placement_group, - placement_group_bundle_index=1, - lifetime="detached").remote() - actor_3 = Actor.options( - placement_group=placement_group, - placement_group_bundle_index=2, - lifetime="detached").remote() - ray.get(actor_1.value.remote()) - ray.get(actor_2.value.remote()) - ray.get(actor_3.value.remote()) + with connect_to_client_or_not(connect_to_client): + placement_group = ray.util.placement_group( + name="name", + strategy="SPREAD", + bundles=[{ + "CPU": 2 + }, { + "CPU": 2 + }, { + "CPU": 2 + }]) + actor_1 = Actor.options( + placement_group=placement_group, + placement_group_bundle_index=0, + lifetime="detached").remote() + actor_2 = Actor.options( + placement_group=placement_group, + placement_group_bundle_index=1, + lifetime="detached").remote() + actor_3 = Actor.options( + placement_group=placement_group, + placement_group_bundle_index=2, + lifetime="detached").remote() + ray.get(actor_1.value.remote()) + ray.get(actor_2.value.remote()) + ray.get(actor_3.value.remote()) - cluster.remove_node(get_other_nodes(cluster, exclude_head=True)[-1]) - cluster.wait_for_nodes() + cluster.remove_node(get_other_nodes(cluster, exclude_head=True)[-1]) + cluster.wait_for_nodes() - actor_4 = Actor.options( - placement_group=placement_group, - placement_group_bundle_index=0, - lifetime="detached").remote() - actor_5 = Actor.options( - placement_group=placement_group, - placement_group_bundle_index=1, - lifetime="detached").remote() - actor_6 = Actor.options( - placement_group=placement_group, - placement_group_bundle_index=2, - lifetime="detached").remote() - ray.get(actor_4.value.remote()) - ray.get(actor_5.value.remote()) - ray.get(actor_6.value.remote()) - ray.shutdown() + actor_4 = Actor.options( + placement_group=placement_group, + placement_group_bundle_index=0, + lifetime="detached").remote() + actor_5 = Actor.options( + placement_group=placement_group, + placement_group_bundle_index=1, + lifetime="detached").remote() + actor_6 = Actor.options( + placement_group=placement_group, + placement_group_bundle_index=2, + lifetime="detached").remote() + ray.get(actor_4.value.remote()) + ray.get(actor_5.value.remote()) + ray.get(actor_6.value.remote()) + ray.shutdown() -def test_check_bundle_index(ray_start_cluster): +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_check_bundle_index(ray_start_cluster, connect_to_client): @ray.remote(num_cpus=2) class Actor(object): def __init__(self): @@ -560,116 +596,125 @@ def value(self): cluster.add_node(num_cpus=4) ray.init(address=cluster.address) - placement_group = ray.util.placement_group( - name="name", strategy="SPREAD", bundles=[{ - "CPU": 2 - }, { - "CPU": 2 - }]) + with connect_to_client_or_not(connect_to_client): + placement_group = ray.util.placement_group( + name="name", strategy="SPREAD", bundles=[{ + "CPU": 2 + }, { + "CPU": 2 + }]) - error_count = 0 - try: - Actor.options( - placement_group=placement_group, - placement_group_bundle_index=3).remote() - except ValueError: - error_count = error_count + 1 - assert error_count == 1 + error_count = 0 + try: + Actor.options( + placement_group=placement_group, + placement_group_bundle_index=3).remote() + except ValueError: + error_count = error_count + 1 + assert error_count == 1 - try: - Actor.options( - placement_group=placement_group, - placement_group_bundle_index=-2).remote() - except ValueError: - error_count = error_count + 1 - assert error_count == 2 + try: + Actor.options( + placement_group=placement_group, + placement_group_bundle_index=-2).remote() + except ValueError: + error_count = error_count + 1 + assert error_count == 2 - try: - Actor.options(placement_group_bundle_index=0).remote() - except ValueError: - error_count = error_count + 1 - assert error_count == 3 + try: + Actor.options(placement_group_bundle_index=0).remote() + except ValueError: + error_count = error_count + 1 + assert error_count == 3 -def test_pending_placement_group_wait(ray_start_cluster): +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_pending_placement_group_wait(ray_start_cluster, connect_to_client): cluster = ray_start_cluster [cluster.add_node(num_cpus=2) for _ in range(1)] ray.init(address=cluster.address) cluster.wait_for_nodes() - # Wait on placement group that cannot be created. - placement_group = ray.util.placement_group( - name="name", - strategy="SPREAD", - bundles=[ - { - "CPU": 2 - }, - { - "CPU": 2 - }, - { - "GPU": 2 - }, - ]) - ready, unready = ray.wait([placement_group.ready()], timeout=0.1) - assert len(unready) == 1 - assert len(ready) == 0 - table = ray.util.placement_group_table(placement_group) - assert table["state"] == "PENDING" - with pytest.raises(ray.exceptions.GetTimeoutError): - ray.get(placement_group.ready(), timeout=0.1) + with connect_to_client_or_not(connect_to_client): + # Wait on placement group that cannot be created. + placement_group = ray.util.placement_group( + name="name", + strategy="SPREAD", + bundles=[ + { + "CPU": 2 + }, + { + "CPU": 2 + }, + { + "GPU": 2 + }, + ]) + ready, unready = ray.wait([placement_group.ready()], timeout=0.1) + assert len(unready) == 1 + assert len(ready) == 0 + table = ray.util.placement_group_table(placement_group) + assert table["state"] == "PENDING" + with pytest.raises(ray.exceptions.GetTimeoutError): + ray.get(placement_group.ready(), timeout=0.1) -def test_placement_group_wait(ray_start_cluster): +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_placement_group_wait(ray_start_cluster, connect_to_client): cluster = ray_start_cluster [cluster.add_node(num_cpus=2) for _ in range(2)] ray.init(address=cluster.address) cluster.wait_for_nodes() - # Wait on placement group that cannot be created. - placement_group = ray.util.placement_group( - name="name", strategy="SPREAD", bundles=[ - { - "CPU": 2 - }, - { - "CPU": 2 - }, - ]) - ready, unready = ray.wait([placement_group.ready()]) - assert len(unready) == 0 - assert len(ready) == 1 - table = ray.util.placement_group_table(placement_group) - assert table["state"] == "CREATED" + with connect_to_client_or_not(connect_to_client): + # Wait on placement group that cannot be created. + placement_group = ray.util.placement_group( + name="name", strategy="SPREAD", bundles=[ + { + "CPU": 2 + }, + { + "CPU": 2 + }, + ]) + ready, unready = ray.wait([placement_group.ready()]) + assert len(unready) == 0 + assert len(ready) == 1 + table = ray.util.placement_group_table(placement_group) + assert table["state"] == "CREATED" - pg = ray.get(placement_group.ready()) - assert pg.bundle_specs == placement_group.bundle_specs - assert pg.id.binary() == placement_group.id.binary() + pg = ray.get(placement_group.ready()) + assert pg.bundle_specs == placement_group.bundle_specs + assert pg.id.binary() == placement_group.id.binary() -def test_schedule_placement_group_when_node_add(ray_start_cluster): +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_schedule_placement_group_when_node_add(ray_start_cluster, + connect_to_client): cluster = ray_start_cluster cluster.add_node(num_cpus=4) ray.init(address=cluster.address) - # Creating a placement group that cannot be satisfied yet. - placement_group = ray.util.placement_group([{"GPU": 2}, {"CPU": 2}]) + with connect_to_client_or_not(connect_to_client): + # Creating a placement group that cannot be satisfied yet. + placement_group = ray.util.placement_group([{"GPU": 2}, {"CPU": 2}]) - def is_placement_group_created(): - table = ray.util.placement_group_table(placement_group) - if "state" not in table: - return False - return table["state"] == "CREATED" + def is_placement_group_created(): + table = ray.util.placement_group_table(placement_group) + if "state" not in table: + return False + return table["state"] == "CREATED" - # Add a node that has GPU. - cluster.add_node(num_cpus=4, num_gpus=4) + # Add a node that has GPU. + cluster.add_node(num_cpus=4, num_gpus=4) - # Make sure the placement group is created. - wait_for_condition(is_placement_group_created) + # Make sure the placement group is created. + wait_for_condition(is_placement_group_created) -def test_atomic_creation(ray_start_cluster): +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_atomic_creation(ray_start_cluster, connect_to_client): # Setup cluster. cluster = ray_start_cluster bundle_cpu_size = 2 @@ -692,91 +737,94 @@ def bothering_task(): time.sleep(6) return True - # Schedule tasks to fail initial placement group creation. - tasks = [bothering_task.remote() for _ in range(2)] - - # Make sure the two common task has scheduled. - def tasks_scheduled(): - return ray.available_resources()["CPU"] == 2.0 - - wait_for_condition(tasks_scheduled) - - # Create an actor that will fail bundle scheduling. - # It is important to use pack strategy to make test less flaky. - pg = ray.util.placement_group( - name="name", - strategy="SPREAD", - bundles=[{ - "CPU": bundle_cpu_size - } for _ in range(num_nodes * bundle_per_node)]) - - # Create a placement group actor. - # This shouldn't be scheduled because atomic - # placement group creation should've failed. - pg_actor = NormalActor.options( - placement_group=pg, - placement_group_bundle_index=num_nodes * bundle_per_node - 1).remote() - - # Wait on the placement group now. It should be unready - # because normal actor takes resources that are required - # for one of bundle creation. - ready, unready = ray.wait([pg.ready()], timeout=0.5) - assert len(ready) == 0 - assert len(unready) == 1 - # Wait until all tasks are done. - assert all(ray.get(tasks)) - - # Wait on the placement group creation. Since resources are now available, - # it should be ready soon. - ready, unready = ray.wait([pg.ready()]) - assert len(ready) == 1 - assert len(unready) == 0 - - # Confirm that the placement group actor is created. It will - # raise an exception if actor was scheduled before placement - # group was created thus it checks atomicity. - ray.get(pg_actor.ping.remote(), timeout=3.0) - ray.kill(pg_actor) - - # Make sure atomic creation failure didn't impact resources. - @ray.remote(num_cpus=bundle_cpu_size) - def resource_check(): - return True + with connect_to_client_or_not(connect_to_client): + # Schedule tasks to fail initial placement group creation. + tasks = [bothering_task.remote() for _ in range(2)] + + # Make sure the two common task has scheduled. + def tasks_scheduled(): + return ray.available_resources()["CPU"] == 2.0 + + wait_for_condition(tasks_scheduled) + + # Create an actor that will fail bundle scheduling. + # It is important to use pack strategy to make test less flaky. + pg = ray.util.placement_group( + name="name", + strategy="SPREAD", + bundles=[{ + "CPU": bundle_cpu_size + } for _ in range(num_nodes * bundle_per_node)]) + + # Create a placement group actor. + # This shouldn't be scheduled because atomic + # placement group creation should've failed. + pg_actor = NormalActor.options( + placement_group=pg, + placement_group_bundle_index=num_nodes * bundle_per_node - + 1).remote() + + # Wait on the placement group now. It should be unready + # because normal actor takes resources that are required + # for one of bundle creation. + ready, unready = ray.wait([pg.ready()], timeout=0.5) + assert len(ready) == 0 + assert len(unready) == 1 + # Wait until all tasks are done. + assert all(ray.get(tasks)) + + # Wait on the placement group creation. Since resources are now + # available, it should be ready soon. + ready, unready = ray.wait([pg.ready()]) + assert len(ready) == 1 + assert len(unready) == 0 + + # Confirm that the placement group actor is created. It will + # raise an exception if actor was scheduled before placement + # group was created thus it checks atomicity. + ray.get(pg_actor.ping.remote(), timeout=3.0) + ray.kill(pg_actor) + + # Make sure atomic creation failure didn't impact resources. + @ray.remote(num_cpus=bundle_cpu_size) + def resource_check(): + return True - # This should hang because every resources - # are claimed by placement group. - check_without_pg = [ - resource_check.remote() for _ in range(bundle_per_node * num_nodes) - ] + # This should hang because every resources + # are claimed by placement group. + check_without_pg = [ + resource_check.remote() for _ in range(bundle_per_node * num_nodes) + ] - # This all should scheduled on each bundle. - check_with_pg = [ - resource_check.options( - placement_group=pg, placement_group_bundle_index=i).remote() - for i in range(bundle_per_node * num_nodes) - ] + # This all should scheduled on each bundle. + check_with_pg = [ + resource_check.options( + placement_group=pg, placement_group_bundle_index=i).remote() + for i in range(bundle_per_node * num_nodes) + ] - # Make sure these are hanging. - ready, unready = ray.wait(check_without_pg, timeout=0) - assert len(ready) == 0 - assert len(unready) == bundle_per_node * num_nodes + # Make sure these are hanging. + ready, unready = ray.wait(check_without_pg, timeout=0) + assert len(ready) == 0 + assert len(unready) == bundle_per_node * num_nodes - # Make sure these are all scheduled. - assert all(ray.get(check_with_pg)) + # Make sure these are all scheduled. + assert all(ray.get(check_with_pg)) - ray.util.remove_placement_group(pg) + ray.util.remove_placement_group(pg) - def pg_removed(): - return ray.util.placement_group_table(pg)["state"] == "REMOVED" + def pg_removed(): + return ray.util.placement_group_table(pg)["state"] == "REMOVED" - wait_for_condition(pg_removed) + wait_for_condition(pg_removed) - # Make sure check without pgs are all - # scheduled properly because resources are cleaned up. - assert all(ray.get(check_without_pg)) + # Make sure check without pgs are all + # scheduled properly because resources are cleaned up. + assert all(ray.get(check_without_pg)) -def test_mini_integration(ray_start_cluster): +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_mini_integration(ray_start_cluster, connect_to_client): # Create bundles as many as number of gpus in the cluster. # Do some random work and make sure all resources are properly recovered. @@ -798,217 +846,224 @@ def test_mini_integration(ray_start_cluster): cluster.wait_for_nodes() ray.init(address=cluster.address) - @ray.remote(num_cpus=0, num_gpus=1) - def random_tasks(): - import time - import random - sleep_time = random.uniform(0.1, 0.2) - time.sleep(sleep_time) - return True - - pgs = [] - pg_tasks = [] - # total bundle gpu usage = bundles_per_pg * total_num_pg * per_bundle_gpus - # Note this is half of total - for index in range(total_num_pg): - pgs.append( - ray.util.placement_group( - name=f"name{index}", - strategy="PACK", - bundles=[{ - "GPU": per_bundle_gpus - } for _ in range(bundles_per_pg)])) - - # Schedule tasks. - for i in range(total_num_pg): - pg = pgs[i] - pg_tasks.append([ - random_tasks.options( - placement_group=pg, - placement_group_bundle_index=bundle_index).remote() - for bundle_index in range(bundles_per_pg) - ]) - - # Make sure tasks are done and we remove placement groups. - num_removed_pg = 0 - pg_indexes = [2, 3, 1, 7, 8, 9, 0, 6, 4, 5] - while num_removed_pg < total_num_pg: - index = pg_indexes[num_removed_pg] - pg = pgs[index] - assert all(ray.get(pg_tasks[index])) - ray.util.remove_placement_group(pg) - num_removed_pg += 1 + with connect_to_client_or_not(connect_to_client): - @ray.remote(num_cpus=2, num_gpus=per_node_gpus) - class A: - def ping(self): + @ray.remote(num_cpus=0, num_gpus=1) + def random_tasks(): + import time + import random + sleep_time = random.uniform(0.1, 0.2) + time.sleep(sleep_time) return True - # Make sure all resources are properly returned by scheduling - # actors that take up all existing resources. - actors = [A.remote() for _ in range(num_nodes)] - assert all(ray.get([a.ping.remote() for a in actors])) + pgs = [] + pg_tasks = [] + # total bundle gpu usage = bundles_per_pg*total_num_pg*per_bundle_gpus + # Note this is half of total + for index in range(total_num_pg): + pgs.append( + ray.util.placement_group( + name=f"name{index}", + strategy="PACK", + bundles=[{ + "GPU": per_bundle_gpus + } for _ in range(bundles_per_pg)])) + + # Schedule tasks. + for i in range(total_num_pg): + pg = pgs[i] + pg_tasks.append([ + random_tasks.options( + placement_group=pg, + placement_group_bundle_index=bundle_index).remote() + for bundle_index in range(bundles_per_pg) + ]) + + # Make sure tasks are done and we remove placement groups. + num_removed_pg = 0 + pg_indexes = [2, 3, 1, 7, 8, 9, 0, 6, 4, 5] + while num_removed_pg < total_num_pg: + index = pg_indexes[num_removed_pg] + pg = pgs[index] + assert all(ray.get(pg_tasks[index])) + ray.util.remove_placement_group(pg) + num_removed_pg += 1 + + @ray.remote(num_cpus=2, num_gpus=per_node_gpus) + class A: + def ping(self): + return True + + # Make sure all resources are properly returned by scheduling + # actors that take up all existing resources. + actors = [A.remote() for _ in range(num_nodes)] + assert all(ray.get([a.ping.remote() for a in actors])) -def test_capture_child_actors(ray_start_cluster): +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_capture_child_actors(ray_start_cluster, connect_to_client): cluster = ray_start_cluster total_num_actors = 4 for _ in range(2): cluster.add_node(num_cpus=total_num_actors) ray.init(address=cluster.address) - pg = ray.util.placement_group( - [{ - "CPU": 2 - }, { - "CPU": 2 - }], strategy="STRICT_PACK") - ray.get(pg.ready()) + with connect_to_client_or_not(connect_to_client): + pg = ray.util.placement_group( + [{ + "CPU": 2 + }, { + "CPU": 2 + }], strategy="STRICT_PACK") + ray.get(pg.ready()) - # If get_current_placement_group is used when the current worker/driver - # doesn't belong to any of placement group, it should return None. - assert get_current_placement_group() is None + # If get_current_placement_group is used when the current worker/driver + # doesn't belong to any of placement group, it should return None. + assert get_current_placement_group() is None - # Test actors first. - @ray.remote(num_cpus=1) - class NestedActor: - def ready(self): - return True + # Test actors first. + @ray.remote(num_cpus=1) + class NestedActor: + def ready(self): + return True - @ray.remote(num_cpus=1) - class Actor: - def __init__(self): - self.actors = [] + @ray.remote(num_cpus=1) + class Actor: + def __init__(self): + self.actors = [] - def ready(self): - return True + def ready(self): + return True - def schedule_nested_actor(self): - # Make sure we can capture the current placement group. - assert get_current_placement_group() is not None - # Actors should be implicitly captured. - actor = NestedActor.remote() - ray.get(actor.ready.remote()) - self.actors.append(actor) - - def schedule_nested_actor_outside_pg(self): - # Don't use placement group. - actor = NestedActor.options(placement_group=None).remote() - ray.get(actor.ready.remote()) - self.actors.append(actor) - - a = Actor.options(placement_group=pg).remote() - ray.get(a.ready.remote()) - # 1 top level actor + 3 children. - for _ in range(total_num_actors - 1): - ray.get(a.schedule_nested_actor.remote()) - # Make sure all the actors are scheduled on the same node. - # (why? The placement group has STRICT_PACK strategy). - node_id_set = set() - for actor_info in ray.actors().values(): - node_id = actor_info["Address"]["NodeID"] - node_id_set.add(node_id) - - # Since all node id should be identical, set should be equal to 1. - assert len(node_id_set) == 1 + def schedule_nested_actor(self): + # Make sure we can capture the current placement group. + assert get_current_placement_group() is not None + # Actors should be implicitly captured. + actor = NestedActor.remote() + ray.get(actor.ready.remote()) + self.actors.append(actor) - # Kill an actor and wait until it is killed. - kill_actor_and_wait_for_failure(a) - with pytest.raises(ray.exceptions.RayActorError): + def schedule_nested_actor_outside_pg(self): + # Don't use placement group. + actor = NestedActor.options(placement_group=None).remote() + ray.get(actor.ready.remote()) + self.actors.append(actor) + + a = Actor.options(placement_group=pg).remote() ray.get(a.ready.remote()) + # 1 top level actor + 3 children. + for _ in range(total_num_actors - 1): + ray.get(a.schedule_nested_actor.remote()) + # Make sure all the actors are scheduled on the same node. + # (why? The placement group has STRICT_PACK strategy). + node_id_set = set() + for actor_info in ray.actors().values(): + node_id = actor_info["Address"]["NodeID"] + node_id_set.add(node_id) - # Now create an actor, but do not capture the current tasks - a = Actor.options( - placement_group=pg, - placement_group_capture_child_tasks=False).remote() - ray.get(a.ready.remote()) - # 1 top level actor + 3 children. - for _ in range(total_num_actors - 1): - ray.get(a.schedule_nested_actor.remote()) - # Make sure all the actors are not scheduled on the same node. - # It is because the child tasks are not scheduled on the same - # placement group. - node_id_set = set() - for actor_info in ray.actors().values(): - node_id = actor_info["Address"]["NodeID"] - node_id_set.add(node_id) - - assert len(node_id_set) == 2 + # Since all node id should be identical, set should be equal to 1. + assert len(node_id_set) == 1 - # Kill an actor and wait until it is killed. - kill_actor_and_wait_for_failure(a) - with pytest.raises(ray.exceptions.RayActorError): + # Kill an actor and wait until it is killed. + kill_actor_and_wait_for_failure(a) + with pytest.raises(ray.exceptions.RayActorError): + ray.get(a.ready.remote()) + + # Now create an actor, but do not capture the current tasks + a = Actor.options( + placement_group=pg, + placement_group_capture_child_tasks=False).remote() ray.get(a.ready.remote()) + # 1 top level actor + 3 children. + for _ in range(total_num_actors - 1): + ray.get(a.schedule_nested_actor.remote()) + # Make sure all the actors are not scheduled on the same node. + # It is because the child tasks are not scheduled on the same + # placement group. + node_id_set = set() + for actor_info in ray.actors().values(): + node_id = actor_info["Address"]["NodeID"] + node_id_set.add(node_id) - # Lastly, make sure when None is specified, actors are not scheduled - # on the same placement group. - a = Actor.options(placement_group=pg).remote() - ray.get(a.ready.remote()) - # 1 top level actor + 3 children. - for _ in range(total_num_actors - 1): - ray.get(a.schedule_nested_actor_outside_pg.remote()) - # Make sure all the actors are not scheduled on the same node. - # It is because the child tasks are not scheduled on the same - # placement group. - node_id_set = set() - for actor_info in ray.actors().values(): - node_id = actor_info["Address"]["NodeID"] - node_id_set.add(node_id) + assert len(node_id_set) == 2 + + # Kill an actor and wait until it is killed. + kill_actor_and_wait_for_failure(a) + with pytest.raises(ray.exceptions.RayActorError): + ray.get(a.ready.remote()) + + # Lastly, make sure when None is specified, actors are not scheduled + # on the same placement group. + a = Actor.options(placement_group=pg).remote() + ray.get(a.ready.remote()) + # 1 top level actor + 3 children. + for _ in range(total_num_actors - 1): + ray.get(a.schedule_nested_actor_outside_pg.remote()) + # Make sure all the actors are not scheduled on the same node. + # It is because the child tasks are not scheduled on the same + # placement group. + node_id_set = set() + for actor_info in ray.actors().values(): + node_id = actor_info["Address"]["NodeID"] + node_id_set.add(node_id) - assert len(node_id_set) == 2 + assert len(node_id_set) == 2 -def test_capture_child_tasks(ray_start_cluster): +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_capture_child_tasks(ray_start_cluster, connect_to_client): cluster = ray_start_cluster total_num_tasks = 4 for _ in range(2): cluster.add_node(num_cpus=total_num_tasks, num_gpus=total_num_tasks) ray.init(address=cluster.address) - pg = ray.util.placement_group( - [{ - "CPU": 2, - "GPU": 2, - }, { - "CPU": 2, - "GPU": 2, - }], - strategy="STRICT_PACK") - ray.get(pg.ready()) - - # If get_current_placement_group is used when the current worker/driver - # doesn't belong to any of placement group, it should return None. - assert get_current_placement_group() is None - - # Test if tasks capture child tasks. - @ray.remote - def task(): - return get_current_placement_group() - - @ray.remote - def create_nested_task(child_cpu, child_gpu): - assert get_current_placement_group() is not None - return ray.get([ - task.options(num_cpus=child_cpu, num_gpus=child_gpu).remote() - for _ in range(3) - ]) - - t = create_nested_task.options( - num_cpus=1, num_gpus=0, placement_group=pg).remote(1, 0) - pgs = ray.get(t) - # Every task should have current placement group because they - # should be implicitly captured by default. - assert None not in pgs - - # Test if tasks don't capture child tasks when the option is off. - t2 = create_nested_task.options( - num_cpus=0, - num_gpus=1, - placement_group=pg, - placement_group_capture_child_tasks=False).remote(0, 1) - pgs = ray.get(t2) - # All placement group should be None because we don't capture child tasks. - assert not all(pgs) + with connect_to_client_or_not(connect_to_client): + pg = ray.util.placement_group( + [{ + "CPU": 2, + "GPU": 2, + }, { + "CPU": 2, + "GPU": 2, + }], + strategy="STRICT_PACK") + ray.get(pg.ready()) + + # If get_current_placement_group is used when the current worker/driver + # doesn't belong to any of placement group, it should return None. + assert get_current_placement_group() is None + + # Test if tasks capture child tasks. + @ray.remote + def task(): + return get_current_placement_group() + + @ray.remote + def create_nested_task(child_cpu, child_gpu): + assert get_current_placement_group() is not None + return ray.get([ + task.options(num_cpus=child_cpu, num_gpus=child_gpu).remote() + for _ in range(3) + ]) + + t = create_nested_task.options( + num_cpus=1, num_gpus=0, placement_group=pg).remote(1, 0) + pgs = ray.get(t) + # Every task should have current placement group because they + # should be implicitly captured by default. + assert None not in pgs + + # Test if tasks don't capture child tasks when the option is off. + t2 = create_nested_task.options( + num_cpus=0, + num_gpus=1, + placement_group=pg, + placement_group_capture_child_tasks=False).remote(0, 1) + pgs = ray.get(t2) + # All placement groups should be None since we don't capture child + # tasks. + assert not all(pgs) def test_ready_warning_suppressed(ray_start_regular, error_pubsub): @@ -1313,25 +1368,27 @@ def test_placement_group_wait_api(ray_start_cluster_head): placement_group1.wait(10) -def test_schedule_placement_groups_at_the_same_time(): +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_schedule_placement_groups_at_the_same_time(connect_to_client): ray.init(num_cpus=4) - pgs = [placement_group([{"CPU": 2}]) for _ in range(6)] + with connect_to_client_or_not(connect_to_client): + pgs = [placement_group([{"CPU": 2}]) for _ in range(6)] - wait_pgs = {pg.ready(): pg for pg in pgs} + wait_pgs = {pg.ready(): pg for pg in pgs} - def is_all_placement_group_removed(): - ready, _ = ray.wait(list(wait_pgs.keys()), timeout=0.5) - if ready: - ready_pg = wait_pgs[ready[0]] - remove_placement_group(ready_pg) - del wait_pgs[ready[0]] + def is_all_placement_group_removed(): + ready, _ = ray.wait(list(wait_pgs.keys()), timeout=0.5) + if ready: + ready_pg = wait_pgs[ready[0]] + remove_placement_group(ready_pg) + del wait_pgs[ready[0]] - if len(wait_pgs) == 0: - return True - return False + if len(wait_pgs) == 0: + return True + return False - wait_for_condition(is_all_placement_group_removed) + wait_for_condition(is_all_placement_group_removed) ray.shutdown() @@ -1532,37 +1589,41 @@ def ping(self): assert error_count == 1 -def test_placement_group_synchronous_registration(ray_start_cluster): +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_placement_group_synchronous_registration(ray_start_cluster, + connect_to_client): cluster = ray_start_cluster # One node which only has one CPU. cluster.add_node(num_cpus=1) cluster.wait_for_nodes() ray.init(address=cluster.address) - # Create a placement group that has two bundles and `STRICT_PACK` strategy, - # so, its registration will successful but scheduling failed. - placement_group = ray.util.placement_group( - name="name", - strategy="STRICT_PACK", - bundles=[{ - "CPU": 1, - }, { - "CPU": 1 - }]) - # Make sure we can properly remove it immediately - # as its registration is synchronous. - ray.util.remove_placement_group(placement_group) + with connect_to_client_or_not(connect_to_client): + # Create a placement group that has two bundles and `STRICT_PACK` + # strategy so its registration will successful but scheduling failed. + placement_group = ray.util.placement_group( + name="name", + strategy="STRICT_PACK", + bundles=[{ + "CPU": 1, + }, { + "CPU": 1 + }]) + # Make sure we can properly remove it immediately + # as its registration is synchronous. + ray.util.remove_placement_group(placement_group) - def is_placement_group_removed(): - table = ray.util.placement_group_table(placement_group) - if "state" not in table: - return False - return table["state"] == "REMOVED" + def is_placement_group_removed(): + table = ray.util.placement_group_table(placement_group) + if "state" not in table: + return False + return table["state"] == "REMOVED" - wait_for_condition(is_placement_group_removed) + wait_for_condition(is_placement_group_removed) -def test_placement_group_gpu_set(ray_start_cluster): +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_placement_group_gpu_set(ray_start_cluster, connect_to_client): cluster = ray_start_cluster # One node which only has one CPU. cluster.add_node(num_cpus=1, num_gpus=1) @@ -1570,35 +1631,37 @@ def test_placement_group_gpu_set(ray_start_cluster): cluster.wait_for_nodes() ray.init(address=cluster.address) - placement_group = ray.util.placement_group( - name="name", - strategy="PACK", - bundles=[{ - "CPU": 1, - "GPU": 1 - }, { - "CPU": 1, - "GPU": 1 - }]) - - @ray.remote(num_gpus=1) - def get_gpus(): - return ray.get_gpu_ids() - - result = get_gpus.options( - placement_group=placement_group, - placement_group_bundle_index=0).remote() - result = ray.get(result) - assert result == [0] + with connect_to_client_or_not(connect_to_client): + placement_group = ray.util.placement_group( + name="name", + strategy="PACK", + bundles=[{ + "CPU": 1, + "GPU": 1 + }, { + "CPU": 1, + "GPU": 1 + }]) + + @ray.remote(num_gpus=1) + def get_gpus(): + return ray.get_gpu_ids() + + result = get_gpus.options( + placement_group=placement_group, + placement_group_bundle_index=0).remote() + result = ray.get(result) + assert result == [0] - result = get_gpus.options( - placement_group=placement_group, - placement_group_bundle_index=1).remote() - result = ray.get(result) - assert result == [0] + result = get_gpus.options( + placement_group=placement_group, + placement_group_bundle_index=1).remote() + result = ray.get(result) + assert result == [0] -def test_placement_group_gpu_assigned(ray_start_cluster): +@pytest.mark.parametrize("connect_to_client", [False, True]) +def test_placement_group_gpu_assigned(ray_start_cluster, connect_to_client): cluster = ray_start_cluster cluster.add_node(num_gpus=2) ray.init(address=cluster.address) @@ -1609,17 +1672,57 @@ def f(): import os return os.environ["CUDA_VISIBLE_DEVICES"] - pg1 = ray.util.placement_group([{"GPU": 1}]) - pg2 = ray.util.placement_group([{"GPU": 1}]) + with connect_to_client_or_not(connect_to_client): + pg1 = ray.util.placement_group([{"GPU": 1}]) + pg2 = ray.util.placement_group([{"GPU": 1}]) + + assert pg1.wait(10) + assert pg2.wait(10) + + gpu_ids_res.add(ray.get(f.options(placement_group=pg1).remote())) + gpu_ids_res.add(ray.get(f.options(placement_group=pg2).remote())) + + assert len(gpu_ids_res) == 2 + + +def test_placement_group_client_option_serialization(): + """Tests conversion of placement group to json-serializable dict and back. + + Tests conversion + placement_group -> dict -> placement_group and + dict -> placement_group -> dict + with and without non-null bundle cache. + """ + + # Tests conversion from dict to placement group and back. + def dict_to_pg_to_dict(pg_dict_in): + pg = PlacementGroup.from_dict(pg_dict_in) + pg_dict_out = pg.to_dict() + assert pg_dict_in == pg_dict_out + + # Tests conversion from placement group to dict and back. + def pg_to_dict_to_pg(pg_in): + pg_dict = pg_in.to_dict() + pg_out = PlacementGroup.from_dict(pg_dict) + assert pg_out.id == pg_in.id + assert pg_out.bundle_cache == pg_in.bundle_cache + + pg_id = PlacementGroupID(id=bytes(16)) + id_string = bytes(16).hex() + bundle_cache = [{"CPU": 2}, {"custom_resource": 5}] + + pg_with_bundles = PlacementGroup(id=pg_id, bundle_cache=bundle_cache) + pg_to_dict_to_pg(pg_with_bundles) - assert pg1.wait(10) - assert pg2.wait(10) + pg_no_bundles = PlacementGroup(id=pg_id) + pg_to_dict_to_pg(pg_no_bundles) - gpu_ids_res.add(ray.get(f.options(placement_group=pg1).remote())) - gpu_ids_res.add(ray.get(f.options(placement_group=pg2).remote())) + pg_dict_with_bundles = {"id": id_string, "bundle_cache": bundle_cache} + dict_to_pg_to_dict(pg_dict_with_bundles) - assert len(gpu_ids_res) == 2 + pg_dict_no_bundles = {"id": id_string, "bundle_cache": None} + dict_to_pg_to_dict(pg_dict_no_bundles) if __name__ == "__main__": - sys.exit(pytest.main(["-v", __file__])) + sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/util/client/api.py b/python/ray/util/client/api.py index febc84111534..a827c4d8fb2c 100644 --- a/python/ray/util/client/api.py +++ b/python/ray/util/client/api.py @@ -287,6 +287,10 @@ def _get_converted(self, key: str) -> "ClientStub": """Given a UUID, return the converted object""" return self.worker._get_converted(key) + def _converted_key_exists(self, key: str) -> bool: + """Check if a key UUID is present in the store of converted objects.""" + return self.worker._converted_key_exists(key) + def __getattr__(self, key: str): if not key.startswith("_"): raise NotImplementedError( diff --git a/python/ray/util/client/common.py b/python/ray/util/client/common.py index 2023e9140c78..16ec11042385 100644 --- a/python/ray/util/client/common.py +++ b/python/ray/util/client/common.py @@ -307,6 +307,12 @@ def set_task_options(task: ray_client_pb2.ClientTask, if options is None: task.ClearField(field) return + + # If there's a non-null "placement_group" in `options`, convert the + # placement group to a dict so that `options` can be passed to json.dumps. + if options.get("placement_group", None): + options["placement_group"] = options["placement_group"].to_dict() + options_str = json.dumps(options) getattr(task, field).json_options = options_str diff --git a/python/ray/util/client/ray_client_helpers.py b/python/ray/util/client/ray_client_helpers.py index 54984709971e..cc6b3fa26201 100644 --- a/python/ray/util/client/ray_client_helpers.py +++ b/python/ray/util/client/ray_client_helpers.py @@ -4,6 +4,7 @@ import ray.cloudpickle as pickle import ray.util.client.server.server as ray_client_server from ray.util.client import ray +from ray._private.client_mode_hook import enable_client_mode @contextmanager @@ -62,3 +63,36 @@ def _CloudPicklerReducer(obj): # construct a reducer pickle.CloudPickler.dispatch[cls] = _CloudPicklerReducer + + +@contextmanager +def connect_to_client_or_not(connect_to_client: bool): + """Utility for running test logic with and without a Ray client connection. + + If client_connect is True, will connect to Ray client in context. + If client_connect is False, does nothing. + + How to use: + Given a test of the following form: + + def test_(args): + + + + Modify the test to + + @pytest.mark.parametrize("connect_to_client", [False, True]) + def test_(args, connect_to_client) + + with connect_to_client_or_not(connect_to_client): + + + Parameterize the argument connect over True, False to run the test with and + without a Ray client connection. + """ + + if connect_to_client: + with ray_start_client_server(), enable_client_mode(): + yield + else: + yield diff --git a/python/ray/util/client/server/server.py b/python/ray/util/client/server/server.py index c358a33012b0..28b437a63dc2 100644 --- a/python/ray/util/client/server/server.py +++ b/python/ray/util/client/server/server.py @@ -31,6 +31,7 @@ from ray.util.client.server.dataservicer import DataServicer from ray.util.client.server.logservicer import LogstreamServicer from ray.util.client.server.server_stubs import current_server +from ray.util.placement_group import PlacementGroup from ray._private.client_mode_hook import disable_client_hook logger = logging.getLogger(__name__) @@ -510,6 +511,13 @@ def decode_options( return None opts = json.loads(options.json_options) assert isinstance(opts, dict) + + if opts.get("placement_group", None): + # Placement groups in Ray client options are serialized as dicts. + # Convert the dict to a PlacementGroup. + opts["placement_group"] = PlacementGroup.from_dict( + opts["placement_group"]) + return opts diff --git a/python/ray/util/client/worker.py b/python/ray/util/client/worker.py index 4e9b807f55d6..ded6657ced8f 100644 --- a/python/ray/util/client/worker.py +++ b/python/ray/util/client/worker.py @@ -274,7 +274,7 @@ def wait(self, data = { "object_ids": [object_ref.id for object_ref in object_refs], "num_returns": num_returns, - "timeout": timeout if timeout else -1, + "timeout": timeout if (timeout is not None) else -1, "client_id": self._client_id, } req = ray_client_pb2.WaitRequest(**data) @@ -512,6 +512,10 @@ def _get_converted(self, key: str) -> "ClientStub": """Given a UUID, return the converted object""" return self._converted[key] + def _converted_key_exists(self, key: str) -> bool: + """Check if a key UUID is present in the store of converted objects.""" + return key in self._converted + def make_client_id() -> str: id = uuid.uuid4() diff --git a/python/ray/util/placement_group.py b/python/ray/util/placement_group.py index 35deefaeb3c2..a07688def36b 100644 --- a/python/ray/util/placement_group.py +++ b/python/ray/util/placement_group.py @@ -1,11 +1,20 @@ import time - -from typing import (List, Dict, Optional, Union) +from typing import Dict +from typing import List +from typing import Optional +from typing import Union +from typing import TYPE_CHECKING import ray -from ray._raylet import PlacementGroupID, ObjectRef +from ray._raylet import ObjectRef +from ray._raylet import PlacementGroupID from ray._private.utils import hex_to_binary from ray.ray_constants import (to_memory_units, MEMORY_RESOURCE_UNIT_BYTES) +from ray._private.client_mode_hook import client_mode_should_convert +from ray._private.client_mode_hook import client_mode_wrap + +if TYPE_CHECKING: + from ray.util.common import ClientObjectRef # noqa bundle_reservation_check = None @@ -31,14 +40,16 @@ class PlacementGroup: """A handle to a placement group.""" @staticmethod - def empty(): + def empty() -> "PlacementGroup": return PlacementGroup(PlacementGroupID.nil()) - def __init__(self, id: PlacementGroupID): + def __init__(self, + id: PlacementGroupID, + bundle_cache: Optional[List[Dict]] = None): self.id = id - self.bundle_cache = None + self.bundle_cache = bundle_cache - def ready(self) -> ObjectRef: + def ready(self) -> Union[ObjectRef, "ClientObjectRef"]: """Returns an ObjectRef to check ready status. This API runs a small dummy task to wait for placement group creation. @@ -67,7 +78,7 @@ def ready(self) -> ObjectRef: bundle_index = 0 bundle = self.bundle_cache[bundle_index] - resource_name, value = self._get_none_zero_resource(bundle) + resource_name, value = self._get_a_non_zero_resource(bundle) num_cpus = 0 num_gpus = 0 memory = 0 @@ -96,11 +107,7 @@ def wait(self, timeout_seconds: Union[float, int]) -> bool: Return: True if the placement group is created. False otherwise. """ - worker = ray.worker.global_worker - worker.check_connected() - - return worker.core_worker.wait_placement_group_ready( - self.id, timeout_seconds) + return _call_placement_group_ready(self.id, timeout_seconds) @property def bundle_specs(self) -> List[Dict]: @@ -109,12 +116,54 @@ def bundle_specs(self) -> List[Dict]: return self.bundle_cache @property - def bundle_count(self): + def bundle_count(self) -> int: self._fill_bundle_cache_if_needed() return len(self.bundle_cache) - def _get_none_zero_resource(self, bundle: List[Dict]): - # Set a mock value to schedule a dummy task. + def to_dict(self) -> dict: + """Convert this placement group into a dict for purposes of json + serialization. + + Used when passing a placement group as an option to a Ray client remote + function. See set_task_options in util/client/common.py. + + Return: + Dictionary with json-serializable keys representing the placemnent + group. + """ + # Placement group id is converted to a hex /string/ to make it + # serializable. + return {"id": self.id.hex(), "bundle_cache": self.bundle_cache} + + @staticmethod + def from_dict(pg_dict: dict) -> "PlacementGroup": + """Instantiate and return a PlacementGroup from its json-serializable + dict representation. + + Used by Ray Client on server-side to deserialize placement group + option. See decode_options in util/client/server/server.py. + + Args: + serializable_form(dict): Dictionary representing a placement group. + Return: + A placement group made from the data in the input dict. + """ + # Validate serialized dict + assert isinstance(pg_dict, dict) + assert pg_dict.keys() == {"id", "bundle_cache"} + # The value associated to key "id" is a hex string. + assert isinstance(pg_dict["id"], str) + if pg_dict["bundle_cache"] is not None: + assert isinstance(pg_dict["bundle_cache"], list) + + # Deserialize and return a Placement Group. + id_bytes = bytes.fromhex(pg_dict["id"]) + pg_id = PlacementGroupID(id_bytes) + bundle_cache = pg_dict["bundle_cache"] + return PlacementGroup(pg_id, bundle_cache) + + def _get_a_non_zero_resource(self, bundle: Dict): + # Any number between 0-1 should be fine. MOCK_VALUE = 0.001 for key, value in bundle.items(): if value > 0: @@ -123,31 +172,46 @@ def _get_none_zero_resource(self, bundle: List[Dict]): return key, value assert False, "This code should be unreachable." - def _fill_bundle_cache_if_needed(self): + def _fill_bundle_cache_if_needed(self) -> None: if not self.bundle_cache: - # Since creating placement group is async, it is - # possible table is not ready yet. To avoid the - # problem, we should keep trying with timeout. - TIMEOUT_SECOND = 30 - WAIT_INTERVAL = 0.05 - timeout_cnt = 0 - worker = ray.worker.global_worker - worker.check_connected() - - while timeout_cnt < int(TIMEOUT_SECOND / WAIT_INTERVAL): - pg_info = ray.state.state.placement_group_table(self.id) - if pg_info: - self.bundle_cache = list(pg_info["bundles"].values()) - return - time.sleep(WAIT_INTERVAL) - timeout_cnt += 1 - - raise RuntimeError( - "Couldn't get the bundle information of placement group id " - f"{self.id} in {TIMEOUT_SECOND} seconds. It is likely " - "because GCS server is too busy.") + self.bundle_cache = _get_bundle_cache(self.id) + +@client_mode_wrap +def _call_placement_group_ready(pg_id: PlacementGroupID, + timeout_seconds: int) -> bool: + worker = ray.worker.global_worker + worker.check_connected() + return worker.core_worker.wait_placement_group_ready( + pg_id, timeout_seconds) + + +@client_mode_wrap +def _get_bundle_cache(pg_id: PlacementGroupID) -> List[Dict]: + # Since creating placement group is async, it is + # possible table is not ready yet. To avoid the + # problem, we should keep trying with timeout. + TIMEOUT_SECOND = 30 + WAIT_INTERVAL = 0.05 + timeout_cnt = 0 + worker = ray.worker.global_worker + worker.check_connected() + + while timeout_cnt < int(TIMEOUT_SECOND / WAIT_INTERVAL): + pg_info = ray.state.state.placement_group_table(pg_id) + if pg_info: + return list(pg_info["bundles"].values()) + time.sleep(WAIT_INTERVAL) + timeout_cnt += 1 + + raise RuntimeError( + "Couldn't get the bundle information of placement group id " + f"{id} in {TIMEOUT_SECOND} seconds. It is likely " + "because GCS server is too busy.") + + +@client_mode_wrap def placement_group(bundles: List[Dict[str, float]], strategy: str = "PACK", name: str = "", @@ -208,7 +272,8 @@ def placement_group(bundles: List[Dict[str, float]], return PlacementGroup(placement_group_id) -def remove_placement_group(placement_group: PlacementGroup): +@client_mode_wrap +def remove_placement_group(placement_group: PlacementGroup) -> None: """Asynchronously remove placement group. Args: @@ -221,7 +286,8 @@ def remove_placement_group(placement_group: PlacementGroup): worker.core_worker.remove_placement_group(placement_group.id) -def get_placement_group(placement_group_name: str): +@client_mode_wrap +def get_placement_group(placement_group_name: str) -> PlacementGroup: """Get a placement group object with a global name. Returns: @@ -244,6 +310,7 @@ def get_placement_group(placement_group_name: str): hex_to_binary(placement_group_info["placement_group_id"]))) +@client_mode_wrap def placement_group_table(placement_group: PlacementGroup = None) -> dict: """Get the state of the placement group from GCS. @@ -286,6 +353,9 @@ def get_current_placement_group() -> Optional[PlacementGroup]: None if the current task or actor wasn't created with any placement group. """ + if client_mode_should_convert(): + # Client mode is only a driver. + return None worker = ray.worker.global_worker worker.check_connected() pg_id = worker.placement_group_id @@ -295,7 +365,7 @@ def get_current_placement_group() -> Optional[PlacementGroup]: def check_placement_group_index(placement_group: PlacementGroup, - bundle_index: int): + bundle_index: int) -> None: assert placement_group is not None if placement_group.id.is_nil(): if bundle_index != -1: