From 23281998590e5fb18b57530bc037bbdb366c98d6 Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Thu, 19 Sep 2024 15:56:11 -0700 Subject: [PATCH 1/3] Add placement group id and format required resources without pg id in the REST API response that the dashboard calls Signed-off-by: Mengjin Yan --- python/ray/dashboard/datacenter.py | 6 ++- .../ray/dashboard/modules/actor/actor_head.py | 4 +- .../modules/actor/tests/test_actor.py | 43 ++++++++++++++++++- 3 files changed, 49 insertions(+), 4 deletions(-) diff --git a/python/ray/dashboard/datacenter.py b/python/ray/dashboard/datacenter.py index b0c663733f2b..8c99a0db9759 100644 --- a/python/ray/dashboard/datacenter.py +++ b/python/ray/dashboard/datacenter.py @@ -3,7 +3,7 @@ from typing import Any, List, Optional import ray.dashboard.consts as dashboard_consts -from ray._private.utils import get_or_create_event_loop +from ray._private.utils import get_or_create_event_loop, parse_pg_formatted_resources_to_original from ray.dashboard.utils import ( Dict, MutableNotificationDict, @@ -264,4 +264,8 @@ async def _get_actor(actor): actor["gpus"] = actor_process_gpu_stats actor["processStats"] = actor_process_stats actor["mem"] = node_physical_stats.get("mem", []) + + required_resources = parse_pg_formatted_resources_to_original(actor["requiredResources"]) + actor["requiredResources"] = required_resources + return actor diff --git a/python/ray/dashboard/modules/actor/actor_head.py b/python/ray/dashboard/modules/actor/actor_head.py index 4ce372a751a3..18dedef989e5 100644 --- a/python/ray/dashboard/modules/actor/actor_head.py +++ b/python/ray/dashboard/modules/actor/actor_head.py @@ -35,6 +35,7 @@ def actor_table_data_to_dict(message): "taskId", "parentTaskId", "sourceActorId", + "placementGroupId" }, always_print_fields_with_no_presence=True, ) @@ -55,6 +56,7 @@ def actor_table_data_to_dict(message): "startTime", "endTime", "reprName", + "placementGroupId" } light_message = {k: v for (k, v) in orig_message.items() if k in fields} light_message["actorClass"] = orig_message["className"] @@ -75,7 +77,7 @@ def actor_table_data_to_dict(message): light_message["startTime"] = int(light_message["startTime"]) light_message["endTime"] = int(light_message["endTime"]) light_message["requiredResources"] = dict(message.required_resources) - + return light_message diff --git a/python/ray/dashboard/modules/actor/tests/test_actor.py b/python/ray/dashboard/modules/actor/tests/test_actor.py index f150904ec9d8..c011f84c4f87 100644 --- a/python/ray/dashboard/modules/actor/tests/test_actor.py +++ b/python/ray/dashboard/modules/actor/tests/test_actor.py @@ -11,11 +11,23 @@ from ray._private.test_utils import format_web_url, wait_until_server_available from ray.dashboard.modules.actor import actor_consts from ray.dashboard.tests.conftest import * # noqa +from ray.util.placement_group import ( + placement_group +) +from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy logger = logging.getLogger(__name__) def test_actors(disable_aiohttp_cache, ray_start_with_dashboard): + """ + Tests the REST API dashboard calls on: + - alive actors + - infeasible actors + - dead actors + - pg acrors (with pg_id set and required_resources formatted) + """ + @ray.remote class Foo: def __init__(self, num): @@ -39,17 +51,35 @@ def __repr__(self) -> str: class InfeasibleActor: pass + pg = placement_group([{"CPU": 1}]) + @ray.remote(num_cpus=1) + class PgActor: + def __init__(self): + pass + + def do_task(self): + return 1 + + def get_placement_group_id(self): + return ray.get_runtime_context().get_placement_group_id() + foo_actors = [Foo.options(name="first").remote(4), Foo.remote(5)] infeasible_actor = InfeasibleActor.options(name="infeasible").remote() # noqa dead_actor = Foo.options(name="dead").remote(1) + pg_actor = PgActor.options(name="pg", scheduling_strategy=PlacementGroupSchedulingStrategy( + placement_group=pg, + )).remote() + ray.kill(dead_actor) - results = [actor.do_task.remote() for actor in foo_actors] # noqa + results_foo = [actor.do_task.remote() for actor in foo_actors] # noqa + results_pg = pg_actor.do_task.remote() webui_url = ray_start_with_dashboard["webui_url"] assert wait_until_server_available(webui_url) webui_url = format_web_url(webui_url) job_id = ray.get_runtime_context().get_job_id() node_id = ray.get(foo_actors[0].get_node_id.remote()) pid = ray.get(foo_actors[0].get_pid.remote()) + placement_group_id = ray.get(pg_actor.get_placement_group_id.remote()) timeout_seconds = 5 start_time = time.time() @@ -61,7 +91,8 @@ class InfeasibleActor: resp_json = resp.json() resp_data = resp_json["data"] actors = resp_data["actors"] - assert len(actors) == 4 + assert len(actors) == 5 + for a in actors.values(): if a["name"] == "first": actor_response = a @@ -104,6 +135,14 @@ class InfeasibleActor: all_pids = {entry["pid"] for entry in actors.values()} assert 0 in all_pids # The infeasible actor assert len(all_pids) > 1 + + # Check the pg actor metadata. + for a in actors.values(): + if a["name"] == "pg": + pg_actor_response = a + assert pg_actor_response["placementGroupId"] == placement_group_id + assert pg_actor_response["requiredResources"] == {'CPU': 1.0} + break except Exception as ex: last_ex = ex From 49a9cbf52d75184e77c0cd9c4b63b6d4ba3fe26a Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Thu, 19 Sep 2024 16:27:17 -0700 Subject: [PATCH 2/3] Code reformat by lint Signed-off-by: Mengjin Yan --- python/ray/dashboard/datacenter.py | 11 ++++++-- .../ray/dashboard/modules/actor/actor_head.py | 6 ++-- .../modules/actor/tests/test_actor.py | 28 ++++++++++--------- 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/python/ray/dashboard/datacenter.py b/python/ray/dashboard/datacenter.py index 8c99a0db9759..82f937b912c9 100644 --- a/python/ray/dashboard/datacenter.py +++ b/python/ray/dashboard/datacenter.py @@ -3,7 +3,10 @@ from typing import Any, List, Optional import ray.dashboard.consts as dashboard_consts -from ray._private.utils import get_or_create_event_loop, parse_pg_formatted_resources_to_original +from ray._private.utils import ( + get_or_create_event_loop, + parse_pg_formatted_resources_to_original, +) from ray.dashboard.utils import ( Dict, MutableNotificationDict, @@ -264,8 +267,10 @@ async def _get_actor(actor): actor["gpus"] = actor_process_gpu_stats actor["processStats"] = actor_process_stats actor["mem"] = node_physical_stats.get("mem", []) - - required_resources = parse_pg_formatted_resources_to_original(actor["requiredResources"]) + + required_resources = parse_pg_formatted_resources_to_original( + actor["requiredResources"] + ) actor["requiredResources"] = required_resources return actor diff --git a/python/ray/dashboard/modules/actor/actor_head.py b/python/ray/dashboard/modules/actor/actor_head.py index 18dedef989e5..49c6e0f58368 100644 --- a/python/ray/dashboard/modules/actor/actor_head.py +++ b/python/ray/dashboard/modules/actor/actor_head.py @@ -35,7 +35,7 @@ def actor_table_data_to_dict(message): "taskId", "parentTaskId", "sourceActorId", - "placementGroupId" + "placementGroupId", }, always_print_fields_with_no_presence=True, ) @@ -56,7 +56,7 @@ def actor_table_data_to_dict(message): "startTime", "endTime", "reprName", - "placementGroupId" + "placementGroupId", } light_message = {k: v for (k, v) in orig_message.items() if k in fields} light_message["actorClass"] = orig_message["className"] @@ -77,7 +77,7 @@ def actor_table_data_to_dict(message): light_message["startTime"] = int(light_message["startTime"]) light_message["endTime"] = int(light_message["endTime"]) light_message["requiredResources"] = dict(message.required_resources) - + return light_message diff --git a/python/ray/dashboard/modules/actor/tests/test_actor.py b/python/ray/dashboard/modules/actor/tests/test_actor.py index c011f84c4f87..bd55b412ffab 100644 --- a/python/ray/dashboard/modules/actor/tests/test_actor.py +++ b/python/ray/dashboard/modules/actor/tests/test_actor.py @@ -11,9 +11,7 @@ from ray._private.test_utils import format_web_url, wait_until_server_available from ray.dashboard.modules.actor import actor_consts from ray.dashboard.tests.conftest import * # noqa -from ray.util.placement_group import ( - placement_group -) +from ray.util.placement_group import placement_group from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy logger = logging.getLogger(__name__) @@ -52,24 +50,28 @@ class InfeasibleActor: pass pg = placement_group([{"CPU": 1}]) - @ray.remote(num_cpus=1) + + @ray.remote(num_cpus=1) class PgActor: def __init__(self): pass - + def do_task(self): return 1 - + def get_placement_group_id(self): return ray.get_runtime_context().get_placement_group_id() foo_actors = [Foo.options(name="first").remote(4), Foo.remote(5)] infeasible_actor = InfeasibleActor.options(name="infeasible").remote() # noqa dead_actor = Foo.options(name="dead").remote(1) - pg_actor = PgActor.options(name="pg", scheduling_strategy=PlacementGroupSchedulingStrategy( - placement_group=pg, - )).remote() - + pg_actor = PgActor.options( + name="pg", + scheduling_strategy=PlacementGroupSchedulingStrategy( + placement_group=pg, + ), + ).remote() + ray.kill(dead_actor) results_foo = [actor.do_task.remote() for actor in foo_actors] # noqa results_pg = pg_actor.do_task.remote() @@ -92,7 +94,7 @@ def get_placement_group_id(self): resp_data = resp_json["data"] actors = resp_data["actors"] assert len(actors) == 5 - + for a in actors.values(): if a["name"] == "first": actor_response = a @@ -135,13 +137,13 @@ def get_placement_group_id(self): all_pids = {entry["pid"] for entry in actors.values()} assert 0 in all_pids # The infeasible actor assert len(all_pids) > 1 - + # Check the pg actor metadata. for a in actors.values(): if a["name"] == "pg": pg_actor_response = a assert pg_actor_response["placementGroupId"] == placement_group_id - assert pg_actor_response["requiredResources"] == {'CPU': 1.0} + assert pg_actor_response["requiredResources"] == {"CPU": 1.0} break except Exception as ex: From 7263bb0a83b485ea54895397f11ee1920280959f Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Thu, 19 Sep 2024 17:53:23 -0700 Subject: [PATCH 3/3] fix lint error Signed-off-by: Mengjin Yan --- python/ray/dashboard/modules/actor/tests/test_actor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/dashboard/modules/actor/tests/test_actor.py b/python/ray/dashboard/modules/actor/tests/test_actor.py index bd55b412ffab..d5a8fd8c89be 100644 --- a/python/ray/dashboard/modules/actor/tests/test_actor.py +++ b/python/ray/dashboard/modules/actor/tests/test_actor.py @@ -73,8 +73,8 @@ def get_placement_group_id(self): ).remote() ray.kill(dead_actor) - results_foo = [actor.do_task.remote() for actor in foo_actors] # noqa - results_pg = pg_actor.do_task.remote() + [actor.do_task.remote() for actor in foo_actors] + pg_actor.do_task.remote() webui_url = ray_start_with_dashboard["webui_url"] assert wait_until_server_available(webui_url) webui_url = format_web_url(webui_url)