From eb8924e265e13cbeb5b00a72765652f78f4d99c4 Mon Sep 17 00:00:00 2001 From: tianyi-ge Date: Mon, 29 Sep 2025 23:29:50 +0800 Subject: [PATCH 01/23] [core] add get pid rpc to node manager Signed-off-by: tianyi-ge --- python/ray/dashboard/consts.py | 2 + .../modules/reporter/reporter_agent.py | 41 ++++++++++++++++++- src/ray/protobuf/node_manager.proto | 11 +++++ src/ray/raylet/node_manager.cc | 18 ++++++++ .../rpc/node_manager/node_manager_server.h | 7 +++- 5 files changed, 76 insertions(+), 3 deletions(-) diff --git a/python/ray/dashboard/consts.py b/python/ray/dashboard/consts.py index 30505878cb80..4671632eb034 100644 --- a/python/ray/dashboard/consts.py +++ b/python/ray/dashboard/consts.py @@ -102,3 +102,5 @@ SUBPROCESS_MODULE_WAIT_READY_TIMEOUT = env_float( "RAY_DASHBOARD_SUBPROCESS_MODULE_WAIT_READY_TIMEOUT", 30.0 ) + +NODE_MANAGER_RPC_TIMEOUT_SECONDS = 1 diff --git a/python/ray/dashboard/modules/reporter/reporter_agent.py b/python/ray/dashboard/modules/reporter/reporter_agent.py index 663a897794e1..97492c50e5c1 100644 --- a/python/ray/dashboard/modules/reporter/reporter_agent.py +++ b/python/ray/dashboard/modules/reporter/reporter_agent.py @@ -25,7 +25,7 @@ import ray._private.prometheus_exporter as prometheus_exporter import ray.dashboard.modules.reporter.reporter_consts as reporter_consts import ray.dashboard.utils as dashboard_utils -from ray._common.network_utils import parse_address +from ray._common.network_utils import build_address, parse_address from ray._common.utils import ( get_or_create_event_loop, get_user_temp_dir, @@ -34,6 +34,7 @@ from ray._private.metrics_agent import Gauge, MetricsAgent, Record from ray._private.ray_constants import ( DEBUG_AUTOSCALING_STATUS, + GLOBAL_GRPC_OPTIONS, RAY_ENABLE_OPEN_TELEMETRY, env_integer, ) @@ -42,7 +43,12 @@ ) from ray._private.utils import get_system_memory from ray._raylet import GCS_PID_KEY, WorkerID -from ray.core.generated import reporter_pb2, reporter_pb2_grpc +from ray.core.generated import ( + node_manager_pb2, + node_manager_pb2_grpc, + reporter_pb2, + reporter_pb2_grpc, +) from ray.dashboard import k8s_utils from ray.dashboard.consts import ( CLUSTER_TAG_KEYS, @@ -50,6 +56,7 @@ COMPONENT_METRICS_TAG_KEYS, GCS_RPC_TIMEOUT_SECONDS, GPU_TAG_KEYS, + NODE_MANAGER_RPC_TIMEOUT_SECONDS, NODE_TAG_KEYS, TPU_TAG_KEYS, ) @@ -483,6 +490,10 @@ def __init__(self, dashboard_agent): # Create GPU metric provider instance self._gpu_metric_provider = GpuMetricProvider() + self._node_manager_address = build_address( + self._ip, self._dashboard_agent.node_manager_port + ) + async def GetTraceback(self, request, context): pid = request.pid native = request.native @@ -885,6 +896,20 @@ def _get_disk_io_stats(): stats.write_count, ) + def _get_worker_pids_from_raylet(self): + channel = ray._private.utils.init_grpc_channel( + self._node_manager_address, GLOBAL_GRPC_OPTIONS, asynchronous=True + ) + timeout = NODE_MANAGER_RPC_TIMEOUT_SECONDS + stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel) + try: + reply = await stub.GetDriverAndWorkers( + node_manager_pb2.GetDriverAndWorkersRequest(), timeout=timeout + ) + return reply.pids + except Exception: + return None + def _get_agent_proc(self) -> psutil.Process: # Agent is the current process. # This method is not necessary, but we have it for mock testing. @@ -894,6 +919,18 @@ def _generate_worker_key(self, proc: psutil.Process) -> Tuple[int, float]: return (proc.pid, proc.create_time()) def _get_worker_processes(self): + pids = self._get_worker_pids_from_raylet() + if pids is not None: + workers = {} + for pid in pids: + try: + proc = psutil.Process(pid) + workers[self._generate_worker_key(proc)] = proc + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + return workers + + logger.debug("fallback to get worker processes from raylet children") raylet_proc = self._get_raylet_proc() if raylet_proc is None: return [] diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index b8e2f9f1c0a9..5a63a3cc123d 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -399,6 +399,13 @@ message IsLocalWorkerDeadReply { bool is_dead = 1; } +message GetDriverAndWorkerPidsRequest {} + +message GetDriverAndWorkerPidsReply { + // PIDs of all drivers and workers managed by the local raylet. + repeated uint32 pids = 1; +} + // Service for inter-node-manager communication. service NodeManagerService { // Handle the case when GCS restarted. @@ -513,4 +520,8 @@ service NodeManagerService { // Failure: Is currently only used when grpc channel is unavailable for retryable core // worker clients. The unavailable callback will eventually be retried so if this fails. rpc IsLocalWorkerDead(IsLocalWorkerDeadRequest) returns (IsLocalWorkerDeadReply); + // Get the worker managed by local raylet. + // Failure: Sends to local raylet, so should never fail. + rpc GetDriverAndWorkerPids(GetDriverAndWorkerPidsRequest) + returns (GetDriverAndWorkerPidsReply); } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index acd4bbea357a..ec6a588009ab 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2723,6 +2723,24 @@ void NodeManager::TriggerGlobalGC() { should_local_gc_ = true; } +void NodeManager::HandleGetDriverAndWorkerPids( + rpc::GetDriverAndWorkerPidsRequest request, + rpc::GetDriverAndWorkerPidsReply *reply, + rpc::SendReplyCallback send_reply_callback) { + auto all_workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_worker */ true); + for (const auto &driver : + worker_pool_.GetAllRegisteredDrivers(/* filter_dead_driver */ true)) { + all_workers.push_back(driver); + } + for (const auto &worker : all_workers) { + if (worker->IsDead()) { + continue; + } + reply->add_pids(worker->GetProcess()->GetId()); + } + send_reply_callback(Status::OK(), nullptr, nullptr); +} + void NodeManager::Stop() { // This never fails. RAY_CHECK_OK(store_client_.Disconnect()); diff --git a/src/ray/rpc/node_manager/node_manager_server.h b/src/ray/rpc/node_manager/node_manager_server.h index eba08ba9b0af..566194765545 100644 --- a/src/ray/rpc/node_manager/node_manager_server.h +++ b/src/ray/rpc/node_manager/node_manager_server.h @@ -58,7 +58,8 @@ class ServerCallFactory; RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(GetObjectsInfo) \ RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(GetWorkerFailureCause) \ RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(RegisterMutableObject) \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(PushMutableObject) + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(PushMutableObject) \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(GetDriverAndWorkerPids) /// Interface of the `NodeManagerService`, see `src/ray/protobuf/node_manager.proto`. class NodeManagerServiceHandler { @@ -182,6 +183,10 @@ class NodeManagerServiceHandler { virtual void HandlePushMutableObject(PushMutableObjectRequest request, PushMutableObjectReply *reply, SendReplyCallback send_reply_callback) = 0; + + virtual void HandleGetDriverAndWorkerPids(GetDriverAndWorkerPidsRequest request, + GetDriverAndWorkerPidsReply *reply, + SendReplyCallback send_reply_callback) = 0; }; /// The `GrpcService` for `NodeManagerService`. From d28c9050122379a5f12ca5e545c8d073db3af01f Mon Sep 17 00:00:00 2001 From: tianyi-ge Date: Mon, 29 Sep 2025 23:34:08 +0800 Subject: [PATCH 02/23] [core] add get pid rpc to node manager Signed-off-by: tianyi-ge --- src/ray/raylet/node_manager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index ec6a588009ab..61f70915b180 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2736,7 +2736,7 @@ void NodeManager::HandleGetDriverAndWorkerPids( if (worker->IsDead()) { continue; } - reply->add_pids(worker->GetProcess()->GetId()); + reply->add_pids(worker->GetProcess().GetId()); } send_reply_callback(Status::OK(), nullptr, nullptr); } From 5927027ff7f96c7656f2a6eddb7cfeba33748b66 Mon Sep 17 00:00:00 2001 From: tianyi-ge Date: Mon, 29 Sep 2025 23:51:13 +0800 Subject: [PATCH 03/23] [core] add get pid rpc to node manager Signed-off-by: tianyi-ge --- .../ray/dashboard/modules/reporter/reporter_agent.py | 11 ++++++----- src/ray/raylet/node_manager.cc | 10 +++------- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/python/ray/dashboard/modules/reporter/reporter_agent.py b/python/ray/dashboard/modules/reporter/reporter_agent.py index 97492c50e5c1..6f3920bc967e 100644 --- a/python/ray/dashboard/modules/reporter/reporter_agent.py +++ b/python/ray/dashboard/modules/reporter/reporter_agent.py @@ -896,18 +896,19 @@ def _get_disk_io_stats(): stats.write_count, ) - def _get_worker_pids_from_raylet(self): + async def _get_worker_pids_from_raylet(self): channel = ray._private.utils.init_grpc_channel( self._node_manager_address, GLOBAL_GRPC_OPTIONS, asynchronous=True ) timeout = NODE_MANAGER_RPC_TIMEOUT_SECONDS stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel) try: - reply = await stub.GetDriverAndWorkers( - node_manager_pb2.GetDriverAndWorkersRequest(), timeout=timeout + reply = await stub.GetDriverAndWorkerPids( + node_manager_pb2.GetDriverAndWorkerPidsRequest(), timeout=timeout ) return reply.pids - except Exception: + except Exception as e: + logger.debug(f"Failed to get worker pids from raylet via gRPC: {e}") return None def _get_agent_proc(self) -> psutil.Process: @@ -919,7 +920,7 @@ def _generate_worker_key(self, proc: psutil.Process) -> Tuple[int, float]: return (proc.pid, proc.create_time()) def _get_worker_processes(self): - pids = self._get_worker_pids_from_raylet() + pids = asyncio.run(self._get_worker_pids_from_raylet()) if pids is not None: workers = {} for pid in pids: diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 61f70915b180..63e6991f6927 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2728,14 +2728,10 @@ void NodeManager::HandleGetDriverAndWorkerPids( rpc::GetDriverAndWorkerPidsReply *reply, rpc::SendReplyCallback send_reply_callback) { auto all_workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_worker */ true); - for (const auto &driver : - worker_pool_.GetAllRegisteredDrivers(/* filter_dead_driver */ true)) { - all_workers.push_back(driver); - } + const auto &drivers = + worker_pool_.GetAllRegisteredDrivers(/* filter_dead_driver */ true); + all_workers.insert(all_workers.end(), drivers.begin(), drivers.end()); for (const auto &worker : all_workers) { - if (worker->IsDead()) { - continue; - } reply->add_pids(worker->GetProcess().GetId()); } send_reply_callback(Status::OK(), nullptr, nullptr); From f76f6336a0f94314b09860f4fc043eb439e421ce Mon Sep 17 00:00:00 2001 From: tianyi-ge Date: Wed, 1 Oct 2025 00:11:57 +0800 Subject: [PATCH 04/23] [core] add cython wrapper for raylet client Signed-off-by: tianyi-ge --- python/ray/_raylet.pyx | 1 + python/ray/dashboard/consts.py | 2 +- .../modules/reporter/reporter_agent.py | 49 ++++-------------- python/ray/includes/common.pxd | 5 ++ python/ray/includes/raylet_client.pxi | 27 ++++++++++ src/ray/protobuf/node_manager.proto | 14 ++--- src/ray/raylet/node_manager.cc | 17 ++++--- src/ray/raylet/node_manager.h | 6 +++ src/ray/raylet_rpc_client/raylet_client.cc | 51 +++++++++++++++++++ src/ray/raylet_rpc_client/raylet_client.h | 19 ++++++- .../rpc/node_manager/node_manager_server.h | 8 +-- 11 files changed, 138 insertions(+), 61 deletions(-) create mode 100644 python/ray/includes/raylet_client.pxi diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index f417d3a567f3..f4bc2077c94a 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -199,6 +199,7 @@ include "includes/libcoreworker.pxi" include "includes/global_state_accessor.pxi" include "includes/metric.pxi" include "includes/setproctitle.pxi" +include "includes/raylet_client.pxi" import ray from ray.exceptions import ( diff --git a/python/ray/dashboard/consts.py b/python/ray/dashboard/consts.py index 4671632eb034..a2efad930920 100644 --- a/python/ray/dashboard/consts.py +++ b/python/ray/dashboard/consts.py @@ -103,4 +103,4 @@ "RAY_DASHBOARD_SUBPROCESS_MODULE_WAIT_READY_TIMEOUT", 30.0 ) -NODE_MANAGER_RPC_TIMEOUT_SECONDS = 1 +RAYLET_RPC_TIMEOUT_SECONDS = 1 diff --git a/python/ray/dashboard/modules/reporter/reporter_agent.py b/python/ray/dashboard/modules/reporter/reporter_agent.py index 6f3920bc967e..e7d2c18217ce 100644 --- a/python/ray/dashboard/modules/reporter/reporter_agent.py +++ b/python/ray/dashboard/modules/reporter/reporter_agent.py @@ -25,7 +25,7 @@ import ray._private.prometheus_exporter as prometheus_exporter import ray.dashboard.modules.reporter.reporter_consts as reporter_consts import ray.dashboard.utils as dashboard_utils -from ray._common.network_utils import build_address, parse_address +from ray._common.network_utils import parse_address from ray._common.utils import ( get_or_create_event_loop, get_user_temp_dir, @@ -34,7 +34,6 @@ from ray._private.metrics_agent import Gauge, MetricsAgent, Record from ray._private.ray_constants import ( DEBUG_AUTOSCALING_STATUS, - GLOBAL_GRPC_OPTIONS, RAY_ENABLE_OPEN_TELEMETRY, env_integer, ) @@ -42,10 +41,8 @@ OpenTelemetryMetricRecorder, ) from ray._private.utils import get_system_memory -from ray._raylet import GCS_PID_KEY, WorkerID +from ray._raylet import GCS_PID_KEY, RayletClient, WorkerID from ray.core.generated import ( - node_manager_pb2, - node_manager_pb2_grpc, reporter_pb2, reporter_pb2_grpc, ) @@ -56,8 +53,8 @@ COMPONENT_METRICS_TAG_KEYS, GCS_RPC_TIMEOUT_SECONDS, GPU_TAG_KEYS, - NODE_MANAGER_RPC_TIMEOUT_SECONDS, NODE_TAG_KEYS, + RAYLET_RPC_TIMEOUT_SECONDS, TPU_TAG_KEYS, ) from ray.dashboard.modules.reporter.gpu_profile_manager import GpuProfilingManager @@ -490,10 +487,6 @@ def __init__(self, dashboard_agent): # Create GPU metric provider instance self._gpu_metric_provider = GpuMetricProvider() - self._node_manager_address = build_address( - self._ip, self._dashboard_agent.node_manager_port - ) - async def GetTraceback(self, request, context): pid = request.pid native = request.native @@ -896,17 +889,14 @@ def _get_disk_io_stats(): stats.write_count, ) - async def _get_worker_pids_from_raylet(self): - channel = ray._private.utils.init_grpc_channel( - self._node_manager_address, GLOBAL_GRPC_OPTIONS, asynchronous=True + def _get_worker_pids_from_raylet(self) -> Optional[List[int]]: + # Get worker pids from raylet via gRPC. + timeout = RAYLET_RPC_TIMEOUT_SECONDS * 1000 # in milliseconds + raylet_client = RayletClient( + ip_address=self._ip, port=self._dashboard_agent.node_manager_port ) - timeout = NODE_MANAGER_RPC_TIMEOUT_SECONDS - stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel) try: - reply = await stub.GetDriverAndWorkerPids( - node_manager_pb2.GetDriverAndWorkerPidsRequest(), timeout=timeout - ) - return reply.pids + return raylet_client.get_worker_pids(timeout=timeout) except Exception as e: logger.debug(f"Failed to get worker pids from raylet via gRPC: {e}") return None @@ -920,7 +910,7 @@ def _generate_worker_key(self, proc: psutil.Process) -> Tuple[int, float]: return (proc.pid, proc.create_time()) def _get_worker_processes(self): - pids = asyncio.run(self._get_worker_pids_from_raylet()) + pids = self._get_worker_pids_from_raylet() if pids is not None: workers = {} for pid in pids: @@ -931,25 +921,6 @@ def _get_worker_processes(self): continue return workers - logger.debug("fallback to get worker processes from raylet children") - raylet_proc = self._get_raylet_proc() - if raylet_proc is None: - return [] - else: - workers = {} - if sys.platform == "win32": - # windows, get the child process not the runner - for child in raylet_proc.children(): - if child.children(): - child = child.children()[0] - workers[self._generate_worker_key(child)] = child - else: - workers = { - self._generate_worker_key(proc): proc - for proc in raylet_proc.children() - } - return workers - def _get_workers(self, gpus: Optional[List[GpuUtilizationInfo]] = None): workers = self._get_worker_processes() if not workers: diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index eda3a38f14a9..6c7c6ee36235 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -763,6 +763,11 @@ cdef extern from "src/ray/protobuf/autoscaler.pb.h" nogil: void ParseFromString(const c_string &serialized) const c_string &SerializeAsString() const +cdef extern from "ray/raylet_rpc_client/raylet_client.h" nogil: + cdef cppclass CRayletClient "ray::rpc::RayletClient": + CRayletClient(const c_string &ip_address, int port) + CRayStatus GetWorkerPIDs(c_vector[int32_t] &worker_pids, int64_t timeout_ms) + cdef extern from "ray/common/task/task_spec.h" nogil: cdef cppclass CConcurrencyGroup "ray::ConcurrencyGroup": CConcurrencyGroup( diff --git a/python/ray/includes/raylet_client.pxi b/python/ray/includes/raylet_client.pxi new file mode 100644 index 000000000000..27d5721368b9 --- /dev/null +++ b/python/ray/includes/raylet_client.pxi @@ -0,0 +1,27 @@ +from libcpp.vector cimport vector as c_vector +from libcpp.string cimport string as c_string +from libc.stdint cimport int32_t as c_int32_t +from libcpp.memory cimport unique_ptr, make_unique +from ray.includes.common cimport CRayletClient, CRayStatus, CAddress + +cdef class RayletClient: + cdef: + unique_ptr[CRayletClient] inner + + def __cinit__(self, ip_address: str, port: int): + cdef: + c_string c_ip_address + c_int32_t c_port + c_ip_address = ip_address.encode('utf-8') + c_port = port + self.inner = make_unique[CRayletClient](c_ip_address, c_port) + + cdef list get_worker_pids(self, timeout_ms: int): + cdef: + c_vector[c_int32_t] pids + CRayStatus status + assert self.inner.get() is not NULL + status = self.inner.get().GetWorkerPIDs(pids, timeout_ms) + if not status.ok(): + raise RuntimeError("Failed to get worker PIDs from raylet: " + status.message()) + return [pid for pid in pids] diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 5a63a3cc123d..9a711a9dfeac 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -399,11 +399,11 @@ message IsLocalWorkerDeadReply { bool is_dead = 1; } -message GetDriverAndWorkerPidsRequest {} +message GetWorkerPIDsRequest {} -message GetDriverAndWorkerPidsReply { +message GetWorkerPIDsReply { // PIDs of all drivers and workers managed by the local raylet. - repeated uint32 pids = 1; + repeated int32 pids = 1; } // Service for inter-node-manager communication. @@ -520,8 +520,8 @@ service NodeManagerService { // Failure: Is currently only used when grpc channel is unavailable for retryable core // worker clients. The unavailable callback will eventually be retried so if this fails. rpc IsLocalWorkerDead(IsLocalWorkerDeadRequest) returns (IsLocalWorkerDeadReply); - // Get the worker managed by local raylet. - // Failure: Sends to local raylet, so should never fail. - rpc GetDriverAndWorkerPids(GetDriverAndWorkerPidsRequest) - returns (GetDriverAndWorkerPidsReply); + // Get the PIDs of all workers currently alive that are managed by the local Raylet. + // This includes connected driver processes. + // Failure: Will retry on failure with logging + rpc GetWorkerPIDs(GetWorkerPIDsRequest) returns (GetWorkerPIDsReply); } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 63e6991f6927..4d0830275436 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2723,14 +2723,15 @@ void NodeManager::TriggerGlobalGC() { should_local_gc_ = true; } -void NodeManager::HandleGetDriverAndWorkerPids( - rpc::GetDriverAndWorkerPidsRequest request, - rpc::GetDriverAndWorkerPidsReply *reply, - rpc::SendReplyCallback send_reply_callback) { - auto all_workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_worker */ true); - const auto &drivers = - worker_pool_.GetAllRegisteredDrivers(/* filter_dead_driver */ true); - all_workers.insert(all_workers.end(), drivers.begin(), drivers.end()); +void NodeManager::HandleGetWorkerPIDs(rpc::GetWorkerPIDsRequest request, + rpc::GetWorkerPIDsReply *reply, + rpc::SendReplyCallback send_reply_callback) { + auto all_workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_worker */ true, + /* filter_io_workers */ true); + auto drivers = worker_pool_.GetAllRegisteredDrivers(/* filter_dead_driver */ true); + all_workers.insert(all_workers.end(), + std::make_move_iterator(drivers.begin()), + std::make_move_iterator(drivers.end())); for (const auto &worker : all_workers) { reply->add_pids(worker->GetProcess().GetId()); } diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 6b564d442012..bd3f15074607 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -636,6 +636,12 @@ class NodeManager : public rpc::NodeManagerServiceHandler, rpc::NotifyGCSRestartReply *reply, rpc::SendReplyCallback send_reply_callback) override; + /// Handle a `GetWorkerPIDs` request. + void NodeManager::HandleGetWorkerPIDs( + rpc::GetWorkerPIDsRequest request, + rpc::GetWorkerPIDsReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + /// Checks the local socket connection for all registered workers and drivers. /// If any of them have disconnected unexpectedly (i.e., we receive a SIGHUP), /// we disconnect and kill the worker process. diff --git a/src/ray/raylet_rpc_client/raylet_client.cc b/src/ray/raylet_rpc_client/raylet_client.cc index 8bf6790d45e8..76a489e05d14 100644 --- a/src/ray/raylet_rpc_client/raylet_client.cc +++ b/src/ray/raylet_rpc_client/raylet_client.cc @@ -47,6 +47,32 @@ RayletClient::RayletClient(const rpc::Address &address, std::move(raylet_unavailable_timeout_callback), /*server_name=*/std::string("Raylet ") + address.ip_address())) {} +RayletClient::RayletClient(const std::string &ip_address, int port) { + io_service_ = std::make_unique(); + client_call_manager_ = + std::make_unique(*io_service_, /*record_stats=*/false); + grpc_client_ = std::make_unique>( + ip_address, port, *client_call_manager_); + auto raylet_unavailable_timeout_callback = []() { + RAY_LOG(WARNING) << "Raylet is unavailable for " + << ::RayConfig::instance().raylet_rpc_server_reconnect_timeout_s() + << "s"; + }; + retryable_grpc_client_ = rpc::RetryableGrpcClient::Create( + grpc_client_->Channel(), + client_call_manager_->GetMainService(), + /*max_pending_requests_bytes=*/ + std::numeric_limits::max(), + /*check_channel_status_interval_milliseconds=*/ + ::RayConfig::instance().grpc_client_check_connection_status_interval_milliseconds(), + /*server_unavailable_timeout_seconds=*/ + ::RayConfig::instance().raylet_rpc_server_reconnect_timeout_s(), + /*server_unavailable_timeout_callback=*/ + raylet_unavailable_timeout_callback, + /*server_name=*/ + std::string("Raylet ") + ip_address); +} + void RayletClient::RequestWorkerLease( const rpc::LeaseSpec &lease_spec, bool grant_or_reject, @@ -465,5 +491,30 @@ void RayletClient::GetNodeStats( /*method_timeout_ms*/ -1); } +Status RayletClient::GetWorkerPIDs(std::vector &worker_pids, + int64_t timeout_ms) { + rpc::GetWorkerPIDsRequest request; + std::promise promise; + auto future = promise.get_future(); + auto callback = [&promise, &worker_pids](const Status &status, + rpc::GetWorkerPIDsReply &&reply) { + if (status.ok()) { + worker_pids = std::vector(reply.pids().begin(), reply.pids().end()); + } + promise.set_value(status); + }; + INVOKE_RPC_CALL(NodeManagerService, + GetWorkerPIDs, + request, + callback, + grpc_client_, + /*method_timeout_ms*/ timeout_ms); + if (future.wait_for(std::chrono::milliseconds(timeout_ms)) == + std::future_status::timeout) { + return Status::TimedOut("Timed out getting worker PIDs from raylet"); + } + return future.get(); +} + } // namespace rpc } // namespace ray diff --git a/src/ray/raylet_rpc_client/raylet_client.h b/src/ray/raylet_rpc_client/raylet_client.h index a053dd400fa3..91d9d2d52da1 100644 --- a/src/ray/raylet_rpc_client/raylet_client.h +++ b/src/ray/raylet_rpc_client/raylet_client.h @@ -43,13 +43,20 @@ class RayletClient : public RayletClientInterface { /// Connect to the raylet. /// /// \param address The IP address of the worker. - /// \param port The port that the worker should listen on for gRPC requests. If - /// 0, the worker should choose a random port. /// \param client_call_manager The client call manager to use for the grpc connection. + /// \param raylet_unavailable_timeout_callback callback to be called when the raylet is + /// unavailable for a certain period of time. explicit RayletClient(const rpc::Address &address, rpc::ClientCallManager &client_call_manager, std::function raylet_unavailable_timeout_callback); + /// Connect to the raylet. only used for cython wrapper `CRayletClient` + /// `client_call_manager` will be created inside. + /// + /// \param ip_address The IP address of the worker. + /// \param port The port of the worker. + explicit RayletClient(const std::string &ip_address, int port); + std::shared_ptr GetChannel() const override; void RequestWorkerLease( @@ -163,6 +170,8 @@ class RayletClient : public RayletClientInterface { void GetNodeStats(const rpc::GetNodeStatsRequest &request, const rpc::ClientCallback &callback) override; + Status GetWorkerPIDs(std::vector &worker_pids, int64_t timeout_ms); + private: /// gRPC client to the NodeManagerService. std::shared_ptr> grpc_client_; @@ -177,6 +186,12 @@ class RayletClient : public RayletClientInterface { /// The number of object ID pin RPCs currently in flight. std::atomic pins_in_flight_ = 0; + + private: + /// if io context and client call manager are created inside the raylet client, they + /// should be kept active during the whole lifetime of client. + std::unique_ptr io_service_; + std::unique_ptr client_call_manager_; }; } // namespace rpc diff --git a/src/ray/rpc/node_manager/node_manager_server.h b/src/ray/rpc/node_manager/node_manager_server.h index 566194765545..fba7780afc69 100644 --- a/src/ray/rpc/node_manager/node_manager_server.h +++ b/src/ray/rpc/node_manager/node_manager_server.h @@ -59,7 +59,7 @@ class ServerCallFactory; RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(GetWorkerFailureCause) \ RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(RegisterMutableObject) \ RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(PushMutableObject) \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(GetDriverAndWorkerPids) + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(GetWorkerPIDs) /// Interface of the `NodeManagerService`, see `src/ray/protobuf/node_manager.proto`. class NodeManagerServiceHandler { @@ -184,9 +184,9 @@ class NodeManagerServiceHandler { PushMutableObjectReply *reply, SendReplyCallback send_reply_callback) = 0; - virtual void HandleGetDriverAndWorkerPids(GetDriverAndWorkerPidsRequest request, - GetDriverAndWorkerPidsReply *reply, - SendReplyCallback send_reply_callback) = 0; + virtual void HandleGetWorkerPIDs(GetWorkerPIDsRequest request, + GetWorkerPIDsReply *reply, + SendReplyCallback send_reply_callback) = 0; }; /// The `GrpcService` for `NodeManagerService`. From c3dca66c76706670088ff03481f40b93e21bea6a Mon Sep 17 00:00:00 2001 From: tianyi-ge Date: Sun, 5 Oct 2025 17:05:31 +0800 Subject: [PATCH 05/23] [core] add cython wrapper for raylet client Signed-off-by: tianyi-ge --- BUILD.bazel | 1 + python/ray/dashboard/consts.py | 2 - .../modules/reporter/reporter_agent.py | 33 ++++--- python/ray/includes/common.pxd | 9 +- python/ray/includes/raylet_client.pxi | 25 +++-- src/ray/protobuf/node_manager.proto | 3 +- src/ray/raylet/node_manager.cc | 2 +- src/ray/raylet/node_manager.h | 7 +- src/ray/raylet_rpc_client/BUILD.bazel | 17 ++++ src/ray/raylet_rpc_client/raylet_client.cc | 52 ---------- src/ray/raylet_rpc_client/raylet_client.h | 19 +--- .../threaded_raylet_client.cc | 99 +++++++++++++++++++ .../threaded_raylet_client.h | 51 ++++++++++ 13 files changed, 216 insertions(+), 104 deletions(-) create mode 100644 src/ray/raylet_rpc_client/threaded_raylet_client.cc create mode 100644 src/ray/raylet_rpc_client/threaded_raylet_client.h diff --git a/BUILD.bazel b/BUILD.bazel index 7a2e89b50ab8..f0fdceaa06c8 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -241,6 +241,7 @@ pyx_library( "//src/ray/gcs_rpc_client:global_state_accessor_lib", "//src/ray/protobuf:serialization_cc_proto", "//src/ray/pubsub:python_gcs_subscriber", + "//src/ray/raylet_rpc_client:threaded_raylet_client_lib", "//src/ray/thirdparty/setproctitle", "//src/ray/util:memory", "//src/ray/util:raii", diff --git a/python/ray/dashboard/consts.py b/python/ray/dashboard/consts.py index a2efad930920..30505878cb80 100644 --- a/python/ray/dashboard/consts.py +++ b/python/ray/dashboard/consts.py @@ -102,5 +102,3 @@ SUBPROCESS_MODULE_WAIT_READY_TIMEOUT = env_float( "RAY_DASHBOARD_SUBPROCESS_MODULE_WAIT_READY_TIMEOUT", 30.0 ) - -RAYLET_RPC_TIMEOUT_SECONDS = 1 diff --git a/python/ray/dashboard/modules/reporter/reporter_agent.py b/python/ray/dashboard/modules/reporter/reporter_agent.py index e7d2c18217ce..8a6641d8fa50 100644 --- a/python/ray/dashboard/modules/reporter/reporter_agent.py +++ b/python/ray/dashboard/modules/reporter/reporter_agent.py @@ -42,10 +42,7 @@ ) from ray._private.utils import get_system_memory from ray._raylet import GCS_PID_KEY, RayletClient, WorkerID -from ray.core.generated import ( - reporter_pb2, - reporter_pb2_grpc, -) +from ray.core.generated import reporter_pb2, reporter_pb2_grpc from ray.dashboard import k8s_utils from ray.dashboard.consts import ( CLUSTER_TAG_KEYS, @@ -54,7 +51,6 @@ GCS_RPC_TIMEOUT_SECONDS, GPU_TAG_KEYS, NODE_TAG_KEYS, - RAYLET_RPC_TIMEOUT_SECONDS, TPU_TAG_KEYS, ) from ray.dashboard.modules.reporter.gpu_profile_manager import GpuProfilingManager @@ -889,17 +885,23 @@ def _get_disk_io_stats(): stats.write_count, ) - def _get_worker_pids_from_raylet(self) -> Optional[List[int]]: - # Get worker pids from raylet via gRPC. - timeout = RAYLET_RPC_TIMEOUT_SECONDS * 1000 # in milliseconds - raylet_client = RayletClient( - ip_address=self._ip, port=self._dashboard_agent.node_manager_port - ) + def _get_raylet_client(self): + if self._raylet_client is None: + self._raylet_client = RayletClient( + ip_address=self._ip, port=self._dashboard_agent.node_manager_port + ) + return self._raylet_client + + def _get_worker_pids_from_raylet(self) -> List[int]: try: - return raylet_client.get_worker_pids(timeout=timeout) + # Get worker pids from raylet via gRPC. + return self._get_raylet_client().get_worker_pids() + except TimeoutError as e: + logger.debug(f"Failed to get worker pids from raylet: {e}") + return [] except Exception as e: - logger.debug(f"Failed to get worker pids from raylet via gRPC: {e}") - return None + logger.error(f"Unexpectedly failed to get worker pids from raylet: {e}") + raise def _get_agent_proc(self) -> psutil.Process: # Agent is the current process. @@ -911,7 +913,8 @@ def _generate_worker_key(self, proc: psutil.Process) -> Tuple[int, float]: def _get_worker_processes(self): pids = self._get_worker_pids_from_raylet() - if pids is not None: + logger.debug(f"Worker PIDs from raylet: {pids}") + if pids: workers = {} for pid in pids: try: diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 6c7c6ee36235..bc75b22e5ca1 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -763,10 +763,11 @@ cdef extern from "src/ray/protobuf/autoscaler.pb.h" nogil: void ParseFromString(const c_string &serialized) const c_string &SerializeAsString() const -cdef extern from "ray/raylet_rpc_client/raylet_client.h" nogil: - cdef cppclass CRayletClient "ray::rpc::RayletClient": - CRayletClient(const c_string &ip_address, int port) - CRayStatus GetWorkerPIDs(c_vector[int32_t] &worker_pids, int64_t timeout_ms) +cdef extern from "ray/raylet_rpc_client/threaded_raylet_client.h" nogil: + cdef cppclass CThreadedRayletClient "ray::rpc::ThreadedRayletClient": + CThreadedRayletClient(const c_string &ip_address, int port) + CRayStatus GetWorkerPIDs(shared_ptr[c_vector[int32_t]] worker_pids, + int64_t timeout_ms) cdef extern from "ray/common/task/task_spec.h" nogil: cdef cppclass CConcurrencyGroup "ray::ConcurrencyGroup": diff --git a/python/ray/includes/raylet_client.pxi b/python/ray/includes/raylet_client.pxi index 27d5721368b9..a6111c65a19b 100644 --- a/python/ray/includes/raylet_client.pxi +++ b/python/ray/includes/raylet_client.pxi @@ -1,12 +1,12 @@ from libcpp.vector cimport vector as c_vector from libcpp.string cimport string as c_string from libc.stdint cimport int32_t as c_int32_t -from libcpp.memory cimport unique_ptr, make_unique -from ray.includes.common cimport CRayletClient, CRayStatus, CAddress +from libcpp.memory cimport unique_ptr, make_unique, shared_ptr +from ray.includes.common cimport CThreadedRayletClient, CRayStatus, CAddress cdef class RayletClient: cdef: - unique_ptr[CRayletClient] inner + unique_ptr[CThreadedRayletClient] inner def __cinit__(self, ip_address: str, port: int): cdef: @@ -14,14 +14,21 @@ cdef class RayletClient: c_int32_t c_port c_ip_address = ip_address.encode('utf-8') c_port = port - self.inner = make_unique[CRayletClient](c_ip_address, c_port) + self.inner = make_unique[CThreadedRayletClient](c_ip_address, c_port) - cdef list get_worker_pids(self, timeout_ms: int): + def get_worker_pids(self, timeout_ms: int = 1000) -> list[int]: + """Get the PIDs of all workers registered with the raylet.""" cdef: - c_vector[c_int32_t] pids + shared_ptr[c_vector[c_int32_t]] pids CRayStatus status + pids = make_shared[c_vector[c_int32_t]]() assert self.inner.get() is not NULL status = self.inner.get().GetWorkerPIDs(pids, timeout_ms) - if not status.ok(): - raise RuntimeError("Failed to get worker PIDs from raylet: " + status.message()) - return [pid for pid in pids] + if status.IsTimedOut(): + raise TimeoutError(status.message()) + elif not status.ok(): + raise RuntimeError( + "Failed to get worker PIDs from raylet: " + status.message() + ) + assert pids.get() is not NULL + return [pid for pid in pids.get()[0]] diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 9a711a9dfeac..2826bed13960 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -522,6 +522,7 @@ service NodeManagerService { rpc IsLocalWorkerDead(IsLocalWorkerDeadRequest) returns (IsLocalWorkerDeadReply); // Get the PIDs of all workers currently alive that are managed by the local Raylet. // This includes connected driver processes. - // Failure: Will retry on failure with logging + // Failure: Will retry with the default timeout 1000ms. + // If fails, reply return an empty list. rpc GetWorkerPIDs(GetWorkerPIDsRequest) returns (GetWorkerPIDsReply); } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 4d0830275436..572f29970b07 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2735,7 +2735,7 @@ void NodeManager::HandleGetWorkerPIDs(rpc::GetWorkerPIDsRequest request, for (const auto &worker : all_workers) { reply->add_pids(worker->GetProcess().GetId()); } - send_reply_callback(Status::OK(), nullptr, nullptr); + send_reply_callback(Status::OK(), /* success */ nullptr, /* failure */ nullptr); } void NodeManager::Stop() { diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index bd3f15074607..66032cb9f84d 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -637,10 +637,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler, rpc::SendReplyCallback send_reply_callback) override; /// Handle a `GetWorkerPIDs` request. - void NodeManager::HandleGetWorkerPIDs( - rpc::GetWorkerPIDsRequest request, - rpc::GetWorkerPIDsReply *reply, - rpc::SendReplyCallback send_reply_callback) override; + void HandleGetWorkerPIDs(rpc::GetWorkerPIDsRequest request, + rpc::GetWorkerPIDsReply *reply, + rpc::SendReplyCallback send_reply_callback) override; /// Checks the local socket connection for all registered workers and drivers. /// If any of them have disconnected unexpectedly (i.e., we receive a SIGHUP), diff --git a/src/ray/raylet_rpc_client/BUILD.bazel b/src/ray/raylet_rpc_client/BUILD.bazel index 48764472d777..da7fd1a9ff79 100644 --- a/src/ray/raylet_rpc_client/BUILD.bazel +++ b/src/ray/raylet_rpc_client/BUILD.bazel @@ -43,6 +43,23 @@ ray_cc_library( ], ) +ray_cc_library( + name = "threaded_raylet_client_lib", + srcs = ["threaded_raylet_client.cc"], + hdrs = ["threaded_raylet_client.h"], + visibility = ["//visibility:public"], + deps = [ + ":raylet_client_interface", + ":raylet_client_lib", + "//src/ray/common:bundle_spec", + "//src/ray/common:ray_config", + "//src/ray/protobuf:node_manager_cc_grpc", + "//src/ray/rpc:retryable_grpc_client", + "//src/ray/rpc:rpc_callback_types", + "//src/ray/util:logging", + ], +) + ray_cc_library( name = "fake_raylet_client", hdrs = [ diff --git a/src/ray/raylet_rpc_client/raylet_client.cc b/src/ray/raylet_rpc_client/raylet_client.cc index 76a489e05d14..35fe8df51cbc 100644 --- a/src/ray/raylet_rpc_client/raylet_client.cc +++ b/src/ray/raylet_rpc_client/raylet_client.cc @@ -47,32 +47,6 @@ RayletClient::RayletClient(const rpc::Address &address, std::move(raylet_unavailable_timeout_callback), /*server_name=*/std::string("Raylet ") + address.ip_address())) {} -RayletClient::RayletClient(const std::string &ip_address, int port) { - io_service_ = std::make_unique(); - client_call_manager_ = - std::make_unique(*io_service_, /*record_stats=*/false); - grpc_client_ = std::make_unique>( - ip_address, port, *client_call_manager_); - auto raylet_unavailable_timeout_callback = []() { - RAY_LOG(WARNING) << "Raylet is unavailable for " - << ::RayConfig::instance().raylet_rpc_server_reconnect_timeout_s() - << "s"; - }; - retryable_grpc_client_ = rpc::RetryableGrpcClient::Create( - grpc_client_->Channel(), - client_call_manager_->GetMainService(), - /*max_pending_requests_bytes=*/ - std::numeric_limits::max(), - /*check_channel_status_interval_milliseconds=*/ - ::RayConfig::instance().grpc_client_check_connection_status_interval_milliseconds(), - /*server_unavailable_timeout_seconds=*/ - ::RayConfig::instance().raylet_rpc_server_reconnect_timeout_s(), - /*server_unavailable_timeout_callback=*/ - raylet_unavailable_timeout_callback, - /*server_name=*/ - std::string("Raylet ") + ip_address); -} - void RayletClient::RequestWorkerLease( const rpc::LeaseSpec &lease_spec, bool grant_or_reject, @@ -490,31 +464,5 @@ void RayletClient::GetNodeStats( grpc_client_, /*method_timeout_ms*/ -1); } - -Status RayletClient::GetWorkerPIDs(std::vector &worker_pids, - int64_t timeout_ms) { - rpc::GetWorkerPIDsRequest request; - std::promise promise; - auto future = promise.get_future(); - auto callback = [&promise, &worker_pids](const Status &status, - rpc::GetWorkerPIDsReply &&reply) { - if (status.ok()) { - worker_pids = std::vector(reply.pids().begin(), reply.pids().end()); - } - promise.set_value(status); - }; - INVOKE_RPC_CALL(NodeManagerService, - GetWorkerPIDs, - request, - callback, - grpc_client_, - /*method_timeout_ms*/ timeout_ms); - if (future.wait_for(std::chrono::milliseconds(timeout_ms)) == - std::future_status::timeout) { - return Status::TimedOut("Timed out getting worker PIDs from raylet"); - } - return future.get(); -} - } // namespace rpc } // namespace ray diff --git a/src/ray/raylet_rpc_client/raylet_client.h b/src/ray/raylet_rpc_client/raylet_client.h index 91d9d2d52da1..9ae0f9b6cf60 100644 --- a/src/ray/raylet_rpc_client/raylet_client.h +++ b/src/ray/raylet_rpc_client/raylet_client.h @@ -50,12 +50,7 @@ class RayletClient : public RayletClientInterface { rpc::ClientCallManager &client_call_manager, std::function raylet_unavailable_timeout_callback); - /// Connect to the raylet. only used for cython wrapper `CRayletClient` - /// `client_call_manager` will be created inside. - /// - /// \param ip_address The IP address of the worker. - /// \param port The port of the worker. - explicit RayletClient(const std::string &ip_address, int port); + RayletClient() = default; std::shared_ptr GetChannel() const override; @@ -170,9 +165,7 @@ class RayletClient : public RayletClientInterface { void GetNodeStats(const rpc::GetNodeStatsRequest &request, const rpc::ClientCallback &callback) override; - Status GetWorkerPIDs(std::vector &worker_pids, int64_t timeout_ms); - - private: + protected: /// gRPC client to the NodeManagerService. std::shared_ptr> grpc_client_; @@ -185,13 +178,7 @@ class RayletClient : public RayletClientInterface { ResourceMappingType resource_ids_; /// The number of object ID pin RPCs currently in flight. - std::atomic pins_in_flight_ = 0; - - private: - /// if io context and client call manager are created inside the raylet client, they - /// should be kept active during the whole lifetime of client. - std::unique_ptr io_service_; - std::unique_ptr client_call_manager_; + std::atomic pins_in_flight_{0}; }; } // namespace rpc diff --git a/src/ray/raylet_rpc_client/threaded_raylet_client.cc b/src/ray/raylet_rpc_client/threaded_raylet_client.cc new file mode 100644 index 000000000000..932457a32223 --- /dev/null +++ b/src/ray/raylet_rpc_client/threaded_raylet_client.cc @@ -0,0 +1,99 @@ +// Copyright 2025 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/raylet_rpc_client/threaded_raylet_client.h" + +#include +#include +#include +#include + +#include "ray/common/ray_config.h" +#include "ray/util/logging.h" +#include "src/ray/protobuf/node_manager.grpc.pb.h" + +namespace ray { +namespace rpc { + +ThreadedRayletClient::ThreadedRayletClient(const std::string &ip_address, int port) + : RayletClient() { + io_service_ = std::make_unique(); + std::promise promise; + thread_io_service_ = std::make_unique([this, &promise] { + SetThreadName("raylet.client"); + boost::asio::executor_work_guard work( + io_service_->get_executor()); + promise.set_value(true); + io_service_->run(); + }); + promise.get_future().get(); + + client_call_manager_ = + std::make_unique(*io_service_, /*record_stats=*/false); + grpc_client_ = std::make_unique>( + ip_address, port, *client_call_manager_); + auto raylet_unavailable_timeout_callback = []() { + RAY_LOG(WARNING) << "Raylet is unavailable for " + << ::RayConfig::instance().raylet_rpc_server_reconnect_timeout_s() + << "s"; + }; + retryable_grpc_client_ = rpc::RetryableGrpcClient::Create( + grpc_client_->Channel(), + client_call_manager_->GetMainService(), + /*max_pending_requests_bytes=*/ + std::numeric_limits::max(), + /*check_channel_status_interval_milliseconds=*/ + ::RayConfig::instance().grpc_client_check_connection_status_interval_milliseconds(), + /*server_unavailable_timeout_seconds=*/ + ::RayConfig::instance().raylet_rpc_server_reconnect_timeout_s(), + /*server_unavailable_timeout_callback=*/ + raylet_unavailable_timeout_callback, + /*server_name=*/ + std::string("Raylet ") + ip_address); +} + +Status ThreadedRayletClient::GetWorkerPIDs( + std::shared_ptr> worker_pids, int64_t timeout_ms) { + rpc::GetWorkerPIDsRequest request; + auto promise = std::make_shared>(); + std::weak_ptr> weak_promise = promise; + std::weak_ptr> weak_worker_pids = worker_pids; + auto future = promise->get_future(); + auto callback = [weak_promise, weak_worker_pids](const Status &status, + rpc::GetWorkerPIDsReply &&reply) { + auto p = weak_promise.lock(); + auto workers = weak_worker_pids.lock(); + if (p != nullptr && workers != nullptr) { + if (status.ok()) { + *workers = std::vector(reply.pids().begin(), reply.pids().end()); + } + p->set_value(status); + } + }; + INVOKE_RETRYABLE_RPC_CALL(retryable_grpc_client_, + NodeManagerService, + GetWorkerPIDs, + request, + callback, + grpc_client_, + timeout_ms); + if (future.wait_for(std::chrono::milliseconds(timeout_ms)) == + std::future_status::timeout) { + return Status::TimedOut("Timed out getting worker PIDs from raylet"); + } + return future.get(); +} + +} // namespace rpc +} // namespace ray diff --git a/src/ray/raylet_rpc_client/threaded_raylet_client.h b/src/ray/raylet_rpc_client/threaded_raylet_client.h new file mode 100644 index 000000000000..175c358071a2 --- /dev/null +++ b/src/ray/raylet_rpc_client/threaded_raylet_client.h @@ -0,0 +1,51 @@ +// Copyright 2025 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "ray/raylet_rpc_client/raylet_client.h" +#include "ray/rpc/grpc_client.h" + +namespace ray { +namespace rpc { + +/// Threaded raylet client is provided for python (e.g. ReporterAgent) to communicate with +/// raylet. It creates and manages a separate thread to run the grpc event loop +class ThreadedRayletClient : public RayletClient { + public: + /// Connect to the raylet. Only used for cython wrapper `CThreadedRayletClient` + /// new io service and new thread will be created inside. + /// + /// \param ip_address The IP address of raylet. + /// \param port The port of raylet. + ThreadedRayletClient(const std::string &ip_address, int port); + + /// Get the worker pids from raylet. + /// \param worker_pids The output worker pids. + /// \param timeout_ms The timeout in milliseconds. + /// \return ray::Status + Status GetWorkerPIDs(std::shared_ptr> worker_pids, + int64_t timeout_ms); + + private: + /// if io context and client call manager are created inside the raylet client, they + /// should be kept active during the whole lifetime of client. + std::unique_ptr io_service_; + std::unique_ptr thread_io_service_; + std::unique_ptr client_call_manager_; + std::unique_ptr raylet_client_; +}; + +} // namespace rpc +} // namespace ray From af615067273dd4a4c6abbbb71d89ca9738959d18 Mon Sep 17 00:00:00 2001 From: tianyi-ge Date: Mon, 6 Oct 2025 00:03:20 +0800 Subject: [PATCH 06/23] [core] fix cursor comments Signed-off-by: tianyi-ge --- python/ray/dashboard/modules/reporter/reporter_agent.py | 2 ++ src/ray/raylet_rpc_client/threaded_raylet_client.cc | 6 +++--- src/ray/raylet_rpc_client/threaded_raylet_client.h | 1 - 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/python/ray/dashboard/modules/reporter/reporter_agent.py b/python/ray/dashboard/modules/reporter/reporter_agent.py index 8a6641d8fa50..1fc8750e16c6 100644 --- a/python/ray/dashboard/modules/reporter/reporter_agent.py +++ b/python/ray/dashboard/modules/reporter/reporter_agent.py @@ -483,6 +483,8 @@ def __init__(self, dashboard_agent): # Create GPU metric provider instance self._gpu_metric_provider = GpuMetricProvider() + self._raylet_client = None + async def GetTraceback(self, request, context): pid = request.pid native = request.native diff --git a/src/ray/raylet_rpc_client/threaded_raylet_client.cc b/src/ray/raylet_rpc_client/threaded_raylet_client.cc index 932457a32223..4a903b399661 100644 --- a/src/ray/raylet_rpc_client/threaded_raylet_client.cc +++ b/src/ray/raylet_rpc_client/threaded_raylet_client.cc @@ -34,13 +34,13 @@ ThreadedRayletClient::ThreadedRayletClient(const std::string &ip_address, int po SetThreadName("raylet.client"); boost::asio::executor_work_guard work( io_service_->get_executor()); - promise.set_value(true); io_service_->run(); + promise.set_value(true); }); promise.get_future().get(); - client_call_manager_ = - std::make_unique(*io_service_, /*record_stats=*/false); + client_call_manager_ = std::make_unique( + *io_service_, /*record_stats=*/false, ip_address); grpc_client_ = std::make_unique>( ip_address, port, *client_call_manager_); auto raylet_unavailable_timeout_callback = []() { diff --git a/src/ray/raylet_rpc_client/threaded_raylet_client.h b/src/ray/raylet_rpc_client/threaded_raylet_client.h index 175c358071a2..476097eb3278 100644 --- a/src/ray/raylet_rpc_client/threaded_raylet_client.h +++ b/src/ray/raylet_rpc_client/threaded_raylet_client.h @@ -44,7 +44,6 @@ class ThreadedRayletClient : public RayletClient { std::unique_ptr io_service_; std::unique_ptr thread_io_service_; std::unique_ptr client_call_manager_; - std::unique_ptr raylet_client_; }; } // namespace rpc From 204caef309603f758f150d4015d42f106680f2d2 Mon Sep 17 00:00:00 2001 From: tianyi-ge Date: Wed, 8 Oct 2025 11:34:10 +0800 Subject: [PATCH 07/23] [core] reuse singleton io context helper class to start a threaded raylet client Signed-off-by: tianyi-ge --- .../modules/reporter/reporter_agent.py | 16 +++--------- python/ray/includes/common.pxd | 1 + python/ray/includes/raylet_client.pxi | 11 +++++++- src/ray/raylet_rpc_client/raylet_client.h | 2 +- .../threaded_raylet_client.cc | 26 +++++++++---------- .../threaded_raylet_client.h | 14 +++++++--- 6 files changed, 38 insertions(+), 32 deletions(-) diff --git a/python/ray/dashboard/modules/reporter/reporter_agent.py b/python/ray/dashboard/modules/reporter/reporter_agent.py index 1fc8750e16c6..9d479b1f4e52 100644 --- a/python/ray/dashboard/modules/reporter/reporter_agent.py +++ b/python/ray/dashboard/modules/reporter/reporter_agent.py @@ -483,7 +483,9 @@ def __init__(self, dashboard_agent): # Create GPU metric provider instance self._gpu_metric_provider = GpuMetricProvider() - self._raylet_client = None + self._raylet_client = RayletClient( + ip_address=self._ip, port=self._dashboard_agent.node_manager_port + ) async def GetTraceback(self, request, context): pid = request.pid @@ -887,17 +889,10 @@ def _get_disk_io_stats(): stats.write_count, ) - def _get_raylet_client(self): - if self._raylet_client is None: - self._raylet_client = RayletClient( - ip_address=self._ip, port=self._dashboard_agent.node_manager_port - ) - return self._raylet_client - def _get_worker_pids_from_raylet(self) -> List[int]: try: # Get worker pids from raylet via gRPC. - return self._get_raylet_client().get_worker_pids() + return self._raylet_client.get_worker_pids() except TimeoutError as e: logger.debug(f"Failed to get worker pids from raylet: {e}") return [] @@ -947,9 +942,6 @@ def _get_workers(self, gpus: Optional[List[GpuUtilizationInfo]] = None): for k in keys_to_pop: self._workers.pop(k) - # Remove the current process (reporter agent), which is also a child of - # the Raylet. - self._workers.pop(self._generate_worker_key(self._get_agent_proc())) # Build process ID -> GPU info mapping for faster lookups gpu_pid_mapping = defaultdict(list) if gpus is not None: diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 3d5cb30172b3..17b35842c4fd 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -768,6 +768,7 @@ cdef extern from "ray/raylet_rpc_client/threaded_raylet_client.h" nogil: CThreadedRayletClient(const c_string &ip_address, int port) CRayStatus GetWorkerPIDs(shared_ptr[c_vector[int32_t]] worker_pids, int64_t timeout_ms) + cdef void ConnectOnSingletonIoContext(CThreadedRayletClient &raylet_client) cdef extern from "ray/common/task/task_spec.h" nogil: cdef cppclass CConcurrencyGroup "ray::ConcurrencyGroup": diff --git a/python/ray/includes/raylet_client.pxi b/python/ray/includes/raylet_client.pxi index a6111c65a19b..0618f5fedf3f 100644 --- a/python/ray/includes/raylet_client.pxi +++ b/python/ray/includes/raylet_client.pxi @@ -2,7 +2,13 @@ from libcpp.vector cimport vector as c_vector from libcpp.string cimport string as c_string from libc.stdint cimport int32_t as c_int32_t from libcpp.memory cimport unique_ptr, make_unique, shared_ptr -from ray.includes.common cimport CThreadedRayletClient, CRayStatus, CAddress +from ray.includes.common cimport ( + CThreadedRayletClient, + CRayStatus, + CAddress, + ConnectOnSingletonIoContext +) +from cython.operator import dereference cdef class RayletClient: cdef: @@ -15,6 +21,9 @@ cdef class RayletClient: c_ip_address = ip_address.encode('utf-8') c_port = port self.inner = make_unique[CThreadedRayletClient](c_ip_address, c_port) + with nogil: + # connects on singleton io context even if multiple raylet clients are created + ConnectOnSingletonIoContext(dereference(self.inner)) def get_worker_pids(self, timeout_ms: int = 1000) -> list[int]: """Get the PIDs of all workers registered with the raylet.""" diff --git a/src/ray/raylet_rpc_client/raylet_client.h b/src/ray/raylet_rpc_client/raylet_client.h index 9ae0f9b6cf60..5533a39e66c6 100644 --- a/src/ray/raylet_rpc_client/raylet_client.h +++ b/src/ray/raylet_rpc_client/raylet_client.h @@ -178,7 +178,7 @@ class RayletClient : public RayletClientInterface { ResourceMappingType resource_ids_; /// The number of object ID pin RPCs currently in flight. - std::atomic pins_in_flight_{0}; + std::atomic pins_in_flight_ = 0; }; } // namespace rpc diff --git a/src/ray/raylet_rpc_client/threaded_raylet_client.cc b/src/ray/raylet_rpc_client/threaded_raylet_client.cc index 4a903b399661..b266782f10e7 100644 --- a/src/ray/raylet_rpc_client/threaded_raylet_client.cc +++ b/src/ray/raylet_rpc_client/threaded_raylet_client.cc @@ -19,6 +19,7 @@ #include #include +#include "ray/common/asio/asio_util.h" #include "ray/common/ray_config.h" #include "ray/util/logging.h" #include "src/ray/protobuf/node_manager.grpc.pb.h" @@ -27,22 +28,13 @@ namespace ray { namespace rpc { ThreadedRayletClient::ThreadedRayletClient(const std::string &ip_address, int port) - : RayletClient() { - io_service_ = std::make_unique(); - std::promise promise; - thread_io_service_ = std::make_unique([this, &promise] { - SetThreadName("raylet.client"); - boost::asio::executor_work_guard work( - io_service_->get_executor()); - io_service_->run(); - promise.set_value(true); - }); - promise.get_future().get(); + : RayletClient(), ip_address_(ip_address), port_(port) {} +void ThreadedRayletClient::Connect(instrumented_io_context &io_service) { client_call_manager_ = std::make_unique( - *io_service_, /*record_stats=*/false, ip_address); + io_service, /*record_stats=*/false, ip_address_); grpc_client_ = std::make_unique>( - ip_address, port, *client_call_manager_); + ip_address_, port_, *client_call_manager_); auto raylet_unavailable_timeout_callback = []() { RAY_LOG(WARNING) << "Raylet is unavailable for " << ::RayConfig::instance().raylet_rpc_server_reconnect_timeout_s() @@ -60,7 +52,7 @@ ThreadedRayletClient::ThreadedRayletClient(const std::string &ip_address, int po /*server_unavailable_timeout_callback=*/ raylet_unavailable_timeout_callback, /*server_name=*/ - std::string("Raylet ") + ip_address); + std::string("Raylet ") + ip_address_); } Status ThreadedRayletClient::GetWorkerPIDs( @@ -95,5 +87,11 @@ Status ThreadedRayletClient::GetWorkerPIDs( return future.get(); } +void ConnectOnSingletonIoContext(ThreadedRayletClient &raylet_client) { + static InstrumentedIOContextWithThread io_context("raylet_client_io_service"); + instrumented_io_context &io_service = io_context.GetIoService(); + raylet_client.Connect(io_service); +} + } // namespace rpc } // namespace ray diff --git a/src/ray/raylet_rpc_client/threaded_raylet_client.h b/src/ray/raylet_rpc_client/threaded_raylet_client.h index 476097eb3278..701aef2b89db 100644 --- a/src/ray/raylet_rpc_client/threaded_raylet_client.h +++ b/src/ray/raylet_rpc_client/threaded_raylet_client.h @@ -38,13 +38,19 @@ class ThreadedRayletClient : public RayletClient { Status GetWorkerPIDs(std::shared_ptr> worker_pids, int64_t timeout_ms); + /// Connect to the raylet. The io service is provided from outside. + /// \param io_service The io service to run the grpc event loop. + void Connect(instrumented_io_context &io_service); + private: - /// if io context and client call manager are created inside the raylet client, they - /// should be kept active during the whole lifetime of client. - std::unique_ptr io_service_; - std::unique_ptr thread_io_service_; + /// client call manager is created inside the raylet client, it should be kept active + /// during the whole lifetime of client. std::unique_ptr client_call_manager_; + std::string ip_address_; + int port_; }; +void ConnectOnSingletonIoContext(ThreadedRayletClient &raylet_client); + } // namespace rpc } // namespace ray From 2b326156dbcf254dae3ad10d54e02eeb21a19392 Mon Sep 17 00:00:00 2001 From: tianyi-ge Date: Thu, 9 Oct 2025 16:05:58 +0800 Subject: [PATCH 08/23] [core] fix reporter agent unittest Signed-off-by: tianyi-ge --- .../modules/reporter/tests/test_reporter.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/python/ray/dashboard/modules/reporter/tests/test_reporter.py b/python/ray/dashboard/modules/reporter/tests/test_reporter.py index af0e54c373cb..6cef2129f973 100644 --- a/python/ray/dashboard/modules/reporter/tests/test_reporter.py +++ b/python/ray/dashboard/modules/reporter/tests/test_reporter.py @@ -307,7 +307,8 @@ def test_worker_stats(): wait_for_condition(test_worker_stats, retry_interval_ms=1000) -def test_report_stats(): +@patch("ray.dashboard.modules.reporter.reporter_agent.RayletClient") +def test_report_stats(mock_raylet_client): dashboard_agent = MagicMock() dashboard_agent.gcs_address = build_address("127.0.0.1", 6379) agent = ReporterAgent(dashboard_agent) @@ -369,7 +370,8 @@ def test_report_stats(): assert len(records) == 44 -def test_report_stats_gpu(): +@patch("ray.dashboard.modules.reporter.reporter_agent.RayletClient") +def test_report_stats_gpu(mock_raylet_client): dashboard_agent = MagicMock() dashboard_agent.gcs_address = build_address("127.0.0.1", 6379) agent = ReporterAgent(dashboard_agent) @@ -490,7 +492,8 @@ def test_report_stats_gpu(): assert gpu_metrics_aggregatd["node_gram_available"] == GPU_MEMORY * 4 - 6 -def test_get_tpu_usage(): +@patch("ray.dashboard.modules.reporter.reporter_agent.RayletClient") +def test_get_tpu_usage(mock_raylet_client): dashboard_agent = MagicMock() dashboard_agent.gcs_address = build_address("127.0.0.1", 6379) agent = ReporterAgent(dashboard_agent) @@ -547,7 +550,8 @@ def test_get_tpu_usage(): assert tpu_utilizations == expected_utilizations -def test_report_stats_tpu(): +@patch("ray.dashboard.modules.reporter.reporter_agent.RayletClient") +def test_report_stats_tpu(mock_raylet_client): dashboard_agent = MagicMock() dashboard_agent.gcs_address = build_address("127.0.0.1", 6379) agent = ReporterAgent(dashboard_agent) @@ -620,7 +624,8 @@ def test_report_stats_tpu(): assert tpu_metrics_aggregated["tpu_memory_total"] == 8000 -def test_report_per_component_stats(): +@patch("ray.dashboard.modules.reporter.reporter_agent.RayletClient") +def test_report_per_component_stats(mock_raylet_client): dashboard_agent = MagicMock() dashboard_agent.gcs_address = build_address("127.0.0.1", 6379) agent = ReporterAgent(dashboard_agent) @@ -882,6 +887,9 @@ def _get_agent_proc(self): def _generate_worker_key(self, proc): return (proc.pid, proc.create_time()) + def _get_worker_pids_from_raylet(self): + return [p.pid for p in children] + def _get_worker_processes(self): return ReporterAgent._get_worker_processes(self) From c8f0fa44b1bd2e4869f153ae2b91bfa7b0a18eef Mon Sep 17 00:00:00 2001 From: tianyi-ge Date: Thu, 9 Oct 2025 22:09:24 +0800 Subject: [PATCH 09/23] [core] add reporter agent unittest Signed-off-by: tianyi-ge --- .../modules/reporter/tests/test_reporter.py | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/python/ray/dashboard/modules/reporter/tests/test_reporter.py b/python/ray/dashboard/modules/reporter/tests/test_reporter.py index 6cef2129f973..b25a48bdcf79 100644 --- a/python/ray/dashboard/modules/reporter/tests/test_reporter.py +++ b/python/ray/dashboard/modules/reporter/tests/test_reporter.py @@ -1184,5 +1184,25 @@ def test_get_cluster_metadata(ray_start_with_dashboard): assert resp_data["rayInitCluster"] == meta["ray_init_cluster"] +def test_reporter_raylet_agent(ray_start_with_dashboard): + @ray.remote + class MyActor: + def ping(self): + return "pong" + + a = MyActor.remote() + assert ray.get(a.ping.remote()) == "pong" + dashboard_agent = MagicMock() + dashboard_agent.gcs_address = build_address("127.0.0.1", 6379) + dashboard_agent.ip = "127.0.0.1" + dashboard_agent.node_manager_port = ( + ray._private.worker.global_worker.node.node_manager_port + ) + agent = ReporterAgent(dashboard_agent) + pids = agent._get_worker_pids_from_raylet() + assert len(pids) == 2 + assert "ray::MyActor" in [psutil.Process(pid).cmdline()[0] for pid in pids] + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) From ac20283f93b949d0f87e8eb9a5af66744516433d Mon Sep 17 00:00:00 2001 From: tianyi-ge Date: Fri, 10 Oct 2025 09:58:58 +0800 Subject: [PATCH 10/23] [core] connect to raylet inside client constructor Signed-off-by: tianyi-ge --- python/ray/includes/common.pxd | 1 - python/ray/includes/raylet_client.pxi | 3 --- .../threaded_raylet_client.cc | 18 +++++++++++------- .../raylet_rpc_client/threaded_raylet_client.h | 7 ++++--- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 17b35842c4fd..3d5cb30172b3 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -768,7 +768,6 @@ cdef extern from "ray/raylet_rpc_client/threaded_raylet_client.h" nogil: CThreadedRayletClient(const c_string &ip_address, int port) CRayStatus GetWorkerPIDs(shared_ptr[c_vector[int32_t]] worker_pids, int64_t timeout_ms) - cdef void ConnectOnSingletonIoContext(CThreadedRayletClient &raylet_client) cdef extern from "ray/common/task/task_spec.h" nogil: cdef cppclass CConcurrencyGroup "ray::ConcurrencyGroup": diff --git a/python/ray/includes/raylet_client.pxi b/python/ray/includes/raylet_client.pxi index 0618f5fedf3f..5a7ed0d8b161 100644 --- a/python/ray/includes/raylet_client.pxi +++ b/python/ray/includes/raylet_client.pxi @@ -21,9 +21,6 @@ cdef class RayletClient: c_ip_address = ip_address.encode('utf-8') c_port = port self.inner = make_unique[CThreadedRayletClient](c_ip_address, c_port) - with nogil: - # connects on singleton io context even if multiple raylet clients are created - ConnectOnSingletonIoContext(dereference(self.inner)) def get_worker_pids(self, timeout_ms: int = 1000) -> list[int]: """Get the PIDs of all workers registered with the raylet.""" diff --git a/src/ray/raylet_rpc_client/threaded_raylet_client.cc b/src/ray/raylet_rpc_client/threaded_raylet_client.cc index b266782f10e7..cca061b56873 100644 --- a/src/ray/raylet_rpc_client/threaded_raylet_client.cc +++ b/src/ray/raylet_rpc_client/threaded_raylet_client.cc @@ -28,7 +28,17 @@ namespace ray { namespace rpc { ThreadedRayletClient::ThreadedRayletClient(const std::string &ip_address, int port) - : RayletClient(), ip_address_(ip_address), port_(port) {} + : RayletClient(), ip_address_(ip_address), port_(port) { + // Connect to the raylet on a singleton io service with a dedicated thread. + // This is to avoid creating multiple threads for multiple clients in python. + ConnectOnSingletonIoContext(); +} + +void ThreadedRayletClient::ConnectOnSingletonIoContext() { + static InstrumentedIOContextWithThread io_context("raylet_client_io_service"); + instrumented_io_context &io_service = io_context.GetIoService(); + Connect(io_service); +} void ThreadedRayletClient::Connect(instrumented_io_context &io_service) { client_call_manager_ = std::make_unique( @@ -87,11 +97,5 @@ Status ThreadedRayletClient::GetWorkerPIDs( return future.get(); } -void ConnectOnSingletonIoContext(ThreadedRayletClient &raylet_client) { - static InstrumentedIOContextWithThread io_context("raylet_client_io_service"); - instrumented_io_context &io_service = io_context.GetIoService(); - raylet_client.Connect(io_service); -} - } // namespace rpc } // namespace ray diff --git a/src/ray/raylet_rpc_client/threaded_raylet_client.h b/src/ray/raylet_rpc_client/threaded_raylet_client.h index 701aef2b89db..5a8286c85898 100644 --- a/src/ray/raylet_rpc_client/threaded_raylet_client.h +++ b/src/ray/raylet_rpc_client/threaded_raylet_client.h @@ -38,11 +38,14 @@ class ThreadedRayletClient : public RayletClient { Status GetWorkerPIDs(std::shared_ptr> worker_pids, int64_t timeout_ms); + private: + /// Connect to the raylet on a singleton io service with a dedicated thread. + void ConnectOnSingletonIoContext(); + /// Connect to the raylet. The io service is provided from outside. /// \param io_service The io service to run the grpc event loop. void Connect(instrumented_io_context &io_service); - private: /// client call manager is created inside the raylet client, it should be kept active /// during the whole lifetime of client. std::unique_ptr client_call_manager_; @@ -50,7 +53,5 @@ class ThreadedRayletClient : public RayletClient { int port_; }; -void ConnectOnSingletonIoContext(ThreadedRayletClient &raylet_client); - } // namespace rpc } // namespace ray From b4e9a1d0f2a4efc048492bf953239b248753d95b Mon Sep 17 00:00:00 2001 From: tianyi-ge Date: Sat, 11 Oct 2025 15:21:59 +0800 Subject: [PATCH 11/23] [core] filter out system drivers for reporter Signed-off-by: tianyi-ge --- src/ray/common/constants.h | 3 +++ src/ray/gcs/gcs_job_manager.h | 3 --- src/ray/raylet/node_manager.cc | 3 ++- src/ray/raylet/worker_pool.cc | 14 +++++++++++++- src/ray/raylet/worker_pool.h | 5 +++-- 5 files changed, 21 insertions(+), 7 deletions(-) diff --git a/src/ray/common/constants.h b/src/ray/common/constants.h index aa9d858f2811..e4ed3812a9a8 100644 --- a/src/ray/common/constants.h +++ b/src/ray/common/constants.h @@ -150,3 +150,6 @@ constexpr char kImplicitResourcePrefix[] = "node:__internal_implicit_resource_"; /// PID of GCS process to record metrics. constexpr char kGcsPidKey[] = "gcs_pid"; + +// Please keep this in sync with the definition in ray_constants.py. +const std::string kRayInternalNamespacePrefix = "_ray_internal_"; // NOLINT diff --git a/src/ray/gcs/gcs_job_manager.h b/src/ray/gcs/gcs_job_manager.h index 81eb78db2c7c..23435c615b39 100644 --- a/src/ray/gcs/gcs_job_manager.h +++ b/src/ray/gcs/gcs_job_manager.h @@ -36,9 +36,6 @@ namespace ray { namespace gcs { -// Please keep this in sync with the definition in ray_constants.py. -const std::string kRayInternalNamespacePrefix = "_ray_internal_"; // NOLINT - // Please keep these in sync with the definition in dashboard/modules/job/common.py. // NOLINTNEXTLINE const std::string kJobDataKeyPrefix = kRayInternalNamespacePrefix + "job_info_"; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 4bbd4a343187..f25be84e96a8 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2792,7 +2792,8 @@ void NodeManager::HandleGetWorkerPIDs(rpc::GetWorkerPIDsRequest request, rpc::SendReplyCallback send_reply_callback) { auto all_workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_worker */ true, /* filter_io_workers */ true); - auto drivers = worker_pool_.GetAllRegisteredDrivers(/* filter_dead_driver */ true); + auto drivers = worker_pool_.GetAllRegisteredDrivers(/* filter_dead_drivers */ true, + /* filter_system_driver */ true); all_workers.insert(all_workers.end(), std::make_move_iterator(drivers.begin()), std::make_move_iterator(drivers.end())); diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 65bcdb774fc6..b6e56e72f2d0 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -1655,8 +1655,12 @@ bool WorkerPool::IsWorkerAvailableForScheduling() const { return false; } +bool IsInternalNamespace(const std::string &ray_namespace) { + return absl::StartsWith(ray_namespace, kRayInternalNamespacePrefix); +} + std::vector> WorkerPool::GetAllRegisteredDrivers( - bool filter_dead_drivers) const { + bool filter_dead_drivers, bool filter_system_drivers) const { std::vector> drivers; for (const auto &entry : states_by_lang_) { @@ -1668,6 +1672,14 @@ std::vector> WorkerPool::GetAllRegisteredDriver if (filter_dead_drivers && driver->IsDead()) { continue; } + + if (filter_system_drivers) { + auto job_config = GetJobConfig(driver->GetAssignedJobId()); + if (job_config.has_value() && IsInternalNamespace(job_config->ray_namespace())) { + continue; + } + } + drivers.push_back(driver); } } diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 71eeb09e1122..9c44233a9d90 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -219,7 +219,7 @@ class WorkerPoolInterface : public IOWorkerPoolInterface { std::unique_ptr runtime_env_agent_client) = 0; virtual std::vector> GetAllRegisteredDrivers( - bool filter_dead_drivers = false) const = 0; + bool filter_dead_drivers = false, bool filter_system_drivers = false) const = 0; virtual Status RegisterDriver(const std::shared_ptr &worker, const rpc::JobConfig &job_config, @@ -522,7 +522,8 @@ class WorkerPool : public WorkerPoolInterface { /// /// \return A list containing all the drivers. std::vector> GetAllRegisteredDrivers( - bool filter_dead_drivers = false) const override; + bool filter_dead_drivers = false, + bool filter_system_drivers = false) const override; /// Returns debug string for class. /// From 53e693c56af3e3dc91c3f7062db30394cafcdac0 Mon Sep 17 00:00:00 2001 From: tianyi-ge Date: Sat, 11 Oct 2025 23:03:18 +0800 Subject: [PATCH 12/23] [core] fix mock WorkerPool compilation Signed-off-by: tianyi-ge --- src/mock/ray/raylet/worker_pool.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mock/ray/raylet/worker_pool.h b/src/mock/ray/raylet/worker_pool.h index 6e8337aef2d4..dd8ff59b1904 100644 --- a/src/mock/ray/raylet/worker_pool.h +++ b/src/mock/ray/raylet/worker_pool.h @@ -62,7 +62,7 @@ class MockWorkerPool : public WorkerPoolInterface { (override)); MOCK_METHOD((std::vector>), GetAllRegisteredDrivers, - (bool filter_dead_drivers), + (bool filter_dead_drivers, bool filter_system_drivers), (const, override)); MOCK_METHOD(Status, RegisterDriver, From 62b48cf66bb6499c2be49c1d06ebd3b1daa2d5c9 Mon Sep 17 00:00:00 2001 From: tianyi-ge Date: Sun, 12 Oct 2025 10:48:13 +0800 Subject: [PATCH 13/23] [core] fix mock WorkerPool compilation Signed-off-by: tianyi-ge --- src/ray/raylet/node_manager.cc | 12 ++++++------ .../scheduling/tests/cluster_lease_manager_test.cc | 2 +- src/ray/raylet/tests/local_lease_manager_test.cc | 2 +- src/ray/raylet/tests/node_manager_test.cc | 6 +++--- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index f25be84e96a8..020eb9a7c0a2 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -659,7 +659,7 @@ void NodeManager::QueryAllWorkerStates( bool include_task_info, int64_t limit, const std::function &on_all_replied) { - auto all_workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_worker */ true, + auto all_workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_workers */ true, /*filter_io_workers*/ true); for (auto &driver : worker_pool_.GetAllRegisteredDrivers(/* filter_dead_driver */ true)) { @@ -957,7 +957,7 @@ void NodeManager::HandleNotifyGCSRestart(rpc::NotifyGCSRestartRequest request, // registered to raylet first (blocking call) and then connect to GCS, so there is no // race condition here. gcs_client_.AsyncResubscribe(); - auto workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_worker */ true); + auto workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_workers */ true); for (auto worker : workers) { worker->AsyncNotifyGCSRestart(); } @@ -2540,7 +2540,7 @@ void NodeManager::HandleGetNodeStats(rpc::GetNodeStatsRequest node_stats_request // and return the information from HandleNodesStatsRequest. The caller of // HandleGetNodeStats should set a timeout so that the rpc finishes even if not all // workers have replied. - auto all_workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_worker */ true); + auto all_workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_workers */ true); absl::flat_hash_set driver_ids; for (const auto &driver : worker_pool_.GetAllRegisteredDrivers(/* filter_dead_driver */ true)) { @@ -2790,10 +2790,10 @@ void NodeManager::TriggerGlobalGC() { void NodeManager::HandleGetWorkerPIDs(rpc::GetWorkerPIDsRequest request, rpc::GetWorkerPIDsReply *reply, rpc::SendReplyCallback send_reply_callback) { - auto all_workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_worker */ true, + auto all_workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_workers */ true, /* filter_io_workers */ true); auto drivers = worker_pool_.GetAllRegisteredDrivers(/* filter_dead_drivers */ true, - /* filter_system_driver */ true); + /* filter_system_drivers */ true); all_workers.insert(all_workers.end(), std::make_move_iterator(drivers.begin()), std::make_move_iterator(drivers.end())); @@ -2809,7 +2809,7 @@ void NodeManager::Stop() { #if !defined(_WIN32) // Best-effort process-group cleanup for any remaining workers before shutdown. if (RayConfig::instance().process_group_cleanup_enabled()) { - auto workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_worker */ true, + auto workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_workers */ true, /* filter_io_workers */ false); for (const auto &w : workers) { auto saved = w->GetSavedProcessGroupId(); diff --git a/src/ray/raylet/scheduling/tests/cluster_lease_manager_test.cc b/src/ray/raylet/scheduling/tests/cluster_lease_manager_test.cc index aabdab0c559e..bbd717a52008 100644 --- a/src/ray/raylet/scheduling/tests/cluster_lease_manager_test.cc +++ b/src/ray/raylet/scheduling/tests/cluster_lease_manager_test.cc @@ -158,7 +158,7 @@ class MockWorkerPool : public WorkerPoolInterface { } std::vector> GetAllRegisteredDrivers( - bool filter_dead_drivers) const override { + bool filter_dead_drivers, bool filter_system_drivers) const override { RAY_CHECK(false) << "Not used."; return {}; } diff --git a/src/ray/raylet/tests/local_lease_manager_test.cc b/src/ray/raylet/tests/local_lease_manager_test.cc index 7dabaebbc7fe..c0a175c78f4e 100644 --- a/src/ray/raylet/tests/local_lease_manager_test.cc +++ b/src/ray/raylet/tests/local_lease_manager_test.cc @@ -162,7 +162,7 @@ class MockWorkerPool : public WorkerPoolInterface { } std::vector> GetAllRegisteredDrivers( - bool filter_dead_drivers) const override { + bool filter_dead_drivers, bool filter_system_drivers) const override { RAY_CHECK(false) << "Not used."; return {}; } diff --git a/src/ray/raylet/tests/node_manager_test.cc b/src/ray/raylet/tests/node_manager_test.cc index 5b94420df01f..37fd034966e7 100644 --- a/src/ray/raylet/tests/node_manager_test.cc +++ b/src/ray/raylet/tests/node_manager_test.cc @@ -458,7 +458,7 @@ TEST_F(NodeManagerTest, TestRegisterGcsAndCheckSelfAlive) { .WillOnce(Return(Status::OK())); EXPECT_CALL(mock_worker_pool_, GetAllRegisteredWorkers(_, _)) .WillRepeatedly(Return(std::vector>{})); - EXPECT_CALL(mock_worker_pool_, GetAllRegisteredDrivers(_)) + EXPECT_CALL(mock_worker_pool_, GetAllRegisteredDrivers(_, _)) .WillRepeatedly(Return(std::vector>{})); EXPECT_CALL(mock_worker_pool_, IsWorkerAvailableForScheduling()) .WillRepeatedly(Return(false)); @@ -484,7 +484,7 @@ TEST_F(NodeManagerTest, TestDetachedWorkerIsKilledByFailedWorker) { .WillOnce(Return(Status::OK())); EXPECT_CALL(mock_worker_pool_, GetAllRegisteredWorkers(_, _)) .WillRepeatedly(Return(std::vector>{})); - EXPECT_CALL(mock_worker_pool_, GetAllRegisteredDrivers(_)) + EXPECT_CALL(mock_worker_pool_, GetAllRegisteredDrivers(_, _)) .WillRepeatedly(Return(std::vector>{})); EXPECT_CALL(mock_worker_pool_, IsWorkerAvailableForScheduling()) .WillRepeatedly(Return(false)); @@ -562,7 +562,7 @@ TEST_F(NodeManagerTest, TestDetachedWorkerIsKilledByFailedNode) { .WillOnce(Return(Status::OK())); EXPECT_CALL(mock_worker_pool_, GetAllRegisteredWorkers(_, _)) .WillRepeatedly(Return(std::vector>{})); - EXPECT_CALL(mock_worker_pool_, GetAllRegisteredDrivers(_)) + EXPECT_CALL(mock_worker_pool_, GetAllRegisteredDrivers(_, _)) .WillRepeatedly(Return(std::vector>{})); EXPECT_CALL(mock_worker_pool_, IsWorkerAvailableForScheduling()) .WillRepeatedly(Return(false)); From be70b12907b980a372d54cd22033fdea612d1daf Mon Sep 17 00:00:00 2001 From: tianyi-ge Date: Tue, 14 Oct 2025 10:45:12 +0800 Subject: [PATCH 14/23] [core] nit updates Signed-off-by: tianyi-ge --- .../modules/reporter/reporter_agent.py | 19 ++++++++++--------- .../modules/reporter/tests/test_reporter.py | 3 +++ src/ray/raylet/worker_pool.h | 3 +++ src/ray/raylet_rpc_client/raylet_client.cc | 1 + .../threaded_raylet_client.cc | 8 -------- .../threaded_raylet_client.h | 7 ------- 6 files changed, 17 insertions(+), 24 deletions(-) diff --git a/python/ray/dashboard/modules/reporter/reporter_agent.py b/python/ray/dashboard/modules/reporter/reporter_agent.py index 9d479b1f4e52..be7e30483829 100644 --- a/python/ray/dashboard/modules/reporter/reporter_agent.py +++ b/python/ray/dashboard/modules/reporter/reporter_agent.py @@ -911,15 +911,16 @@ def _generate_worker_key(self, proc: psutil.Process) -> Tuple[int, float]: def _get_worker_processes(self): pids = self._get_worker_pids_from_raylet() logger.debug(f"Worker PIDs from raylet: {pids}") - if pids: - workers = {} - for pid in pids: - try: - proc = psutil.Process(pid) - workers[self._generate_worker_key(proc)] = proc - except (psutil.NoSuchProcess, psutil.AccessDenied): - continue - return workers + if not pids: + return [] + workers = {} + for pid in pids: + try: + proc = psutil.Process(pid) + workers[self._generate_worker_key(proc)] = proc + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + return workers def _get_workers(self, gpus: Optional[List[GpuUtilizationInfo]] = None): workers = self._get_worker_processes() diff --git a/python/ray/dashboard/modules/reporter/tests/test_reporter.py b/python/ray/dashboard/modules/reporter/tests/test_reporter.py index b25a48bdcf79..79c81e1fb781 100644 --- a/python/ray/dashboard/modules/reporter/tests/test_reporter.py +++ b/python/ray/dashboard/modules/reporter/tests/test_reporter.py @@ -1201,7 +1201,10 @@ def ping(self): agent = ReporterAgent(dashboard_agent) pids = agent._get_worker_pids_from_raylet() assert len(pids) == 2 + # check if worker is reported assert "ray::MyActor" in [psutil.Process(pid).cmdline()[0] for pid in pids] + # check if driver is reported + assert os.getpid() in pids if __name__ == "__main__": diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 9c44233a9d90..c9423c96cdad 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -519,6 +519,9 @@ class WorkerPool : public WorkerPoolInterface { /// /// \param filter_dead_drivers whether or not if this method will filter dead drivers /// that are still registered. + /// \param filter_system_drivers whether or not if this method will filter system + /// drivers. A system driver is a driver with job config namespace starting with + /// "__ray_internal__". /// /// \return A list containing all the drivers. std::vector> GetAllRegisteredDrivers( diff --git a/src/ray/raylet_rpc_client/raylet_client.cc b/src/ray/raylet_rpc_client/raylet_client.cc index 35fe8df51cbc..8bf6790d45e8 100644 --- a/src/ray/raylet_rpc_client/raylet_client.cc +++ b/src/ray/raylet_rpc_client/raylet_client.cc @@ -464,5 +464,6 @@ void RayletClient::GetNodeStats( grpc_client_, /*method_timeout_ms*/ -1); } + } // namespace rpc } // namespace ray diff --git a/src/ray/raylet_rpc_client/threaded_raylet_client.cc b/src/ray/raylet_rpc_client/threaded_raylet_client.cc index cca061b56873..f7dad3ba8cb1 100644 --- a/src/ray/raylet_rpc_client/threaded_raylet_client.cc +++ b/src/ray/raylet_rpc_client/threaded_raylet_client.cc @@ -31,16 +31,8 @@ ThreadedRayletClient::ThreadedRayletClient(const std::string &ip_address, int po : RayletClient(), ip_address_(ip_address), port_(port) { // Connect to the raylet on a singleton io service with a dedicated thread. // This is to avoid creating multiple threads for multiple clients in python. - ConnectOnSingletonIoContext(); -} - -void ThreadedRayletClient::ConnectOnSingletonIoContext() { static InstrumentedIOContextWithThread io_context("raylet_client_io_service"); instrumented_io_context &io_service = io_context.GetIoService(); - Connect(io_service); -} - -void ThreadedRayletClient::Connect(instrumented_io_context &io_service) { client_call_manager_ = std::make_unique( io_service, /*record_stats=*/false, ip_address_); grpc_client_ = std::make_unique>( diff --git a/src/ray/raylet_rpc_client/threaded_raylet_client.h b/src/ray/raylet_rpc_client/threaded_raylet_client.h index 5a8286c85898..9f6e7fd45d4d 100644 --- a/src/ray/raylet_rpc_client/threaded_raylet_client.h +++ b/src/ray/raylet_rpc_client/threaded_raylet_client.h @@ -39,13 +39,6 @@ class ThreadedRayletClient : public RayletClient { int64_t timeout_ms); private: - /// Connect to the raylet on a singleton io service with a dedicated thread. - void ConnectOnSingletonIoContext(); - - /// Connect to the raylet. The io service is provided from outside. - /// \param io_service The io service to run the grpc event loop. - void Connect(instrumented_io_context &io_service); - /// client call manager is created inside the raylet client, it should be kept active /// during the whole lifetime of client. std::unique_ptr client_call_manager_; From f09822a28b8efab29377fae32afa7edda43a133a Mon Sep 17 00:00:00 2001 From: tianyi-ge Date: Wed, 15 Oct 2025 10:28:40 +0800 Subject: [PATCH 15/23] [core] replace inheritance ThreadedRayletClient with wrapper RayletClientWithIoContext Signed-off-by: tianyi-ge --- BUILD.bazel | 2 +- python/ray/includes/common.pxd | 6 +- python/ray/includes/raylet_client.pxi | 6 +- src/ray/raylet_rpc_client/BUILD.bazel | 6 +- src/ray/raylet_rpc_client/raylet_client.cc | 32 +++++++ src/ray/raylet_rpc_client/raylet_client.h | 9 +- .../raylet_client_with_io_context.cc | 56 +++++++++++ ...ient.h => raylet_client_with_io_context.h} | 14 +-- .../threaded_raylet_client.cc | 93 ------------------- 9 files changed, 112 insertions(+), 112 deletions(-) create mode 100644 src/ray/raylet_rpc_client/raylet_client_with_io_context.cc rename src/ray/raylet_rpc_client/{threaded_raylet_client.h => raylet_client_with_io_context.h} (76%) delete mode 100644 src/ray/raylet_rpc_client/threaded_raylet_client.cc diff --git a/BUILD.bazel b/BUILD.bazel index f0fdceaa06c8..cc326746c5f8 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -241,7 +241,7 @@ pyx_library( "//src/ray/gcs_rpc_client:global_state_accessor_lib", "//src/ray/protobuf:serialization_cc_proto", "//src/ray/pubsub:python_gcs_subscriber", - "//src/ray/raylet_rpc_client:threaded_raylet_client_lib", + "//src/ray/raylet_rpc_client:raylet_client_with_io_context_lib", "//src/ray/thirdparty/setproctitle", "//src/ray/util:memory", "//src/ray/util:raii", diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 3d5cb30172b3..43efb6edbf87 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -763,9 +763,9 @@ cdef extern from "src/ray/protobuf/autoscaler.pb.h" nogil: void ParseFromString(const c_string &serialized) const c_string &SerializeAsString() const -cdef extern from "ray/raylet_rpc_client/threaded_raylet_client.h" nogil: - cdef cppclass CThreadedRayletClient "ray::rpc::ThreadedRayletClient": - CThreadedRayletClient(const c_string &ip_address, int port) +cdef extern from "ray/raylet_rpc_client/raylet_client_with_io_context.h" nogil: + cdef cppclass CRayletClientWithIoContext "ray::rpc::RayletClientWithIoContext": + CRayletClientWithIoContext(const c_string &ip_address, int port) CRayStatus GetWorkerPIDs(shared_ptr[c_vector[int32_t]] worker_pids, int64_t timeout_ms) diff --git a/python/ray/includes/raylet_client.pxi b/python/ray/includes/raylet_client.pxi index 5a7ed0d8b161..c5fa9d1da5e6 100644 --- a/python/ray/includes/raylet_client.pxi +++ b/python/ray/includes/raylet_client.pxi @@ -3,7 +3,7 @@ from libcpp.string cimport string as c_string from libc.stdint cimport int32_t as c_int32_t from libcpp.memory cimport unique_ptr, make_unique, shared_ptr from ray.includes.common cimport ( - CThreadedRayletClient, + CRayletClientWithIoContext, CRayStatus, CAddress, ConnectOnSingletonIoContext @@ -12,7 +12,7 @@ from cython.operator import dereference cdef class RayletClient: cdef: - unique_ptr[CThreadedRayletClient] inner + unique_ptr[CRayletClientWithIoContext] inner def __cinit__(self, ip_address: str, port: int): cdef: @@ -20,7 +20,7 @@ cdef class RayletClient: c_int32_t c_port c_ip_address = ip_address.encode('utf-8') c_port = port - self.inner = make_unique[CThreadedRayletClient](c_ip_address, c_port) + self.inner = make_unique[CRayletClientWithIoContext](c_ip_address, c_port) def get_worker_pids(self, timeout_ms: int = 1000) -> list[int]: """Get the PIDs of all workers registered with the raylet.""" diff --git a/src/ray/raylet_rpc_client/BUILD.bazel b/src/ray/raylet_rpc_client/BUILD.bazel index da7fd1a9ff79..99bdb9052937 100644 --- a/src/ray/raylet_rpc_client/BUILD.bazel +++ b/src/ray/raylet_rpc_client/BUILD.bazel @@ -44,9 +44,9 @@ ray_cc_library( ) ray_cc_library( - name = "threaded_raylet_client_lib", - srcs = ["threaded_raylet_client.cc"], - hdrs = ["threaded_raylet_client.h"], + name = "raylet_client_with_io_context_lib", + srcs = ["raylet_client_with_io_context.cc"], + hdrs = ["raylet_client_with_io_context.h"], visibility = ["//visibility:public"], deps = [ ":raylet_client_interface", diff --git a/src/ray/raylet_rpc_client/raylet_client.cc b/src/ray/raylet_rpc_client/raylet_client.cc index 8bf6790d45e8..543c9d1636ea 100644 --- a/src/ray/raylet_rpc_client/raylet_client.cc +++ b/src/ray/raylet_rpc_client/raylet_client.cc @@ -465,5 +465,37 @@ void RayletClient::GetNodeStats( /*method_timeout_ms*/ -1); } +Status RayletClient::GetWorkerPIDs(std::shared_ptr> worker_pids, + int64_t timeout_ms) { + rpc::GetWorkerPIDsRequest request; + auto promise = std::make_shared>(); + std::weak_ptr> weak_promise = promise; + std::weak_ptr> weak_worker_pids = worker_pids; + auto future = promise->get_future(); + auto callback = [weak_promise, weak_worker_pids](const Status &status, + rpc::GetWorkerPIDsReply &&reply) { + auto p = weak_promise.lock(); + auto workers = weak_worker_pids.lock(); + if (p != nullptr && workers != nullptr) { + if (status.ok()) { + *workers = std::vector(reply.pids().begin(), reply.pids().end()); + } + p->set_value(status); + } + }; + INVOKE_RETRYABLE_RPC_CALL(retryable_grpc_client_, + NodeManagerService, + GetWorkerPIDs, + request, + callback, + grpc_client_, + timeout_ms); + if (future.wait_for(std::chrono::milliseconds(timeout_ms)) == + std::future_status::timeout) { + return Status::TimedOut("Timed out getting worker PIDs from raylet"); + } + return future.get(); +} + } // namespace rpc } // namespace ray diff --git a/src/ray/raylet_rpc_client/raylet_client.h b/src/ray/raylet_rpc_client/raylet_client.h index 5533a39e66c6..b1b804d7fb41 100644 --- a/src/ray/raylet_rpc_client/raylet_client.h +++ b/src/ray/raylet_rpc_client/raylet_client.h @@ -50,8 +50,6 @@ class RayletClient : public RayletClientInterface { rpc::ClientCallManager &client_call_manager, std::function raylet_unavailable_timeout_callback); - RayletClient() = default; - std::shared_ptr GetChannel() const override; void RequestWorkerLease( @@ -165,6 +163,13 @@ class RayletClient : public RayletClientInterface { void GetNodeStats(const rpc::GetNodeStatsRequest &request, const rpc::ClientCallback &callback) override; + /// Get the worker pids from raylet. + /// \param worker_pids The output worker pids. + /// \param timeout_ms The timeout in milliseconds. + /// \return ray::Status + Status GetWorkerPIDs(std::shared_ptr> worker_pids, + int64_t timeout_ms); + protected: /// gRPC client to the NodeManagerService. std::shared_ptr> grpc_client_; diff --git a/src/ray/raylet_rpc_client/raylet_client_with_io_context.cc b/src/ray/raylet_rpc_client/raylet_client_with_io_context.cc new file mode 100644 index 000000000000..2bb1f78c234e --- /dev/null +++ b/src/ray/raylet_rpc_client/raylet_client_with_io_context.cc @@ -0,0 +1,56 @@ +// Copyright 2025 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/raylet_rpc_client/raylet_client_with_io_context.h" + +#include +#include +#include +#include + +#include "ray/common/asio/asio_util.h" +#include "ray/common/ray_config.h" +#include "ray/util/logging.h" +#include "src/ray/protobuf/node_manager.grpc.pb.h" + +namespace ray { +namespace rpc { + +RayletClientWithIoContext::RayletClientWithIoContext(const std::string &ip_address, + int port) { + // Connect to the raylet on a singleton io service with a dedicated thread. + // This is to avoid creating multiple threads for multiple clients in python. + static InstrumentedIOContextWithThread io_context("raylet_client_io_service"); + instrumented_io_context &io_service = io_context.GetIoService(); + client_call_manager_ = std::make_unique( + io_service, /*record_stats=*/false, ip_address); + auto raylet_unavailable_timeout_callback = []() { + RAY_LOG(WARNING) << "Raylet is unavailable for " + << ::RayConfig::instance().raylet_rpc_server_reconnect_timeout_s() + << "s"; + }; + rpc::Address rpc_address; + rpc_address.set_ip_address(ip_address); + rpc_address.set_port(port); + raylet_client_ = std::make_unique( + rpc_address, *client_call_manager_, std::move(raylet_unavailable_timeout_callback)); +} + +Status RayletClientWithIoContext::GetWorkerPIDs( + std::shared_ptr> worker_pids, int64_t timeout_ms) { + return raylet_client_->GetWorkerPIDs(worker_pids, timeout_ms); +} + +} // namespace rpc +} // namespace ray diff --git a/src/ray/raylet_rpc_client/threaded_raylet_client.h b/src/ray/raylet_rpc_client/raylet_client_with_io_context.h similarity index 76% rename from src/ray/raylet_rpc_client/threaded_raylet_client.h rename to src/ray/raylet_rpc_client/raylet_client_with_io_context.h index 9f6e7fd45d4d..2ca74d27b188 100644 --- a/src/ray/raylet_rpc_client/threaded_raylet_client.h +++ b/src/ray/raylet_rpc_client/raylet_client_with_io_context.h @@ -20,16 +20,17 @@ namespace ray { namespace rpc { -/// Threaded raylet client is provided for python (e.g. ReporterAgent) to communicate with -/// raylet. It creates and manages a separate thread to run the grpc event loop -class ThreadedRayletClient : public RayletClient { +/// Raylet client with io context is provided for python (e.g. ReporterAgent) to +/// communicate with raylet. It creates and manages a separate thread to run the grpc +/// event loop +class RayletClientWithIoContext { public: - /// Connect to the raylet. Only used for cython wrapper `CThreadedRayletClient` + /// Connect to the raylet. Only used for cython wrapper `CRayletClientWithIoContext` /// new io service and new thread will be created inside. /// /// \param ip_address The IP address of raylet. /// \param port The port of raylet. - ThreadedRayletClient(const std::string &ip_address, int port); + RayletClientWithIoContext(const std::string &ip_address, int port); /// Get the worker pids from raylet. /// \param worker_pids The output worker pids. @@ -42,8 +43,7 @@ class ThreadedRayletClient : public RayletClient { /// client call manager is created inside the raylet client, it should be kept active /// during the whole lifetime of client. std::unique_ptr client_call_manager_; - std::string ip_address_; - int port_; + std::unique_ptr raylet_client_; }; } // namespace rpc diff --git a/src/ray/raylet_rpc_client/threaded_raylet_client.cc b/src/ray/raylet_rpc_client/threaded_raylet_client.cc deleted file mode 100644 index f7dad3ba8cb1..000000000000 --- a/src/ray/raylet_rpc_client/threaded_raylet_client.cc +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright 2025 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "ray/raylet_rpc_client/threaded_raylet_client.h" - -#include -#include -#include -#include - -#include "ray/common/asio/asio_util.h" -#include "ray/common/ray_config.h" -#include "ray/util/logging.h" -#include "src/ray/protobuf/node_manager.grpc.pb.h" - -namespace ray { -namespace rpc { - -ThreadedRayletClient::ThreadedRayletClient(const std::string &ip_address, int port) - : RayletClient(), ip_address_(ip_address), port_(port) { - // Connect to the raylet on a singleton io service with a dedicated thread. - // This is to avoid creating multiple threads for multiple clients in python. - static InstrumentedIOContextWithThread io_context("raylet_client_io_service"); - instrumented_io_context &io_service = io_context.GetIoService(); - client_call_manager_ = std::make_unique( - io_service, /*record_stats=*/false, ip_address_); - grpc_client_ = std::make_unique>( - ip_address_, port_, *client_call_manager_); - auto raylet_unavailable_timeout_callback = []() { - RAY_LOG(WARNING) << "Raylet is unavailable for " - << ::RayConfig::instance().raylet_rpc_server_reconnect_timeout_s() - << "s"; - }; - retryable_grpc_client_ = rpc::RetryableGrpcClient::Create( - grpc_client_->Channel(), - client_call_manager_->GetMainService(), - /*max_pending_requests_bytes=*/ - std::numeric_limits::max(), - /*check_channel_status_interval_milliseconds=*/ - ::RayConfig::instance().grpc_client_check_connection_status_interval_milliseconds(), - /*server_unavailable_timeout_seconds=*/ - ::RayConfig::instance().raylet_rpc_server_reconnect_timeout_s(), - /*server_unavailable_timeout_callback=*/ - raylet_unavailable_timeout_callback, - /*server_name=*/ - std::string("Raylet ") + ip_address_); -} - -Status ThreadedRayletClient::GetWorkerPIDs( - std::shared_ptr> worker_pids, int64_t timeout_ms) { - rpc::GetWorkerPIDsRequest request; - auto promise = std::make_shared>(); - std::weak_ptr> weak_promise = promise; - std::weak_ptr> weak_worker_pids = worker_pids; - auto future = promise->get_future(); - auto callback = [weak_promise, weak_worker_pids](const Status &status, - rpc::GetWorkerPIDsReply &&reply) { - auto p = weak_promise.lock(); - auto workers = weak_worker_pids.lock(); - if (p != nullptr && workers != nullptr) { - if (status.ok()) { - *workers = std::vector(reply.pids().begin(), reply.pids().end()); - } - p->set_value(status); - } - }; - INVOKE_RETRYABLE_RPC_CALL(retryable_grpc_client_, - NodeManagerService, - GetWorkerPIDs, - request, - callback, - grpc_client_, - timeout_ms); - if (future.wait_for(std::chrono::milliseconds(timeout_ms)) == - std::future_status::timeout) { - return Status::TimedOut("Timed out getting worker PIDs from raylet"); - } - return future.get(); -} - -} // namespace rpc -} // namespace ray From 6a6b7428f2409e05a606b034057d8a6009301e25 Mon Sep 17 00:00:00 2001 From: tianyi-ge Date: Wed, 15 Oct 2025 10:32:23 +0800 Subject: [PATCH 16/23] [core] remove exception-catch-all Signed-off-by: tianyi-ge --- python/ray/dashboard/modules/reporter/reporter_agent.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/python/ray/dashboard/modules/reporter/reporter_agent.py b/python/ray/dashboard/modules/reporter/reporter_agent.py index be7e30483829..399b076eb681 100644 --- a/python/ray/dashboard/modules/reporter/reporter_agent.py +++ b/python/ray/dashboard/modules/reporter/reporter_agent.py @@ -896,9 +896,6 @@ def _get_worker_pids_from_raylet(self) -> List[int]: except TimeoutError as e: logger.debug(f"Failed to get worker pids from raylet: {e}") return [] - except Exception as e: - logger.error(f"Unexpectedly failed to get worker pids from raylet: {e}") - raise def _get_agent_proc(self) -> psutil.Process: # Agent is the current process. From 02656c0880d9a40ba6f100c62adac95c73387e0e Mon Sep 17 00:00:00 2001 From: tianyi-ge Date: Thu, 16 Oct 2025 10:30:24 +0800 Subject: [PATCH 17/23] [core] catch RuntimeError and log with exception Signed-off-by: tianyi-ge --- python/ray/dashboard/modules/reporter/reporter_agent.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/ray/dashboard/modules/reporter/reporter_agent.py b/python/ray/dashboard/modules/reporter/reporter_agent.py index 399b076eb681..76ae046427a5 100644 --- a/python/ray/dashboard/modules/reporter/reporter_agent.py +++ b/python/ray/dashboard/modules/reporter/reporter_agent.py @@ -893,8 +893,8 @@ def _get_worker_pids_from_raylet(self) -> List[int]: try: # Get worker pids from raylet via gRPC. return self._raylet_client.get_worker_pids() - except TimeoutError as e: - logger.debug(f"Failed to get worker pids from raylet: {e}") + except (TimeoutError, RuntimeError): + logger.exception("Failed to get worker pids from raylet") return [] def _get_agent_proc(self) -> psutil.Process: @@ -916,6 +916,7 @@ def _get_worker_processes(self): proc = psutil.Process(pid) workers[self._generate_worker_key(proc)] = proc except (psutil.NoSuchProcess, psutil.AccessDenied): + logger.error(f"Failed to access worker process {pid}") continue return workers From 3521a915cb0fc4db8f4c8746a4fe598f9691dd64 Mon Sep 17 00:00:00 2001 From: tianyi-ge Date: Thu, 16 Oct 2025 21:53:45 +0800 Subject: [PATCH 18/23] [core] update config raylet_rpc_server_reconnect_timeout_max_s Signed-off-by: tianyi-ge --- src/ray/raylet_rpc_client/raylet_client_with_io_context.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ray/raylet_rpc_client/raylet_client_with_io_context.cc b/src/ray/raylet_rpc_client/raylet_client_with_io_context.cc index 2bb1f78c234e..a68bcf105726 100644 --- a/src/ray/raylet_rpc_client/raylet_client_with_io_context.cc +++ b/src/ray/raylet_rpc_client/raylet_client_with_io_context.cc @@ -36,9 +36,9 @@ RayletClientWithIoContext::RayletClientWithIoContext(const std::string &ip_addre client_call_manager_ = std::make_unique( io_service, /*record_stats=*/false, ip_address); auto raylet_unavailable_timeout_callback = []() { - RAY_LOG(WARNING) << "Raylet is unavailable for " - << ::RayConfig::instance().raylet_rpc_server_reconnect_timeout_s() - << "s"; + RAY_LOG(WARNING) + << "Raylet is unavailable for " + << ::RayConfig::instance().raylet_rpc_server_reconnect_timeout_max_s() << "s"; }; rpc::Address rpc_address; rpc_address.set_ip_address(ip_address); From 9940d37693abf358477a23fc58198a0e1342ab93 Mon Sep 17 00:00:00 2001 From: tianyi-ge Date: Sat, 18 Oct 2025 00:24:12 +0800 Subject: [PATCH 19/23] [core] change GetWorkerPIDs to async Signed-off-by: tianyi-ge --- python/ray/_private/ray_constants.py | 8 ---- python/ray/_private/worker.py | 2 +- python/ray/dashboard/modules/job/common.py | 8 ++-- python/ray/dashboard/modules/job/utils.py | 4 +- .../modules/reporter/reporter_agent.py | 12 +++-- .../modules/reporter/tests/test_reporter.py | 12 ++++- python/ray/dashboard/optional_utils.py | 3 +- python/ray/includes/common.pxd | 3 +- python/ray/includes/common.pxi | 7 +++ python/ray/includes/raylet_client.pxi | 46 ++++++++++++------- src/ray/common/constants.h | 4 +- src/ray/gcs/gcs_job_manager.h | 3 +- src/ray/protobuf/node_manager.proto | 7 +-- src/ray/raylet_rpc_client/BUILD.bazel | 1 + src/ray/raylet_rpc_client/raylet_client.cc | 31 ++++--------- src/ray/raylet_rpc_client/raylet_client.h | 8 ++-- .../raylet_client_with_io_context.cc | 6 +-- .../raylet_client_with_io_context.h | 7 ++- 18 files changed, 93 insertions(+), 79 deletions(-) diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index 973c5d490042..576b0aeccfd5 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -459,14 +459,6 @@ def env_set_by_user(key): # Default max_concurrency option in @ray.remote for threaded actors. DEFAULT_MAX_CONCURRENCY_THREADED = 1 -# Prefix for namespaces which are used internally by ray. -# Jobs within these namespaces should be hidden from users -# and should not be considered user activity. -# Please keep this in sync with the definition kRayInternalNamespacePrefix -# in /src/ray/gcs/gcs_server/gcs_job_manager.h. -RAY_INTERNAL_NAMESPACE_PREFIX = "_ray_internal_" -RAY_INTERNAL_DASHBOARD_NAMESPACE = f"{RAY_INTERNAL_NAMESPACE_PREFIX}dashboard" - # Ray internal flags. These flags should not be set by users, and we strip them on job # submission. # This should be consistent with src/ray/common/ray_internal_flag_def.h diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 6b015cdcdc81..d9d210c1710c 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -2625,7 +2625,7 @@ def connect( # We also want to skip adding script directory when running from dashboard. code_paths = [] if not interactive_mode and not ( - namespace and namespace == ray_constants.RAY_INTERNAL_DASHBOARD_NAMESPACE + namespace and namespace == ray._raylet.RAY_INTERNAL_DASHBOARD_NAMESPACE ): script_directory = os.path.dirname(os.path.realpath(sys.argv[0])) # If driver's sys.path doesn't include the script directory diff --git a/python/ray/dashboard/modules/job/common.py b/python/ray/dashboard/modules/job/common.py index 4466740f0ed8..47cff07b6a5a 100644 --- a/python/ray/dashboard/modules/job/common.py +++ b/python/ray/dashboard/modules/job/common.py @@ -14,7 +14,7 @@ get_export_event_logger, ) from ray._private.runtime_env.packaging import parse_uri -from ray._raylet import GcsClient +from ray._raylet import RAY_INTERNAL_NAMESPACE_PREFIX, GcsClient from ray.core.generated.export_event_pb2 import ExportEvent from ray.core.generated.export_submission_job_event_pb2 import ( ExportSubmissionJobEventData, @@ -25,9 +25,7 @@ # they're exposed in the snapshot API. JOB_ID_METADATA_KEY = "job_submission_id" JOB_NAME_METADATA_KEY = "job_name" -JOB_ACTOR_NAME_TEMPLATE = ( - f"{ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX}job_actor_" + "{job_id}" -) +JOB_ACTOR_NAME_TEMPLATE = f"{RAY_INTERNAL_NAMESPACE_PREFIX}job_actor_" + "{job_id}" # In order to get information about SupervisorActors launched by different jobs, # they must be set to the same namespace. SUPERVISOR_ACTOR_RAY_NAMESPACE = "SUPERVISOR_ACTOR_RAY_NAMESPACE" @@ -227,7 +225,7 @@ class JobInfoStorageClient: # Please keep this format in sync with JobDataKey() # in src/ray/gcs/gcs_server/gcs_job_manager.h. - JOB_DATA_KEY_PREFIX = f"{ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX}job_info_" + JOB_DATA_KEY_PREFIX = f"{RAY_INTERNAL_NAMESPACE_PREFIX}job_info_" JOB_DATA_KEY = f"{JOB_DATA_KEY_PREFIX}{{job_id}}" def __init__( diff --git a/python/ray/dashboard/modules/job/utils.py b/python/ray/dashboard/modules/job/utils.py index c798d2a8631f..e2eb60876695 100644 --- a/python/ray/dashboard/modules/job/utils.py +++ b/python/ray/dashboard/modules/job/utils.py @@ -8,7 +8,7 @@ from typing import Any, AsyncIterator, Dict, List, Optional, Tuple, Union from ray._private import ray_constants -from ray._raylet import GcsClient +from ray._raylet import RAY_INTERNAL_NAMESPACE_PREFIX, GcsClient from ray.dashboard.modules.job.common import ( JOB_ID_METADATA_KEY, JobInfoStorageClient, @@ -178,7 +178,7 @@ async def get_driver_jobs( submission_job_drivers = {} for job_table_entry in sorted_job_infos: if job_table_entry.config.ray_namespace.startswith( - ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX + RAY_INTERNAL_NAMESPACE_PREFIX ): # Skip jobs in any _ray_internal_ namespace continue diff --git a/python/ray/dashboard/modules/reporter/reporter_agent.py b/python/ray/dashboard/modules/reporter/reporter_agent.py index b8de5c248a71..b680217e99b1 100644 --- a/python/ray/dashboard/modules/reporter/reporter_agent.py +++ b/python/ray/dashboard/modules/reporter/reporter_agent.py @@ -66,6 +66,10 @@ from ray.dashboard.modules.reporter.reporter_models import ( StatsPayload, ) +from ray.exceptions import ( + GetTimeoutError, + RpcError, +) import psutil @@ -892,11 +896,11 @@ def _get_disk_io_stats(): stats.write_count, ) - def _get_worker_pids_from_raylet(self) -> List[int]: + async def _get_worker_pids_from_raylet(self) -> List[int]: try: # Get worker pids from raylet via gRPC. - return self._raylet_client.get_worker_pids() - except (TimeoutError, RuntimeError): + return await self._raylet_client.async_get_worker_pids() + except (GetTimeoutError, RpcError): logger.exception("Failed to get worker pids from raylet") return [] @@ -909,7 +913,7 @@ def _generate_worker_key(self, proc: psutil.Process) -> Tuple[int, float]: return (proc.pid, proc.create_time()) def _get_worker_processes(self): - pids = self._get_worker_pids_from_raylet() + pids = asyncio.run(self._get_worker_pids_from_raylet()) logger.debug(f"Worker PIDs from raylet: {pids}") if not pids: return [] diff --git a/python/ray/dashboard/modules/reporter/tests/test_reporter.py b/python/ray/dashboard/modules/reporter/tests/test_reporter.py index 330e22975e90..ea8942f7947d 100644 --- a/python/ray/dashboard/modules/reporter/tests/test_reporter.py +++ b/python/ray/dashboard/modules/reporter/tests/test_reporter.py @@ -1213,7 +1213,15 @@ def test_get_cluster_metadata(ray_start_with_dashboard): assert resp_data["rayInitCluster"] == meta["ray_init_cluster"] -def test_reporter_raylet_agent(ray_start_with_dashboard): +@pytest.mark.asyncio +@pytest.mark.parametrize( + "ray_start_with_dashboard", + [ + {"num_cpus": 1}, + ], + indirect=True, +) +async def test_reporter_raylet_agent(ray_start_with_dashboard): @ray.remote class MyActor: def ping(self): @@ -1228,7 +1236,7 @@ def ping(self): ray._private.worker.global_worker.node.node_manager_port ) agent = ReporterAgent(dashboard_agent) - pids = agent._get_worker_pids_from_raylet() + pids = await agent._get_worker_pids_from_raylet() assert len(pids) == 2 # check if worker is reported assert "ray::MyActor" in [psutil.Process(pid).cmdline()[0] for pid in pids] diff --git a/python/ray/dashboard/optional_utils.py b/python/ray/dashboard/optional_utils.py index 94311565e246..191f64f61c46 100644 --- a/python/ray/dashboard/optional_utils.py +++ b/python/ray/dashboard/optional_utils.py @@ -18,7 +18,8 @@ import ray import ray.dashboard.consts as dashboard_consts import ray.dashboard.utils as dashboard_utils -from ray._private.ray_constants import RAY_INTERNAL_DASHBOARD_NAMESPACE, env_bool +from ray._private.ray_constants import env_bool +from ray._raylet import RAY_INTERNAL_DASHBOARD_NAMESPACE # All third-party dependencies that are not included in the minimal Ray # installation must be included in this file. This allows us to determine if diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 573bcea38599..3ec069ab333b 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -767,7 +767,7 @@ cdef extern from "src/ray/protobuf/autoscaler.pb.h" nogil: cdef extern from "ray/raylet_rpc_client/raylet_client_with_io_context.h" nogil: cdef cppclass CRayletClientWithIoContext "ray::rpc::RayletClientWithIoContext": CRayletClientWithIoContext(const c_string &ip_address, int port) - CRayStatus GetWorkerPIDs(shared_ptr[c_vector[int32_t]] worker_pids, + CRayStatus GetWorkerPIDs(const OptionalItemPyCallback[c_vector[int32_t]] &callback, int64_t timeout_ms) cdef extern from "ray/common/task/task_spec.h" nogil: @@ -803,3 +803,4 @@ cdef extern from "ray/common/constants.h" nogil: cdef const char[] kLabelKeyTpuSliceName cdef const char[] kLabelKeyTpuWorkerId cdef const char[] kLabelKeyTpuPodType + cdef const char[] kRayInternalNamespacePrefix diff --git a/python/ray/includes/common.pxi b/python/ray/includes/common.pxi index 636a4600b0ea..47dc390c2bbe 100644 --- a/python/ray/includes/common.pxi +++ b/python/ray/includes/common.pxi @@ -27,6 +27,7 @@ from ray.includes.common cimport ( kLabelKeyTpuSliceName, kLabelKeyTpuWorkerId, kLabelKeyTpuPodType, + kRayInternalNamespacePrefix, ) from ray.exceptions import ( @@ -159,3 +160,9 @@ RAY_NODE_TPU_TOPOLOGY_KEY = kLabelKeyTpuTopology.decode() RAY_NODE_TPU_SLICE_NAME_KEY = kLabelKeyTpuSliceName.decode() RAY_NODE_TPU_WORKER_ID_KEY = kLabelKeyTpuWorkerId.decode() RAY_NODE_TPU_POD_TYPE_KEY = kLabelKeyTpuPodType.decode() + +RAY_INTERNAL_NAMESPACE_PREFIX = kRayInternalNamespacePrefix.decode() +# Prefix for namespaces which are used internally by ray. +# Jobs within these namespaces should be hidden from users +# and should not be considered user activity. +RAY_INTERNAL_DASHBOARD_NAMESPACE = f"{RAY_INTERNAL_NAMESPACE_PREFIX}dashboard" diff --git a/python/ray/includes/raylet_client.pxi b/python/ray/includes/raylet_client.pxi index c5fa9d1da5e6..11538f82d2c5 100644 --- a/python/ray/includes/raylet_client.pxi +++ b/python/ray/includes/raylet_client.pxi @@ -1,14 +1,28 @@ +from asyncio import Future +import concurrent.futures from libcpp.vector cimport vector as c_vector from libcpp.string cimport string as c_string -from libc.stdint cimport int32_t as c_int32_t +from libc.stdint cimport int32_t +from libcpp.utility cimport move from libcpp.memory cimport unique_ptr, make_unique, shared_ptr from ray.includes.common cimport ( CRayletClientWithIoContext, CRayStatus, CAddress, - ConnectOnSingletonIoContext + OptionalItemPyCallback, ) -from cython.operator import dereference +from ray.includes.optional cimport optional + + +cdef convert_optional_vector_int32( + CRayStatus status, optional[c_vector[int32_t]] vec) with gil: + try: + check_status_timeout_as_rpc_error(status) + assert vec.has_value() + return move(vec.value()), None + except Exception as e: + return None, e + cdef class RayletClient: cdef: @@ -17,24 +31,22 @@ cdef class RayletClient: def __cinit__(self, ip_address: str, port: int): cdef: c_string c_ip_address - c_int32_t c_port + int32_t c_port c_ip_address = ip_address.encode('utf-8') c_port = port self.inner = make_unique[CRayletClientWithIoContext](c_ip_address, c_port) - def get_worker_pids(self, timeout_ms: int = 1000) -> list[int]: + def async_get_worker_pids(self, timeout_ms: int = 1000) -> Future[list[int]]: """Get the PIDs of all workers registered with the raylet.""" cdef: - shared_ptr[c_vector[c_int32_t]] pids - CRayStatus status - pids = make_shared[c_vector[c_int32_t]]() + fut = incremented_fut() + int32_t timeout = timeout_ms assert self.inner.get() is not NULL - status = self.inner.get().GetWorkerPIDs(pids, timeout_ms) - if status.IsTimedOut(): - raise TimeoutError(status.message()) - elif not status.ok(): - raise RuntimeError( - "Failed to get worker PIDs from raylet: " + status.message() - ) - assert pids.get() is not NULL - return [pid for pid in pids.get()[0]] + with nogil: + self.inner.get().GetWorkerPIDs( + OptionalItemPyCallback[c_vector[int32_t]]( + &convert_optional_vector_int32, + assign_and_decrement_fut, + fut), + timeout) + return asyncio.wrap_future(fut) diff --git a/src/ray/common/constants.h b/src/ray/common/constants.h index e4ed3812a9a8..bfd06e677e7e 100644 --- a/src/ray/common/constants.h +++ b/src/ray/common/constants.h @@ -151,5 +151,5 @@ constexpr char kImplicitResourcePrefix[] = "node:__internal_implicit_resource_"; /// PID of GCS process to record metrics. constexpr char kGcsPidKey[] = "gcs_pid"; -// Please keep this in sync with the definition in ray_constants.py. -const std::string kRayInternalNamespacePrefix = "_ray_internal_"; // NOLINT +// Prefix for namespaces which are used internally by ray. +constexpr char kRayInternalNamespacePrefix[] = "_ray_internal_"; // NOLINT diff --git a/src/ray/gcs/gcs_job_manager.h b/src/ray/gcs/gcs_job_manager.h index 4db4429916ab..f62091f52f56 100644 --- a/src/ray/gcs/gcs_job_manager.h +++ b/src/ray/gcs/gcs_job_manager.h @@ -38,7 +38,8 @@ namespace gcs { // Please keep these in sync with the definition in dashboard/modules/job/common.py. // NOLINTNEXTLINE -const std::string kJobDataKeyPrefix = kRayInternalNamespacePrefix + "job_info_"; +const std::string kJobDataKeyPrefix = + std::string(kRayInternalNamespacePrefix) + "job_info_"; inline std::string JobDataKey(const std::string &submission_id) { return kJobDataKeyPrefix + submission_id; } diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 2f8d38457871..6ee66ca0b1cd 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -520,8 +520,9 @@ service NodeManagerService { // worker clients. The unavailable callback will eventually be retried so if this fails. rpc IsLocalWorkerDead(IsLocalWorkerDeadRequest) returns (IsLocalWorkerDeadReply); // Get the PIDs of all workers currently alive that are managed by the local Raylet. - // This includes connected driver processes. - // Failure: Will retry with the default timeout 1000ms. - // If fails, reply return an empty list. + // This includes connected driver processes but excludes system drivers (with namespace + // prefix "_ray_internal_") + // Failure: Will retry with the default timeout 1000ms. If fails, reply return an empty + // list. rpc GetWorkerPIDs(GetWorkerPIDsRequest) returns (GetWorkerPIDsReply); } diff --git a/src/ray/raylet_rpc_client/BUILD.bazel b/src/ray/raylet_rpc_client/BUILD.bazel index 99bdb9052937..f3208a4a9f27 100644 --- a/src/ray/raylet_rpc_client/BUILD.bazel +++ b/src/ray/raylet_rpc_client/BUILD.bazel @@ -35,6 +35,7 @@ ray_cc_library( deps = [ ":raylet_client_interface", "//src/ray/common:bundle_spec", + "//src/ray/common:gcs_callback_types", "//src/ray/common:ray_config", "//src/ray/protobuf:node_manager_cc_grpc", "//src/ray/rpc:retryable_grpc_client", diff --git a/src/ray/raylet_rpc_client/raylet_client.cc b/src/ray/raylet_rpc_client/raylet_client.cc index 39adc84bc6cd..1dc2e591334f 100644 --- a/src/ray/raylet_rpc_client/raylet_client.cc +++ b/src/ray/raylet_rpc_client/raylet_client.cc @@ -467,36 +467,25 @@ void RayletClient::GetNodeStats( /*method_timeout_ms*/ -1); } -Status RayletClient::GetWorkerPIDs(std::shared_ptr> worker_pids, - int64_t timeout_ms) { +void RayletClient::GetWorkerPIDs( + const gcs::OptionalItemCallback> &callback, int64_t timeout_ms) { rpc::GetWorkerPIDsRequest request; - auto promise = std::make_shared>(); - std::weak_ptr> weak_promise = promise; - std::weak_ptr> weak_worker_pids = worker_pids; - auto future = promise->get_future(); - auto callback = [weak_promise, weak_worker_pids](const Status &status, - rpc::GetWorkerPIDsReply &&reply) { - auto p = weak_promise.lock(); - auto workers = weak_worker_pids.lock(); - if (p != nullptr && workers != nullptr) { - if (status.ok()) { - *workers = std::vector(reply.pids().begin(), reply.pids().end()); - } - p->set_value(status); + auto client_callback = [callback](const Status &status, + rpc::GetWorkerPIDsReply &&reply) { + if (status.ok()) { + std::vector workers(reply.pids().begin(), reply.pids().end()); + callback(status, workers); + } else { + callback(status, std::nullopt); } }; INVOKE_RETRYABLE_RPC_CALL(retryable_grpc_client_, NodeManagerService, GetWorkerPIDs, request, - callback, + client_callback, grpc_client_, timeout_ms); - if (future.wait_for(std::chrono::milliseconds(timeout_ms)) == - std::future_status::timeout) { - return Status::TimedOut("Timed out getting worker PIDs from raylet"); - } - return future.get(); } } // namespace rpc diff --git a/src/ray/raylet_rpc_client/raylet_client.h b/src/ray/raylet_rpc_client/raylet_client.h index b1b804d7fb41..e5273c8ce36c 100644 --- a/src/ray/raylet_rpc_client/raylet_client.h +++ b/src/ray/raylet_rpc_client/raylet_client.h @@ -22,6 +22,7 @@ #include #include +#include "ray/common/gcs_callback_types.h" #include "ray/raylet_rpc_client/raylet_client_interface.h" #include "ray/rpc/grpc_client.h" #include "ray/rpc/retryable_grpc_client.h" @@ -164,11 +165,10 @@ class RayletClient : public RayletClientInterface { const rpc::ClientCallback &callback) override; /// Get the worker pids from raylet. - /// \param worker_pids The output worker pids. + /// \param callback The callback to set the worker pids. /// \param timeout_ms The timeout in milliseconds. - /// \return ray::Status - Status GetWorkerPIDs(std::shared_ptr> worker_pids, - int64_t timeout_ms); + void GetWorkerPIDs(const gcs::OptionalItemCallback> &callback, + int64_t timeout_ms); protected: /// gRPC client to the NodeManagerService. diff --git a/src/ray/raylet_rpc_client/raylet_client_with_io_context.cc b/src/ray/raylet_rpc_client/raylet_client_with_io_context.cc index a68bcf105726..6fbb971099f1 100644 --- a/src/ray/raylet_rpc_client/raylet_client_with_io_context.cc +++ b/src/ray/raylet_rpc_client/raylet_client_with_io_context.cc @@ -47,9 +47,9 @@ RayletClientWithIoContext::RayletClientWithIoContext(const std::string &ip_addre rpc_address, *client_call_manager_, std::move(raylet_unavailable_timeout_callback)); } -Status RayletClientWithIoContext::GetWorkerPIDs( - std::shared_ptr> worker_pids, int64_t timeout_ms) { - return raylet_client_->GetWorkerPIDs(worker_pids, timeout_ms); +void RayletClientWithIoContext::GetWorkerPIDs( + const gcs::OptionalItemCallback> &callback, int64_t timeout_ms) { + raylet_client_->GetWorkerPIDs(callback, timeout_ms); } } // namespace rpc diff --git a/src/ray/raylet_rpc_client/raylet_client_with_io_context.h b/src/ray/raylet_rpc_client/raylet_client_with_io_context.h index 2ca74d27b188..8800ea2c7e9b 100644 --- a/src/ray/raylet_rpc_client/raylet_client_with_io_context.h +++ b/src/ray/raylet_rpc_client/raylet_client_with_io_context.h @@ -33,11 +33,10 @@ class RayletClientWithIoContext { RayletClientWithIoContext(const std::string &ip_address, int port); /// Get the worker pids from raylet. - /// \param worker_pids The output worker pids. + /// \param callback The callback to set the worker pids. /// \param timeout_ms The timeout in milliseconds. - /// \return ray::Status - Status GetWorkerPIDs(std::shared_ptr> worker_pids, - int64_t timeout_ms); + void GetWorkerPIDs(const gcs::OptionalItemCallback> &callback, + int64_t timeout_ms); private: /// client call manager is created inside the raylet client, it should be kept active From e3c79a25f3e19380a9d63633a39c1af439fdf718 Mon Sep 17 00:00:00 2001 From: tianyi-ge Date: Sat, 18 Oct 2025 10:52:35 +0800 Subject: [PATCH 20/23] [core] update async logic Signed-off-by: tianyi-ge --- .../modules/reporter/reporter_agent.py | 23 ++++++++++++------- .../modules/reporter/tests/test_reporter.py | 19 +++++++-------- 2 files changed, 25 insertions(+), 17 deletions(-) diff --git a/python/ray/dashboard/modules/reporter/reporter_agent.py b/python/ray/dashboard/modules/reporter/reporter_agent.py index b680217e99b1..086076fac035 100644 --- a/python/ray/dashboard/modules/reporter/reporter_agent.py +++ b/python/ray/dashboard/modules/reporter/reporter_agent.py @@ -896,7 +896,7 @@ def _get_disk_io_stats(): stats.write_count, ) - async def _get_worker_pids_from_raylet(self) -> List[int]: + async def _async_get_worker_pids_from_raylet(self) -> List[int]: try: # Get worker pids from raylet via gRPC. return await self._raylet_client.async_get_worker_pids() @@ -912,8 +912,8 @@ def _get_agent_proc(self) -> psutil.Process: def _generate_worker_key(self, proc: psutil.Process) -> Tuple[int, float]: return (proc.pid, proc.create_time()) - def _get_worker_processes(self): - pids = asyncio.run(self._get_worker_pids_from_raylet()) + async def _async_get_worker_processes(self): + pids = await self._async_get_worker_pids_from_raylet() logger.debug(f"Worker PIDs from raylet: {pids}") if not pids: return [] @@ -927,8 +927,8 @@ def _get_worker_processes(self): continue return workers - def _get_workers(self, gpus: Optional[List[GpuUtilizationInfo]] = None): - workers = self._get_worker_processes() + async def _async_get_workers(self, gpus: Optional[List[GpuUtilizationInfo]] = None): + workers = await self._async_get_worker_processes() if not workers: return [] else: @@ -1067,7 +1067,7 @@ def _get_shm_usage(self): return None return mem.shared - def _collect_stats(self): + async def _async_collect_stats(self): now = dashboard_utils.to_posix_time(datetime.datetime.utcnow()) network_stats = self._get_network_stats() self._network_stats_hist.append((now, network_stats)) @@ -1088,7 +1088,7 @@ def _collect_stats(self): "mem": self._get_mem_usage(), # Unit is in bytes. None if "shm": self._get_shm_usage(), - "workers": self._get_workers(gpus), + "workers": await self._async_get_workers(gpus), "raylet": raylet, "agent": self._get_agent(), "bootTime": self._get_boot_time(), @@ -1751,7 +1751,14 @@ async def _run_loop(self): def _compose_stats_payload( self, cluster_autoscaling_stats_json: Optional[bytes] ) -> str: - stats = self._collect_stats() + return asyncio.run( + self._async_compose_stats_payload(cluster_autoscaling_stats_json) + ) + + async def _async_compose_stats_payload( + self, cluster_autoscaling_stats_json: Optional[bytes] + ) -> str: + stats = await self._async_collect_stats() # Report stats only when metrics collection is enabled. if not self._metrics_collection_disabled: diff --git a/python/ray/dashboard/modules/reporter/tests/test_reporter.py b/python/ray/dashboard/modules/reporter/tests/test_reporter.py index ea8942f7947d..e88d7d83548a 100644 --- a/python/ray/dashboard/modules/reporter/tests/test_reporter.py +++ b/python/ray/dashboard/modules/reporter/tests/test_reporter.py @@ -899,7 +899,8 @@ def test_enable_k8s_disk_usage(enable_k8s_disk_usage: bool): assert root_usage.free == 1 -def test_reporter_worker_cpu_percent(): +@pytest.mark.asyncio +async def test_reporter_worker_cpu_percent(): raylet_dummy_proc_f = psutil.Process agent_mock = Process(target=random_work) children = [Process(target=random_work) for _ in range(2)] @@ -916,11 +917,11 @@ def _get_agent_proc(self): def _generate_worker_key(self, proc): return (proc.pid, proc.create_time()) - def _get_worker_pids_from_raylet(self): + async def _async_get_worker_pids_from_raylet(self): return [p.pid for p in children] - def _get_worker_processes(self): - return ReporterAgent._get_worker_processes(self) + async def _async_get_worker_processes(self): + return await ReporterAgent._async_get_worker_processes(self) obj = ReporterAgentDummy() @@ -929,12 +930,12 @@ def _get_worker_processes(self): for child_proc in children: child_proc.start() children_pids = {p.pid for p in children} - workers = ReporterAgent._get_workers(obj) + workers = await ReporterAgent._async_get_workers(obj) # In the first run, the percent should be 0. assert all([worker["cpu_percent"] == 0.0 for worker in workers]) for _ in range(10): time.sleep(0.1) - workers = ReporterAgent._get_workers(obj) + workers = await ReporterAgent._async_get_workers(obj) workers_pids = {w["pid"] for w in workers} # Make sure all children are registered. @@ -950,14 +951,14 @@ def _get_worker_processes(self): print("killed ", children[0].pid) children[0].kill() wait_for_condition(lambda: not children[0].is_alive()) - workers = ReporterAgent._get_workers(obj) + workers = await ReporterAgent._async_get_workers(obj) workers_pids = {w["pid"] for w in workers} assert children[0].pid not in workers_pids assert children[1].pid in workers_pids children[1].kill() wait_for_condition(lambda: not children[1].is_alive()) - workers = ReporterAgent._get_workers(obj) + workers = await ReporterAgent._async_get_workers(obj) workers_pids = {w["pid"] for w in workers} assert children[0].pid not in workers_pids assert children[1].pid not in workers_pids @@ -1236,7 +1237,7 @@ def ping(self): ray._private.worker.global_worker.node.node_manager_port ) agent = ReporterAgent(dashboard_agent) - pids = await agent._get_worker_pids_from_raylet() + pids = await agent._async_get_worker_pids_from_raylet() assert len(pids) == 2 # check if worker is reported assert "ray::MyActor" in [psutil.Process(pid).cmdline()[0] for pid in pids] From 5edc6f08574d318b297525749002cac14daedfd5 Mon Sep 17 00:00:00 2001 From: tianyi-ge Date: Tue, 21 Oct 2025 00:17:13 +0800 Subject: [PATCH 21/23] [core] inject mocked raylet client and add e2e test Signed-off-by: tianyi-ge --- .../dashboard/modules/node/tests/test_node.py | 36 ++++++++++++++++++ .../modules/reporter/reporter_agent.py | 12 ++++-- .../modules/reporter/tests/test_reporter.py | 38 +++++++++---------- 3 files changed, 63 insertions(+), 23 deletions(-) diff --git a/python/ray/dashboard/modules/node/tests/test_node.py b/python/ray/dashboard/modules/node/tests/test_node.py index df4f980f4125..6cf9c10d01e3 100644 --- a/python/ray/dashboard/modules/node/tests/test_node.py +++ b/python/ray/dashboard/modules/node/tests/test_node.py @@ -283,5 +283,41 @@ def _check_workers(): wait_for_condition(_check_workers, timeout=10) +def test_worker_pids_reported(enable_test_module, ray_start_with_dashboard): + assert wait_until_server_available(ray_start_with_dashboard["webui_url"]) is True + webui_url = ray_start_with_dashboard["webui_url"] + webui_url = format_web_url(webui_url) + node_id = ray_start_with_dashboard["node_id"] + + @ray.remote(runtime_env={"uv": {"packages": ["requests==2.3.0"]}}) + class UvActor: + def get_pid(self): + return os.getpid() + + uv_actor = UvActor.remote() + uv_actor_pid = ray.get(uv_actor.get_pid.remote()) + driver_pid = os.getpid() + + def _check_worker_pids(): + try: + response = requests.get(webui_url + f"/nodes/{node_id}") + response.raise_for_status() + dump_info = response.json() + assert dump_info["result"] is True + detail = dump_info["data"]["detail"] + pids = [worker["pid"] for worker in detail["workers"]] + if len(pids) < 2: # might include idle worker + time.sleep(1) + return False + assert uv_actor_pid in pids + assert driver_pid in pids + return True + except Exception as ex: + logger.info(ex) + return False + + wait_for_condition(_check_worker_pids, timeout=20) + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/dashboard/modules/reporter/reporter_agent.py b/python/ray/dashboard/modules/reporter/reporter_agent.py index 086076fac035..937875c3a2a0 100644 --- a/python/ray/dashboard/modules/reporter/reporter_agent.py +++ b/python/ray/dashboard/modules/reporter/reporter_agent.py @@ -399,9 +399,10 @@ class ReporterAgent( Attributes: dashboard_agent: The DashboardAgent object contains global config + raylet_client: The RayletClient object to access raylet server """ - def __init__(self, dashboard_agent): + def __init__(self, dashboard_agent, raylet_client=None): """Initialize the reporter object.""" super().__init__(dashboard_agent) @@ -490,9 +491,12 @@ def __init__(self, dashboard_agent): # Create GPU metric provider instance self._gpu_metric_provider = GpuMetricProvider() - self._raylet_client = RayletClient( - ip_address=self._ip, port=self._dashboard_agent.node_manager_port - ) + if raylet_client: + self._raylet_client = raylet_client + else: + self._raylet_client = RayletClient( + ip_address=self._ip, port=self._dashboard_agent.node_manager_port + ) async def GetTraceback(self, request, context): pid = request.pid diff --git a/python/ray/dashboard/modules/reporter/tests/test_reporter.py b/python/ray/dashboard/modules/reporter/tests/test_reporter.py index e88d7d83548a..f281653d7ab6 100644 --- a/python/ray/dashboard/modules/reporter/tests/test_reporter.py +++ b/python/ray/dashboard/modules/reporter/tests/test_reporter.py @@ -312,11 +312,11 @@ def test_worker_stats(): wait_for_condition(test_worker_stats, retry_interval_ms=1000) -@patch("ray.dashboard.modules.reporter.reporter_agent.RayletClient") -def test_report_stats(mock_raylet_client): +def test_report_stats(): dashboard_agent = MagicMock() dashboard_agent.gcs_address = build_address("127.0.0.1", 6379) - agent = ReporterAgent(dashboard_agent) + raylet_client = MagicMock() + agent = ReporterAgent(dashboard_agent, raylet_client) # Assume it is a head node. agent._is_head_node = True @@ -388,11 +388,11 @@ def test_report_stats(mock_raylet_client): assert isinstance(stats_payload, str) -@patch("ray.dashboard.modules.reporter.reporter_agent.RayletClient") -def test_report_stats_gpu(mock_raylet_client): +def test_report_stats_gpu(): dashboard_agent = MagicMock() dashboard_agent.gcs_address = build_address("127.0.0.1", 6379) - agent = ReporterAgent(dashboard_agent) + raylet_client = MagicMock() + agent = ReporterAgent(dashboard_agent, raylet_client) # Assume it is a head node. agent._is_head_node = True # GPUstats query output example. @@ -499,11 +499,11 @@ def test_report_stats_gpu(mock_raylet_client): assert isinstance(stats_payload, str) -@patch("ray.dashboard.modules.reporter.reporter_agent.RayletClient") -def test_get_tpu_usage(mock_raylet_client): +def test_get_tpu_usage(): dashboard_agent = MagicMock() dashboard_agent.gcs_address = build_address("127.0.0.1", 6379) - agent = ReporterAgent(dashboard_agent) + raylet_client = MagicMock() + agent = ReporterAgent(dashboard_agent, raylet_client) fake_metrics_content = """ duty_cycle{accelerator_id="1234-0",container="ray-head",make="cloud-tpu",model="tpu-v6e-slice",namespace="default",pod="test",tpu_topology="2x2"} 20.0 @@ -557,11 +557,11 @@ def test_get_tpu_usage(mock_raylet_client): assert tpu_utilizations == expected_utilizations -@patch("ray.dashboard.modules.reporter.reporter_agent.RayletClient") -def test_report_stats_tpu(mock_raylet_client): +def test_report_stats_tpu(): dashboard_agent = MagicMock() dashboard_agent.gcs_address = build_address("127.0.0.1", 6379) - agent = ReporterAgent(dashboard_agent) + raylet_client = MagicMock() + agent = ReporterAgent(dashboard_agent, raylet_client) stats = copy.deepcopy(STATS_TEMPLATE) @@ -637,11 +637,11 @@ def test_report_stats_tpu(mock_raylet_client): assert isinstance(stats_payload, str) -@patch("ray.dashboard.modules.reporter.reporter_agent.RayletClient") -def test_report_per_component_stats(mock_raylet_client): +def test_report_per_component_stats(): dashboard_agent = MagicMock() dashboard_agent.gcs_address = build_address("127.0.0.1", 6379) - agent = ReporterAgent(dashboard_agent) + raylet_client = MagicMock() + agent = ReporterAgent(dashboard_agent, raylet_client) # Assume it is a head node. agent._is_head_node = True @@ -1225,11 +1225,11 @@ def test_get_cluster_metadata(ray_start_with_dashboard): async def test_reporter_raylet_agent(ray_start_with_dashboard): @ray.remote class MyActor: - def ping(self): - return "pong" + def get_pid(self): + return os.getpid() a = MyActor.remote() - assert ray.get(a.ping.remote()) == "pong" + worker_pid = ray.get(a.get_pid.remote()) dashboard_agent = MagicMock() dashboard_agent.gcs_address = build_address("127.0.0.1", 6379) dashboard_agent.ip = "127.0.0.1" @@ -1240,7 +1240,7 @@ def ping(self): pids = await agent._async_get_worker_pids_from_raylet() assert len(pids) == 2 # check if worker is reported - assert "ray::MyActor" in [psutil.Process(pid).cmdline()[0] for pid in pids] + assert worker_pid in pids # check if driver is reported assert os.getpid() in pids From e281cf4efa7088ccced86d8cb05fc9507f42238f Mon Sep 17 00:00:00 2001 From: tianyi-ge Date: Tue, 21 Oct 2025 09:00:33 +0800 Subject: [PATCH 22/23] [core] prettify e2e reporter test Signed-off-by: tianyi-ge --- python/ray/dashboard/modules/node/tests/test_node.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/ray/dashboard/modules/node/tests/test_node.py b/python/ray/dashboard/modules/node/tests/test_node.py index 6cf9c10d01e3..8ffc04c1ded2 100644 --- a/python/ray/dashboard/modules/node/tests/test_node.py +++ b/python/ray/dashboard/modules/node/tests/test_node.py @@ -306,9 +306,7 @@ def _check_worker_pids(): assert dump_info["result"] is True detail = dump_info["data"]["detail"] pids = [worker["pid"] for worker in detail["workers"]] - if len(pids) < 2: # might include idle worker - time.sleep(1) - return False + assert len(pids) >= 2 # might include idle worker assert uv_actor_pid in pids assert driver_pid in pids return True From 3b97fb63a1cdc71196d82eec07ff553f79323249 Mon Sep 17 00:00:00 2001 From: tianyi-ge Date: Tue, 21 Oct 2025 09:49:40 +0800 Subject: [PATCH 23/23] [core] rename sync _compose_stats_payload to _run_in_executor Signed-off-by: tianyi-ge --- python/ray/dashboard/modules/reporter/reporter_agent.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/python/ray/dashboard/modules/reporter/reporter_agent.py b/python/ray/dashboard/modules/reporter/reporter_agent.py index 937875c3a2a0..fff076e4cf51 100644 --- a/python/ray/dashboard/modules/reporter/reporter_agent.py +++ b/python/ray/dashboard/modules/reporter/reporter_agent.py @@ -1739,7 +1739,7 @@ async def _run_loop(self): # executor (TPE) to avoid blocking the Agent's event-loop json_payload = await loop.run_in_executor( self._executor, - self._compose_stats_payload, + self._run_in_executor, autoscaler_status_json_bytes, ) @@ -1752,9 +1752,7 @@ async def _run_loop(self): await asyncio.sleep(reporter_consts.REPORTER_UPDATE_INTERVAL_MS / 1000) - def _compose_stats_payload( - self, cluster_autoscaling_stats_json: Optional[bytes] - ) -> str: + def _run_in_executor(self, cluster_autoscaling_stats_json: Optional[bytes]) -> str: return asyncio.run( self._async_compose_stats_payload(cluster_autoscaling_stats_json) )