Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
)
from ray._raylet import GcsClient, get_session_key_from_storage

import psutil

# Logger for this module. It should be configured at the entry point
# into the program using Ray. Ray configures it by default automatically
# using logging.basicConfig in its entry/init points.
Expand Down Expand Up @@ -1438,7 +1440,20 @@ def _get_system_processes_for_resource_isolation(self) -> str:
added to self.all_processes so it can be moved into the raylet's managed cgroup
hierarchy.
"""
return ",".join(str(p[0].process.pid) for p in self.all_processes.values())
system_process_pids = [
str(p[0].process.pid) for p in self.all_processes.values()
]

# If the dashboard api server was started on the head node, then include all of the api server's
# child processes.
if ray_constants.PROCESS_TYPE_DASHBOARD in self.all_processes:
dashboard_pid = self.all_processes[ray_constants.PROCESS_TYPE_DASHBOARD][
0
].process.pid
dashboard_process = psutil.Process(dashboard_pid)
system_process_pids += [str(p.pid) for p in dashboard_process.children()]

return ",".join(system_process_pids)

def _kill_process_type(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,22 @@

_MOUNT_FILE_PATH = "/proc/mounts"

# The names are here to help debug test failures. Tests should
# only use the size of this list. These processes are expected to be moved
# into the the system cgroup.
_EXPECTED_DASHBOARD_MODULES = [
"ray.dashboard.modules.usage_stats.usage_stats_head.UsageStatsHead",
"ray.dashboard.modules.metrics.metrics_head.MetricsHead",
"ray.dashboard.modules.data.data_head.DataHead",
"ray.dashboard.modules.event.event_head.EventHead",
"ray.dashboard.modules.job.job_head.JobHead",
"ray.dashboard.modules.node.node_head.NodeHead",
"ray.dashboard.modules.reporter.reporter_head.ReportHead",
"ray.dashboard.modules.serve.serve_head.ServeHead",
"ray.dashboard.modules.state.state_head.StateHead",
"ray.dashboard.modules.train.train_head.TrainHead",
]

# The list of processes expected to be started in the system cgroup
# with default params for 'ray start' and 'ray.init(...)'
_EXPECTED_SYSTEM_PROCESSES_RAY_START = [
Expand Down Expand Up @@ -345,7 +361,7 @@ def assert_system_processes_are_in_system_cgroup(
lines = cgroup_procs_file.readlines()
assert (
len(lines) == expected_count
), f"Expected only system process passed into the raylet. Found {lines}"
), f"Expected only system process passed into the raylet. Found {lines}. You may have added a new dashboard module in which case you need to update _EXPECTED_DASHBOARD_MODULES"


def assert_worker_processes_are_in_workers_cgroup(
Expand Down Expand Up @@ -457,7 +473,9 @@ def get_pid(self):
for actor in actor_refs:
worker_pids.add(str(ray.get(actor.get_pid.remote())))
assert_system_processes_are_in_system_cgroup(
node_id, resource_isolation_config, len(_EXPECTED_SYSTEM_PROCESSES_RAY_START)
node_id,
resource_isolation_config,
len(_EXPECTED_SYSTEM_PROCESSES_RAY_START) + len(_EXPECTED_DASHBOARD_MODULES),
)
assert_worker_processes_are_in_workers_cgroup(
node_id, resource_isolation_config, worker_pids
Expand Down Expand Up @@ -520,7 +538,9 @@ def get_pid(self):
for actor in actor_refs:
worker_pids.add(str(ray.get(actor.get_pid.remote())))
assert_system_processes_are_in_system_cgroup(
node_id, resource_isolation_config, len(_EXPECTED_SYSTEM_PROCESSES_RAY_INIT)
node_id,
resource_isolation_config,
len(_EXPECTED_SYSTEM_PROCESSES_RAY_INIT) + len(_EXPECTED_DASHBOARD_MODULES),
)
assert_worker_processes_are_in_workers_cgroup(
node_id, resource_isolation_config, worker_pids
Expand Down
2 changes: 1 addition & 1 deletion src/ray/common/cgroup2/cgroup_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ StatusOr<std::unique_ptr<CgroupManager>> CgroupManager::Create(
std::unique_ptr<CgroupDriverInterface> cgroup_driver) {
if (!cpu_weight_constraint_.IsValid(system_reserved_cpu_weight)) {
return Status::InvalidArgument(
absl::StrFormat("Invalid constraint %s=%d. %s must be in the range [%d, %d].",
absl::StrFormat(" Invalid constraint %s=%d. %s must be in the range [%d, %d].",
cpu_weight_constraint_.name_,
system_reserved_cpu_weight,
cpu_weight_constraint_.name_,
Expand Down