Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core][Bug fix] Trigger local task scheduling after deleting bundle. #51125

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions python/ray/tests/test_placement_group_5.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import sys
import time
import os
import psutil
from functools import reduce
from itertools import chain

Expand Down Expand Up @@ -651,6 +653,52 @@ def get_node_id():
)


def test_remove_placement_group_when_a_actor_queued(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=1)
cluster.wait_for_nodes()
ray.init(address=cluster.address)

pg = ray.util.placement_group([{"CPU": 1}])

@ray.remote
class Actor:
def get_raylet_pid(self):
return os.getppid()

actor = Actor.options(
lifetime="detached",
num_cpus=1.0,
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg, placement_group_bundle_index=0
),
).remote()

raylet = psutil.Process(ray.get(actor.get_raylet_pid.remote()))

_ = Actor.options(
# Ensure that it can still be scheduled when the job ends.
lifetime="detached",
num_cpus=1.0,
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg, placement_group_bundle_index=0
),
).remote()

assert raylet.is_running()

# trigger GCS remove pg
ray.shutdown()

# Ensure GCS finish remove pg.
with pytest.raises(psutil.TimeoutExpired):
raylet.wait(10)

# check raylet pass raycheck:
# `RAY_CHECK_OK(placement_group_resource_manager_->ReturnBundle(bundle_spec))`
assert raylet.is_running()


if __name__ == "__main__":
import os

Expand Down
22 changes: 17 additions & 5 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -601,11 +601,16 @@ void NodeManager::KillWorker(std::shared_ptr<WorkerInterface> worker, bool force
void NodeManager::DestroyWorker(std::shared_ptr<WorkerInterface> worker,
rpc::WorkerExitType disconnect_type,
const std::string &disconnect_detail,
bool force) {
bool force,
bool disable_schedule) {
// We should disconnect the client first. Otherwise, we'll remove bundle resources
// before actual resources are returned. Subsequent disconnect request that comes
// due to worker dead will be ignored.
DisconnectClient(worker->Connection(), disconnect_type, disconnect_detail);
DisconnectClient(worker->Connection(),
disconnect_type,
disconnect_detail,
/*creation_task_exception=*/nullptr,
disable_schedule);
worker->MarkDead();
KillWorker(worker, force);
if (disconnect_type == rpc::WorkerExitType::SYSTEM_ERROR) {
Expand Down Expand Up @@ -1541,7 +1546,8 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr<WorkerInterface> &
void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &client,
rpc::WorkerExitType disconnect_type,
const std::string &disconnect_detail,
const rpc::RayException *creation_task_exception) {
const rpc::RayException *creation_task_exception,
bool disable_schedule) {
RAY_LOG(INFO) << "NodeManager::DisconnectClient, disconnect_type=" << disconnect_type
<< ", has creation task exception = " << std::boolalpha
<< bool(creation_task_exception != nullptr);
Expand Down Expand Up @@ -1639,7 +1645,7 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie
local_task_manager_->ReleaseWorkerResources(worker);

// Since some resources may have been released, we can try to dispatch more tasks.
cluster_task_manager_->ScheduleAndDispatchTasks();
if (!disable_schedule) cluster_task_manager_->ScheduleAndDispatchTasks();
} else if (is_driver) {
// The client is a driver.
const auto job_id = worker->GetAssignedJobId();
Expand Down Expand Up @@ -2086,7 +2092,13 @@ void NodeManager::HandleCancelResourceReserve(
<< ", worker id: " << worker->WorkerId();
const auto &message = stream.str();
RAY_LOG(DEBUG) << message;
DestroyWorker(worker, rpc::WorkerExitType::INTENDED_SYSTEM_EXIT, message);
// Note(hejialing): We need to prohibit scheduling before we complete the modification
// of the bundle.
DestroyWorker(worker,
rpc::WorkerExitType::INTENDED_SYSTEM_EXIT,
message,
/*force=*/false,
/*disable_schedule=*/true);
}

RAY_CHECK_OK(placement_group_resource_manager_->ReturnBundle(bundle_spec));
Expand Down
8 changes: 6 additions & 2 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -371,12 +371,14 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// \param disconnect_type The reason why this worker process is disconnected.
/// \param disconnect_detail The detailed reason for a given exit.
/// \param force true to destroy immediately, false to give time for the worker to
/// \param disable_schedule true to diable schedule local task.
/// clean up and exit gracefully.
/// \return Void.
void DestroyWorker(std::shared_ptr<WorkerInterface> worker,
rpc::WorkerExitType disconnect_type,
const std::string &disconnect_detail,
bool force = false);
bool force = false,
bool disable_schedule = false);

/// When a job finished, loop over all of the queued tasks for that job and
/// treat them as failed.
Expand Down Expand Up @@ -699,11 +701,13 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// \param disconnect_type The reason to disconnect the specified client.
/// \param disconnect_detail Disconnection information in details.
/// \param client_error_message Extra error messages about this disconnection
/// \param disable_schedule true to diable schedule local task.
/// \return Void.
void DisconnectClient(const std::shared_ptr<ClientConnection> &client,
rpc::WorkerExitType disconnect_type,
const std::string &disconnect_detail,
const rpc::RayException *creation_task_exception = nullptr);
const rpc::RayException *creation_task_exception = nullptr,
bool disable_schedule = false);

bool TryLocalGC();

Expand Down