Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
eb8924e
[core] add get pid rpc to node manager
tianyi-ge Sep 29, 2025
d28c905
[core] add get pid rpc to node manager
tianyi-ge Sep 29, 2025
5927027
[core] add get pid rpc to node manager
tianyi-ge Sep 29, 2025
f76f633
[core] add cython wrapper for raylet client
tianyi-ge Sep 30, 2025
c3dca66
[core] add cython wrapper for raylet client
tianyi-ge Oct 5, 2025
1ea7081
Merge branch 'master' of github.com:ray-project/ray into raylet-grpc
tianyi-ge Oct 5, 2025
af61506
[core] fix cursor comments
tianyi-ge Oct 5, 2025
204caef
[core] reuse singleton io context helper class to start a threaded ra…
tianyi-ge Oct 8, 2025
2b32615
[core] fix reporter agent unittest
tianyi-ge Oct 9, 2025
c8f0fa4
[core] add reporter agent unittest
tianyi-ge Oct 9, 2025
ac20283
[core] connect to raylet inside client constructor
tianyi-ge Oct 10, 2025
b4e9a1d
[core] filter out system drivers for reporter
tianyi-ge Oct 11, 2025
53e693c
[core] fix mock WorkerPool compilation
tianyi-ge Oct 11, 2025
62b48cf
[core] fix mock WorkerPool compilation
tianyi-ge Oct 12, 2025
be70b12
[core] nit updates
tianyi-ge Oct 14, 2025
f09822a
[core] replace inheritance ThreadedRayletClient with wrapper RayletCl…
tianyi-ge Oct 15, 2025
6a6b742
[core] remove exception-catch-all
tianyi-ge Oct 15, 2025
02656c0
[core] catch RuntimeError and log with exception
tianyi-ge Oct 16, 2025
3ed4295
Merge branch 'master' of github.com:ray-project/ray into raylet-grpc
tianyi-ge Oct 16, 2025
3521a91
[core] update config raylet_rpc_server_reconnect_timeout_max_s
tianyi-ge Oct 16, 2025
a93cb28
Merge branch 'master' of github.com:ray-project/ray into raylet-grpc
tianyi-ge Oct 16, 2025
9940d37
[core] change GetWorkerPIDs to async
tianyi-ge Oct 17, 2025
e3c79a2
[core] update async logic
tianyi-ge Oct 18, 2025
5edc6f0
[core] inject mocked raylet client and add e2e test
tianyi-ge Oct 20, 2025
e281cf4
[core] prettify e2e reporter test
tianyi-ge Oct 21, 2025
3b97fb6
[core] rename sync _compose_stats_payload to _run_in_executor
tianyi-ge Oct 21, 2025
a6e7342
Merge branch 'master' of github.com:ray-project/ray into raylet-grpc
tianyi-ge Oct 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ pyx_library(
"//src/ray/gcs_rpc_client:global_state_accessor_lib",
"//src/ray/protobuf:serialization_cc_proto",
"//src/ray/pubsub:python_gcs_subscriber",
"//src/ray/raylet_rpc_client:raylet_client_with_io_context_lib",
"//src/ray/thirdparty/setproctitle",
"//src/ray/util:memory",
"//src/ray/util:raii",
Expand Down
8 changes: 0 additions & 8 deletions python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,14 +459,6 @@ def env_set_by_user(key):
# Default max_concurrency option in @ray.remote for threaded actors.
DEFAULT_MAX_CONCURRENCY_THREADED = 1

# Prefix for namespaces which are used internally by ray.
# Jobs within these namespaces should be hidden from users
# and should not be considered user activity.
# Please keep this in sync with the definition kRayInternalNamespacePrefix
# in /src/ray/gcs/gcs_server/gcs_job_manager.h.
RAY_INTERNAL_NAMESPACE_PREFIX = "_ray_internal_"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are these refactoring for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#57004 (comment)
@jjyao suggested to expose this c++ constant to python, avoiding duplicate definition and potential inconsistency

RAY_INTERNAL_DASHBOARD_NAMESPACE = f"{RAY_INTERNAL_NAMESPACE_PREFIX}dashboard"

# Ray internal flags. These flags should not be set by users, and we strip them on job
# submission.
# This should be consistent with src/ray/common/ray_internal_flag_def.h
Expand Down
2 changes: 1 addition & 1 deletion python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2625,7 +2625,7 @@ def connect(
# We also want to skip adding script directory when running from dashboard.
code_paths = []
if not interactive_mode and not (
namespace and namespace == ray_constants.RAY_INTERNAL_DASHBOARD_NAMESPACE
namespace and namespace == ray._raylet.RAY_INTERNAL_DASHBOARD_NAMESPACE
):
script_directory = os.path.dirname(os.path.realpath(sys.argv[0]))
# If driver's sys.path doesn't include the script directory
Expand Down
1 change: 1 addition & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ include "includes/libcoreworker.pxi"
include "includes/global_state_accessor.pxi"
include "includes/metric.pxi"
include "includes/setproctitle.pxi"
include "includes/raylet_client.pxi"
include "includes/gcs_subscriber.pxi"

import ray
Expand Down
8 changes: 3 additions & 5 deletions python/ray/dashboard/modules/job/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
get_export_event_logger,
)
from ray._private.runtime_env.packaging import parse_uri
from ray._raylet import GcsClient
from ray._raylet import RAY_INTERNAL_NAMESPACE_PREFIX, GcsClient
from ray.core.generated.export_event_pb2 import ExportEvent
from ray.core.generated.export_submission_job_event_pb2 import (
ExportSubmissionJobEventData,
Expand All @@ -25,9 +25,7 @@
# they're exposed in the snapshot API.
JOB_ID_METADATA_KEY = "job_submission_id"
JOB_NAME_METADATA_KEY = "job_name"
JOB_ACTOR_NAME_TEMPLATE = (
f"{ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX}job_actor_" + "{job_id}"
)
JOB_ACTOR_NAME_TEMPLATE = f"{RAY_INTERNAL_NAMESPACE_PREFIX}job_actor_" + "{job_id}"
# In order to get information about SupervisorActors launched by different jobs,
# they must be set to the same namespace.
SUPERVISOR_ACTOR_RAY_NAMESPACE = "SUPERVISOR_ACTOR_RAY_NAMESPACE"
Expand Down Expand Up @@ -227,7 +225,7 @@ class JobInfoStorageClient:

# Please keep this format in sync with JobDataKey()
# in src/ray/gcs/gcs_server/gcs_job_manager.h.
JOB_DATA_KEY_PREFIX = f"{ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX}job_info_"
JOB_DATA_KEY_PREFIX = f"{RAY_INTERNAL_NAMESPACE_PREFIX}job_info_"
JOB_DATA_KEY = f"{JOB_DATA_KEY_PREFIX}{{job_id}}"

def __init__(
Expand Down
4 changes: 2 additions & 2 deletions python/ray/dashboard/modules/job/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import Any, AsyncIterator, Dict, List, Optional, Tuple, Union

from ray._private import ray_constants
from ray._raylet import GcsClient
from ray._raylet import RAY_INTERNAL_NAMESPACE_PREFIX, GcsClient
from ray.dashboard.modules.job.common import (
JOB_ID_METADATA_KEY,
JobInfoStorageClient,
Expand Down Expand Up @@ -178,7 +178,7 @@ async def get_driver_jobs(
submission_job_drivers = {}
for job_table_entry in sorted_job_infos:
if job_table_entry.config.ray_namespace.startswith(
ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX
RAY_INTERNAL_NAMESPACE_PREFIX
):
# Skip jobs in any _ray_internal_ namespace
continue
Expand Down
34 changes: 34 additions & 0 deletions python/ray/dashboard/modules/node/tests/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,5 +283,39 @@ def _check_workers():
wait_for_condition(_check_workers, timeout=10)


def test_worker_pids_reported(enable_test_module, ray_start_with_dashboard):
assert wait_until_server_available(ray_start_with_dashboard["webui_url"]) is True
webui_url = ray_start_with_dashboard["webui_url"]
webui_url = format_web_url(webui_url)
node_id = ray_start_with_dashboard["node_id"]

@ray.remote(runtime_env={"uv": {"packages": ["requests==2.3.0"]}})
class UvActor:
def get_pid(self):
return os.getpid()

uv_actor = UvActor.remote()
uv_actor_pid = ray.get(uv_actor.get_pid.remote())
driver_pid = os.getpid()

def _check_worker_pids():
try:
response = requests.get(webui_url + f"/nodes/{node_id}")
response.raise_for_status()
dump_info = response.json()
assert dump_info["result"] is True
detail = dump_info["data"]["detail"]
pids = [worker["pid"] for worker in detail["workers"]]
assert len(pids) >= 2 # might include idle worker
assert uv_actor_pid in pids
assert driver_pid in pids
return True
except Exception as ex:
logger.info(ex)
return False

wait_for_condition(_check_worker_pids, timeout=20)


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
78 changes: 48 additions & 30 deletions python/ray/dashboard/modules/reporter/reporter_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
OpenTelemetryMetricRecorder,
)
from ray._private.utils import get_system_memory
from ray._raylet import GCS_PID_KEY, WorkerID
from ray._raylet import GCS_PID_KEY, RayletClient, WorkerID
from ray.core.generated import reporter_pb2, reporter_pb2_grpc
from ray.dashboard import k8s_utils
from ray.dashboard.consts import (
Expand All @@ -66,6 +66,10 @@
from ray.dashboard.modules.reporter.reporter_models import (
StatsPayload,
)
from ray.exceptions import (
GetTimeoutError,
RpcError,
)

import psutil

Expand Down Expand Up @@ -395,9 +399,10 @@ class ReporterAgent(

Attributes:
dashboard_agent: The DashboardAgent object contains global config
raylet_client: The RayletClient object to access raylet server
"""

def __init__(self, dashboard_agent):
def __init__(self, dashboard_agent, raylet_client=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the raylet_client=None only used inside test cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

"""Initialize the reporter object."""
super().__init__(dashboard_agent)

Expand Down Expand Up @@ -486,6 +491,13 @@ def __init__(self, dashboard_agent):
# Create GPU metric provider instance
self._gpu_metric_provider = GpuMetricProvider()

if raylet_client:
self._raylet_client = raylet_client
else:
self._raylet_client = RayletClient(
ip_address=self._ip, port=self._dashboard_agent.node_manager_port
)

async def GetTraceback(self, request, context):
pid = request.pid
native = request.native
Expand Down Expand Up @@ -888,6 +900,14 @@ def _get_disk_io_stats():
stats.write_count,
)

async def _async_get_worker_pids_from_raylet(self) -> List[int]:
try:
# Get worker pids from raylet via gRPC.
return await self._raylet_client.async_get_worker_pids()
except (GetTimeoutError, RpcError):
logger.exception("Failed to get worker pids from raylet")
return []

def _get_agent_proc(self) -> psutil.Process:
# Agent is the current process.
# This method is not necessary, but we have it for mock testing.
Expand All @@ -896,27 +916,23 @@ def _get_agent_proc(self) -> psutil.Process:
def _generate_worker_key(self, proc: psutil.Process) -> Tuple[int, float]:
return (proc.pid, proc.create_time())

def _get_worker_processes(self):
raylet_proc = self._get_raylet_proc()
if raylet_proc is None:
async def _async_get_worker_processes(self):
pids = await self._async_get_worker_pids_from_raylet()
logger.debug(f"Worker PIDs from raylet: {pids}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Asyncio Loop Conflict in Worker Process Retrieval

The _get_worker_processes method uses asyncio.run() to execute _get_worker_pids_from_raylet(). Since the ReporterAgent runs within an existing asyncio event loop, calling asyncio.run() from it raises a RuntimeError and crashes the application.

Fix in Cursor Fix in Web

if not pids:
return []
else:
workers = {}
if sys.platform == "win32":
# windows, get the child process not the runner
for child in raylet_proc.children():
if child.children():
child = child.children()[0]
workers[self._generate_worker_key(child)] = child
else:
workers = {
self._generate_worker_key(proc): proc
for proc in raylet_proc.children()
}
return workers

def _get_workers(self, gpus: Optional[List[GpuUtilizationInfo]] = None):
workers = self._get_worker_processes()
workers = {}
for pid in pids:
try:
proc = psutil.Process(pid)
workers[self._generate_worker_key(proc)] = proc
except (psutil.NoSuchProcess, psutil.AccessDenied):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.error("...")

logger.error(f"Failed to access worker process {pid}")
continue
return workers

async def _async_get_workers(self, gpus: Optional[List[GpuUtilizationInfo]] = None):
workers = await self._async_get_worker_processes()
if not workers:
return []
else:
Expand All @@ -936,9 +952,6 @@ def _get_workers(self, gpus: Optional[List[GpuUtilizationInfo]] = None):
for k in keys_to_pop:
self._workers.pop(k)

# Remove the current process (reporter agent), which is also a child of
# the Raylet.
self._workers.pop(self._generate_worker_key(self._get_agent_proc()))
# Build process ID -> GPU info mapping for faster lookups
gpu_pid_mapping = defaultdict(list)
if gpus is not None:
Expand Down Expand Up @@ -1058,7 +1071,7 @@ def _get_shm_usage(self):
return None
return mem.shared

def _collect_stats(self):
async def _async_collect_stats(self):
now = dashboard_utils.to_posix_time(datetime.datetime.utcnow())
network_stats = self._get_network_stats()
self._network_stats_hist.append((now, network_stats))
Expand All @@ -1079,7 +1092,7 @@ def _collect_stats(self):
"mem": self._get_mem_usage(),
# Unit is in bytes. None if
"shm": self._get_shm_usage(),
"workers": self._get_workers(gpus),
"workers": await self._async_get_workers(gpus),
"raylet": raylet,
"agent": self._get_agent(),
"bootTime": self._get_boot_time(),
Expand Down Expand Up @@ -1726,7 +1739,7 @@ async def _run_loop(self):
# executor (TPE) to avoid blocking the Agent's event-loop
json_payload = await loop.run_in_executor(
self._executor,
self._compose_stats_payload,
self._run_in_executor,
autoscaler_status_json_bytes,
)

Expand All @@ -1739,10 +1752,15 @@ async def _run_loop(self):

await asyncio.sleep(reporter_consts.REPORTER_UPDATE_INTERVAL_MS / 1000)

def _compose_stats_payload(
def _run_in_executor(self, cluster_autoscaling_stats_json: Optional[bytes]) -> str:
return asyncio.run(
self._async_compose_stats_payload(cluster_autoscaling_stats_json)
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Async Loop Conflict in Reporting Method

The _compose_stats_payload method calls asyncio.run() from within the async reporting loop. This raises a RuntimeError because asyncio.run() cannot be invoked when an event loop is already running.

Fix in Cursor Fix in Web


Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Async Method Causes Event Loop Conflicts

The _compose_stats_payload method uses asyncio.run() to execute an async function. This causes a RuntimeError if called from an existing event loop, which is likely for the ReporterAgent within the dashboard. This can lead to application crashes or failures in stats collection.

Fix in Cursor Fix in Web

async def _async_compose_stats_payload(
self, cluster_autoscaling_stats_json: Optional[bytes]
) -> str:
stats = self._collect_stats()
stats = await self._async_collect_stats()

# Report stats only when metrics collection is enabled.
if not self._metrics_collection_disabled:
Expand Down
Loading