Skip to content

Commit

Permalink
[Core] Cancel lease requests before returning a PG bundle (#45919)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
  • Loading branch information
jjyao authored Jun 16, 2024
1 parent 2f80503 commit f0f52fa
Show file tree
Hide file tree
Showing 17 changed files with 366 additions and 237 deletions.
84 changes: 84 additions & 0 deletions python/ray/tests/test_gcs_fault_tolerance.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sys
import asyncio
import os
import threading
from time import sleep
Expand All @@ -22,6 +23,8 @@
)
from ray.job_submission import JobSubmissionClient, JobStatus
from ray._raylet import GcsClient
from ray._private.runtime_env.plugin import RuntimeEnvPlugin
from ray.util.state import list_placement_groups

import psutil

Expand Down Expand Up @@ -1213,6 +1216,87 @@ def spawn(self, name, namespace):
raise ValueError(f"Unknown case: {case}")


MyPlugin = "MyPlugin"
MY_PLUGIN_CLASS_PATH = "ray.tests.test_gcs_fault_tolerance.HangPlugin"


class HangPlugin(RuntimeEnvPlugin):
name = MyPlugin

async def create(
self,
uri,
runtime_env,
ctx,
logger, # noqa: F821
) -> float:
while True:
await asyncio.sleep(1)

@staticmethod
def validate(runtime_env_dict: dict) -> str:
return 1


@pytest.mark.parametrize(
"ray_start_regular_with_external_redis",
[
generate_system_config_map(
gcs_rpc_server_reconnect_timeout_s=60,
testing_asio_delay_us="NodeManagerService.grpc_server.CancelResourceReserve=500000000:500000000", # noqa: E501
),
],
indirect=True,
)
@pytest.mark.parametrize(
"set_runtime_env_plugins",
[
'[{"class":"' + MY_PLUGIN_CLASS_PATH + '"}]',
],
indirect=True,
)
def test_placement_group_removal_after_gcs_restarts(
set_runtime_env_plugins, ray_start_regular_with_external_redis
):
@ray.remote
def task():
pass

pg = ray.util.placement_group(bundles=[{"CPU": 1}])
_ = task.options(
max_retries=0,
num_cpus=1,
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg,
),
runtime_env={
MyPlugin: {"name": "f2"},
"config": {"setup_timeout_seconds": -1},
},
).remote()

# The task should be popping worker
# TODO(jjyao) Use a more determinstic way to
# decide whether the task is popping worker
sleep(5)

ray.util.remove_placement_group(pg)
# The PG is marked as REMOVED in redis but not removed yet from raylet
# due to the injected delay of CancelResourceReserve rpc
wait_for_condition(lambda: list_placement_groups()[0].state == "REMOVED")

ray._private.worker._global_node.kill_gcs_server()
# After GCS restarts, it will try to remove the PG resources
# again via ReleaseUnusedBundles rpc
ray._private.worker._global_node.start_gcs_server()

def verify_pg_resources_cleaned():
r_keys = ray.available_resources().keys()
return all("group" not in k for k in r_keys)

wait_for_condition(verify_pg_resources_cleaned, timeout=30)


if __name__ == "__main__":

import pytest
Expand Down
7 changes: 3 additions & 4 deletions python/ray/tests/test_placement_group_5.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,9 @@ async def create(
) -> float:
await asyncio.sleep(PLUGIN_TIMEOUT)


@staticmethod
def validate(runtime_env_dict: dict) -> str:
return 1
@staticmethod
def validate(runtime_env_dict: dict) -> str:
return 1


@pytest.mark.parametrize(
Expand Down
1 change: 1 addition & 0 deletions src/ray/core_worker/core_worker_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ void CoreWorkerProcessImpl::InitializeSystemConfig() {
thread.join();

RayConfig::instance().initialize(promise.get_future().get());
ray::asio::testing::init();
}

void CoreWorkerProcessImpl::RunWorkerTaskExecutionLoop() {
Expand Down
39 changes: 15 additions & 24 deletions src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,9 @@ void GcsPlacementGroupScheduler::CancelResourceReserve(
auto node_id = NodeID::FromBinary(node.value()->node_id());

if (max_retry == current_retry_cnt) {
RAY_LOG(INFO) << "Failed to cancel resource reserved for bundle because the max "
"retry count is reached. "
<< bundle_spec->DebugString() << " at node " << node_id;
RAY_LOG(ERROR) << "Failed to cancel resource reserved for bundle because the max "
"retry count is reached. "
<< bundle_spec->DebugString() << " at node " << node_id;
return;
}

Expand All @@ -261,11 +261,10 @@ void GcsPlacementGroupScheduler::CancelResourceReserve(
RAY_LOG(INFO) << "Finished cancelling the resource reserved for bundle: "
<< bundle_spec->DebugString() << " at node " << node_id;
} else {
// We couldn't delete the pg resources either becuase it is in use
// or network issue. Retry.
RAY_LOG(INFO) << "Failed to cancel the resource reserved for bundle: "
<< bundle_spec->DebugString() << " at node " << node_id
<< ". Status: " << status;
// We couldn't delete the pg resources because of network issue. Retry.
RAY_LOG(WARNING) << "Failed to cancel the resource reserved for bundle: "
<< bundle_spec->DebugString() << " at node " << node_id
<< ". Status: " << status;
execute_after(
io_context_,
[this, bundle_spec, node, max_retry, current_retry_cnt] {
Expand Down Expand Up @@ -568,14 +567,10 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupPreparedBundleResources(
for (const auto &iter : *(leasing_bundle_locations)) {
auto &bundle_spec = iter.second.second;
auto &node_id = iter.second.first;
CancelResourceReserve(
bundle_spec,
gcs_node_manager_.GetAliveNode(node_id),
// Retry 10 * worker registeration timeout to avoid race condition.
// See https://github.com/ray-project/ray/pull/42942
// for more details.
/*max_retry*/ RayConfig::instance().worker_register_timeout_seconds() * 10,
/*num_retry*/ 0);
CancelResourceReserve(bundle_spec,
gcs_node_manager_.GetAliveNode(node_id),
/*max_retry*/ 5,
/*num_retry*/ 0);
}
}
}
Expand All @@ -594,14 +589,10 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupCommittedBundleResources(
for (const auto &iter : *(committed_bundle_locations)) {
auto &bundle_spec = iter.second.second;
auto &node_id = iter.second.first;
CancelResourceReserve(
bundle_spec,
gcs_node_manager_.GetAliveNode(node_id),
// Retry 10 * worker registeration timeout to avoid race condition.
// See https://github.com/ray-project/ray/pull/42942
// for more details.
/*max_retry*/ RayConfig::instance().worker_register_timeout_seconds() * 10,
/*num_retry*/ 0);
CancelResourceReserve(bundle_spec,
gcs_node_manager_.GetAliveNode(node_id),
/*max_retry*/ 5,
/*num_retry*/ 0);
}
committed_bundle_location_index_.Erase(placement_group_id);
cluster_resource_scheduler_.GetClusterResourceManager()
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_server_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ int main(int argc, char *argv[]) {
gflags::ShutDownCommandLineFlags();

RayConfig::instance().initialize(config_list);
ray::asio::testing::init();

// IO Service for main loop.
instrumented_io_context main_service;
Expand Down
124 changes: 65 additions & 59 deletions src/ray/raylet/local_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -546,21 +546,15 @@ bool LocalTaskManager::PoppedWorkerHandler(
not_detached_with_owner_failed = true;
}

const auto &required_resource =
task.GetTaskSpecification().GetRequiredResources().GetResourceMap();
for (auto &entry : required_resource) {
if (!cluster_resource_scheduler_->GetLocalResourceManager().ResourcesExist(
scheduling::ResourceID(entry.first))) {
RAY_CHECK(task.GetTaskSpecification().PlacementGroupBundleId().first !=
PlacementGroupID::Nil());
RAY_LOG(DEBUG) << "The placement group: "
<< task.GetTaskSpecification().PlacementGroupBundleId().first
<< " was removed when poping workers for task: " << task_id
<< ", will cancel the task.";
CancelTask(
task_id,
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_PLACEMENT_GROUP_REMOVED);
canceled = true;
if (!canceled) {
const auto &required_resource =
task.GetTaskSpecification().GetRequiredResources().GetResourceMap();
for (auto &entry : required_resource) {
// This is to make sure PG resource is not deleted during popping worker
// unless the lease request is cancelled.
RAY_CHECK(cluster_resource_scheduler_->GetLocalResourceManager().ResourcesExist(
scheduling::ResourceID(entry.first)))
<< entry.first;
}
}

Expand Down Expand Up @@ -855,7 +849,7 @@ void LocalTaskManager::ReleaseTaskArgs(const TaskID &task_id) {
}

namespace {
void ReplyCancelled(std::shared_ptr<internal::Work> &work,
void ReplyCancelled(const std::shared_ptr<internal::Work> &work,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
const std::string &scheduling_failure_message) {
auto reply = work->reply;
Expand All @@ -867,55 +861,67 @@ void ReplyCancelled(std::shared_ptr<internal::Work> &work,
}
} // namespace

bool LocalTaskManager::CancelTask(
const TaskID &task_id,
bool LocalTaskManager::CancelTasks(
std::function<bool(const std::shared_ptr<internal::Work> &)> predicate,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
const std::string &scheduling_failure_message) {
for (auto shapes_it = tasks_to_dispatch_.begin(); shapes_it != tasks_to_dispatch_.end();
shapes_it++) {
auto &work_queue = shapes_it->second;
for (auto work_it = work_queue.begin(); work_it != work_queue.end(); work_it++) {
const auto &task = (*work_it)->task;
if (task.GetTaskSpecification().TaskId() == task_id) {
RAY_LOG(DEBUG) << "Canceling task " << task_id << " from dispatch queue.";
ReplyCancelled(*work_it, failure_type, scheduling_failure_message);
if ((*work_it)->GetState() == internal::WorkStatus::WAITING_FOR_WORKER) {
// We've already acquired resources so we need to release them.
cluster_resource_scheduler_->GetLocalResourceManager().ReleaseWorkerResources(
(*work_it)->allocated_instances);
// Release pinned task args.
ReleaseTaskArgs(task_id);
}
if (!task.GetTaskSpecification().GetDependencies().empty()) {
task_dependency_manager_.RemoveTaskDependencies(
task.GetTaskSpecification().TaskId());
bool tasks_cancelled = false;

ray::erase_if<SchedulingClass, std::shared_ptr<internal::Work>>(
tasks_to_dispatch_, [&](const std::shared_ptr<internal::Work> &work) {
if (predicate(work)) {
const TaskID task_id = work->task.GetTaskSpecification().TaskId();
RAY_LOG(DEBUG) << "Canceling task " << task_id << " from dispatch queue.";
ReplyCancelled(work, failure_type, scheduling_failure_message);
if (work->GetState() == internal::WorkStatus::WAITING_FOR_WORKER) {
// We've already acquired resources so we need to release them.
cluster_resource_scheduler_->GetLocalResourceManager().ReleaseWorkerResources(
work->allocated_instances);
// Release pinned task args.
ReleaseTaskArgs(task_id);
}
if (!work->task.GetTaskSpecification().GetDependencies().empty()) {
task_dependency_manager_.RemoveTaskDependencies(
work->task.GetTaskSpecification().TaskId());
}
RemoveFromRunningTasksIfExists(work->task);
work->SetStateCancelled();
tasks_cancelled = true;
return true;
} else {
return false;
}
RemoveFromRunningTasksIfExists(task);
(*work_it)->SetStateCancelled();
work_queue.erase(work_it);
if (work_queue.empty()) {
tasks_to_dispatch_.erase(shapes_it);
});

ray::erase_if<std::shared_ptr<internal::Work>>(
waiting_task_queue_, [&](const std::shared_ptr<internal::Work> &work) {
if (predicate(work)) {
ReplyCancelled(work, failure_type, scheduling_failure_message);
if (!work->task.GetTaskSpecification().GetDependencies().empty()) {
task_dependency_manager_.RemoveTaskDependencies(
work->task.GetTaskSpecification().TaskId());
}
waiting_tasks_index_.erase(work->task.GetTaskSpecification().TaskId());
tasks_cancelled = true;
return true;
} else {
return false;
}
return true;
}
}
}
});

auto iter = waiting_tasks_index_.find(task_id);
if (iter != waiting_tasks_index_.end()) {
const auto &task = (*iter->second)->task;
ReplyCancelled(*iter->second, failure_type, scheduling_failure_message);
if (!task.GetTaskSpecification().GetDependencies().empty()) {
task_dependency_manager_.RemoveTaskDependencies(
task.GetTaskSpecification().TaskId());
}
waiting_task_queue_.erase(iter->second);
waiting_tasks_index_.erase(iter);

return true;
}
return tasks_cancelled;
}

return false;
bool LocalTaskManager::CancelTask(
const TaskID &task_id,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
const std::string &scheduling_failure_message) {
return CancelTasks(
[task_id](const std::shared_ptr<internal::Work> &work) {
return work->task.GetTaskSpecification().TaskId() == task_id;
},
failure_type,
scheduling_failure_message);
}

bool LocalTaskManager::AnyPendingTasksForResourceAcquisition(
Expand Down
30 changes: 20 additions & 10 deletions src/ray/raylet/local_task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,15 @@ class LocalTaskManager : public ILocalTaskManager {
/// \param task: Output parameter.
void TaskFinished(std::shared_ptr<WorkerInterface> worker, RayTask *task);

/// Attempt to cancel an already queued task.
/// Attempt to cancel all queued tasks that match the predicate.
///
/// \param task_id: The id of the task to remove.
/// \param failure_type: The failure type.
///
/// \return True if task was successfully removed. This function will return
/// false if the task is already running.
bool CancelTask(const TaskID &task_id,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type =
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED,
const std::string &scheduling_failure_message = "") override;
/// \param predicate: A function that returns true if a task needs to be cancelled.
/// \param failure_type: The reason for cancellation.
/// \param scheduling_failure_message: The reason message for cancellation.
/// \return True if any task was successfully cancelled.
bool CancelTasks(std::function<bool(const std::shared_ptr<internal::Work> &)> predicate,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
const std::string &scheduling_failure_message) override;

/// Return if any tasks are pending resource acquisition.
///
Expand Down Expand Up @@ -203,6 +201,18 @@ class LocalTaskManager : public ILocalTaskManager {
const rpc::Address &owner_address,
const std::string &runtime_env_setup_error_message);

/// Attempt to cancel an already queued task.
///
/// \param task_id: The id of the task to remove.
/// \param failure_type: The failure type.
///
/// \return True if task was successfully removed. This function will return
/// false if the task is already running.
bool CancelTask(const TaskID &task_id,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type =
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED,
const std::string &scheduling_failure_message = "");

/// Attempts to dispatch all tasks which are ready to run. A task
/// will be dispatched if it is on `tasks_to_dispatch_` and there are still
/// available resources on the node.
Expand Down
Loading

0 comments on commit f0f52fa

Please sign in to comment.