From 0426ee4d414f212e27496c72129f167eed56faf7 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 19 Sep 2024 09:57:48 -0700 Subject: [PATCH] [Dashboard] Optimizing performance of Ray Dashboard (#47617) Signed-off-by: Alexey Kudinkin Signed-off-by: ujjawal-khare --- python/ray/dashboard/datacenter.py | 1 + .../ray/dashboard/modules/actor/actor_head.py | 49 +++----------- .../ray/dashboard/modules/node/node_head.py | 65 +++++++++++-------- .../dashboard/modules/node/tests/test_node.py | 1 + 4 files changed, 49 insertions(+), 67 deletions(-) diff --git a/python/ray/dashboard/datacenter.py b/python/ray/dashboard/datacenter.py index 4bb2d9d3a06fc..25ca83a8663bb 100644 --- a/python/ray/dashboard/datacenter.py +++ b/python/ray/dashboard/datacenter.py @@ -122,6 +122,7 @@ def _extract_workers_for_node(cls, node_physical_stats, node_stats): for worker in node_physical_stats.get("workers", []): worker = dict(worker) pid = worker["pid"] + core_worker_stats = pid_to_worker_stats.get(pid) # Empty list means core worker stats is not available. worker["coreWorkerStats"] = [core_worker_stats] if core_worker_stats else [] diff --git a/python/ray/dashboard/modules/actor/actor_head.py b/python/ray/dashboard/modules/actor/actor_head.py index 3036d099327a9..990579a9c6051 100644 --- a/python/ray/dashboard/modules/actor/actor_head.py +++ b/python/ray/dashboard/modules/actor/actor_head.py @@ -181,16 +181,16 @@ async def _update_actors(self): Processes actor info. First gets all actors from GCS, then subscribes to actor updates. For each actor update, updates DataSource.node_actors and DataSource.actors. + """ - To prevent Time-of-check to time-of-use issue [1], the get-all-actor-info - happens after the subscription. That is, an update between get-all-actor-info - and the subscription is not missed. + # To prevent Time-of-check to time-of-use issue [1], the get-all-actor-info + # happens after the subscription. That is, an update between get-all-actor-info + # and the subscription is not missed. + # # [1] https://en.wikipedia.org/wiki/Time-of-check_to_time-of-use - """ - # Receive actors from channel. gcs_addr = self._dashboard_head.gcs_address - subscriber = GcsAioActorSubscriber(address=gcs_addr) - await subscriber.subscribe() + actor_channel_subscriber = GcsAioActorSubscriber(address=gcs_addr) + await actor_channel_subscriber.subscribe() # Get all actor info. while True: @@ -222,40 +222,7 @@ async def _update_actors(self): actor_consts.RETRY_GET_ALL_ACTOR_INFO_INTERVAL_SECONDS ) - state_keys = ( - "state", - "address", - "numRestarts", - "timestamp", - "pid", - "exitDetail", - "startTime", - "endTime", - "reprName", - ) - - def process_actor_data_from_pubsub(actor_id, actor_table_data): - actor_table_data = actor_table_data_to_dict(actor_table_data) - # If actor is not new registered but updated, we only update - # states related fields. - if actor_table_data["state"] != "DEPENDENCIES_UNREADY": - actors = DataSource.actors[actor_id] - for k in state_keys: - if k in actor_table_data: - actors[k] = actor_table_data[k] - actor_table_data = actors - actor_id = actor_table_data["actorId"] - node_id = actor_table_data["address"]["rayletId"] - if actor_table_data["state"] == "DEAD": - self.dead_actors_queue.append(actor_id) - # Update actors. - DataSource.actors[actor_id] = actor_table_data - # Update node actors (only when node_id is not Nil). - if node_id != actor_consts.NIL_NODE_ID: - node_actors = DataSource.node_actors.get(node_id, {}) - node_actors[actor_id] = actor_table_data - DataSource.node_actors[node_id] = node_actors - + # Pull incremental updates from the GCS channel while True: try: updated_actor_table_entries = await self._poll_updated_actor_table_data( diff --git a/python/ray/dashboard/modules/node/node_head.py b/python/ray/dashboard/modules/node/node_head.py index 15cea516325e2..e146d9409cb6a 100644 --- a/python/ray/dashboard/modules/node/node_head.py +++ b/python/ray/dashboard/modules/node/node_head.py @@ -5,8 +5,9 @@ import os import time from collections import deque +from concurrent.futures import ThreadPoolExecutor from itertools import chain -from typing import AsyncGenerator, Dict, List, Tuple +from typing import AsyncGenerator, Dict, Iterable, List, Optional import aiohttp.web import grpc @@ -17,8 +18,13 @@ import ray.dashboard.utils as dashboard_utils from ray import NodeID from ray._private import ray_constants +from ray._private.collections_utils import split from ray._private.gcs_pubsub import GcsAioNodeInfoSubscriber -from ray._private.ray_constants import DEBUG_AUTOSCALING_ERROR, DEBUG_AUTOSCALING_STATUS +from ray._private.ray_constants import ( + DEBUG_AUTOSCALING_ERROR, + DEBUG_AUTOSCALING_STATUS, + env_integer, +) from ray._private.utils import get_or_create_event_loop from ray.autoscaler._private.util import ( LoadMetricsSummary, @@ -44,24 +50,20 @@ routes = dashboard_optional_utils.DashboardHeadRouteTable +# NOTE: Executor in this head is intentionally constrained to just 1 thread by +# default to limit its concurrency, therefore reducing potential for +# GIL contention +RAY_DASHBOARD_NODE_HEAD_TPE_MAX_WORKERS = env_integer( + "RAY_DASHBOARD_NODE_HEAD_TPE_MAX_WORKERS", 1 +) + + def _gcs_node_info_to_dict(message: gcs_pb2.GcsNodeInfo) -> dict: return dashboard_utils.message_to_dict( message, {"nodeId"}, always_print_fields_with_no_presence=True ) -def _map_batch_node_info_to_dict( - messages: Dict[NodeID, gcs_pb2.GcsNodeInfo] -) -> List[dict]: - return [_gcs_node_info_to_dict(message) for message in messages.values()] - - -def _list_gcs_node_info_to_dict( - messages: List[Tuple[bytes, gcs_pb2.GcsNodeInfo]] -) -> List[dict]: - return [_gcs_node_info_to_dict(node_info) for _, node_info in messages] - - def node_stats_to_dict(message): decode_keys = { "actorId", @@ -211,27 +213,38 @@ async def _subscribe_for_node_updates(self) -> AsyncGenerator[dict, None]: # it happens after the subscription. That is, an update between # get-all-node-info and the subscription is not missed. # [1] https://en.wikipedia.org/wiki/Time-of-check_to_time-of-use - all_node_info = await self.get_all_node_info(timeout=None) + all_node_info = await self._get_all_node_info_client(timeout=None) + + def _convert_to_dict(messages: Iterable[gcs_pb2.GcsNodeInfo]) -> List[dict]: + return [_gcs_node_info_to_dict(m) for m in messages] - all_node_dicts = await get_or_create_event_loop().run_in_executor( - self._dashboard_head._thread_pool_executor, - _map_batch_node_info_to_dict, - all_node_info, + all_node_infos = await get_or_create_event_loop().run_in_executor( + self._executor, + _convert_to_dict, + all_node_info.values(), ) - for node in all_node_dicts: + + for node in all_node_infos: yield node while True: try: - published = await subscriber.poll( + node_id_updated_info_tuples = await subscriber.poll( batch_size=node_consts.RAY_DASHBOARD_NODE_SUBSCRIBER_POLL_SIZE ) - updated_dicts = await get_or_create_event_loop().run_in_executor( - self._dashboard_head._thread_pool_executor, - _list_gcs_node_info_to_dict, - published, + + if node_id_updated_info_tuples: + _, updated_infos_proto = zip(*node_id_updated_info_tuples) + else: + updated_infos_proto = [] + + updated_infos = await get_or_create_event_loop().run_in_executor( + self._executor, + _convert_to_dict, + updated_infos_proto, ) - for node in updated_dicts: + + for node in updated_infos: yield node except Exception: logger.exception("Failed handling updated nodes.") diff --git a/python/ray/dashboard/modules/node/tests/test_node.py b/python/ray/dashboard/modules/node/tests/test_node.py index edc703910242e..cb71105046e3d 100644 --- a/python/ray/dashboard/modules/node/tests/test_node.py +++ b/python/ray/dashboard/modules/node/tests/test_node.py @@ -17,6 +17,7 @@ wait_until_server_available, ) from ray.cluster_utils import Cluster +from ray.dashboard.consts import RAY_DASHBOARD_STATS_UPDATING_INTERVAL from ray.dashboard.tests.conftest import * # noqa logger = logging.getLogger(__name__)