diff --git a/python/ray/_private/resource_isolation_config.py b/python/ray/_private/resource_isolation_config.py index 1044e5c63721..24837bf23796 100644 --- a/python/ray/_private/resource_isolation_config.py +++ b/python/ray/_private/resource_isolation_config.py @@ -179,7 +179,7 @@ def _validate_and_get_system_reserved_cpu( does not have enough available cpus. """ - available_system_cpus = utils.get_num_cpus() + available_system_cpus = utils.get_num_cpus(truncate=False) if available_system_cpus < ray_constants.DEFAULT_MIN_SYSTEM_RESERVED_CPU_CORES: raise ValueError( @@ -220,9 +220,9 @@ def _validate_and_get_system_reserved_cpu( f"greater than or equal to {ray_constants.DEFAULT_MIN_SYSTEM_RESERVED_CPU_CORES}" ) - if system_reserved_cpu > available_system_cpus: + if system_reserved_cpu >= available_system_cpus: raise ValueError( - f"The requested system_reserved_cpu={system_reserved_cpu} is greater than " + f"The requested system_reserved_cpu={system_reserved_cpu} is greater than or equal to " f"the number of cpus available={available_system_cpus}. " "Pick a smaller number of cpu cores to reserve for ray system processes." ) diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index 871bda337bdd..6a29c711ef7f 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -421,7 +421,8 @@ def _get_docker_cpus( def get_num_cpus( override_docker_cpu_warning: bool = ENV_DISABLE_DOCKER_CPU_WARNING, -) -> int: + truncate: bool = True, +) -> float: """ Get the number of CPUs available on this node. Depending on the situation, use multiprocessing.cpu_count() or cgroups. @@ -432,6 +433,7 @@ def get_num_cpus( RAY_DISABLE_DOCKER_CPU_WARNING. By default, whether or not to log the warning is determined by the env variable RAY_DISABLE_DOCKER_CPU_WARNING. + truncate: truncates the return value and drops the decimal part. """ cpu_count = multiprocessing.cpu_count() if os.environ.get("RAY_USE_MULTIPROCESSING_CPU_COUNT"): @@ -473,7 +475,8 @@ def get_num_cpus( f"truncated from {docker_count} to " f"{int(docker_count)}." ) - docker_count = int(docker_count) + if truncate: + docker_count = int(docker_count) cpu_count = docker_count except Exception: diff --git a/python/ray/tests/resource_isolation/test_resource_isolation_config.py b/python/ray/tests/resource_isolation/test_resource_isolation_config.py index d22c9368fa45..5cdf034f0a26 100644 --- a/python/ray/tests/resource_isolation/test_resource_isolation_config.py +++ b/python/ray/tests/resource_isolation/test_resource_isolation_config.py @@ -89,12 +89,12 @@ def test_enabled_resource_isolation_with_default_config_picks_min_values(monkeyp # NOTE: if you change the DEFAULT_MIN_SYSTEM_* constants, you may need to modify this test. # if the total number of cpus is between [1,19] the system cgroup will a weight that is equal to 1 cpu core. # if the total amount of memory is between [0.5GB, 4.8GB] the system cgroup will get 0.5GB + object store memory. - monkeypatch.setattr(utils, "get_num_cpus", lambda *args, **kwargs: 1) + monkeypatch.setattr(utils, "get_num_cpus", lambda *args, **kwargs: 2) monkeypatch.setattr( common_utils, "get_system_memory", lambda *args, **kwargs: 0.5 * (1024**3) ) config = ResourceIsolationConfig(enable_resource_isolation=True) - assert config.system_reserved_cpu_weight == 10000 + assert config.system_reserved_cpu_weight == 5000 assert config.system_reserved_memory == 500 * (1024**2) monkeypatch.setattr(utils, "get_num_cpus", lambda *args, **kwargs: 19) @@ -176,7 +176,7 @@ def test_enabled_with_resource_overrides_less_than_minimum_defaults_raise_value_ ) -def test_enabled_with_resource_overrides_greater_than_available_resources_raise_value_error( +def test_enabled_with_resource_overrides_gte_than_available_resources_raise_value_error( monkeypatch, ): # The following values in ray_constants define the maximum reserved values to run ray with resource isolation. @@ -186,11 +186,9 @@ def test_enabled_with_resource_overrides_greater_than_available_resources_raise_ monkeypatch.setattr(utils, "get_num_cpus", lambda *args, **kwargs: 32) with pytest.raises( ValueError, - match="The requested system_reserved_cpu=32.1 is greater than the number of cpus available=32", + match="The requested system_reserved_cpu=32.0 is greater than or equal to the number of cpus available=32", ): - ResourceIsolationConfig( - enable_resource_isolation=True, system_reserved_cpu=32.1 - ) + ResourceIsolationConfig(enable_resource_isolation=True, system_reserved_cpu=32) monkeypatch.setattr( common_utils, "get_system_memory", lambda *args, **kwargs: 10 * (1024**3) diff --git a/python/ray/tests/resource_isolation/test_resource_isolation_integration.py b/python/ray/tests/resource_isolation/test_resource_isolation_integration.py index 8fc1668af3e9..fd858f1169a4 100644 --- a/python/ray/tests/resource_isolation/test_resource_isolation_integration.py +++ b/python/ray/tests/resource_isolation/test_resource_isolation_integration.py @@ -224,7 +224,7 @@ def cleanup_test_suite(): ) as base_subtree_control_file: base_subtree_control_file.write("-cpu -memory") base_subtree_control_file.flush() - # 2) Move processes back into the leaf cgroup. + # 2) Move processes back into the root cgroup. with open(_ROOT_CGROUP / "cgroup.procs", "w") as root_procs_file, open( _LEAF_GROUP / "cgroup.procs", "r" ) as leaf_procs_file: @@ -232,6 +232,15 @@ def cleanup_test_suite(): for line in leaf_cgroup_lines: root_procs_file.write(line.strip()) root_procs_file.flush() + # 3) Move the current process back into the _ROOT_CGROUP + with open(_ROOT_CGROUP / "cgroup.procs", "w") as root_procs_file, open( + _TEST_CGROUP / "cgroup.procs", "r" + ) as test_procs_file: + test_cgroup_lines = test_procs_file.readlines() + for line in test_cgroup_lines: + root_procs_file.write(line.strip()) + root_procs_file.flush() + # 3) Delete the cgroups. os.rmdir(_LEAF_GROUP) os.rmdir(_TEST_CGROUP) @@ -431,9 +440,6 @@ def test_ray_cli_start_resource_isolation_creates_cgroup_hierarchy_and_cleans_up assert result.exit_code == 0 resource_isolation_config.add_object_store_memory(object_store_memory) assert_cgroup_hierarchy_exists_for_node(node_id, resource_isolation_config) - assert_system_processes_are_in_system_cgroup( - node_id, resource_isolation_config, len(_EXPECTED_SYSTEM_PROCESSES_RAY_START) - ) @ray.remote(num_cpus=1) class Actor: @@ -447,12 +453,17 @@ def get_pid(self): for _ in range(num_cpus): actor_refs.append(Actor.remote()) worker_pids = set() + worker_pids.add(str(os.getpid())) for actor in actor_refs: worker_pids.add(str(ray.get(actor.get_pid.remote()))) + assert_system_processes_are_in_system_cgroup( + node_id, resource_isolation_config, len(_EXPECTED_SYSTEM_PROCESSES_RAY_START) + ) assert_worker_processes_are_in_workers_cgroup( node_id, resource_isolation_config, worker_pids ) runner.invoke(scripts.stop) + assert_cgroup_hierarchy_cleaned_up_for_node(node_id, resource_isolation_config) @@ -492,9 +503,6 @@ def test_ray_init_resource_isolation_creates_cgroup_hierarchy_and_cleans_up( object_store_memory=object_store_memory, ) assert_cgroup_hierarchy_exists_for_node(node_id, resource_isolation_config) - assert_system_processes_are_in_system_cgroup( - node_id, resource_isolation_config, len(_EXPECTED_SYSTEM_PROCESSES_RAY_INIT) - ) @ray.remote(num_cpus=1) class Actor: @@ -508,8 +516,12 @@ def get_pid(self): for _ in range(num_cpus): actor_refs.append(Actor.remote()) worker_pids = set() + worker_pids.add(str(os.getpid())) for actor in actor_refs: worker_pids.add(str(ray.get(actor.get_pid.remote()))) + assert_system_processes_are_in_system_cgroup( + node_id, resource_isolation_config, len(_EXPECTED_SYSTEM_PROCESSES_RAY_INIT) + ) assert_worker_processes_are_in_workers_cgroup( node_id, resource_isolation_config, worker_pids ) diff --git a/src/ray/common/cgroup2/cgroup_manager.cc b/src/ray/common/cgroup2/cgroup_manager.cc index 6a332fb89c7c..12126c0f4f1b 100644 --- a/src/ray/common/cgroup2/cgroup_manager.cc +++ b/src/ray/common/cgroup2/cgroup_manager.cc @@ -268,10 +268,11 @@ Status CgroupManager::Initialize(int64_t system_reserved_cpu_weight, RAY_RETURN_NOT_OK(cgroup_driver_->MoveAllProcesses(base_cgroup_, non_ray_cgroup_)); RegisterMoveAllProcesses(non_ray_cgroup_, base_cgroup_); - // NOTE: Since the raylet does not own the lifecycle of all system processes, - // there's no guarantee that there are no pids in the system leaf cgroup. + // NOTE: Since the raylet does not own the lifecycle of all system or worker processes, + // there's no guarantee that there are no pids in the system leaf or the workers cgroup. // Therefore, pids need to be migrated out of the system cgroup to delete it. RegisterMoveAllProcesses(system_leaf_cgroup_, base_cgroup_); + RegisterMoveAllProcesses(workers_cgroup_, base_cgroup_); std::array cpu_controlled_cgroups{&base_cgroup_, &node_cgroup_}; std::array memory_controlled_cgroups{ diff --git a/src/ray/common/cgroup2/integration_tests/sysfs_cgroup_driver_integration_test.cc b/src/ray/common/cgroup2/integration_tests/sysfs_cgroup_driver_integration_test.cc index 3c990b5a3cf8..87aa166b1309 100644 --- a/src/ray/common/cgroup2/integration_tests/sysfs_cgroup_driver_integration_test.cc +++ b/src/ray/common/cgroup2/integration_tests/sysfs_cgroup_driver_integration_test.cc @@ -679,4 +679,44 @@ TEST_F(SysFsCgroupDriverIntegrationTest, ASSERT_TRUE(terminate_s.ok()) << terminate_s.ToString(); } +TEST_F(SysFsCgroupDriverIntegrationTest, + AddProcessToCgroupSucceedsIfProcessAlreadyInCgroup) { + auto cgroup_or_status = TempCgroupDirectory::Create(test_cgroup_path_, S_IRWXU); + ASSERT_TRUE(cgroup_or_status.ok()) << cgroup_or_status.ToString(); + auto cgroup = std::move(cgroup_or_status.value()); + auto child_cgroup_or_status = TempCgroupDirectory::Create(cgroup->GetPath(), S_IRWXU); + ASSERT_TRUE(child_cgroup_or_status.ok()) << child_cgroup_or_status.ToString(); + auto child_cgroup = std::move(child_cgroup_or_status.value()); + StatusOr> child_process_s = + StartChildProcessInCgroup(cgroup->GetPath()); + ASSERT_TRUE(child_process_s.ok()) << child_process_s.ToString(); + auto [child_pid, child_pidfd] = child_process_s.value(); + SysFsCgroupDriver driver; + Status s = + driver.AddProcessToCgroup(child_cgroup->GetPath(), std::to_string(child_pid)); + ASSERT_TRUE(s.ok()) << s.ToString(); + Status s2 = + driver.AddProcessToCgroup(child_cgroup->GetPath(), std::to_string(child_pid)); + ASSERT_TRUE(s2.ok()) << s2.ToString(); + // Assert that the child's pid is actually in the new file. + std::string child_cgroup_procs_file_path = child_cgroup->GetPath() + + std::filesystem::path::preferred_separator + + "cgroup.procs"; + std::ifstream child_cgroup_procs_file(child_cgroup_procs_file_path); + ASSERT_TRUE(child_cgroup_procs_file.is_open()) + << "Could not open file " << child_cgroup_procs_file_path << "."; + std::unordered_set child_cgroup_pids; + int pid = -1; + while (child_cgroup_procs_file >> pid) { + ASSERT_FALSE(child_cgroup_procs_file.fail()) + << "Unable to read pid from file " << child_cgroup_procs_file_path; + child_cgroup_pids.emplace(pid); + } + EXPECT_EQ(child_cgroup_pids.size(), 1); + EXPECT_TRUE(child_cgroup_pids.find(child_pid) != child_cgroup_pids.end()); + Status terminate_s = + TerminateChildProcessAndWaitForTimeout(child_pid, child_pidfd, 5000); + ASSERT_TRUE(terminate_s.ok()) << terminate_s.ToString(); +} + } // namespace ray diff --git a/src/ray/common/cgroup2/sysfs_cgroup_driver.cc b/src/ray/common/cgroup2/sysfs_cgroup_driver.cc index ddc845368151..2e973113e582 100644 --- a/src/ray/common/cgroup2/sysfs_cgroup_driver.cc +++ b/src/ray/common/cgroup2/sysfs_cgroup_driver.cc @@ -331,8 +331,6 @@ Status SysFsCgroupDriver::DisableController(const std::string &cgroup_path, return Status::OK(); } -// What's the right thing here? If the controller is specified? -// The correct API would be specify where the controller should be enabled. Status SysFsCgroupDriver::AddConstraint(const std::string &cgroup_path, const std::string &constraint, const std::string &constraint_value) { diff --git a/src/ray/common/cgroup2/tests/cgroup_manager_test.cc b/src/ray/common/cgroup2/tests/cgroup_manager_test.cc index 00aae668d39a..7b2f07d4c793 100644 --- a/src/ray/common/cgroup2/tests/cgroup_manager_test.cc +++ b/src/ray/common/cgroup2/tests/cgroup_manager_test.cc @@ -282,11 +282,11 @@ TEST(CgroupManagerTest, CreateSucceedsWithCleanupInOrder) { } // Processes must be moved third. - // Processes were moved both out of the system_leaf cgroup and the non_ray - // cgroup. - ASSERT_EQ(processes_moved->size(), 2); - std::array process_moved_cgroups{system_leaf_cgroup_path, - non_ray_cgroup_path}; + // Processes were moved both out of the system_leaf, workers, and non_ray + // cgroups. + ASSERT_EQ(processes_moved->size(), 3); + std::array process_moved_cgroups{ + system_leaf_cgroup_path, non_ray_cgroup_path, workers_cgroup_path}; // The order in which processes were moved back from leaf nodes to the base_cgroup // does not matter. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 858ff5566115..f5e7c765d443 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1183,7 +1183,10 @@ Status NodeManager::RegisterForNewDriver( worker->SetProcess(Process::FromPid(pid)); rpc::JobConfig job_config; job_config.ParseFromString(message->serialized_job_config()->str()); - + Status s = cgroup_manager_->AddProcessToWorkersCgroup(std::to_string(pid)); + RAY_CHECK(s.ok()) << absl::StrFormat( + "Failed to move the driver process into the workers cgroup with error %s", + s.ToString()); return worker_pool_.RegisterDriver(worker, job_config, send_reply_callback); }