diff --git a/BUILD.bazel b/BUILD.bazel index f7133c52ed4b..e4eae1399ffd 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -242,6 +242,7 @@ pyx_library( "//src/ray/gcs_rpc_client:global_state_accessor_lib", "//src/ray/protobuf:serialization_cc_proto", "//src/ray/pubsub:python_gcs_subscriber", + "//src/ray/raylet_rpc_client:raylet_client_with_io_context_lib", "//src/ray/thirdparty/setproctitle", "//src/ray/util:memory", "//src/ray/util:raii", diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index 973c5d490042..576b0aeccfd5 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -459,14 +459,6 @@ def env_set_by_user(key): # Default max_concurrency option in @ray.remote for threaded actors. DEFAULT_MAX_CONCURRENCY_THREADED = 1 -# Prefix for namespaces which are used internally by ray. -# Jobs within these namespaces should be hidden from users -# and should not be considered user activity. -# Please keep this in sync with the definition kRayInternalNamespacePrefix -# in /src/ray/gcs/gcs_server/gcs_job_manager.h. -RAY_INTERNAL_NAMESPACE_PREFIX = "_ray_internal_" -RAY_INTERNAL_DASHBOARD_NAMESPACE = f"{RAY_INTERNAL_NAMESPACE_PREFIX}dashboard" - # Ray internal flags. These flags should not be set by users, and we strip them on job # submission. # This should be consistent with src/ray/common/ray_internal_flag_def.h diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 6b015cdcdc81..d9d210c1710c 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -2625,7 +2625,7 @@ def connect( # We also want to skip adding script directory when running from dashboard. code_paths = [] if not interactive_mode and not ( - namespace and namespace == ray_constants.RAY_INTERNAL_DASHBOARD_NAMESPACE + namespace and namespace == ray._raylet.RAY_INTERNAL_DASHBOARD_NAMESPACE ): script_directory = os.path.dirname(os.path.realpath(sys.argv[0])) # If driver's sys.path doesn't include the script directory diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 2ef7be64adeb..06f77fa0ec31 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -199,6 +199,7 @@ include "includes/libcoreworker.pxi" include "includes/global_state_accessor.pxi" include "includes/metric.pxi" include "includes/setproctitle.pxi" +include "includes/raylet_client.pxi" include "includes/gcs_subscriber.pxi" import ray diff --git a/python/ray/dashboard/modules/job/common.py b/python/ray/dashboard/modules/job/common.py index 4466740f0ed8..47cff07b6a5a 100644 --- a/python/ray/dashboard/modules/job/common.py +++ b/python/ray/dashboard/modules/job/common.py @@ -14,7 +14,7 @@ get_export_event_logger, ) from ray._private.runtime_env.packaging import parse_uri -from ray._raylet import GcsClient +from ray._raylet import RAY_INTERNAL_NAMESPACE_PREFIX, GcsClient from ray.core.generated.export_event_pb2 import ExportEvent from ray.core.generated.export_submission_job_event_pb2 import ( ExportSubmissionJobEventData, @@ -25,9 +25,7 @@ # they're exposed in the snapshot API. JOB_ID_METADATA_KEY = "job_submission_id" JOB_NAME_METADATA_KEY = "job_name" -JOB_ACTOR_NAME_TEMPLATE = ( - f"{ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX}job_actor_" + "{job_id}" -) +JOB_ACTOR_NAME_TEMPLATE = f"{RAY_INTERNAL_NAMESPACE_PREFIX}job_actor_" + "{job_id}" # In order to get information about SupervisorActors launched by different jobs, # they must be set to the same namespace. SUPERVISOR_ACTOR_RAY_NAMESPACE = "SUPERVISOR_ACTOR_RAY_NAMESPACE" @@ -227,7 +225,7 @@ class JobInfoStorageClient: # Please keep this format in sync with JobDataKey() # in src/ray/gcs/gcs_server/gcs_job_manager.h. - JOB_DATA_KEY_PREFIX = f"{ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX}job_info_" + JOB_DATA_KEY_PREFIX = f"{RAY_INTERNAL_NAMESPACE_PREFIX}job_info_" JOB_DATA_KEY = f"{JOB_DATA_KEY_PREFIX}{{job_id}}" def __init__( diff --git a/python/ray/dashboard/modules/job/utils.py b/python/ray/dashboard/modules/job/utils.py index c798d2a8631f..e2eb60876695 100644 --- a/python/ray/dashboard/modules/job/utils.py +++ b/python/ray/dashboard/modules/job/utils.py @@ -8,7 +8,7 @@ from typing import Any, AsyncIterator, Dict, List, Optional, Tuple, Union from ray._private import ray_constants -from ray._raylet import GcsClient +from ray._raylet import RAY_INTERNAL_NAMESPACE_PREFIX, GcsClient from ray.dashboard.modules.job.common import ( JOB_ID_METADATA_KEY, JobInfoStorageClient, @@ -178,7 +178,7 @@ async def get_driver_jobs( submission_job_drivers = {} for job_table_entry in sorted_job_infos: if job_table_entry.config.ray_namespace.startswith( - ray_constants.RAY_INTERNAL_NAMESPACE_PREFIX + RAY_INTERNAL_NAMESPACE_PREFIX ): # Skip jobs in any _ray_internal_ namespace continue diff --git a/python/ray/dashboard/modules/node/tests/test_node.py b/python/ray/dashboard/modules/node/tests/test_node.py index df4f980f4125..8ffc04c1ded2 100644 --- a/python/ray/dashboard/modules/node/tests/test_node.py +++ b/python/ray/dashboard/modules/node/tests/test_node.py @@ -283,5 +283,39 @@ def _check_workers(): wait_for_condition(_check_workers, timeout=10) +def test_worker_pids_reported(enable_test_module, ray_start_with_dashboard): + assert wait_until_server_available(ray_start_with_dashboard["webui_url"]) is True + webui_url = ray_start_with_dashboard["webui_url"] + webui_url = format_web_url(webui_url) + node_id = ray_start_with_dashboard["node_id"] + + @ray.remote(runtime_env={"uv": {"packages": ["requests==2.3.0"]}}) + class UvActor: + def get_pid(self): + return os.getpid() + + uv_actor = UvActor.remote() + uv_actor_pid = ray.get(uv_actor.get_pid.remote()) + driver_pid = os.getpid() + + def _check_worker_pids(): + try: + response = requests.get(webui_url + f"/nodes/{node_id}") + response.raise_for_status() + dump_info = response.json() + assert dump_info["result"] is True + detail = dump_info["data"]["detail"] + pids = [worker["pid"] for worker in detail["workers"]] + assert len(pids) >= 2 # might include idle worker + assert uv_actor_pid in pids + assert driver_pid in pids + return True + except Exception as ex: + logger.info(ex) + return False + + wait_for_condition(_check_worker_pids, timeout=20) + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/dashboard/modules/reporter/reporter_agent.py b/python/ray/dashboard/modules/reporter/reporter_agent.py index 6bee5eb12efa..fff076e4cf51 100644 --- a/python/ray/dashboard/modules/reporter/reporter_agent.py +++ b/python/ray/dashboard/modules/reporter/reporter_agent.py @@ -41,7 +41,7 @@ OpenTelemetryMetricRecorder, ) from ray._private.utils import get_system_memory -from ray._raylet import GCS_PID_KEY, WorkerID +from ray._raylet import GCS_PID_KEY, RayletClient, WorkerID from ray.core.generated import reporter_pb2, reporter_pb2_grpc from ray.dashboard import k8s_utils from ray.dashboard.consts import ( @@ -66,6 +66,10 @@ from ray.dashboard.modules.reporter.reporter_models import ( StatsPayload, ) +from ray.exceptions import ( + GetTimeoutError, + RpcError, +) import psutil @@ -395,9 +399,10 @@ class ReporterAgent( Attributes: dashboard_agent: The DashboardAgent object contains global config + raylet_client: The RayletClient object to access raylet server """ - def __init__(self, dashboard_agent): + def __init__(self, dashboard_agent, raylet_client=None): """Initialize the reporter object.""" super().__init__(dashboard_agent) @@ -486,6 +491,13 @@ def __init__(self, dashboard_agent): # Create GPU metric provider instance self._gpu_metric_provider = GpuMetricProvider() + if raylet_client: + self._raylet_client = raylet_client + else: + self._raylet_client = RayletClient( + ip_address=self._ip, port=self._dashboard_agent.node_manager_port + ) + async def GetTraceback(self, request, context): pid = request.pid native = request.native @@ -888,6 +900,14 @@ def _get_disk_io_stats(): stats.write_count, ) + async def _async_get_worker_pids_from_raylet(self) -> List[int]: + try: + # Get worker pids from raylet via gRPC. + return await self._raylet_client.async_get_worker_pids() + except (GetTimeoutError, RpcError): + logger.exception("Failed to get worker pids from raylet") + return [] + def _get_agent_proc(self) -> psutil.Process: # Agent is the current process. # This method is not necessary, but we have it for mock testing. @@ -896,27 +916,23 @@ def _get_agent_proc(self) -> psutil.Process: def _generate_worker_key(self, proc: psutil.Process) -> Tuple[int, float]: return (proc.pid, proc.create_time()) - def _get_worker_processes(self): - raylet_proc = self._get_raylet_proc() - if raylet_proc is None: + async def _async_get_worker_processes(self): + pids = await self._async_get_worker_pids_from_raylet() + logger.debug(f"Worker PIDs from raylet: {pids}") + if not pids: return [] - else: - workers = {} - if sys.platform == "win32": - # windows, get the child process not the runner - for child in raylet_proc.children(): - if child.children(): - child = child.children()[0] - workers[self._generate_worker_key(child)] = child - else: - workers = { - self._generate_worker_key(proc): proc - for proc in raylet_proc.children() - } - return workers - - def _get_workers(self, gpus: Optional[List[GpuUtilizationInfo]] = None): - workers = self._get_worker_processes() + workers = {} + for pid in pids: + try: + proc = psutil.Process(pid) + workers[self._generate_worker_key(proc)] = proc + except (psutil.NoSuchProcess, psutil.AccessDenied): + logger.error(f"Failed to access worker process {pid}") + continue + return workers + + async def _async_get_workers(self, gpus: Optional[List[GpuUtilizationInfo]] = None): + workers = await self._async_get_worker_processes() if not workers: return [] else: @@ -936,9 +952,6 @@ def _get_workers(self, gpus: Optional[List[GpuUtilizationInfo]] = None): for k in keys_to_pop: self._workers.pop(k) - # Remove the current process (reporter agent), which is also a child of - # the Raylet. - self._workers.pop(self._generate_worker_key(self._get_agent_proc())) # Build process ID -> GPU info mapping for faster lookups gpu_pid_mapping = defaultdict(list) if gpus is not None: @@ -1058,7 +1071,7 @@ def _get_shm_usage(self): return None return mem.shared - def _collect_stats(self): + async def _async_collect_stats(self): now = dashboard_utils.to_posix_time(datetime.datetime.utcnow()) network_stats = self._get_network_stats() self._network_stats_hist.append((now, network_stats)) @@ -1079,7 +1092,7 @@ def _collect_stats(self): "mem": self._get_mem_usage(), # Unit is in bytes. None if "shm": self._get_shm_usage(), - "workers": self._get_workers(gpus), + "workers": await self._async_get_workers(gpus), "raylet": raylet, "agent": self._get_agent(), "bootTime": self._get_boot_time(), @@ -1726,7 +1739,7 @@ async def _run_loop(self): # executor (TPE) to avoid blocking the Agent's event-loop json_payload = await loop.run_in_executor( self._executor, - self._compose_stats_payload, + self._run_in_executor, autoscaler_status_json_bytes, ) @@ -1739,10 +1752,15 @@ async def _run_loop(self): await asyncio.sleep(reporter_consts.REPORTER_UPDATE_INTERVAL_MS / 1000) - def _compose_stats_payload( + def _run_in_executor(self, cluster_autoscaling_stats_json: Optional[bytes]) -> str: + return asyncio.run( + self._async_compose_stats_payload(cluster_autoscaling_stats_json) + ) + + async def _async_compose_stats_payload( self, cluster_autoscaling_stats_json: Optional[bytes] ) -> str: - stats = self._collect_stats() + stats = await self._async_collect_stats() # Report stats only when metrics collection is enabled. if not self._metrics_collection_disabled: diff --git a/python/ray/dashboard/modules/reporter/tests/test_reporter.py b/python/ray/dashboard/modules/reporter/tests/test_reporter.py index 8a7704358ded..f281653d7ab6 100644 --- a/python/ray/dashboard/modules/reporter/tests/test_reporter.py +++ b/python/ray/dashboard/modules/reporter/tests/test_reporter.py @@ -315,7 +315,8 @@ def test_worker_stats(): def test_report_stats(): dashboard_agent = MagicMock() dashboard_agent.gcs_address = build_address("127.0.0.1", 6379) - agent = ReporterAgent(dashboard_agent) + raylet_client = MagicMock() + agent = ReporterAgent(dashboard_agent, raylet_client) # Assume it is a head node. agent._is_head_node = True @@ -390,7 +391,8 @@ def test_report_stats(): def test_report_stats_gpu(): dashboard_agent = MagicMock() dashboard_agent.gcs_address = build_address("127.0.0.1", 6379) - agent = ReporterAgent(dashboard_agent) + raylet_client = MagicMock() + agent = ReporterAgent(dashboard_agent, raylet_client) # Assume it is a head node. agent._is_head_node = True # GPUstats query output example. @@ -500,7 +502,8 @@ def test_report_stats_gpu(): def test_get_tpu_usage(): dashboard_agent = MagicMock() dashboard_agent.gcs_address = build_address("127.0.0.1", 6379) - agent = ReporterAgent(dashboard_agent) + raylet_client = MagicMock() + agent = ReporterAgent(dashboard_agent, raylet_client) fake_metrics_content = """ duty_cycle{accelerator_id="1234-0",container="ray-head",make="cloud-tpu",model="tpu-v6e-slice",namespace="default",pod="test",tpu_topology="2x2"} 20.0 @@ -557,7 +560,8 @@ def test_get_tpu_usage(): def test_report_stats_tpu(): dashboard_agent = MagicMock() dashboard_agent.gcs_address = build_address("127.0.0.1", 6379) - agent = ReporterAgent(dashboard_agent) + raylet_client = MagicMock() + agent = ReporterAgent(dashboard_agent, raylet_client) stats = copy.deepcopy(STATS_TEMPLATE) @@ -636,7 +640,8 @@ def test_report_stats_tpu(): def test_report_per_component_stats(): dashboard_agent = MagicMock() dashboard_agent.gcs_address = build_address("127.0.0.1", 6379) - agent = ReporterAgent(dashboard_agent) + raylet_client = MagicMock() + agent = ReporterAgent(dashboard_agent, raylet_client) # Assume it is a head node. agent._is_head_node = True @@ -894,7 +899,8 @@ def test_enable_k8s_disk_usage(enable_k8s_disk_usage: bool): assert root_usage.free == 1 -def test_reporter_worker_cpu_percent(): +@pytest.mark.asyncio +async def test_reporter_worker_cpu_percent(): raylet_dummy_proc_f = psutil.Process agent_mock = Process(target=random_work) children = [Process(target=random_work) for _ in range(2)] @@ -911,8 +917,11 @@ def _get_agent_proc(self): def _generate_worker_key(self, proc): return (proc.pid, proc.create_time()) - def _get_worker_processes(self): - return ReporterAgent._get_worker_processes(self) + async def _async_get_worker_pids_from_raylet(self): + return [p.pid for p in children] + + async def _async_get_worker_processes(self): + return await ReporterAgent._async_get_worker_processes(self) obj = ReporterAgentDummy() @@ -921,12 +930,12 @@ def _get_worker_processes(self): for child_proc in children: child_proc.start() children_pids = {p.pid for p in children} - workers = ReporterAgent._get_workers(obj) + workers = await ReporterAgent._async_get_workers(obj) # In the first run, the percent should be 0. assert all([worker["cpu_percent"] == 0.0 for worker in workers]) for _ in range(10): time.sleep(0.1) - workers = ReporterAgent._get_workers(obj) + workers = await ReporterAgent._async_get_workers(obj) workers_pids = {w["pid"] for w in workers} # Make sure all children are registered. @@ -942,14 +951,14 @@ def _get_worker_processes(self): print("killed ", children[0].pid) children[0].kill() wait_for_condition(lambda: not children[0].is_alive()) - workers = ReporterAgent._get_workers(obj) + workers = await ReporterAgent._async_get_workers(obj) workers_pids = {w["pid"] for w in workers} assert children[0].pid not in workers_pids assert children[1].pid in workers_pids children[1].kill() wait_for_condition(lambda: not children[1].is_alive()) - workers = ReporterAgent._get_workers(obj) + workers = await ReporterAgent._async_get_workers(obj) workers_pids = {w["pid"] for w in workers} assert children[0].pid not in workers_pids assert children[1].pid not in workers_pids @@ -1205,5 +1214,36 @@ def test_get_cluster_metadata(ray_start_with_dashboard): assert resp_data["rayInitCluster"] == meta["ray_init_cluster"] +@pytest.mark.asyncio +@pytest.mark.parametrize( + "ray_start_with_dashboard", + [ + {"num_cpus": 1}, + ], + indirect=True, +) +async def test_reporter_raylet_agent(ray_start_with_dashboard): + @ray.remote + class MyActor: + def get_pid(self): + return os.getpid() + + a = MyActor.remote() + worker_pid = ray.get(a.get_pid.remote()) + dashboard_agent = MagicMock() + dashboard_agent.gcs_address = build_address("127.0.0.1", 6379) + dashboard_agent.ip = "127.0.0.1" + dashboard_agent.node_manager_port = ( + ray._private.worker.global_worker.node.node_manager_port + ) + agent = ReporterAgent(dashboard_agent) + pids = await agent._async_get_worker_pids_from_raylet() + assert len(pids) == 2 + # check if worker is reported + assert worker_pid in pids + # check if driver is reported + assert os.getpid() in pids + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/dashboard/optional_utils.py b/python/ray/dashboard/optional_utils.py index 94311565e246..191f64f61c46 100644 --- a/python/ray/dashboard/optional_utils.py +++ b/python/ray/dashboard/optional_utils.py @@ -18,7 +18,8 @@ import ray import ray.dashboard.consts as dashboard_consts import ray.dashboard.utils as dashboard_utils -from ray._private.ray_constants import RAY_INTERNAL_DASHBOARD_NAMESPACE, env_bool +from ray._private.ray_constants import env_bool +from ray._raylet import RAY_INTERNAL_DASHBOARD_NAMESPACE # All third-party dependencies that are not included in the minimal Ray # installation must be included in this file. This allows us to determine if diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index e7e25071e5de..3ec069ab333b 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -764,6 +764,12 @@ cdef extern from "src/ray/protobuf/autoscaler.pb.h" nogil: void ParseFromString(const c_string &serialized) const c_string &SerializeAsString() const +cdef extern from "ray/raylet_rpc_client/raylet_client_with_io_context.h" nogil: + cdef cppclass CRayletClientWithIoContext "ray::rpc::RayletClientWithIoContext": + CRayletClientWithIoContext(const c_string &ip_address, int port) + CRayStatus GetWorkerPIDs(const OptionalItemPyCallback[c_vector[int32_t]] &callback, + int64_t timeout_ms) + cdef extern from "ray/common/task/task_spec.h" nogil: cdef cppclass CConcurrencyGroup "ray::ConcurrencyGroup": CConcurrencyGroup( @@ -797,3 +803,4 @@ cdef extern from "ray/common/constants.h" nogil: cdef const char[] kLabelKeyTpuSliceName cdef const char[] kLabelKeyTpuWorkerId cdef const char[] kLabelKeyTpuPodType + cdef const char[] kRayInternalNamespacePrefix diff --git a/python/ray/includes/common.pxi b/python/ray/includes/common.pxi index 636a4600b0ea..47dc390c2bbe 100644 --- a/python/ray/includes/common.pxi +++ b/python/ray/includes/common.pxi @@ -27,6 +27,7 @@ from ray.includes.common cimport ( kLabelKeyTpuSliceName, kLabelKeyTpuWorkerId, kLabelKeyTpuPodType, + kRayInternalNamespacePrefix, ) from ray.exceptions import ( @@ -159,3 +160,9 @@ RAY_NODE_TPU_TOPOLOGY_KEY = kLabelKeyTpuTopology.decode() RAY_NODE_TPU_SLICE_NAME_KEY = kLabelKeyTpuSliceName.decode() RAY_NODE_TPU_WORKER_ID_KEY = kLabelKeyTpuWorkerId.decode() RAY_NODE_TPU_POD_TYPE_KEY = kLabelKeyTpuPodType.decode() + +RAY_INTERNAL_NAMESPACE_PREFIX = kRayInternalNamespacePrefix.decode() +# Prefix for namespaces which are used internally by ray. +# Jobs within these namespaces should be hidden from users +# and should not be considered user activity. +RAY_INTERNAL_DASHBOARD_NAMESPACE = f"{RAY_INTERNAL_NAMESPACE_PREFIX}dashboard" diff --git a/python/ray/includes/raylet_client.pxi b/python/ray/includes/raylet_client.pxi new file mode 100644 index 000000000000..11538f82d2c5 --- /dev/null +++ b/python/ray/includes/raylet_client.pxi @@ -0,0 +1,52 @@ +from asyncio import Future +import concurrent.futures +from libcpp.vector cimport vector as c_vector +from libcpp.string cimport string as c_string +from libc.stdint cimport int32_t +from libcpp.utility cimport move +from libcpp.memory cimport unique_ptr, make_unique, shared_ptr +from ray.includes.common cimport ( + CRayletClientWithIoContext, + CRayStatus, + CAddress, + OptionalItemPyCallback, +) +from ray.includes.optional cimport optional + + +cdef convert_optional_vector_int32( + CRayStatus status, optional[c_vector[int32_t]] vec) with gil: + try: + check_status_timeout_as_rpc_error(status) + assert vec.has_value() + return move(vec.value()), None + except Exception as e: + return None, e + + +cdef class RayletClient: + cdef: + unique_ptr[CRayletClientWithIoContext] inner + + def __cinit__(self, ip_address: str, port: int): + cdef: + c_string c_ip_address + int32_t c_port + c_ip_address = ip_address.encode('utf-8') + c_port = port + self.inner = make_unique[CRayletClientWithIoContext](c_ip_address, c_port) + + def async_get_worker_pids(self, timeout_ms: int = 1000) -> Future[list[int]]: + """Get the PIDs of all workers registered with the raylet.""" + cdef: + fut = incremented_fut() + int32_t timeout = timeout_ms + assert self.inner.get() is not NULL + with nogil: + self.inner.get().GetWorkerPIDs( + OptionalItemPyCallback[c_vector[int32_t]]( + &convert_optional_vector_int32, + assign_and_decrement_fut, + fut), + timeout) + return asyncio.wrap_future(fut) diff --git a/src/mock/ray/raylet/worker_pool.h b/src/mock/ray/raylet/worker_pool.h index 6e8337aef2d4..dd8ff59b1904 100644 --- a/src/mock/ray/raylet/worker_pool.h +++ b/src/mock/ray/raylet/worker_pool.h @@ -62,7 +62,7 @@ class MockWorkerPool : public WorkerPoolInterface { (override)); MOCK_METHOD((std::vector>), GetAllRegisteredDrivers, - (bool filter_dead_drivers), + (bool filter_dead_drivers, bool filter_system_drivers), (const, override)); MOCK_METHOD(Status, RegisterDriver, diff --git a/src/ray/common/constants.h b/src/ray/common/constants.h index aa9d858f2811..bfd06e677e7e 100644 --- a/src/ray/common/constants.h +++ b/src/ray/common/constants.h @@ -150,3 +150,6 @@ constexpr char kImplicitResourcePrefix[] = "node:__internal_implicit_resource_"; /// PID of GCS process to record metrics. constexpr char kGcsPidKey[] = "gcs_pid"; + +// Prefix for namespaces which are used internally by ray. +constexpr char kRayInternalNamespacePrefix[] = "_ray_internal_"; // NOLINT diff --git a/src/ray/gcs/gcs_job_manager.h b/src/ray/gcs/gcs_job_manager.h index 95f841177569..f62091f52f56 100644 --- a/src/ray/gcs/gcs_job_manager.h +++ b/src/ray/gcs/gcs_job_manager.h @@ -36,12 +36,10 @@ namespace ray { namespace gcs { -// Please keep this in sync with the definition in ray_constants.py. -const std::string kRayInternalNamespacePrefix = "_ray_internal_"; // NOLINT - // Please keep these in sync with the definition in dashboard/modules/job/common.py. // NOLINTNEXTLINE -const std::string kJobDataKeyPrefix = kRayInternalNamespacePrefix + "job_info_"; +const std::string kJobDataKeyPrefix = + std::string(kRayInternalNamespacePrefix) + "job_info_"; inline std::string JobDataKey(const std::string &submission_id) { return kJobDataKeyPrefix + submission_id; } diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index 8f7cf7476b3d..05a3ea9be154 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -399,6 +399,13 @@ message IsLocalWorkerDeadReply { bool is_dead = 1; } +message GetWorkerPIDsRequest {} + +message GetWorkerPIDsReply { + // PIDs of all drivers and workers managed by the local raylet. + repeated int32 pids = 1; +} + // Service for inter-node-manager communication. service NodeManagerService { // Handle the case when GCS restarted. @@ -512,4 +519,10 @@ service NodeManagerService { // Failure: Is currently only used when grpc channel is unavailable for retryable core // worker clients. The unavailable callback will eventually be retried so if this fails. rpc IsLocalWorkerDead(IsLocalWorkerDeadRequest) returns (IsLocalWorkerDeadReply); + // Get the PIDs of all workers currently alive that are managed by the local Raylet. + // This includes connected driver processes but excludes system drivers (with namespace + // prefix "_ray_internal_") + // Failure: Will retry with the default timeout 1000ms. If fails, reply return an empty + // list. + rpc GetWorkerPIDs(GetWorkerPIDsRequest) returns (GetWorkerPIDsReply); } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 45740919dbb6..17fb2a08904c 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -763,7 +763,7 @@ void NodeManager::QueryAllWorkerStates( bool include_task_info, int64_t limit, const std::function &on_all_replied) { - auto all_workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_worker */ true, + auto all_workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_workers */ true, /*filter_io_workers*/ true); for (auto &driver : worker_pool_.GetAllRegisteredDrivers(/* filter_dead_driver */ true)) { @@ -1057,7 +1057,7 @@ void NodeManager::HandleNotifyGCSRestart(rpc::NotifyGCSRestartRequest request, // registered to raylet first (blocking call) and then connect to GCS, so there is no // race condition here. gcs_client_.AsyncResubscribe(); - auto workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_worker */ true); + auto workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_workers */ true); for (const auto &worker : workers) { worker->AsyncNotifyGCSRestart(); } @@ -2669,7 +2669,7 @@ void NodeManager::HandleGetNodeStats(rpc::GetNodeStatsRequest node_stats_request // and return the information from HandleNodesStatsRequest. The caller of // HandleGetNodeStats should set a timeout so that the rpc finishes even if not all // workers have replied. - auto all_workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_worker */ true); + auto all_workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_workers */ true); absl::flat_hash_set driver_ids; for (const auto &driver : worker_pool_.GetAllRegisteredDrivers(/* filter_dead_driver */ true)) { @@ -2914,6 +2914,22 @@ void NodeManager::TriggerGlobalGC() { should_local_gc_ = true; } +void NodeManager::HandleGetWorkerPIDs(rpc::GetWorkerPIDsRequest request, + rpc::GetWorkerPIDsReply *reply, + rpc::SendReplyCallback send_reply_callback) { + auto all_workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_workers */ true, + /* filter_io_workers */ true); + auto drivers = worker_pool_.GetAllRegisteredDrivers(/* filter_dead_drivers */ true, + /* filter_system_drivers */ true); + all_workers.insert(all_workers.end(), + std::make_move_iterator(drivers.begin()), + std::make_move_iterator(drivers.end())); + for (const auto &worker : all_workers) { + reply->add_pids(worker->GetProcess().GetId()); + } + send_reply_callback(Status::OK(), /* success */ nullptr, /* failure */ nullptr); +} + void NodeManager::Stop() { store_client_->Disconnect(); #if !defined(_WIN32) diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index b53370d3da4e..62eb9b509816 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -648,6 +648,11 @@ class NodeManager : public rpc::NodeManagerServiceHandler, rpc::NotifyGCSRestartReply *reply, rpc::SendReplyCallback send_reply_callback) override; + /// Handle a `GetWorkerPIDs` request. + void HandleGetWorkerPIDs(rpc::GetWorkerPIDsRequest request, + rpc::GetWorkerPIDsReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + /// Checks the local socket connection for all registered workers and drivers. /// If any of them have disconnected unexpectedly (i.e., we receive a SIGHUP), /// we disconnect and kill the worker process. diff --git a/src/ray/raylet/scheduling/tests/cluster_lease_manager_test.cc b/src/ray/raylet/scheduling/tests/cluster_lease_manager_test.cc index 59b5adfaebdc..50a0bd23e649 100644 --- a/src/ray/raylet/scheduling/tests/cluster_lease_manager_test.cc +++ b/src/ray/raylet/scheduling/tests/cluster_lease_manager_test.cc @@ -158,7 +158,7 @@ class MockWorkerPool : public WorkerPoolInterface { } std::vector> GetAllRegisteredDrivers( - bool filter_dead_drivers) const override { + bool filter_dead_drivers, bool filter_system_drivers) const override { RAY_CHECK(false) << "Not used."; return {}; } diff --git a/src/ray/raylet/tests/local_lease_manager_test.cc b/src/ray/raylet/tests/local_lease_manager_test.cc index bb8f82bca0a9..94047e18ff6c 100644 --- a/src/ray/raylet/tests/local_lease_manager_test.cc +++ b/src/ray/raylet/tests/local_lease_manager_test.cc @@ -162,7 +162,7 @@ class MockWorkerPool : public WorkerPoolInterface { } std::vector> GetAllRegisteredDrivers( - bool filter_dead_drivers) const override { + bool filter_dead_drivers, bool filter_system_drivers) const override { RAY_CHECK(false) << "Not used."; return {}; } diff --git a/src/ray/raylet/tests/node_manager_test.cc b/src/ray/raylet/tests/node_manager_test.cc index b390f933dc17..24390aa432b8 100644 --- a/src/ray/raylet/tests/node_manager_test.cc +++ b/src/ray/raylet/tests/node_manager_test.cc @@ -481,7 +481,7 @@ TEST_F(NodeManagerTest, TestRegisterGcsAndCheckSelfAlive) { .WillOnce(Return(Status::OK())); EXPECT_CALL(mock_worker_pool_, GetAllRegisteredWorkers(_, _)) .WillRepeatedly(Return(std::vector>{})); - EXPECT_CALL(mock_worker_pool_, GetAllRegisteredDrivers(_)) + EXPECT_CALL(mock_worker_pool_, GetAllRegisteredDrivers(_, _)) .WillRepeatedly(Return(std::vector>{})); EXPECT_CALL(mock_worker_pool_, IsWorkerAvailableForScheduling()) .WillRepeatedly(Return(false)); @@ -509,7 +509,7 @@ TEST_F(NodeManagerTest, TestDetachedWorkerIsKilledByFailedWorker) { .WillOnce(Return(Status::OK())); EXPECT_CALL(mock_worker_pool_, GetAllRegisteredWorkers(_, _)) .WillRepeatedly(Return(std::vector>{})); - EXPECT_CALL(mock_worker_pool_, GetAllRegisteredDrivers(_)) + EXPECT_CALL(mock_worker_pool_, GetAllRegisteredDrivers(_, _)) .WillRepeatedly(Return(std::vector>{})); EXPECT_CALL(mock_worker_pool_, IsWorkerAvailableForScheduling()) .WillRepeatedly(Return(false)); @@ -587,7 +587,7 @@ TEST_F(NodeManagerTest, TestDetachedWorkerIsKilledByFailedNode) { .WillOnce(Return(Status::OK())); EXPECT_CALL(mock_worker_pool_, GetAllRegisteredWorkers(_, _)) .WillRepeatedly(Return(std::vector>{})); - EXPECT_CALL(mock_worker_pool_, GetAllRegisteredDrivers(_)) + EXPECT_CALL(mock_worker_pool_, GetAllRegisteredDrivers(_, _)) .WillRepeatedly(Return(std::vector>{})); EXPECT_CALL(mock_worker_pool_, IsWorkerAvailableForScheduling()) .WillRepeatedly(Return(false)); diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 18c3a1fdfac7..5520aade4c74 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -1657,8 +1657,12 @@ bool WorkerPool::IsWorkerAvailableForScheduling() const { return false; } +bool IsInternalNamespace(const std::string &ray_namespace) { + return absl::StartsWith(ray_namespace, kRayInternalNamespacePrefix); +} + std::vector> WorkerPool::GetAllRegisteredDrivers( - bool filter_dead_drivers) const { + bool filter_dead_drivers, bool filter_system_drivers) const { std::vector> drivers; for (const auto &entry : states_by_lang_) { @@ -1670,6 +1674,14 @@ std::vector> WorkerPool::GetAllRegisteredDriver if (filter_dead_drivers && driver->IsDead()) { continue; } + + if (filter_system_drivers) { + auto job_config = GetJobConfig(driver->GetAssignedJobId()); + if (job_config.has_value() && IsInternalNamespace(job_config->ray_namespace())) { + continue; + } + } + drivers.push_back(driver); } } diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 6f12af1b097f..f64acbc028dc 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -219,7 +219,7 @@ class WorkerPoolInterface : public IOWorkerPoolInterface { std::unique_ptr runtime_env_agent_client) = 0; virtual std::vector> GetAllRegisteredDrivers( - bool filter_dead_drivers = false) const = 0; + bool filter_dead_drivers = false, bool filter_system_drivers = false) const = 0; virtual Status RegisterDriver(const std::shared_ptr &worker, const rpc::JobConfig &job_config, @@ -519,10 +519,14 @@ class WorkerPool : public WorkerPoolInterface { /// /// \param filter_dead_drivers whether or not if this method will filter dead drivers /// that are still registered. + /// \param filter_system_drivers whether or not if this method will filter system + /// drivers. A system driver is a driver with job config namespace starting with + /// "__ray_internal__". /// /// \return A list containing all the drivers. std::vector> GetAllRegisteredDrivers( - bool filter_dead_drivers = false) const override; + bool filter_dead_drivers = false, + bool filter_system_drivers = false) const override; /// Returns debug string for class. /// diff --git a/src/ray/raylet_rpc_client/BUILD.bazel b/src/ray/raylet_rpc_client/BUILD.bazel index 48764472d777..f3208a4a9f27 100644 --- a/src/ray/raylet_rpc_client/BUILD.bazel +++ b/src/ray/raylet_rpc_client/BUILD.bazel @@ -35,6 +35,24 @@ ray_cc_library( deps = [ ":raylet_client_interface", "//src/ray/common:bundle_spec", + "//src/ray/common:gcs_callback_types", + "//src/ray/common:ray_config", + "//src/ray/protobuf:node_manager_cc_grpc", + "//src/ray/rpc:retryable_grpc_client", + "//src/ray/rpc:rpc_callback_types", + "//src/ray/util:logging", + ], +) + +ray_cc_library( + name = "raylet_client_with_io_context_lib", + srcs = ["raylet_client_with_io_context.cc"], + hdrs = ["raylet_client_with_io_context.h"], + visibility = ["//visibility:public"], + deps = [ + ":raylet_client_interface", + ":raylet_client_lib", + "//src/ray/common:bundle_spec", "//src/ray/common:ray_config", "//src/ray/protobuf:node_manager_cc_grpc", "//src/ray/rpc:retryable_grpc_client", diff --git a/src/ray/raylet_rpc_client/raylet_client.cc b/src/ray/raylet_rpc_client/raylet_client.cc index a657e6dbfe5c..1a937c51d521 100644 --- a/src/ray/raylet_rpc_client/raylet_client.cc +++ b/src/ray/raylet_rpc_client/raylet_client.cc @@ -470,5 +470,26 @@ void RayletClient::GetNodeStats( /*method_timeout_ms*/ -1); } +void RayletClient::GetWorkerPIDs( + const gcs::OptionalItemCallback> &callback, int64_t timeout_ms) { + rpc::GetWorkerPIDsRequest request; + auto client_callback = [callback](const Status &status, + rpc::GetWorkerPIDsReply &&reply) { + if (status.ok()) { + std::vector workers(reply.pids().begin(), reply.pids().end()); + callback(status, workers); + } else { + callback(status, std::nullopt); + } + }; + INVOKE_RETRYABLE_RPC_CALL(retryable_grpc_client_, + NodeManagerService, + GetWorkerPIDs, + request, + client_callback, + grpc_client_, + timeout_ms); +} + } // namespace rpc } // namespace ray diff --git a/src/ray/raylet_rpc_client/raylet_client.h b/src/ray/raylet_rpc_client/raylet_client.h index a053dd400fa3..e5273c8ce36c 100644 --- a/src/ray/raylet_rpc_client/raylet_client.h +++ b/src/ray/raylet_rpc_client/raylet_client.h @@ -22,6 +22,7 @@ #include #include +#include "ray/common/gcs_callback_types.h" #include "ray/raylet_rpc_client/raylet_client_interface.h" #include "ray/rpc/grpc_client.h" #include "ray/rpc/retryable_grpc_client.h" @@ -43,9 +44,9 @@ class RayletClient : public RayletClientInterface { /// Connect to the raylet. /// /// \param address The IP address of the worker. - /// \param port The port that the worker should listen on for gRPC requests. If - /// 0, the worker should choose a random port. /// \param client_call_manager The client call manager to use for the grpc connection. + /// \param raylet_unavailable_timeout_callback callback to be called when the raylet is + /// unavailable for a certain period of time. explicit RayletClient(const rpc::Address &address, rpc::ClientCallManager &client_call_manager, std::function raylet_unavailable_timeout_callback); @@ -163,7 +164,13 @@ class RayletClient : public RayletClientInterface { void GetNodeStats(const rpc::GetNodeStatsRequest &request, const rpc::ClientCallback &callback) override; - private: + /// Get the worker pids from raylet. + /// \param callback The callback to set the worker pids. + /// \param timeout_ms The timeout in milliseconds. + void GetWorkerPIDs(const gcs::OptionalItemCallback> &callback, + int64_t timeout_ms); + + protected: /// gRPC client to the NodeManagerService. std::shared_ptr> grpc_client_; diff --git a/src/ray/raylet_rpc_client/raylet_client_with_io_context.cc b/src/ray/raylet_rpc_client/raylet_client_with_io_context.cc new file mode 100644 index 000000000000..6fbb971099f1 --- /dev/null +++ b/src/ray/raylet_rpc_client/raylet_client_with_io_context.cc @@ -0,0 +1,56 @@ +// Copyright 2025 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/raylet_rpc_client/raylet_client_with_io_context.h" + +#include +#include +#include +#include + +#include "ray/common/asio/asio_util.h" +#include "ray/common/ray_config.h" +#include "ray/util/logging.h" +#include "src/ray/protobuf/node_manager.grpc.pb.h" + +namespace ray { +namespace rpc { + +RayletClientWithIoContext::RayletClientWithIoContext(const std::string &ip_address, + int port) { + // Connect to the raylet on a singleton io service with a dedicated thread. + // This is to avoid creating multiple threads for multiple clients in python. + static InstrumentedIOContextWithThread io_context("raylet_client_io_service"); + instrumented_io_context &io_service = io_context.GetIoService(); + client_call_manager_ = std::make_unique( + io_service, /*record_stats=*/false, ip_address); + auto raylet_unavailable_timeout_callback = []() { + RAY_LOG(WARNING) + << "Raylet is unavailable for " + << ::RayConfig::instance().raylet_rpc_server_reconnect_timeout_max_s() << "s"; + }; + rpc::Address rpc_address; + rpc_address.set_ip_address(ip_address); + rpc_address.set_port(port); + raylet_client_ = std::make_unique( + rpc_address, *client_call_manager_, std::move(raylet_unavailable_timeout_callback)); +} + +void RayletClientWithIoContext::GetWorkerPIDs( + const gcs::OptionalItemCallback> &callback, int64_t timeout_ms) { + raylet_client_->GetWorkerPIDs(callback, timeout_ms); +} + +} // namespace rpc +} // namespace ray diff --git a/src/ray/raylet_rpc_client/raylet_client_with_io_context.h b/src/ray/raylet_rpc_client/raylet_client_with_io_context.h new file mode 100644 index 000000000000..8800ea2c7e9b --- /dev/null +++ b/src/ray/raylet_rpc_client/raylet_client_with_io_context.h @@ -0,0 +1,49 @@ +// Copyright 2025 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "ray/raylet_rpc_client/raylet_client.h" +#include "ray/rpc/grpc_client.h" + +namespace ray { +namespace rpc { + +/// Raylet client with io context is provided for python (e.g. ReporterAgent) to +/// communicate with raylet. It creates and manages a separate thread to run the grpc +/// event loop +class RayletClientWithIoContext { + public: + /// Connect to the raylet. Only used for cython wrapper `CRayletClientWithIoContext` + /// new io service and new thread will be created inside. + /// + /// \param ip_address The IP address of raylet. + /// \param port The port of raylet. + RayletClientWithIoContext(const std::string &ip_address, int port); + + /// Get the worker pids from raylet. + /// \param callback The callback to set the worker pids. + /// \param timeout_ms The timeout in milliseconds. + void GetWorkerPIDs(const gcs::OptionalItemCallback> &callback, + int64_t timeout_ms); + + private: + /// client call manager is created inside the raylet client, it should be kept active + /// during the whole lifetime of client. + std::unique_ptr client_call_manager_; + std::unique_ptr raylet_client_; +}; + +} // namespace rpc +} // namespace ray diff --git a/src/ray/rpc/node_manager/node_manager_server.h b/src/ray/rpc/node_manager/node_manager_server.h index eba08ba9b0af..fba7780afc69 100644 --- a/src/ray/rpc/node_manager/node_manager_server.h +++ b/src/ray/rpc/node_manager/node_manager_server.h @@ -58,7 +58,8 @@ class ServerCallFactory; RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(GetObjectsInfo) \ RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(GetWorkerFailureCause) \ RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(RegisterMutableObject) \ - RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(PushMutableObject) + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(PushMutableObject) \ + RAY_NODE_MANAGER_RPC_SERVICE_HANDLER(GetWorkerPIDs) /// Interface of the `NodeManagerService`, see `src/ray/protobuf/node_manager.proto`. class NodeManagerServiceHandler { @@ -182,6 +183,10 @@ class NodeManagerServiceHandler { virtual void HandlePushMutableObject(PushMutableObjectRequest request, PushMutableObjectReply *reply, SendReplyCallback send_reply_callback) = 0; + + virtual void HandleGetWorkerPIDs(GetWorkerPIDsRequest request, + GetWorkerPIDsReply *reply, + SendReplyCallback send_reply_callback) = 0; }; /// The `GrpcService` for `NodeManagerService`.