Skip to content

Commit 5927027

Browse files
committed
[core] add get pid rpc to node manager
Signed-off-by: tianyi-ge <tianyig@outlook.com>
1 parent d28c905 commit 5927027

File tree

2 files changed

+9
-12
lines changed

2 files changed

+9
-12
lines changed

python/ray/dashboard/modules/reporter/reporter_agent.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -896,18 +896,19 @@ def _get_disk_io_stats():
896896
stats.write_count,
897897
)
898898

899-
def _get_worker_pids_from_raylet(self):
899+
async def _get_worker_pids_from_raylet(self):
900900
channel = ray._private.utils.init_grpc_channel(
901901
self._node_manager_address, GLOBAL_GRPC_OPTIONS, asynchronous=True
902902
)
903903
timeout = NODE_MANAGER_RPC_TIMEOUT_SECONDS
904904
stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel)
905905
try:
906-
reply = await stub.GetDriverAndWorkers(
907-
node_manager_pb2.GetDriverAndWorkersRequest(), timeout=timeout
906+
reply = await stub.GetDriverAndWorkerPids(
907+
node_manager_pb2.GetDriverAndWorkerPidsRequest(), timeout=timeout
908908
)
909909
return reply.pids
910-
except Exception:
910+
except Exception as e:
911+
logger.debug(f"Failed to get worker pids from raylet via gRPC: {e}")
911912
return None
912913

913914
def _get_agent_proc(self) -> psutil.Process:
@@ -919,7 +920,7 @@ def _generate_worker_key(self, proc: psutil.Process) -> Tuple[int, float]:
919920
return (proc.pid, proc.create_time())
920921

921922
def _get_worker_processes(self):
922-
pids = self._get_worker_pids_from_raylet()
923+
pids = asyncio.run(self._get_worker_pids_from_raylet())
923924
if pids is not None:
924925
workers = {}
925926
for pid in pids:

src/ray/raylet/node_manager.cc

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2728,14 +2728,10 @@ void NodeManager::HandleGetDriverAndWorkerPids(
27282728
rpc::GetDriverAndWorkerPidsReply *reply,
27292729
rpc::SendReplyCallback send_reply_callback) {
27302730
auto all_workers = worker_pool_.GetAllRegisteredWorkers(/* filter_dead_worker */ true);
2731-
for (const auto &driver :
2732-
worker_pool_.GetAllRegisteredDrivers(/* filter_dead_driver */ true)) {
2733-
all_workers.push_back(driver);
2734-
}
2731+
const auto &drivers =
2732+
worker_pool_.GetAllRegisteredDrivers(/* filter_dead_driver */ true);
2733+
all_workers.insert(all_workers.end(), drivers.begin(), drivers.end());
27352734
for (const auto &worker : all_workers) {
2736-
if (worker->IsDead()) {
2737-
continue;
2738-
}
27392735
reply->add_pids(worker->GetProcess().GetId());
27402736
}
27412737
send_reply_callback(Status::OK(), nullptr, nullptr);

0 commit comments

Comments
 (0)