From 3bb9a8a83a9a7aa5fda1b3122c20e516ab38a1be Mon Sep 17 00:00:00 2001 From: rickyyx Date: Wed, 1 Feb 2023 02:52:59 +0000 Subject: [PATCH 1/5] init Signed-off-by: rickyyx --- .../runtime/task/local_mode_task_submitter.cc | 3 +- python/ray/tests/test_task_events.py | 206 ++++++++++++++++++ src/ray/common/task/task_spec.cc | 7 + src/ray/common/task/task_spec.h | 2 + src/ray/common/task/task_util.h | 6 +- src/ray/core_worker/context.cc | 10 + src/ray/core_worker/context.h | 5 + src/ray/core_worker/core_worker.cc | 8 +- src/ray/core_worker/core_worker.h | 1 + src/ray/core_worker/task_manager.cc | 2 +- src/ray/core_worker/test/core_worker_test.cc | 12 +- .../test/dependency_resolver_test.cc | 3 +- .../test/direct_task_transport_test.cc | 3 +- src/ray/gcs/test/gcs_test_util.h | 3 +- src/ray/protobuf/common.proto | 4 + .../scheduling/cluster_task_manager_test.cc | 1 + 16 files changed, 268 insertions(+), 8 deletions(-) diff --git a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc index 7740bc186d25..145e8130fe15 100644 --- a/cpp/src/ray/runtime/task/local_mode_task_submitter.cc +++ b/cpp/src/ray/runtime/task/local_mode_task_submitter.cc @@ -64,7 +64,8 @@ ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation, required_resources, required_placement_resources, "", - /*depth=*/0); + /*depth=*/0, + local_mode_ray_tuntime_.GetCurrentTaskId()); if (invocation.task_type == TaskType::NORMAL_TASK) { } else if (invocation.task_type == TaskType::ACTOR_CREATION_TASK) { invocation.actor_id = local_mode_ray_tuntime_.GetNextActorID(); diff --git a/python/ray/tests/test_task_events.py b/python/ray/tests/test_task_events.py index 425f1f5931cc..20f0c6f36182 100644 --- a/python/ray/tests/test_task_events.py +++ b/python/ray/tests/test_task_events.py @@ -1,12 +1,14 @@ from collections import defaultdict from typing import Dict import pytest +import threading import time import ray from ray.experimental.state.common import ListApiOptions, StateResource from ray._private.test_utils import ( raw_metrics, + run_string_as_driver, run_string_as_driver_nonblocking, wait_for_condition, ) @@ -124,6 +126,210 @@ def verify(): ) +def test_parent_task_id_threaded_task(shutdown_only): + ray.init(_system_config=_SYSTEM_CONFIG) + + # Task starts a thread + @ray.remote + def main_task(): + def thd_task(): + @ray.remote + def thd_task(): + pass + + ray.get(thd_task.remote()) + + thd = threading.Thread(target=thd_task) + thd.start() + thd.join() + + ray.get(main_task.remote()) + + def verify(): + tasks = list_tasks() + assert len(tasks) == 2 + expect_parent_task_id = None + actual_parent_task_id = None + for task in tasks: + if task["name"] == "main_task": + expect_parent_task_id = task["task_id"] + elif task["name"] == "thd_task": + actual_parent_task_id = task["parent_task_id"] + assert actual_parent_task_id is not None + assert expect_parent_task_id == actual_parent_task_id + + return True + + wait_for_condition(verify) + + +def test_parent_task_id_actor(shutdown_only): + ray.init(_system_config=_SYSTEM_CONFIG) + + def run_task_in_thread(): + def thd_task(): + @ray.remote + def thd_task(): + pass + + ray.get(thd_task.remote()) + + thd = threading.Thread(target=thd_task) + thd.start() + thd.join() + + @ray.remote + class Actor: + def main_task(self): + run_task_in_thread() + + a = Actor.remote() + ray.get(a.main_task.remote()) + + def verify(): + tasks = list_tasks() + expect_parent_task_id = None + actual_parent_task_id = None + for task in tasks: + if "main_task" in task["name"]: + expect_parent_task_id = task["task_id"] + elif "thd_task" in task["name"]: + actual_parent_task_id = task["parent_task_id"] + print(tasks) + assert actual_parent_task_id is not None + assert expect_parent_task_id == actual_parent_task_id + + return True + + wait_for_condition(verify) + + +@pytest.mark.parametrize("actor_concurrency", [3, 10]) +def test_parent_task_id_multi_thread_actors(shutdown_only, actor_concurrency): + ray.init(_system_config=_SYSTEM_CONFIG) + + def run_task_in_thread(name, i): + def thd_task(): + @ray.remote + def thd_task(): + pass + + ray.get(thd_task.options(name=f"{name}_{i}").remote()) + + thd = threading.Thread(target=thd_task) + thd.start() + thd.join() + + @ray.remote + class AsyncActor: + async def main_task(self, i): + run_task_in_thread("async_thd_task", i) + + @ray.remote + class ThreadedActor: + def main_task(self, i): + run_task_in_thread("threaded_thd_task", i) + + def verify(actor_method_name, actor_class_name): + tasks = list_tasks() + print(tasks) + expect_parent_task_id = None + actual_parent_task_id = None + for task in tasks: + if f"{actor_class_name}.__init__" in task["name"]: + expect_parent_task_id = task["task_id"] + + assert expect_parent_task_id is not None + for task in tasks: + if f"{actor_method_name}" in task["name"]: + actual_parent_task_id = task["parent_task_id"] + assert expect_parent_task_id == actual_parent_task_id, task + + return True + + async_actor = AsyncActor.options(max_concurrency=actor_concurrency).remote() + ray.get([async_actor.main_task.remote(i) for i in range(20)]) + wait_for_condition( + verify, actor_class_name="AsyncActor", actor_method_name="async_thd_task" + ) + + thd_actor = ThreadedActor.options(max_concurrency=actor_concurrency).remote() + ray.get([thd_actor.main_task.remote(i) for i in range(20)]) + wait_for_condition( + verify, actor_class_name="ThreadedActor", actor_method_name="threaded_thd_task" + ) + + +def test_parent_task_id_tune_e2e(shutdown_only): + ray.init(_system_config=_SYSTEM_CONFIG) + job_id = ray.get_runtime_context().get_job_id() + script = """ +import numpy as np +import ray +from ray import tune +import time + +ray.init("auto") + +@ray.remote +def train_step_1(): + time.sleep(0.5) + return 1 + +def train_function(config): + for i in range(5): + loss = config["mean"] * np.random.randn() + ray.get( + train_step_1.remote()) + tune.report(loss=loss, nodes=ray.nodes()) + + +def tune_function(): + analysis = tune.run( + train_function, + metric="loss", + mode="min", + config={ + "mean": tune.grid_search([1, 2, 3, 4, 5]), + }, + resources_per_trial=tune.PlacementGroupFactory([{ + 'CPU': 1.0 + }] + [{ + 'CPU': 1.0 + }] * 3), + ) + return analysis.best_config + + +tune_function() + """ + + run_string_as_driver(script) + client = StateApiClient() + + def list_tasks(): + return client.list( + StateResource.TASKS, + # Filter out this driver + options=ListApiOptions( + exclude_driver=False, filters=[("job_id", "!=", job_id)], limit=1000 + ), + raise_on_missing_output=True, + ) + + def verify(): + tasks = list_tasks() + + task_id_map = {task["task_id"]: task for task in tasks} + for task in tasks: + if task["type"] == "DRIVER_TASK": + continue + assert task_id_map.get(task["parent_task_id"], None) is not None, task + + return True + + wait_for_condition(verify) + + def test_handle_driver_tasks(shutdown_only): ray.init(_system_config=_SYSTEM_CONFIG) diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 0c31dd0f466a..dcdaef5f9855 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -160,6 +160,13 @@ TaskID TaskSpecification::ParentTaskId() const { return TaskID::FromBinary(message_->parent_task_id()); } +TaskID TaskSpecification::MainThreadParentTaskId() const { + if (message_->main_thread_parent_task_id().empty() /* e.g., empty proto default */) { + return TaskID::Nil(); + } + return TaskID::FromBinary(message_->main_thread_parent_task_id()); +} + size_t TaskSpecification::ParentCounter() const { return message_->parent_counter(); } ray::FunctionDescriptor TaskSpecification::FunctionDescriptor() const { diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index 778678ced951..d304e9472a26 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -222,6 +222,8 @@ class TaskSpecification : public MessageWrapper { TaskID ParentTaskId() const; + TaskID MainThreadParentTaskId() const; + size_t ParentCounter() const; ray::FunctionDescriptor FunctionDescriptor() const; diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index 652ddd307a28..428f87fa799a 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -130,6 +130,7 @@ class TaskSpecBuilder { const std::unordered_map &required_placement_resources, const std::string &debugger_breakpoint, int64_t depth, + const TaskID &main_thread_parent_task_id, const std::shared_ptr runtime_env_info = nullptr, const std::string &concurrency_group_name = "") { message_->set_type(TaskType::NORMAL_TASK); @@ -142,6 +143,7 @@ class TaskSpecBuilder { } message_->set_task_id(task_id.Binary()); message_->set_parent_task_id(parent_task_id.Binary()); + message_->set_main_thread_parent_task_id(main_thread_parent_task_id.Binary()); message_->set_parent_counter(parent_counter); message_->set_caller_id(caller_id.Binary()); message_->mutable_caller_address()->CopyFrom(caller_address); @@ -182,12 +184,14 @@ class TaskSpecBuilder { const JobID &job_id, const TaskID &parent_task_id, const TaskID &caller_id, - const rpc::Address &caller_address) { + const rpc::Address &caller_address, + const TaskID &main_thread_parent_task_id) { message_->set_type(TaskType::DRIVER_TASK); message_->set_language(language); message_->set_job_id(job_id.Binary()); message_->set_task_id(task_id.Binary()); message_->set_parent_task_id(parent_task_id.Binary()); + message_->set_main_thread_parent_task_id(main_thread_parent_task_id.Binary()); message_->set_parent_counter(0); message_->set_caller_id(caller_id.Binary()); message_->mutable_caller_address()->CopyFrom(caller_address); diff --git a/src/ray/core_worker/context.cc b/src/ray/core_worker/context.cc index 89a79cd5ba21..4395823f0475 100644 --- a/src/ray/core_worker/context.cc +++ b/src/ray/core_worker/context.cc @@ -160,6 +160,8 @@ WorkerContext::WorkerContext(WorkerType worker_type, RAY_CHECK(!current_job_id_.IsNil()); GetThreadContext().SetCurrentTaskId(TaskID::ForDriverTask(job_id), /*attempt_number=*/0); + // Driver runs in the main thread. + main_thread_current_task_id_ = TaskID::ForDriverTask(job_id); } } @@ -266,6 +268,9 @@ void WorkerContext::SetTaskDepth(int64_t depth) { task_depth_ = depth; } void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) { GetThreadContext().SetCurrentTask(task_spec); absl::WriterMutexLock lock(&mutex_); + if (CurrentThreadIsMain()) { + main_thread_current_task_id_ = task_spec.TaskId(); + } SetTaskDepth(task_spec.GetDepth()); RAY_CHECK(current_job_id_ == task_spec.JobId()); if (task_spec.IsNormalTask()) { @@ -314,6 +319,11 @@ bool WorkerContext::CurrentThreadIsMain() const { return boost::this_thread::get_id() == main_thread_id_; } +const TaskID WorkerContext::GetMainThreadCurrentTaskID() const { + absl::ReaderMutexLock lock(&mutex_); + return main_thread_current_task_id_; +} + bool WorkerContext::ShouldReleaseResourcesOnBlockingCalls() const { // Check if we need to release resources when we block: // - Driver doesn't acquire resources and thus doesn't need to release. diff --git a/src/ray/core_worker/context.h b/src/ray/core_worker/context.h index 09719f57654d..6642fc2a1f07 100644 --- a/src/ray/core_worker/context.h +++ b/src/ray/core_worker/context.h @@ -40,6 +40,8 @@ class WorkerContext { const TaskID &GetCurrentTaskID() const; + const TaskID GetMainThreadCurrentTaskID() const; + const PlacementGroupID &GetCurrentPlacementGroupId() const LOCKS_EXCLUDED(mutex_); bool ShouldCaptureChildTasksInPlacementGroup() const LOCKS_EXCLUDED(mutex_); @@ -130,6 +132,9 @@ class WorkerContext { std::shared_ptr runtime_env_info_ GUARDED_BY(mutex_); /// The id of the (main) thread that constructed this worker context. const boost::thread::id main_thread_id_; + /// The currently executing main thread's task id. Used merely for observability + /// purposes to track task hierarchy. + TaskID main_thread_current_task_id_; // To protect access to mutable members; mutable absl::Mutex mutex_; diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index c3ea072b52db..e2ce9d779424 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -363,7 +363,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ // Driver has no parent task /* parent_task_id */ TaskID::Nil(), GetCallerId(), - rpc_address_); + rpc_address_, + TaskID::Nil()); // Drivers are never re-executed. SetCurrentTaskId(task_id, /*attempt_number=*/0, "driver"); @@ -1748,6 +1749,7 @@ void CoreWorker::BuildCommonTaskSpec( const std::string &debugger_breakpoint, int64_t depth, const std::string &serialized_runtime_env_info, + const TaskID &main_thread_current_task_id, const std::string &concurrency_group_name, bool include_job_config) { // Build common task spec. @@ -1780,6 +1782,7 @@ void CoreWorker::BuildCommonTaskSpec( required_placement_resources, debugger_breakpoint, depth, + main_thread_current_task_id, override_runtime_env_info, concurrency_group_name); // Set task arguments. @@ -1830,6 +1833,7 @@ std::vector CoreWorker::SubmitTask( debugger_breakpoint, depth, task_options.serialized_runtime_env_info, + worker_context_.GetMainThreadCurrentTaskID(), /*concurrency_group_name*/ "", /*include_job_config*/ true); builder.SetNormalTaskSpec(max_retries, @@ -1913,6 +1917,7 @@ Status CoreWorker::CreateActor(const RayFunction &function, "" /* debugger_breakpoint */, depth, actor_creation_options.serialized_runtime_env_info, + worker_context_.GetMainThreadCurrentTaskID(), /*concurrency_group_name*/ "", /*include_job_config*/ true); @@ -2137,6 +2142,7 @@ std::optional> CoreWorker::SubmitActorTask( "", /* debugger_breakpoint */ depth, /*depth*/ "{}", /* serialized_runtime_env_info */ + worker_context_.GetMainThreadCurrentTaskID(), task_options.concurrency_group_name, /*include_job_config*/ false); // NOTE: placement_group_capture_child_tasks and runtime_env will diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index d16f82003a86..8d926a7eead7 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1145,6 +1145,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { const std::string &debugger_breakpoint, int64_t depth, const std::string &serialized_runtime_env_info, + const TaskID &main_thread_current_task_id, const std::string &concurrency_group_name = "", bool include_job_config = false); void SetCurrentTaskId(const TaskID &task_id, diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 3cf71e383d19..c9b593010ceb 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -861,7 +861,7 @@ rpc::TaskInfoEntry TaskManager::MakeTaskInfoEntry( task_info.set_job_id(task_spec.JobId().Binary()); task_info.set_task_id(task_spec.TaskId().Binary()); - task_info.set_parent_task_id(task_spec.ParentTaskId().Binary()); + task_info.set_parent_task_id(task_spec.MainThreadParentTaskId().Binary()); const auto &resources_map = task_spec.GetRequiredResources().GetResourceMap(); task_info.mutable_required_resources()->insert(resources_map.begin(), resources_map.end()); diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 302c7e877dc8..2f1281c57745 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -565,7 +565,8 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { resources, resources, "", - 0); + 0, + RandomTaskId()); // Set task arguments. for (const auto &arg : args) { builder.AddArg(*arg); @@ -631,6 +632,7 @@ TEST_F(ZeroNodeTest, TestWorkerContext) { auto job_id = NextJobId(); WorkerContext context(WorkerType::WORKER, WorkerID::FromRandom(), job_id); + // This fails locally somehow. Guess we reuse the thread in testing. ASSERT_TRUE(context.GetCurrentTaskID().IsNil()); ASSERT_EQ(context.GetNextTaskIndex(), 1); ASSERT_EQ(context.GetNextTaskIndex(), 2); @@ -658,6 +660,14 @@ TEST_F(ZeroNodeTest, TestWorkerContext) { context.SetCurrentTask(task_spec); ASSERT_EQ(context.GetCurrentTaskID(), task_spec.TaskId()); + auto main_thread_task_id = task_spec.TaskId(); + auto thread_func2 = [&context, &main_thread_task_id]() { + // Verify the main thread task id matches. + ASSERT_EQ(context.GetMainThreadCurrentTaskID(), main_thread_task_id); + }; + std::thread async_thread2(thread_func2); + async_thread2.join(); + // Verify that put index doesn't confict with the return object range. ASSERT_EQ(context.GetNextPutIndex(), num_returns + 1); ASSERT_EQ(context.GetNextPutIndex(), num_returns + 2); diff --git a/src/ray/core_worker/test/dependency_resolver_test.cc b/src/ray/core_worker/test/dependency_resolver_test.cc index d83318a1e82b..241cf0e5e31d 100644 --- a/src/ray/core_worker/test/dependency_resolver_test.cc +++ b/src/ray/core_worker/test/dependency_resolver_test.cc @@ -47,7 +47,8 @@ TaskSpecification BuildTaskSpec(const std::unordered_map &r resources, resources, serialized_runtime_env, - depth); + depth, + TaskID::Nil()); return builder.Build(); } TaskSpecification BuildEmptyTaskSpec() { diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index bc90f82c1824..2abc88cf87ac 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -52,7 +52,8 @@ TaskSpecification BuildTaskSpec(const std::unordered_map &r resources, resources, serialized_runtime_env, - depth); + depth, + TaskID::Nil()); return builder.Build(); } // Calls BuildTaskSpec with empty resources map and empty function descriptor diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index ea5b4e02b8c5..4708eab96ace 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -61,7 +61,8 @@ struct Mocker { required_resources, required_placement_resources, "", - 0); + 0, + TaskID::Nil()); rpc::SchedulingStrategy scheduling_strategy; scheduling_strategy.mutable_default_scheduling_strategy(); builder.SetActorCreationTaskSpec(actor_id, diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index e471dc4adaff..bc5cad219965 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -401,6 +401,10 @@ message TaskSpec { repeated bytes dynamic_return_ids = 31; // Job config for the task. Only set for normal task or actor creation task. optional JobConfig job_config = 32; + // TODO(rickyx): Remove this once we figure out a way to handle task ids + // across multiple threads properly. + // The task id of the CoreWorker's main thread from which the task is submitted. + bytes main_thread_parent_task_id = 33; } message TaskInfoEntry { diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index 846fc282c08a..602cd70df5f2 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -168,6 +168,7 @@ RayTask CreateTask( {}, "", 0, + TaskID::Nil(), runtime_env_info); if (!args.empty()) { From 5aada4d79e7c24fa5837380f69b091a02b300b84 Mon Sep 17 00:00:00 2001 From: rickyyx Date: Wed, 1 Feb 2023 06:24:36 +0000 Subject: [PATCH 2/5] changed Signed-off-by: rickyyx --- python/ray/tests/test_task_events.py | 49 +++----------------- src/ray/common/task/task_spec.cc | 6 +-- src/ray/common/task/task_spec.h | 2 +- src/ray/common/task/task_util.h | 8 ++-- src/ray/core_worker/context.cc | 13 +++--- src/ray/core_worker/context.h | 9 ++-- src/ray/core_worker/core_worker.cc | 6 +-- src/ray/core_worker/task_manager.cc | 2 +- src/ray/core_worker/test/core_worker_test.cc | 2 +- src/ray/protobuf/common.proto | 4 +- 10 files changed, 34 insertions(+), 67 deletions(-) diff --git a/python/ray/tests/test_task_events.py b/python/ray/tests/test_task_events.py index 20f0c6f36182..78b0897c1ae0 100644 --- a/python/ray/tests/test_task_events.py +++ b/python/ray/tests/test_task_events.py @@ -163,49 +163,10 @@ def verify(): wait_for_condition(verify) -def test_parent_task_id_actor(shutdown_only): - ray.init(_system_config=_SYSTEM_CONFIG) - - def run_task_in_thread(): - def thd_task(): - @ray.remote - def thd_task(): - pass - - ray.get(thd_task.remote()) - - thd = threading.Thread(target=thd_task) - thd.start() - thd.join() - - @ray.remote - class Actor: - def main_task(self): - run_task_in_thread() - - a = Actor.remote() - ray.get(a.main_task.remote()) - - def verify(): - tasks = list_tasks() - expect_parent_task_id = None - actual_parent_task_id = None - for task in tasks: - if "main_task" in task["name"]: - expect_parent_task_id = task["task_id"] - elif "thd_task" in task["name"]: - actual_parent_task_id = task["parent_task_id"] - print(tasks) - assert actual_parent_task_id is not None - assert expect_parent_task_id == actual_parent_task_id - - return True - - wait_for_condition(verify) - - -@pytest.mark.parametrize("actor_concurrency", [3, 10]) -def test_parent_task_id_multi_thread_actors(shutdown_only, actor_concurrency): +@pytest.mark.parametrize("actor_concurrency", [1, 3, 10]) +def test_parent_task_id_actors(shutdown_only, actor_concurrency): + # Test tasks runs in user started thread from actors have a parent_task_id + # as the actor's creation task. ray.init(_system_config=_SYSTEM_CONFIG) def run_task_in_thread(name, i): @@ -261,6 +222,8 @@ def verify(actor_method_name, actor_class_name): def test_parent_task_id_tune_e2e(shutdown_only): + # Test a tune e2e workload should not have any task with parent_task_id that's + # not found. ray.init(_system_config=_SYSTEM_CONFIG) job_id = ray.get_runtime_context().get_job_id() script = """ diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index dcdaef5f9855..71000748cb44 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -160,11 +160,11 @@ TaskID TaskSpecification::ParentTaskId() const { return TaskID::FromBinary(message_->parent_task_id()); } -TaskID TaskSpecification::MainThreadParentTaskId() const { - if (message_->main_thread_parent_task_id().empty() /* e.g., empty proto default */) { +TaskID TaskSpecification::SubmitterTaskId() const { + if (message_->submitter_task_id().empty() /* e.g., empty proto default */) { return TaskID::Nil(); } - return TaskID::FromBinary(message_->main_thread_parent_task_id()); + return TaskID::FromBinary(message_->submitter_task_id()); } size_t TaskSpecification::ParentCounter() const { return message_->parent_counter(); } diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index d304e9472a26..e1aea6fc4459 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -222,7 +222,7 @@ class TaskSpecification : public MessageWrapper { TaskID ParentTaskId() const; - TaskID MainThreadParentTaskId() const; + TaskID SubmitterTaskId() const; size_t ParentCounter() const; diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index 428f87fa799a..c260745b7161 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -130,7 +130,7 @@ class TaskSpecBuilder { const std::unordered_map &required_placement_resources, const std::string &debugger_breakpoint, int64_t depth, - const TaskID &main_thread_parent_task_id, + const TaskID &submitter_task_id, const std::shared_ptr runtime_env_info = nullptr, const std::string &concurrency_group_name = "") { message_->set_type(TaskType::NORMAL_TASK); @@ -143,7 +143,7 @@ class TaskSpecBuilder { } message_->set_task_id(task_id.Binary()); message_->set_parent_task_id(parent_task_id.Binary()); - message_->set_main_thread_parent_task_id(main_thread_parent_task_id.Binary()); + message_->set_submitter_task_id(submitter_task_id.Binary()); message_->set_parent_counter(parent_counter); message_->set_caller_id(caller_id.Binary()); message_->mutable_caller_address()->CopyFrom(caller_address); @@ -185,13 +185,13 @@ class TaskSpecBuilder { const TaskID &parent_task_id, const TaskID &caller_id, const rpc::Address &caller_address, - const TaskID &main_thread_parent_task_id) { + const TaskID &submitter_task_id) { message_->set_type(TaskType::DRIVER_TASK); message_->set_language(language); message_->set_job_id(job_id.Binary()); message_->set_task_id(task_id.Binary()); message_->set_parent_task_id(parent_task_id.Binary()); - message_->set_main_thread_parent_task_id(main_thread_parent_task_id.Binary()); + message_->set_submitter_task_id(submitter_task_id.Binary()); message_->set_parent_counter(0); message_->set_caller_id(caller_id.Binary()); message_->mutable_caller_address()->CopyFrom(caller_address); diff --git a/src/ray/core_worker/context.cc b/src/ray/core_worker/context.cc index 4395823f0475..9b0afbef3c54 100644 --- a/src/ray/core_worker/context.cc +++ b/src/ray/core_worker/context.cc @@ -161,7 +161,7 @@ WorkerContext::WorkerContext(WorkerType worker_type, GetThreadContext().SetCurrentTaskId(TaskID::ForDriverTask(job_id), /*attempt_number=*/0); // Driver runs in the main thread. - main_thread_current_task_id_ = TaskID::ForDriverTask(job_id); + main_thread_or_actor_creation_task_id = TaskID::ForDriverTask(job_id); } } @@ -268,17 +268,18 @@ void WorkerContext::SetTaskDepth(int64_t depth) { task_depth_ = depth; } void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) { GetThreadContext().SetCurrentTask(task_spec); absl::WriterMutexLock lock(&mutex_); - if (CurrentThreadIsMain()) { - main_thread_current_task_id_ = task_spec.TaskId(); - } SetTaskDepth(task_spec.GetDepth()); RAY_CHECK(current_job_id_ == task_spec.JobId()); if (task_spec.IsNormalTask()) { current_task_is_direct_call_ = true; + if (CurrentThreadIsMain()) { + main_thread_or_actor_creation_task_id = task_spec.TaskId(); + } } else if (task_spec.IsActorCreationTask()) { if (!current_actor_id_.IsNil()) { RAY_CHECK(current_actor_id_ == task_spec.ActorCreationId()); } + main_thread_or_actor_creation_task_id = task_spec.TaskId(); current_actor_id_ = task_spec.ActorCreationId(); current_actor_is_direct_call_ = true; current_actor_max_concurrency_ = task_spec.MaxActorConcurrency(); @@ -319,9 +320,9 @@ bool WorkerContext::CurrentThreadIsMain() const { return boost::this_thread::get_id() == main_thread_id_; } -const TaskID WorkerContext::GetMainThreadCurrentTaskID() const { +const TaskID WorkerContext::GetMainThreadOrActorCreationTaskID() const { absl::ReaderMutexLock lock(&mutex_); - return main_thread_current_task_id_; + return main_thread_or_actor_creation_task_id; } bool WorkerContext::ShouldReleaseResourcesOnBlockingCalls() const { diff --git a/src/ray/core_worker/context.h b/src/ray/core_worker/context.h index 6642fc2a1f07..82c53398cdc4 100644 --- a/src/ray/core_worker/context.h +++ b/src/ray/core_worker/context.h @@ -40,7 +40,7 @@ class WorkerContext { const TaskID &GetCurrentTaskID() const; - const TaskID GetMainThreadCurrentTaskID() const; + const TaskID GetMainThreadOrActorCreationTaskID() const; const PlacementGroupID &GetCurrentPlacementGroupId() const LOCKS_EXCLUDED(mutex_); @@ -132,9 +132,10 @@ class WorkerContext { std::shared_ptr runtime_env_info_ GUARDED_BY(mutex_); /// The id of the (main) thread that constructed this worker context. const boost::thread::id main_thread_id_; - /// The currently executing main thread's task id. Used merely for observability - /// purposes to track task hierarchy. - TaskID main_thread_current_task_id_; + /// The currently executing main thread's task id. It's the actor creation task id + /// for actor, or the main thread's task id for normal tasks. + /// Used merely for observability purposes to track task hierarchy. + TaskID main_thread_or_actor_creation_task_id; // To protect access to mutable members; mutable absl::Mutex mutex_; diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index e2ce9d779424..a8bc2ffb055e 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1833,7 +1833,7 @@ std::vector CoreWorker::SubmitTask( debugger_breakpoint, depth, task_options.serialized_runtime_env_info, - worker_context_.GetMainThreadCurrentTaskID(), + worker_context_.GetMainThreadOrActorCreationTaskID(), /*concurrency_group_name*/ "", /*include_job_config*/ true); builder.SetNormalTaskSpec(max_retries, @@ -1917,7 +1917,7 @@ Status CoreWorker::CreateActor(const RayFunction &function, "" /* debugger_breakpoint */, depth, actor_creation_options.serialized_runtime_env_info, - worker_context_.GetMainThreadCurrentTaskID(), + worker_context_.GetMainThreadOrActorCreationTaskID(), /*concurrency_group_name*/ "", /*include_job_config*/ true); @@ -2142,7 +2142,7 @@ std::optional> CoreWorker::SubmitActorTask( "", /* debugger_breakpoint */ depth, /*depth*/ "{}", /* serialized_runtime_env_info */ - worker_context_.GetMainThreadCurrentTaskID(), + worker_context_.GetMainThreadOrActorCreationTaskID(), task_options.concurrency_group_name, /*include_job_config*/ false); // NOTE: placement_group_capture_child_tasks and runtime_env will diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index c9b593010ceb..ec22f85be0c3 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -861,7 +861,7 @@ rpc::TaskInfoEntry TaskManager::MakeTaskInfoEntry( task_info.set_job_id(task_spec.JobId().Binary()); task_info.set_task_id(task_spec.TaskId().Binary()); - task_info.set_parent_task_id(task_spec.MainThreadParentTaskId().Binary()); + task_info.set_parent_task_id(task_spec.SubmitterTaskId().Binary()); const auto &resources_map = task_spec.GetRequiredResources().GetResourceMap(); task_info.mutable_required_resources()->insert(resources_map.begin(), resources_map.end()); diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 2f1281c57745..cba46a14d733 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -663,7 +663,7 @@ TEST_F(ZeroNodeTest, TestWorkerContext) { auto main_thread_task_id = task_spec.TaskId(); auto thread_func2 = [&context, &main_thread_task_id]() { // Verify the main thread task id matches. - ASSERT_EQ(context.GetMainThreadCurrentTaskID(), main_thread_task_id); + ASSERT_EQ(context.GetMainThreadOrActorCreationTaskID(), main_thread_task_id); }; std::thread async_thread2(thread_func2); async_thread2.join(); diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index bc5cad219965..65fdc8909263 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -404,7 +404,9 @@ message TaskSpec { // TODO(rickyx): Remove this once we figure out a way to handle task ids // across multiple threads properly. // The task id of the CoreWorker's main thread from which the task is submitted. - bytes main_thread_parent_task_id = 33; + // This will be the main thread's task id for normal task, or the actor creation + // task's task id for actors. + bytes submitter_task_id = 33; } message TaskInfoEntry { From 83123871176a4595f83476e9e188a6ba2ff91e2a Mon Sep 17 00:00:00 2001 From: rickyyx Date: Wed, 1 Feb 2023 07:28:06 +0000 Subject: [PATCH 3/5] address comments Signed-off-by: rickyyx --- python/ray/experimental/state/common.py | 3 ++- src/ray/core_worker/context.cc | 8 ++++---- src/ray/core_worker/context.h | 2 +- src/ray/core_worker/task_manager.cc | 5 +++++ 4 files changed, 12 insertions(+), 6 deletions(-) diff --git a/python/ray/experimental/state/common.py b/python/ray/experimental/state/common.py index 05d00da4cdc2..f09116a6a1e8 100644 --- a/python/ray/experimental/state/common.py +++ b/python/ray/experimental/state/common.py @@ -544,7 +544,8 @@ class TaskState(StateSchema): required_resources: dict = state_column(detail=True, filterable=False) #: The runtime environment information for the task. runtime_env_info: str = state_column(detail=True, filterable=False) - #: The parent task id. + #: The parent task id. If the parent is a normal task, it will be the task's id. + #: If the parent runs in an actor, it will be the actor's creation task id. parent_task_id: str = state_column(filterable=True) #: The placement group id that's associated with this task. placement_group_id: str = state_column(detail=True, filterable=True) diff --git a/src/ray/core_worker/context.cc b/src/ray/core_worker/context.cc index 9b0afbef3c54..264529b597d7 100644 --- a/src/ray/core_worker/context.cc +++ b/src/ray/core_worker/context.cc @@ -161,7 +161,7 @@ WorkerContext::WorkerContext(WorkerType worker_type, GetThreadContext().SetCurrentTaskId(TaskID::ForDriverTask(job_id), /*attempt_number=*/0); // Driver runs in the main thread. - main_thread_or_actor_creation_task_id = TaskID::ForDriverTask(job_id); + main_thread_or_actor_creation_task_id_ = TaskID::ForDriverTask(job_id); } } @@ -273,13 +273,13 @@ void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) { if (task_spec.IsNormalTask()) { current_task_is_direct_call_ = true; if (CurrentThreadIsMain()) { - main_thread_or_actor_creation_task_id = task_spec.TaskId(); + main_thread_or_actor_creation_task_id_ = task_spec.TaskId(); } } else if (task_spec.IsActorCreationTask()) { if (!current_actor_id_.IsNil()) { RAY_CHECK(current_actor_id_ == task_spec.ActorCreationId()); } - main_thread_or_actor_creation_task_id = task_spec.TaskId(); + main_thread_or_actor_creation_task_id_ = task_spec.TaskId(); current_actor_id_ = task_spec.ActorCreationId(); current_actor_is_direct_call_ = true; current_actor_max_concurrency_ = task_spec.MaxActorConcurrency(); @@ -322,7 +322,7 @@ bool WorkerContext::CurrentThreadIsMain() const { const TaskID WorkerContext::GetMainThreadOrActorCreationTaskID() const { absl::ReaderMutexLock lock(&mutex_); - return main_thread_or_actor_creation_task_id; + return main_thread_or_actor_creation_task_id_; } bool WorkerContext::ShouldReleaseResourcesOnBlockingCalls() const { diff --git a/src/ray/core_worker/context.h b/src/ray/core_worker/context.h index 82c53398cdc4..9831691ac12f 100644 --- a/src/ray/core_worker/context.h +++ b/src/ray/core_worker/context.h @@ -135,7 +135,7 @@ class WorkerContext { /// The currently executing main thread's task id. It's the actor creation task id /// for actor, or the main thread's task id for normal tasks. /// Used merely for observability purposes to track task hierarchy. - TaskID main_thread_or_actor_creation_task_id; + TaskID main_thread_or_actor_creation_task_id_ GUARDED_BY(mutex_); // To protect access to mutable members; mutable absl::Mutex mutex_; diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index ec22f85be0c3..bfa957a79c51 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -861,6 +861,11 @@ rpc::TaskInfoEntry TaskManager::MakeTaskInfoEntry( task_info.set_job_id(task_spec.JobId().Binary()); task_info.set_task_id(task_spec.TaskId().Binary()); + // NOTE: we set the parent task id of a task to be submitter's task id, where + // the submitter depends on the owner coreworker's: + // - if the owner coreworker runs a normal task, the submitter's task id is the task id. + // - if the owner coreworker runs an actor, the submitter's task id will be the actor's + // creation task id. task_info.set_parent_task_id(task_spec.SubmitterTaskId().Binary()); const auto &resources_map = task_spec.GetRequiredResources().GetResourceMap(); task_info.mutable_required_resources()->insert(resources_map.begin(), From 5103131a15d39bd86584336622686cd8e367c4f0 Mon Sep 17 00:00:00 2001 From: rickyyx Date: Wed, 1 Feb 2023 07:34:50 +0000 Subject: [PATCH 4/5] lock lcok Signed-off-by: rickyyx --- src/ray/core_worker/context.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/ray/core_worker/context.cc b/src/ray/core_worker/context.cc index 264529b597d7..b4c2bbd47d1d 100644 --- a/src/ray/core_worker/context.cc +++ b/src/ray/core_worker/context.cc @@ -161,7 +161,10 @@ WorkerContext::WorkerContext(WorkerType worker_type, GetThreadContext().SetCurrentTaskId(TaskID::ForDriverTask(job_id), /*attempt_number=*/0); // Driver runs in the main thread. - main_thread_or_actor_creation_task_id_ = TaskID::ForDriverTask(job_id); + { + absl::WriterMutexLock lock(&mutex_); + main_thread_or_actor_creation_task_id_ = TaskID::ForDriverTask(job_id); + } } } From ff987283af3d26270668c13d2af9965adea8cd88 Mon Sep 17 00:00:00 2001 From: rickyyx Date: Wed, 1 Feb 2023 08:34:51 +0000 Subject: [PATCH 5/5] use main task id for non-concurrent actors Signed-off-by: rickyyx --- python/ray/experimental/state/common.py | 3 +- python/ray/tests/test_task_events.py | 45 +++++++++++++++++++++++-- src/ray/core_worker/context.cc | 7 ++-- src/ray/core_worker/context.h | 2 +- src/ray/protobuf/common.proto | 4 +-- 5 files changed, 51 insertions(+), 10 deletions(-) diff --git a/python/ray/experimental/state/common.py b/python/ray/experimental/state/common.py index f09116a6a1e8..84b234192981 100644 --- a/python/ray/experimental/state/common.py +++ b/python/ray/experimental/state/common.py @@ -545,7 +545,8 @@ class TaskState(StateSchema): #: The runtime environment information for the task. runtime_env_info: str = state_column(detail=True, filterable=False) #: The parent task id. If the parent is a normal task, it will be the task's id. - #: If the parent runs in an actor, it will be the actor's creation task id. + #: If the parent runs in a concurrent actor (async actor or threaded actor), + #: it will be the actor's creation task id. parent_task_id: str = state_column(filterable=True) #: The placement group id that's associated with this task. placement_group_id: str = state_column(detail=True, filterable=True) diff --git a/python/ray/tests/test_task_events.py b/python/ray/tests/test_task_events.py index 78b0897c1ae0..48790844faaf 100644 --- a/python/ray/tests/test_task_events.py +++ b/python/ray/tests/test_task_events.py @@ -163,8 +163,49 @@ def verify(): wait_for_condition(verify) -@pytest.mark.parametrize("actor_concurrency", [1, 3, 10]) -def test_parent_task_id_actors(shutdown_only, actor_concurrency): +def test_parent_task_id_non_concurrent_actor(shutdown_only): + ray.init(_system_config=_SYSTEM_CONFIG) + + def run_task_in_thread(): + def thd_task(): + @ray.remote + def thd_task(): + pass + + ray.get(thd_task.remote()) + + thd = threading.Thread(target=thd_task) + thd.start() + thd.join() + + @ray.remote + class Actor: + def main_task(self): + run_task_in_thread() + + a = Actor.remote() + ray.get(a.main_task.remote()) + + def verify(): + tasks = list_tasks() + expect_parent_task_id = None + actual_parent_task_id = None + for task in tasks: + if "main_task" in task["name"]: + expect_parent_task_id = task["task_id"] + elif "thd_task" in task["name"]: + actual_parent_task_id = task["parent_task_id"] + print(tasks) + assert actual_parent_task_id is not None + assert expect_parent_task_id == actual_parent_task_id + + return True + + wait_for_condition(verify) + + +@pytest.mark.parametrize("actor_concurrency", [3, 10]) +def test_parent_task_id_concurrent_actor(shutdown_only, actor_concurrency): # Test tasks runs in user started thread from actors have a parent_task_id # as the actor's creation task. ray.init(_system_config=_SYSTEM_CONFIG) diff --git a/src/ray/core_worker/context.cc b/src/ray/core_worker/context.cc index b4c2bbd47d1d..125f42d17e39 100644 --- a/src/ray/core_worker/context.cc +++ b/src/ray/core_worker/context.cc @@ -272,17 +272,16 @@ void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) { GetThreadContext().SetCurrentTask(task_spec); absl::WriterMutexLock lock(&mutex_); SetTaskDepth(task_spec.GetDepth()); + if (CurrentThreadIsMain()) { + main_thread_or_actor_creation_task_id_ = task_spec.TaskId(); + } RAY_CHECK(current_job_id_ == task_spec.JobId()); if (task_spec.IsNormalTask()) { current_task_is_direct_call_ = true; - if (CurrentThreadIsMain()) { - main_thread_or_actor_creation_task_id_ = task_spec.TaskId(); - } } else if (task_spec.IsActorCreationTask()) { if (!current_actor_id_.IsNil()) { RAY_CHECK(current_actor_id_ == task_spec.ActorCreationId()); } - main_thread_or_actor_creation_task_id_ = task_spec.TaskId(); current_actor_id_ = task_spec.ActorCreationId(); current_actor_is_direct_call_ = true; current_actor_max_concurrency_ = task_spec.MaxActorConcurrency(); diff --git a/src/ray/core_worker/context.h b/src/ray/core_worker/context.h index 9831691ac12f..692063900526 100644 --- a/src/ray/core_worker/context.h +++ b/src/ray/core_worker/context.h @@ -133,7 +133,7 @@ class WorkerContext { /// The id of the (main) thread that constructed this worker context. const boost::thread::id main_thread_id_; /// The currently executing main thread's task id. It's the actor creation task id - /// for actor, or the main thread's task id for normal tasks. + /// for concurrent actor, or the main thread's task id for other cases. /// Used merely for observability purposes to track task hierarchy. TaskID main_thread_or_actor_creation_task_id_ GUARDED_BY(mutex_); // To protect access to mutable members; diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 65fdc8909263..1db13e682db1 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -404,8 +404,8 @@ message TaskSpec { // TODO(rickyx): Remove this once we figure out a way to handle task ids // across multiple threads properly. // The task id of the CoreWorker's main thread from which the task is submitted. - // This will be the main thread's task id for normal task, or the actor creation - // task's task id for actors. + // This will be the actor creation task's task id for concurrent actors. Or + // the main thread's task id for other cases. bytes submitter_task_id = 33; }