Skip to content

Commit

Permalink
[Dashboard] Optimizing performance of Ray Dashboard (ray-project#47617)
Browse files Browse the repository at this point in the history
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
  • Loading branch information
alexeykudinkin authored and ujjawal-khare committed Oct 15, 2024
1 parent 9e28fb7 commit 0426ee4
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 67 deletions.
1 change: 1 addition & 0 deletions python/ray/dashboard/datacenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def _extract_workers_for_node(cls, node_physical_stats, node_stats):
for worker in node_physical_stats.get("workers", []):
worker = dict(worker)
pid = worker["pid"]

core_worker_stats = pid_to_worker_stats.get(pid)
# Empty list means core worker stats is not available.
worker["coreWorkerStats"] = [core_worker_stats] if core_worker_stats else []
Expand Down
49 changes: 8 additions & 41 deletions python/ray/dashboard/modules/actor/actor_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,16 @@ async def _update_actors(self):
Processes actor info. First gets all actors from GCS, then subscribes to
actor updates. For each actor update, updates DataSource.node_actors and
DataSource.actors.
"""

To prevent Time-of-check to time-of-use issue [1], the get-all-actor-info
happens after the subscription. That is, an update between get-all-actor-info
and the subscription is not missed.
# To prevent Time-of-check to time-of-use issue [1], the get-all-actor-info
# happens after the subscription. That is, an update between get-all-actor-info
# and the subscription is not missed.
#
# [1] https://en.wikipedia.org/wiki/Time-of-check_to_time-of-use
"""
# Receive actors from channel.
gcs_addr = self._dashboard_head.gcs_address
subscriber = GcsAioActorSubscriber(address=gcs_addr)
await subscriber.subscribe()
actor_channel_subscriber = GcsAioActorSubscriber(address=gcs_addr)
await actor_channel_subscriber.subscribe()

# Get all actor info.
while True:
Expand Down Expand Up @@ -222,40 +222,7 @@ async def _update_actors(self):
actor_consts.RETRY_GET_ALL_ACTOR_INFO_INTERVAL_SECONDS
)

state_keys = (
"state",
"address",
"numRestarts",
"timestamp",
"pid",
"exitDetail",
"startTime",
"endTime",
"reprName",
)

def process_actor_data_from_pubsub(actor_id, actor_table_data):
actor_table_data = actor_table_data_to_dict(actor_table_data)
# If actor is not new registered but updated, we only update
# states related fields.
if actor_table_data["state"] != "DEPENDENCIES_UNREADY":
actors = DataSource.actors[actor_id]
for k in state_keys:
if k in actor_table_data:
actors[k] = actor_table_data[k]
actor_table_data = actors
actor_id = actor_table_data["actorId"]
node_id = actor_table_data["address"]["rayletId"]
if actor_table_data["state"] == "DEAD":
self.dead_actors_queue.append(actor_id)
# Update actors.
DataSource.actors[actor_id] = actor_table_data
# Update node actors (only when node_id is not Nil).
if node_id != actor_consts.NIL_NODE_ID:
node_actors = DataSource.node_actors.get(node_id, {})
node_actors[actor_id] = actor_table_data
DataSource.node_actors[node_id] = node_actors

# Pull incremental updates from the GCS channel
while True:
try:
updated_actor_table_entries = await self._poll_updated_actor_table_data(
Expand Down
65 changes: 39 additions & 26 deletions python/ray/dashboard/modules/node/node_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import os
import time
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from itertools import chain
from typing import AsyncGenerator, Dict, List, Tuple
from typing import AsyncGenerator, Dict, Iterable, List, Optional

import aiohttp.web
import grpc
Expand All @@ -17,8 +18,13 @@
import ray.dashboard.utils as dashboard_utils
from ray import NodeID
from ray._private import ray_constants
from ray._private.collections_utils import split
from ray._private.gcs_pubsub import GcsAioNodeInfoSubscriber
from ray._private.ray_constants import DEBUG_AUTOSCALING_ERROR, DEBUG_AUTOSCALING_STATUS
from ray._private.ray_constants import (
DEBUG_AUTOSCALING_ERROR,
DEBUG_AUTOSCALING_STATUS,
env_integer,
)
from ray._private.utils import get_or_create_event_loop
from ray.autoscaler._private.util import (
LoadMetricsSummary,
Expand All @@ -44,24 +50,20 @@
routes = dashboard_optional_utils.DashboardHeadRouteTable


# NOTE: Executor in this head is intentionally constrained to just 1 thread by
# default to limit its concurrency, therefore reducing potential for
# GIL contention
RAY_DASHBOARD_NODE_HEAD_TPE_MAX_WORKERS = env_integer(
"RAY_DASHBOARD_NODE_HEAD_TPE_MAX_WORKERS", 1
)


def _gcs_node_info_to_dict(message: gcs_pb2.GcsNodeInfo) -> dict:
return dashboard_utils.message_to_dict(
message, {"nodeId"}, always_print_fields_with_no_presence=True
)


def _map_batch_node_info_to_dict(
messages: Dict[NodeID, gcs_pb2.GcsNodeInfo]
) -> List[dict]:
return [_gcs_node_info_to_dict(message) for message in messages.values()]


def _list_gcs_node_info_to_dict(
messages: List[Tuple[bytes, gcs_pb2.GcsNodeInfo]]
) -> List[dict]:
return [_gcs_node_info_to_dict(node_info) for _, node_info in messages]


def node_stats_to_dict(message):
decode_keys = {
"actorId",
Expand Down Expand Up @@ -211,27 +213,38 @@ async def _subscribe_for_node_updates(self) -> AsyncGenerator[dict, None]:
# it happens after the subscription. That is, an update between
# get-all-node-info and the subscription is not missed.
# [1] https://en.wikipedia.org/wiki/Time-of-check_to_time-of-use
all_node_info = await self.get_all_node_info(timeout=None)
all_node_info = await self._get_all_node_info_client(timeout=None)

def _convert_to_dict(messages: Iterable[gcs_pb2.GcsNodeInfo]) -> List[dict]:
return [_gcs_node_info_to_dict(m) for m in messages]

all_node_dicts = await get_or_create_event_loop().run_in_executor(
self._dashboard_head._thread_pool_executor,
_map_batch_node_info_to_dict,
all_node_info,
all_node_infos = await get_or_create_event_loop().run_in_executor(
self._executor,
_convert_to_dict,
all_node_info.values(),
)
for node in all_node_dicts:

for node in all_node_infos:
yield node

while True:
try:
published = await subscriber.poll(
node_id_updated_info_tuples = await subscriber.poll(
batch_size=node_consts.RAY_DASHBOARD_NODE_SUBSCRIBER_POLL_SIZE
)
updated_dicts = await get_or_create_event_loop().run_in_executor(
self._dashboard_head._thread_pool_executor,
_list_gcs_node_info_to_dict,
published,

if node_id_updated_info_tuples:
_, updated_infos_proto = zip(*node_id_updated_info_tuples)
else:
updated_infos_proto = []

updated_infos = await get_or_create_event_loop().run_in_executor(
self._executor,
_convert_to_dict,
updated_infos_proto,
)
for node in updated_dicts:

for node in updated_infos:
yield node
except Exception:
logger.exception("Failed handling updated nodes.")
Expand Down
1 change: 1 addition & 0 deletions python/ray/dashboard/modules/node/tests/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
wait_until_server_available,
)
from ray.cluster_utils import Cluster
from ray.dashboard.consts import RAY_DASHBOARD_STATS_UPDATING_INTERVAL
from ray.dashboard.tests.conftest import * # noqa

logger = logging.getLogger(__name__)
Expand Down

0 comments on commit 0426ee4

Please sign in to comment.