Skip to content

Commit 0582425

Browse files
tianyi-geelliot-barn
authored andcommitted
[core] allow reporter agent to get pid via rpc to raylet (#57004)
1. currently, reporter agent is spawned by raylet process. It's assumed that all core workers are direct children of raylet, but it's not the case with new features (uv, image_url). reporter agent need another way to find all core workers. https://github.com/ray-project/ray/blob/10eacfd6ddf3b84827d016e37294bc5f2577ad3f/python/ray/dashboard/modules/reporter/reporter_agent.py#L911 2. driver is not spawned by raylet, thus is never monitored implementation: 1. add an grpc endpoint in raylet process (node manager), and allow reporter agent to connect 2. reporter agent fetches worker lists via grpc reply, including driver. it creates a raylet client with a dedicated thread Closes #56739 --------- Signed-off-by: tianyi-ge <tianyig@outlook.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
1 parent 13e0416 commit 0582425

30 files changed

+437
-79
lines changed

BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ pyx_library(
242242
"//src/ray/gcs_rpc_client:global_state_accessor_lib",
243243
"//src/ray/protobuf:serialization_cc_proto",
244244
"//src/ray/pubsub:python_gcs_subscriber",
245+
"//src/ray/raylet_rpc_client:raylet_client_with_io_context_lib",
245246
"//src/ray/thirdparty/setproctitle",
246247
"//src/ray/util:memory",
247248
"//src/ray/util:raii",

python/ray/_private/ray_constants.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -459,14 +459,6 @@ def env_set_by_user(key):
459459
# Default max_concurrency option in @ray.remote for threaded actors.
460460
DEFAULT_MAX_CONCURRENCY_THREADED = 1
461461

462-
# Prefix for namespaces which are used internally by ray.
463-
# Jobs within these namespaces should be hidden from users
464-
# and should not be considered user activity.
465-
# Please keep this in sync with the definition kRayInternalNamespacePrefix
466-
# in /src/ray/gcs/gcs_server/gcs_job_manager.h.
467-
RAY_INTERNAL_NAMESPACE_PREFIX = "_ray_internal_"
468-
RAY_INTERNAL_DASHBOARD_NAMESPACE = f"{RAY_INTERNAL_NAMESPACE_PREFIX}dashboard"
469-
470462
# Ray internal flags. These flags should not be set by users, and we strip them on job
471463
# submission.
472464
# This should be consistent with src/ray/common/ray_internal_flag_def.h

python/ray/_private/worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2625,7 +2625,7 @@ def connect(
26252625
# We also want to skip adding script directory when running from dashboard.
26262626
code_paths = []
26272627
if not interactive_mode and not (
2628-
namespace and namespace == ray_constants.RAY_INTERNAL_DASHBOARD_NAMESPACE
2628+
namespace and namespace == ray._raylet.RAY_INTERNAL_DASHBOARD_NAMESPACE
26292629
):
26302630
script_directory = os.path.dirname(os.path.realpath(sys.argv[0]))
26312631
# If driver's sys.path doesn't include the script directory

python/ray/_raylet.pyx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ include "includes/libcoreworker.pxi"
199199
include "includes/global_state_accessor.pxi"
200200
include "includes/metric.pxi"
201201
include "includes/setproctitle.pxi"
202+
include "includes/raylet_client.pxi"
202203
include "includes/gcs_subscriber.pxi"
203204

204205
import ray

python/ray/dashboard/modules/job/common.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
get_export_event_logger,
1515
)
1616
from ray._private.runtime_env.packaging import parse_uri
17-
from ray._raylet import GcsClient
17+
from ray._raylet import RAY_INTERNAL_NAMESPACE_PREFIX, GcsClient
1818
from ray.core.generated.export_event_pb2 import ExportEvent
1919
from ray.core.generated.export_submission_job_event_pb2 import (
2020
ExportSubmissionJobEventData,
@@ -25,9 +25,7 @@
2525
# they're exposed in the snapshot API.
2626
JOB_ID_METADATA_KEY = "job_submission_id"
2727
JOB_NAME_METADATA_KEY = "job_name"
28-
JOB_ACTOR_NAME_TEMPLATE = (
29-
f"{ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX}job_actor_" + "{job_id}"
30-
)
28+
JOB_ACTOR_NAME_TEMPLATE = f"{RAY_INTERNAL_NAMESPACE_PREFIX}job_actor_" + "{job_id}"
3129
# In order to get information about SupervisorActors launched by different jobs,
3230
# they must be set to the same namespace.
3331
SUPERVISOR_ACTOR_RAY_NAMESPACE = "SUPERVISOR_ACTOR_RAY_NAMESPACE"
@@ -227,7 +225,7 @@ class JobInfoStorageClient:
227225

228226
# Please keep this format in sync with JobDataKey()
229227
# in src/ray/gcs/gcs_server/gcs_job_manager.h.
230-
JOB_DATA_KEY_PREFIX = f"{ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX}job_info_"
228+
JOB_DATA_KEY_PREFIX = f"{RAY_INTERNAL_NAMESPACE_PREFIX}job_info_"
231229
JOB_DATA_KEY = f"{JOB_DATA_KEY_PREFIX}{{job_id}}"
232230

233231
def __init__(

python/ray/dashboard/modules/job/utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from typing import Any, AsyncIterator, Dict, List, Optional, Tuple, Union
99

1010
from ray._private import ray_constants
11-
from ray._raylet import GcsClient
11+
from ray._raylet import RAY_INTERNAL_NAMESPACE_PREFIX, GcsClient
1212
from ray.dashboard.modules.job.common import (
1313
JOB_ID_METADATA_KEY,
1414
JobInfoStorageClient,
@@ -178,7 +178,7 @@ async def get_driver_jobs(
178178
submission_job_drivers = {}
179179
for job_table_entry in sorted_job_infos:
180180
if job_table_entry.config.ray_namespace.startswith(
181-
ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX
181+
RAY_INTERNAL_NAMESPACE_PREFIX
182182
):
183183
# Skip jobs in any _ray_internal_ namespace
184184
continue

python/ray/dashboard/modules/node/tests/test_node.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,5 +283,39 @@ def _check_workers():
283283
wait_for_condition(_check_workers, timeout=10)
284284

285285

286+
def test_worker_pids_reported(enable_test_module, ray_start_with_dashboard):
287+
assert wait_until_server_available(ray_start_with_dashboard["webui_url"]) is True
288+
webui_url = ray_start_with_dashboard["webui_url"]
289+
webui_url = format_web_url(webui_url)
290+
node_id = ray_start_with_dashboard["node_id"]
291+
292+
@ray.remote(runtime_env={"uv": {"packages": ["requests==2.3.0"]}})
293+
class UvActor:
294+
def get_pid(self):
295+
return os.getpid()
296+
297+
uv_actor = UvActor.remote()
298+
uv_actor_pid = ray.get(uv_actor.get_pid.remote())
299+
driver_pid = os.getpid()
300+
301+
def _check_worker_pids():
302+
try:
303+
response = requests.get(webui_url + f"/nodes/{node_id}")
304+
response.raise_for_status()
305+
dump_info = response.json()
306+
assert dump_info["result"] is True
307+
detail = dump_info["data"]["detail"]
308+
pids = [worker["pid"] for worker in detail["workers"]]
309+
assert len(pids) >= 2 # might include idle worker
310+
assert uv_actor_pid in pids
311+
assert driver_pid in pids
312+
return True
313+
except Exception as ex:
314+
logger.info(ex)
315+
return False
316+
317+
wait_for_condition(_check_worker_pids, timeout=20)
318+
319+
286320
if __name__ == "__main__":
287321
sys.exit(pytest.main(["-v", __file__]))

python/ray/dashboard/modules/reporter/reporter_agent.py

Lines changed: 48 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
OpenTelemetryMetricRecorder,
4242
)
4343
from ray._private.utils import get_system_memory
44-
from ray._raylet import GCS_PID_KEY, WorkerID
44+
from ray._raylet import GCS_PID_KEY, RayletClient, WorkerID
4545
from ray.core.generated import reporter_pb2, reporter_pb2_grpc
4646
from ray.dashboard import k8s_utils
4747
from ray.dashboard.consts import (
@@ -66,6 +66,10 @@
6666
from ray.dashboard.modules.reporter.reporter_models import (
6767
StatsPayload,
6868
)
69+
from ray.exceptions import (
70+
GetTimeoutError,
71+
RpcError,
72+
)
6973

7074
import psutil
7175

@@ -395,9 +399,10 @@ class ReporterAgent(
395399
396400
Attributes:
397401
dashboard_agent: The DashboardAgent object contains global config
402+
raylet_client: The RayletClient object to access raylet server
398403
"""
399404

400-
def __init__(self, dashboard_agent):
405+
def __init__(self, dashboard_agent, raylet_client=None):
401406
"""Initialize the reporter object."""
402407
super().__init__(dashboard_agent)
403408

@@ -486,6 +491,13 @@ def __init__(self, dashboard_agent):
486491
# Create GPU metric provider instance
487492
self._gpu_metric_provider = GpuMetricProvider()
488493

494+
if raylet_client:
495+
self._raylet_client = raylet_client
496+
else:
497+
self._raylet_client = RayletClient(
498+
ip_address=self._ip, port=self._dashboard_agent.node_manager_port
499+
)
500+
489501
async def GetTraceback(self, request, context):
490502
pid = request.pid
491503
native = request.native
@@ -888,6 +900,14 @@ def _get_disk_io_stats():
888900
stats.write_count,
889901
)
890902

903+
async def _async_get_worker_pids_from_raylet(self) -> List[int]:
904+
try:
905+
# Get worker pids from raylet via gRPC.
906+
return await self._raylet_client.async_get_worker_pids()
907+
except (GetTimeoutError, RpcError):
908+
logger.exception("Failed to get worker pids from raylet")
909+
return []
910+
891911
def _get_agent_proc(self) -> psutil.Process:
892912
# Agent is the current process.
893913
# This method is not necessary, but we have it for mock testing.
@@ -896,27 +916,23 @@ def _get_agent_proc(self) -> psutil.Process:
896916
def _generate_worker_key(self, proc: psutil.Process) -> Tuple[int, float]:
897917
return (proc.pid, proc.create_time())
898918

899-
def _get_worker_processes(self):
900-
raylet_proc = self._get_raylet_proc()
901-
if raylet_proc is None:
919+
async def _async_get_worker_processes(self):
920+
pids = await self._async_get_worker_pids_from_raylet()
921+
logger.debug(f"Worker PIDs from raylet: {pids}")
922+
if not pids:
902923
return []
903-
else:
904-
workers = {}
905-
if sys.platform == "win32":
906-
# windows, get the child process not the runner
907-
for child in raylet_proc.children():
908-
if child.children():
909-
child = child.children()[0]
910-
workers[self._generate_worker_key(child)] = child
911-
else:
912-
workers = {
913-
self._generate_worker_key(proc): proc
914-
for proc in raylet_proc.children()
915-
}
916-
return workers
917-
918-
def _get_workers(self, gpus: Optional[List[GpuUtilizationInfo]] = None):
919-
workers = self._get_worker_processes()
924+
workers = {}
925+
for pid in pids:
926+
try:
927+
proc = psutil.Process(pid)
928+
workers[self._generate_worker_key(proc)] = proc
929+
except (psutil.NoSuchProcess, psutil.AccessDenied):
930+
logger.error(f"Failed to access worker process {pid}")
931+
continue
932+
return workers
933+
934+
async def _async_get_workers(self, gpus: Optional[List[GpuUtilizationInfo]] = None):
935+
workers = await self._async_get_worker_processes()
920936
if not workers:
921937
return []
922938
else:
@@ -936,9 +952,6 @@ def _get_workers(self, gpus: Optional[List[GpuUtilizationInfo]] = None):
936952
for k in keys_to_pop:
937953
self._workers.pop(k)
938954

939-
# Remove the current process (reporter agent), which is also a child of
940-
# the Raylet.
941-
self._workers.pop(self._generate_worker_key(self._get_agent_proc()))
942955
# Build process ID -> GPU info mapping for faster lookups
943956
gpu_pid_mapping = defaultdict(list)
944957
if gpus is not None:
@@ -1058,7 +1071,7 @@ def _get_shm_usage(self):
10581071
return None
10591072
return mem.shared
10601073

1061-
def _collect_stats(self):
1074+
async def _async_collect_stats(self):
10621075
now = dashboard_utils.to_posix_time(datetime.datetime.utcnow())
10631076
network_stats = self._get_network_stats()
10641077
self._network_stats_hist.append((now, network_stats))
@@ -1079,7 +1092,7 @@ def _collect_stats(self):
10791092
"mem": self._get_mem_usage(),
10801093
# Unit is in bytes. None if
10811094
"shm": self._get_shm_usage(),
1082-
"workers": self._get_workers(gpus),
1095+
"workers": await self._async_get_workers(gpus),
10831096
"raylet": raylet,
10841097
"agent": self._get_agent(),
10851098
"bootTime": self._get_boot_time(),
@@ -1726,7 +1739,7 @@ async def _run_loop(self):
17261739
# executor (TPE) to avoid blocking the Agent's event-loop
17271740
json_payload = await loop.run_in_executor(
17281741
self._executor,
1729-
self._compose_stats_payload,
1742+
self._run_in_executor,
17301743
autoscaler_status_json_bytes,
17311744
)
17321745

@@ -1739,10 +1752,15 @@ async def _run_loop(self):
17391752

17401753
await asyncio.sleep(reporter_consts.REPORTER_UPDATE_INTERVAL_MS / 1000)
17411754

1742-
def _compose_stats_payload(
1755+
def _run_in_executor(self, cluster_autoscaling_stats_json: Optional[bytes]) -> str:
1756+
return asyncio.run(
1757+
self._async_compose_stats_payload(cluster_autoscaling_stats_json)
1758+
)
1759+
1760+
async def _async_compose_stats_payload(
17431761
self, cluster_autoscaling_stats_json: Optional[bytes]
17441762
) -> str:
1745-
stats = self._collect_stats()
1763+
stats = await self._async_collect_stats()
17461764

17471765
# Report stats only when metrics collection is enabled.
17481766
if not self._metrics_collection_disabled:

0 commit comments

Comments
 (0)