Skip to content

Conversation

@tianyi-ge
Copy link
Contributor

@tianyi-ge tianyi-ge commented Sep 29, 2025

Why are these changes needed?

  1. currently, reporter agent is spawned by raylet process. It's assumed that all core workers are direct children of raylet, but it's not the case with new features (uv, image_url). reporter agent need another way to find all core workers.
    for proc in raylet_proc.children()
  2. driver is not spawned by raylet, thus is never monitored

implementation:

  1. add an grpc endpoint in raylet process (node manager), and allow reporter agent to connect
  2. reporter agent fetches worker lists via grpc reply, including driver. it creates a raylet client with a dedicated thread

Related issue number

Closes #56739

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Note

Reporter agent now fetches worker/driver PIDs via a new Raylet GetWorkerPIDs RPC using a new RayletClient binding, replacing psutil child-process scanning.

  • Backend (Raylet RPC):
    • Add GetWorkerPIDs RPC in node_manager.proto and wire it into NodeManagerService.
    • Implement NodeManager::HandleGetWorkerPIDs to return PIDs of all alive workers and drivers.
    • Extend RayletClient (C++) with GetWorkerPIDs(timeout_ms) and an alternate ctor (ip, port); expose to Python via Cython (includes/raylet_client.pxi, includes/common.pxd).
  • Python/Cython plumbing:
    • Include includes/raylet_client.pxi in _raylet.pyx to expose RayletClient to Python.
  • Dashboard Reporter:
    • Update reporter_agent.py to use RayletClient(ip, node_manager_port).get_worker_pids(timeout) to discover workers; build psutil.Process objects from returned PIDs.
    • Add RAYLET_RPC_TIMEOUT_SECONDS = 1 in dashboard/consts.py and use it for RPC timeout.
  • Server registration:
    • Register new handler in node_manager_server.h macro list.

Written by Cursor Bugbot for commit f76f633. This will update automatically on new commits. Configure here.

Signed-off-by: tianyi-ge <tianyig@outlook.com>
@tianyi-ge tianyi-ge requested a review from a team as a code owner September 29, 2025 15:33
Signed-off-by: tianyi-ge <tianyig@outlook.com>
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request adds a new gRPC endpoint to the node manager for fetching worker and driver PIDs, which is a solid approach for discovering all worker processes. The changes to the protobuf definition and the C++ implementation are mostly correct. However, I've found a critical issue in the Python client code due to a typo that would cause the RPC call to fail. I've also included a few suggestions for improving error handling and code efficiency.

@tianyi-ge tianyi-ge changed the title [core] add get pid rpc to node manager [core] allow reporter agent to get pid via rpc to raylet Sep 29, 2025
cursor[bot]

This comment was marked as outdated.

Signed-off-by: tianyi-ge <tianyig@outlook.com>
cursor[bot]

This comment was marked as outdated.

@ray-gardener ray-gardener bot added core Issues that should be addressed in Ray Core observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling community-contribution Contributed by the community labels Sep 29, 2025
Comment on lines 523 to 524
// Get the worker managed by local raylet.
// Failure: Sends to local raylet, so should never fail.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should still add error handling & retries just in case (there could be a logical bug in the raylet)

@tianyi-ge
Copy link
Contributor Author

@edoakes thanks you for the prompt comments; I'll fix it soon. Also, after discussing with @can-anyscale , I'll replace python grpcio lib with a cython wrapper "RayletClient"

Signed-off-by: tianyi-ge <tianyig@outlook.com>
cursor[bot]

This comment was marked as outdated.

@can-anyscale can-anyscale self-assigned this Oct 1, 2025
Copy link
Contributor

@can-anyscale can-anyscale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's figure out a way to test that the solution work

)
try:
return raylet_client.get_worker_pids(timeout=timeout)
except Exception as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not exception catch all; be explicit of what are the acceptable exceptions can be thrown from get_worker_pids and what exceptions ray should just fail out loud

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this should be rpc exceptions or something; try not to exception catch all if possible

rpc IsLocalWorkerDead(IsLocalWorkerDeadRequest) returns (IsLocalWorkerDeadReply);
// Get the PIDs of all workers currently alive that are managed by the local Raylet.
// This includes connected driver processes.
// Failure: Will retry on failure with logging
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: what "with logging" means? more useful information would be to retry how many time; what will the reply look like on failures (partial results, empty) etc.

Signed-off-by: tianyi-ge <tianyig@outlook.com>
cursor[bot]

This comment was marked as outdated.


ThreadedRayletClient::ThreadedRayletClient(const std::string &ip_address, int port)
: RayletClient() {
io_service_ = std::make_unique<instrumented_io_context>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe can just use this https://github.com/ray-project/ray/blob/master/src/ray/common/asio/asio_util.h#L53 and don't need to maintain the thread yourself

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are also patterns here to make sure the io_context is reused across raylet client within one process

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for your suggestions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess in the future, if raylet client is used at multiple places, reusing io_context is important.
But to use IOContextProvider, I have to create a "default io context" anyway. It's also manually maintained, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh dang sorry forgot to include the link to the pattern; you can create a static InstrumentedIOContextWithThread and reuse it across the constructor of ThreadedRayletClient https://github.com/ray-project/ray/blob/master/src/ray/gcs_rpc_client/gcs_client.cc#L219

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I run a test of creating 5 actors. The rpc reply has 12 processes, including 5 actors, 5 idle workers, driver (python in the following screenshot) and a dashboard server head, which aligns with the dashboard.

2025-10-08 11:29:33,355	INFO reporter_agent.py:913 -- Worker PIDs from raylet: [41692, 41694, 41685, 41689, 41693, 41688, 41690, 41691, 41687, 41686, 41676, 41618]
image

should dashboard server head be here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah -- I don't think the dashboard server head should be there... the reason it's showing up is because it is connecting to ray with ray.init. We'll need some way of filtering it. I believe it is started in a namespace prefixed with _ray_internal. We do other such filtering here:

# This includes the _ray_internal_dashboard job that gets automatically

If the namespace is available in the raylet, we can add the filtering there and exclude any workers that are associated with a _ray_internal* namespace

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, for system driver processes, we should hide them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @edoakes . I added a new option filter_system_drivers. It finds the corresponding namesapce and check its prefix. now dashboard server head is gone
image

…ylet client

Signed-off-by: tianyi-ge <tianyig@outlook.com>
cursor[bot]

This comment was marked as outdated.

Signed-off-by: tianyi-ge <tianyig@outlook.com>
cursor[bot]

This comment was marked as outdated.

Signed-off-by: tianyi-ge <tianyig@outlook.com>
cursor[bot]

This comment was marked as outdated.

Signed-off-by: tianyi-ge <tianyig@outlook.com>
Signed-off-by: tianyi-ge <tianyig@outlook.com>
@tianyi-ge
Copy link
Contributor Author

@jjyao Is runtime env agent a special driver or core worker? is it possible to assert it in my unittest?

return Status::TimedOut("Timed out getting worker PIDs from raylet");
}
return future.get();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Race Condition in Dual Timeout Handling

The GetWorkerPIDs method has a race condition due to dual timeout handling. Both the RPC call and future.wait_for use the same timeout_ms, which can cause future.wait_for to incorrectly report a timeout even if the RPC successfully completed.

Fix in Cursor Fix in Web


def test_report_stats():
@patch("ray.dashboard.modules.reporter.reporter_agent.RayletClient")
def test_report_stats(mock_raylet_client):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the mock_raylet_client is not used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will be used in ReporterAgent constructor to avoid creating a real grpc client

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's dependency inject the client instead of using patch to hijack the import path... makes it explicit and less likely to break and confuse people down the line.

assert resp_data["rayInitCluster"] == meta["ray_init_cluster"]


def test_reporter_raylet_agent(ray_start_with_dashboard):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test depends on the fact the total cpu resource of the node is 1 so we don't create extra idle nodes. Could you make it explicit by doing

@pytest.mark.parametrize(
    "ray_start_with_dashboard",
    [
        {
            "num_cpus": 1,
        }
    ],
    indirect=True,
)

/// PID of GCS process to record metrics.
constexpr char kGcsPidKey[] = "gcs_pid";

// Please keep this in sync with the definition in ray_constants.py.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can enforce the sync by exposing the c++ constant to python via cython. We have examples in common.pxi and common.pxd:

RAY_NODE_TPU_POD_TYPE_KEY = kLabelKeyTpuPodType.decode()

// worker clients. The unavailable callback will eventually be retried so if this fails.
rpc IsLocalWorkerDead(IsLocalWorkerDeadRequest) returns (IsLocalWorkerDeadReply);
// Get the PIDs of all workers currently alive that are managed by the local Raylet.
// This includes connected driver processes.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should mention system drivers are excluded

Comment on lines 474 to 475
std::weak_ptr<std::promise<Status>> weak_promise = promise;
std::weak_ptr<std::vector<int32_t>> weak_worker_pids = worker_pids;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need weak_ptr and promise here?

def _get_worker_pids_from_raylet(self) -> List[int]:
try:
# Get worker pids from raylet via gRPC.
return self._raylet_client.get_worker_pids()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is PRC so we should make it async and change get_worker_pids_from_raylet to async.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yes, @tianyi-ge, there is a pattern to turn this async grpc call into a await/future method in python, example here https://github.com/ray-project/ray/blob/master/python/ray/includes/gcs_client.pxi#L177-L191

Signed-off-by: tianyi-ge <tianyig@outlook.com>
raylet_proc = self._get_raylet_proc()
if raylet_proc is None:
pids = asyncio.run(self._get_worker_pids_from_raylet())
logger.debug(f"Worker PIDs from raylet: {pids}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Asyncio Loop Conflict in Worker Process Retrieval

The _get_worker_processes method uses asyncio.run() to execute _get_worker_pids_from_raylet(). Since the ReporterAgent runs within an existing asyncio event loop, calling asyncio.run() from it raises a RuntimeError and crashes the application.

Fix in Cursor Fix in Web

Signed-off-by: tianyi-ge <tianyig@outlook.com>
Copy link
Collaborator

@edoakes edoakes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code changes LGTM, just one minor comment.

Also -- is it possible to add an e2e integration test? We can run an application using uv runtime_env and check that metrics are exported for the expected PIDs (both driver and the uv worker)


def test_report_stats():
@patch("ray.dashboard.modules.reporter.reporter_agent.RayletClient")
def test_report_stats(mock_raylet_client):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's dependency inject the client instead of using patch to hijack the import path... makes it explicit and less likely to break and confuse people down the line.

@tianyi-ge
Copy link
Contributor Author

tianyi-ge commented Oct 20, 2025

Hi @edoakes how about dependency inject by passing an optional raylet_client=None to ReporterAgent.__init__?

also, could you give me some references about how to check pids from exported metrics in dashboard?

@edoakes
Copy link
Collaborator

edoakes commented Oct 20, 2025

Hi @edoakes how about dependency inject by passing an optional raylet_client=None to ReporterAgent.__init__?

Sounds great 👍

also, could you give me some references about how to check pids from exported metrics in dashboard?

Looks like the metrics are written to the GCS here:

await self._gcs_client.async_publish_node_resource_usage(

Then they are read here:

key, data = await subscriber.poll()

And finally they're used to serve the GET /nodes/{node_id} endpoint:

So let's write a test that runs a driver & uv actor, then queries the GET /nodes/{node_id} endpoint and verified that stats for their PIDs are populated. You can get the uv actor PID by adding a method to it that returns os.getpid().

There are similar tests in test_node.py. You can add this there and follow a similar pattern.

Signed-off-by: tianyi-ge <tianyig@outlook.com>
@tianyi-ge
Copy link
Contributor Author

updated!

stats = self._collect_stats()
return asyncio.run(
self._async_compose_stats_payload(cluster_autoscaling_stats_json)
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Async Loop Conflict in Reporting Method

The _compose_stats_payload method calls asyncio.run() from within the async reporting loop. This raises a RuntimeError because asyncio.run() cannot be invoked when an event loop is already running.

Fix in Cursor Fix in Web

# and should not be considered user activity.
# Please keep this in sync with the definition kRayInternalNamespacePrefix
# in /src/ray/gcs/gcs_server/gcs_job_manager.h.
RAY_INTERNAL_NAMESPACE_PREFIX = "_ray_internal_"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are these refactoring for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#57004 (comment)
@jjyao suggested to expose this c++ constant to python, avoiding duplicate definition and potential inconsistency

assert dump_info["result"] is True
detail = dump_info["data"]["detail"]
pids = [worker["pid"] for worker in detail["workers"]]
if len(pids) < 2: # might include idle worker
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can remove this if condition, wait_for_condition already handles retry interval

"""

def __init__(self, dashboard_agent):
def __init__(self, dashboard_agent, raylet_client=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the raylet_client=None only used inside test cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes


await asyncio.sleep(reporter_consts.REPORTER_UPDATE_INTERVAL_MS / 1000)

def _compose_stats_payload(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason you didn’t make this async as well and only call asyncio.run inside loop.run_in_executor? That approach seems less error-prone. Calling asyncio.run here makes _compose_stats_payload unsafe to reuse elsewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I tested only calling asyncio.run inside loop.run_in_executor but it does not work. I guess in my way, the creation of coroutine (call of asyncio.run) is delayed until it's really called by executor. for this unsafe issue, how about moving this _compose_stats_payload to be a nested function?

Signed-off-by: tianyi-ge <tianyig@outlook.com>
return asyncio.run(
self._async_compose_stats_payload(cluster_autoscaling_stats_json)
)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Async Method Causes Event Loop Conflicts

The _compose_stats_payload method uses asyncio.run() to execute an async function. This causes a RuntimeError if called from an existing event loop, which is likely for the ReporterAgent within the dashboard. This can lead to application crashes or failures in stats collection.

Fix in Cursor Fix in Web

Copy link
Contributor

@can-anyscale can-anyscale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM


await asyncio.sleep(reporter_consts.REPORTER_UPDATE_INTERVAL_MS / 1000)

def _compose_stats_payload(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's rename this to _run_in_executor (to mark it as the entry function, and use a similar run naming convention of this class; the rest looks good

@edoakes
Copy link
Collaborator

edoakes commented Oct 21, 2025

Thanks for the contribution and sticking through all of the feedback @tianyi-ge!

@edoakes edoakes merged commit 47f6b87 into ray-project:master Oct 21, 2025
6 checks passed
Copy link
Collaborator

@jjyao jjyao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great Work! These comments you can fix in your next PR

Comment on lines +525 to +526
// Failure: Will retry with the default timeout 1000ms. If fails, reply return an empty
// list.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually retry? I think it's local PRC so should never fail unless the raylet crashes?

}

void RayletClient::GetWorkerPIDs(
const gcs::OptionalItemCallback<std::vector<int32_t>> &callback, int64_t timeout_ms) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should move this out of gcs namespace since it's a common util now.

int port) {
// Connect to the raylet on a singleton io service with a dedicated thread.
// This is to avoid creating multiple threads for multiple clients in python.
static InstrumentedIOContextWithThread io_context("raylet_client_io_service");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be a member field of RayClientWithIoContext

xinyuangui2 pushed a commit to xinyuangui2/ray that referenced this pull request Oct 22, 2025
…#57004)

1. currently, reporter agent is spawned by raylet process. It's assumed
that all core workers are direct children of raylet, but it's not the
case with new features (uv, image_url). reporter agent need another way
to find all core workers.

https://github.com/ray-project/ray/blob/10eacfd6ddf3b84827d016e37294bc5f2577ad3f/python/ray/dashboard/modules/reporter/reporter_agent.py#L911
2. driver is not spawned by raylet, thus is never monitored

implementation:
1. add an grpc endpoint in raylet process (node manager), and allow
reporter agent to connect
2. reporter agent fetches worker lists via grpc reply, including driver.
it creates a raylet client with a dedicated thread

Closes ray-project#56739

---------

Signed-off-by: tianyi-ge <tianyig@outlook.com>
Signed-off-by: xgui <xgui@anyscale.com>
elliot-barn pushed a commit that referenced this pull request Oct 23, 2025
1. currently, reporter agent is spawned by raylet process. It's assumed
that all core workers are direct children of raylet, but it's not the
case with new features (uv, image_url). reporter agent need another way
to find all core workers.

https://github.com/ray-project/ray/blob/10eacfd6ddf3b84827d016e37294bc5f2577ad3f/python/ray/dashboard/modules/reporter/reporter_agent.py#L911
2. driver is not spawned by raylet, thus is never monitored

implementation:
1. add an grpc endpoint in raylet process (node manager), and allow
reporter agent to connect
2. reporter agent fetches worker lists via grpc reply, including driver.
it creates a raylet client with a dedicated thread

Closes #56739

---------

Signed-off-by: tianyi-ge <tianyig@outlook.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Nov 17, 2025
…#57004)

1. currently, reporter agent is spawned by raylet process. It's assumed
that all core workers are direct children of raylet, but it's not the
case with new features (uv, image_url). reporter agent need another way
to find all core workers.

https://github.com/ray-project/ray/blob/10eacfd6ddf3b84827d016e37294bc5f2577ad3f/python/ray/dashboard/modules/reporter/reporter_agent.py#L911
2. driver is not spawned by raylet, thus is never monitored

implementation:
1. add an grpc endpoint in raylet process (node manager), and allow
reporter agent to connect
2. reporter agent fetches worker lists via grpc reply, including driver.
it creates a raylet client with a dedicated thread

Closes ray-project#56739

---------

Signed-off-by: tianyi-ge <tianyig@outlook.com>
Aydin-ab pushed a commit to Aydin-ab/ray-aydin that referenced this pull request Nov 19, 2025
…#57004)

1. currently, reporter agent is spawned by raylet process. It's assumed
that all core workers are direct children of raylet, but it's not the
case with new features (uv, image_url). reporter agent need another way
to find all core workers.

https://github.com/ray-project/ray/blob/10eacfd6ddf3b84827d016e37294bc5f2577ad3f/python/ray/dashboard/modules/reporter/reporter_agent.py#L911
2. driver is not spawned by raylet, thus is never monitored

implementation:
1. add an grpc endpoint in raylet process (node manager), and allow
reporter agent to connect
2. reporter agent fetches worker lists via grpc reply, including driver.
it creates a raylet client with a dedicated thread

Closes ray-project#56739

---------

Signed-off-by: tianyi-ge <tianyig@outlook.com>
Signed-off-by: Aydin Abiar <aydin@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Observability] Improve ray system metrics

4 participants