From fe7c9adebee648135bcfdf45decdf709073fc153 Mon Sep 17 00:00:00 2001 From: hejialing Date: Thu, 6 Mar 2025 17:28:34 +0800 Subject: [PATCH] save Signed-off-by: hejialing.hjl --- python/ray/tests/test_placement_group_5.py | 48 ++++++++++++++++++++++ src/ray/raylet/node_manager.cc | 22 +++++++--- src/ray/raylet/node_manager.h | 8 +++- 3 files changed, 71 insertions(+), 7 deletions(-) diff --git a/python/ray/tests/test_placement_group_5.py b/python/ray/tests/test_placement_group_5.py index 4af2894a99f3d..71c593a4fa9ca 100644 --- a/python/ray/tests/test_placement_group_5.py +++ b/python/ray/tests/test_placement_group_5.py @@ -1,6 +1,8 @@ import asyncio import sys import time +import os +import psutil from functools import reduce from itertools import chain @@ -651,6 +653,52 @@ def get_node_id(): ) +def test_remove_placement_group_when_a_actor_queued(ray_start_cluster): + cluster = ray_start_cluster + cluster.add_node(num_cpus=1) + cluster.wait_for_nodes() + ray.init(address=cluster.address) + + pg = ray.util.placement_group([{"CPU": 1}]) + + @ray.remote + class Actor: + def get_raylet_pid(self): + return os.getppid() + + actor = Actor.options( + lifetime="detached", + num_cpus=1.0, + scheduling_strategy=PlacementGroupSchedulingStrategy( + placement_group=pg, placement_group_bundle_index=0 + ), + ).remote() + + raylet = psutil.Process(ray.get(actor.get_raylet_pid.remote())) + + _ = Actor.options( + # Ensure that it can still be scheduled when the job ends. + lifetime="detached", + num_cpus=1.0, + scheduling_strategy=PlacementGroupSchedulingStrategy( + placement_group=pg, placement_group_bundle_index=0 + ), + ).remote() + + assert raylet.is_running() + + # trigger GCS remove pg + ray.shutdown() + + # Ensure GCS finish remove pg. + with pytest.raises(psutil.TimeoutExpired): + raylet.wait(10) + + # check raylet pass raycheck: + # `RAY_CHECK_OK(placement_group_resource_manager_->ReturnBundle(bundle_spec))` + assert raylet.is_running() + + if __name__ == "__main__": import os diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 152a259b3a5a8..b4d2f234ac065 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -601,11 +601,16 @@ void NodeManager::KillWorker(std::shared_ptr worker, bool force void NodeManager::DestroyWorker(std::shared_ptr worker, rpc::WorkerExitType disconnect_type, const std::string &disconnect_detail, - bool force) { + bool force, + bool disable_schedule) { // We should disconnect the client first. Otherwise, we'll remove bundle resources // before actual resources are returned. Subsequent disconnect request that comes // due to worker dead will be ignored. - DisconnectClient(worker->Connection(), disconnect_type, disconnect_detail); + DisconnectClient(worker->Connection(), + disconnect_type, + disconnect_detail, + /*creation_task_exception=*/nullptr, + disable_schedule); worker->MarkDead(); KillWorker(worker, force); if (disconnect_type == rpc::WorkerExitType::SYSTEM_ERROR) { @@ -1541,7 +1546,8 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr & void NodeManager::DisconnectClient(const std::shared_ptr &client, rpc::WorkerExitType disconnect_type, const std::string &disconnect_detail, - const rpc::RayException *creation_task_exception) { + const rpc::RayException *creation_task_exception, + bool disable_schedule) { RAY_LOG(INFO) << "NodeManager::DisconnectClient, disconnect_type=" << disconnect_type << ", has creation task exception = " << std::boolalpha << bool(creation_task_exception != nullptr); @@ -1639,7 +1645,7 @@ void NodeManager::DisconnectClient(const std::shared_ptr &clie local_task_manager_->ReleaseWorkerResources(worker); // Since some resources may have been released, we can try to dispatch more tasks. - cluster_task_manager_->ScheduleAndDispatchTasks(); + if (!disable_schedule) cluster_task_manager_->ScheduleAndDispatchTasks(); } else if (is_driver) { // The client is a driver. const auto job_id = worker->GetAssignedJobId(); @@ -2086,7 +2092,13 @@ void NodeManager::HandleCancelResourceReserve( << ", worker id: " << worker->WorkerId(); const auto &message = stream.str(); RAY_LOG(DEBUG) << message; - DestroyWorker(worker, rpc::WorkerExitType::INTENDED_SYSTEM_EXIT, message); + // Note(hejialing): We need to prohibit scheduling before we complete the modification + // of the bundle. + DestroyWorker(worker, + rpc::WorkerExitType::INTENDED_SYSTEM_EXIT, + message, + /*force=*/false, + /*disable_schedule=*/true); } RAY_CHECK_OK(placement_group_resource_manager_->ReturnBundle(bundle_spec)); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index a199c84fc0740..1898887de6fb0 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -371,12 +371,14 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// \param disconnect_type The reason why this worker process is disconnected. /// \param disconnect_detail The detailed reason for a given exit. /// \param force true to destroy immediately, false to give time for the worker to + /// \param disable_schedule true to diable schedule local task. /// clean up and exit gracefully. /// \return Void. void DestroyWorker(std::shared_ptr worker, rpc::WorkerExitType disconnect_type, const std::string &disconnect_detail, - bool force = false); + bool force = false, + bool disable_schedule = false); /// When a job finished, loop over all of the queued tasks for that job and /// treat them as failed. @@ -699,11 +701,13 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// \param disconnect_type The reason to disconnect the specified client. /// \param disconnect_detail Disconnection information in details. /// \param client_error_message Extra error messages about this disconnection + /// \param disable_schedule true to diable schedule local task. /// \return Void. void DisconnectClient(const std::shared_ptr &client, rpc::WorkerExitType disconnect_type, const std::string &disconnect_detail, - const rpc::RayException *creation_task_exception = nullptr); + const rpc::RayException *creation_task_exception = nullptr, + bool disable_schedule = false); bool TryLocalGC();