From 4dc514ee538d60dae67ea43d8d3d67dbb395ad9d Mon Sep 17 00:00:00 2001 From: Dhyey Shah Date: Thu, 14 Nov 2024 19:31:35 -0500 Subject: [PATCH] [core] Stop waiting for dependencies when cancelling tasks (#48661) Signed-off-by: dayshah --- python/ray/tests/test_cancel.py | 20 +++++++++++++++++++ src/mock/ray/core_worker/task_manager.h | 1 + src/ray/core_worker/task_manager.h | 4 +++- .../test/dependency_resolver_test.cc | 2 ++ .../test/normal_task_submitter_test.cc | 2 ++ .../transport/actor_task_submitter.cc | 3 ++- .../transport/normal_task_submitter.cc | 11 +++++----- 7 files changed, 35 insertions(+), 8 deletions(-) diff --git a/python/ray/tests/test_cancel.py b/python/ray/tests/test_cancel.py index c46175670e60..e9cf6cfdb012 100644 --- a/python/ray/tests/test_cancel.py +++ b/python/ray/tests/test_cancel.py @@ -636,6 +636,26 @@ def verify(): wait_for_condition(verify) +@pytest.mark.parametrize("use_force", [True, False]) +def test_cancel_with_dependency(shutdown_only, use_force): + ray.init(num_cpus=4) + + @ray.remote(num_cpus=1) + def wait_forever_task(): + while True: + time.sleep(1000) + + @ray.remote(num_cpus=1) + def square(x): + return x * x + + wait_forever_obj = wait_forever_task.remote() + wait_forever_as_dep = square.remote(wait_forever_obj) + ray.cancel(wait_forever_as_dep, force=use_force) + with pytest.raises(valid_exceptions(use_force)): + ray.get(wait_forever_as_dep) + + @pytest.mark.skip("Actor cancelation works now.") def test_recursive_cancel_error_messages(shutdown_only, capsys): """ diff --git a/src/mock/ray/core_worker/task_manager.h b/src/mock/ray/core_worker/task_manager.h index 5dc1f469ee2c..488e0ac3c235 100644 --- a/src/mock/ray/core_worker/task_manager.h +++ b/src/mock/ray/core_worker/task_manager.h @@ -61,6 +61,7 @@ class MockTaskFinisherInterface : public TaskFinisherInterface { MarkTaskWaitingForExecution, (const TaskID &task_id, const NodeID &node_id, const WorkerID &worker_id), (override)); + MOCK_METHOD(bool, IsTaskPending, (const TaskID &task_id), (const, override)); }; } // namespace core diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index df0b26f3d0d9..196e18beb277 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -66,6 +66,8 @@ class TaskFinisherInterface { virtual absl::optional GetTaskSpec(const TaskID &task_id) const = 0; + virtual bool IsTaskPending(const TaskID &task_id) const = 0; + virtual ~TaskFinisherInterface() = default; }; @@ -551,7 +553,7 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa /// /// \param[in] task_id ID of the task to query. /// \return Whether the task is pending. - bool IsTaskPending(const TaskID &task_id) const; + bool IsTaskPending(const TaskID &task_id) const override; /// Return whether the task is scheduled adn waiting for execution. /// diff --git a/src/ray/core_worker/test/dependency_resolver_test.cc b/src/ray/core_worker/test/dependency_resolver_test.cc index 4d8e51a6faec..9fbe28445126 100644 --- a/src/ray/core_worker/test/dependency_resolver_test.cc +++ b/src/ray/core_worker/test/dependency_resolver_test.cc @@ -114,6 +114,8 @@ class MockTaskFinisher : public TaskFinisherInterface { const NodeID &node_id, const WorkerID &worker_id) override {} + bool IsTaskPending(const TaskID &task_id) const override { return true; } + int num_tasks_complete = 0; int num_tasks_failed = 0; int num_inlined_dependencies = 0; diff --git a/src/ray/core_worker/test/normal_task_submitter_test.cc b/src/ray/core_worker/test/normal_task_submitter_test.cc index f7feb5ae26e0..4179d512ea47 100644 --- a/src/ray/core_worker/test/normal_task_submitter_test.cc +++ b/src/ray/core_worker/test/normal_task_submitter_test.cc @@ -179,6 +179,8 @@ class MockTaskFinisher : public TaskFinisherInterface { const NodeID &node_id, const WorkerID &worker_id) override {} + bool IsTaskPending(const TaskID &task_id) const override { return true; } + int num_tasks_complete = 0; int num_tasks_failed = 0; int num_inlined_dependencies = 0; diff --git a/src/ray/core_worker/transport/actor_task_submitter.cc b/src/ray/core_worker/transport/actor_task_submitter.cc index 2f18f6299bb8..c54b9deb16ec 100644 --- a/src/ray/core_worker/transport/actor_task_submitter.cc +++ b/src/ray/core_worker/transport/actor_task_submitter.cc @@ -876,7 +876,8 @@ Status ActorTaskSubmitter::CancelTask(TaskSpecification task_spec, bool recursiv // Shouldn't hold a lock while accessing task_finisher_. // Task is already canceled or finished. - if (!GetTaskFinisherWithoutMu().MarkTaskCanceled(task_id)) { + if (!GetTaskFinisherWithoutMu().MarkTaskCanceled(task_id) || + !GetTaskFinisherWithoutMu().IsTaskPending(task_id)) { RAY_LOG(DEBUG).WithField(task_id) << "Task is already finished or canceled"; return Status::OK(); } diff --git a/src/ray/core_worker/transport/normal_task_submitter.cc b/src/ray/core_worker/transport/normal_task_submitter.cc index 9e8ac970db1b..a1fca860da26 100644 --- a/src/ray/core_worker/transport/normal_task_submitter.cc +++ b/src/ray/core_worker/transport/normal_task_submitter.cc @@ -82,10 +82,6 @@ Status NormalTaskSubmitter::SubmitTask(TaskSpecification task_spec) { RequestNewWorkerIfNeeded(scheduling_key); } } - if (!keep_executing) { - RAY_UNUSED(task_finisher_->FailOrRetryPendingTask( - task_spec.TaskId(), rpc::ErrorType::TASK_CANCELLED, nullptr)); - } }); return Status::OK(); } @@ -709,7 +705,8 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, { absl::MutexLock lock(&mu_); if (cancelled_tasks_.find(task_spec.TaskId()) != cancelled_tasks_.end() || - !task_finisher_->MarkTaskCanceled(task_spec.TaskId())) { + !task_finisher_->MarkTaskCanceled(task_spec.TaskId()) || + !task_finisher_->IsTaskPending(task_spec.TaskId())) { return Status::OK(); } @@ -736,7 +733,9 @@ Status NormalTaskSubmitter::CancelTask(TaskSpecification task_spec, if (rpc_client == executing_tasks_.end()) { // This case is reached for tasks that have unresolved dependencies. - // No executing tasks, so cancelling is a noop. + resolver_.CancelDependencyResolution(task_spec.TaskId()); + RAY_UNUSED(task_finisher_->FailPendingTask(task_spec.TaskId(), + rpc::ErrorType::TASK_CANCELLED)); if (scheduling_key_entry.CanDelete()) { // We can safely remove the entry keyed by scheduling_key from the // scheduling_key_entries_ hashmap.