Skip to content

Commit

Permalink
[core] Fix test log race on async actor task (#41563) (#41681)
Browse files Browse the repository at this point in the history
This closes #40959 a flaky test, and it fixes a real bug in the async actor log recording.

---------

Signed-off-by: rickyyx <rickyx@anyscale.com>
  • Loading branch information
rickyyx authored Dec 7, 2023
1 parent f519b60 commit 267ae9f
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 29 deletions.
13 changes: 9 additions & 4 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import ray.remote_function
from ray import ActorID, JobID, Language, ObjectRef
from ray._raylet import raise_sys_exit_with_custom_error_message
from ray._raylet import ObjectRefGenerator
from ray._raylet import ObjectRefGenerator, TaskID
from ray.runtime_env.runtime_env import _merge_runtime_env
from ray._private import ray_option_utils
from ray._private.client_mode_hook import client_mode_hook
Expand Down Expand Up @@ -594,7 +594,7 @@ def set_out_file(self, out_file=Optional[IO[AnyStr]]) -> None:
"""Set the worker's out file where stdout is redirected to"""
self._out_file = out_file

def record_task_log_start(self):
def record_task_log_start(self, task_id: TaskID, attempt_number: int):
"""Record the task log info when task starts executing for
non concurrent actor tasks."""
if not self._enable_record_actor_task_log and not self.actor_id.is_nil():
Expand All @@ -608,13 +608,15 @@ def record_task_log_start(self):
return

self.core_worker.record_task_log_start(
task_id,
attempt_number,
self.get_out_file_path(),
self.get_err_file_path(),
self.get_current_out_offset(),
self.get_current_err_offset(),
)

def record_task_log_end(self):
def record_task_log_end(self, task_id: TaskID, attempt_number: int):
"""Record the task log info when task finishes executing for
non concurrent actor tasks."""
if not self._enable_record_actor_task_log and not self.actor_id.is_nil():
Expand All @@ -628,7 +630,10 @@ def record_task_log_end(self):
return

self.core_worker.record_task_log_end(
self.get_current_out_offset(), self.get_current_err_offset()
task_id,
attempt_number,
self.get_current_out_offset(),
self.get_current_err_offset(),
)

def get_err_file_path(self) -> str:
Expand Down
37 changes: 26 additions & 11 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1783,8 +1783,7 @@ cdef void execute_task(
actor_title = f"{class_name}({args!r}, {kwargs!r})"
core_worker.set_actor_title(actor_title.encode("utf-8"))

# Record the log file offsets if applicable.
worker.record_task_log_start()
worker.record_task_log_start(task_id, attempt_number)

# Execute the task.
with core_worker.profile_event(b"task:execute"):
Expand Down Expand Up @@ -1890,7 +1889,7 @@ cdef void execute_task(
raise e
finally:
# Record the end of the task log.
worker.record_task_log_end()
worker.record_task_log_end(task_id, attempt_number)

if (returns[0].size() == 1
and not inspect.isgenerator(outputs)
Expand Down Expand Up @@ -4700,15 +4699,31 @@ cdef class CoreWorker:

return (num_tasks_submitted, num_leases_requested)

def record_task_log_start(self, stdout_path, stderr_path,
int64_t out_start_offset, int64_t err_start_offset):
CCoreWorkerProcess.GetCoreWorker() \
.RecordTaskLogStart(stdout_path, stderr_path,
out_start_offset, err_start_offset)
def record_task_log_start(
self, task_id: TaskID, int attempt_number,
stdout_path, stderr_path,
int64_t out_start_offset, int64_t err_start_offset):
cdef:
CTaskID c_task_id = task_id.native()
c_string c_stdout_path = stdout_path.encode("utf-8")
c_string c_stderr_path = stderr_path.encode("utf-8")

def record_task_log_end(self, int64_t out_end_offset, int64_t err_end_offset):
CCoreWorkerProcess.GetCoreWorker() \
.RecordTaskLogEnd(out_end_offset, err_end_offset)
with nogil:
CCoreWorkerProcess.GetCoreWorker() \
.RecordTaskLogStart(c_task_id, attempt_number,
c_stdout_path, c_stderr_path,
out_start_offset, err_start_offset)

def record_task_log_end(
self, task_id: TaskID, int attempt_number,
int64_t out_end_offset, int64_t err_end_offset):
cdef:
CTaskID c_task_id = task_id.native()

with nogil:
CCoreWorkerProcess.GetCoreWorker() \
.RecordTaskLogEnd(c_task_id, attempt_number,
out_end_offset, err_end_offset)

cdef CObjectID allocate_dynamic_return_id_for_generator(
self,
Expand Down
9 changes: 7 additions & 2 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -296,13 +296,18 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
unordered_map[c_string, c_vector[int64_t]] GetActorCallStats() const

void RecordTaskLogStart(
const CTaskID &task_id,
int attempt_number,
const c_string& stdout_path,
const c_string& stderr_path,
int64_t stdout_start_offset,
int64_t stderr_start_offset) const

void RecordTaskLogEnd(int64_t stdout_end_offset,
int64_t stderr_end_offset) const
void RecordTaskLogEnd(
const CTaskID &task_id,
int attempt_number,
int64_t stdout_end_offset,
int64_t stderr_end_offset) const

void Exit(const CWorkerExitType exit_type,
const c_string &detail,
Expand Down
2 changes: 0 additions & 2 deletions python/ray/tests/test_state_api_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
format_web_url,
wait_for_condition,
wait_until_server_available,
skip_flaky_core_test_premerge,
)

from ray._private.ray_constants import (
Expand Down Expand Up @@ -1358,7 +1357,6 @@ def verify():
@pytest.mark.skipif(
sys.platform == "win32", reason="Windows has logging race from tasks."
)
@skip_flaky_core_test_premerge("https://github.com/ray-project/ray/issues/40959")
def test_log_task(shutdown_only):
from ray.runtime_env import RuntimeEnv

Expand Down
20 changes: 12 additions & 8 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4300,7 +4300,9 @@ std::vector<ObjectID> CoreWorker::GetCurrentReturnIds(int num_returns,
return return_ids;
}

void CoreWorker::RecordTaskLogStart(const std::string &stdout_path,
void CoreWorker::RecordTaskLogStart(const TaskID &task_id,
int32_t attempt_number,
const std::string &stdout_path,
const std::string &stderr_path,
int64_t stdout_start_offset,
int64_t stderr_start_offset) const {
Expand All @@ -4317,14 +4319,16 @@ void CoreWorker::RecordTaskLogStart(const std::string &stdout_path,
RAY_CHECK(current_task)
<< "We should have set the current task spec while executing the task.";
task_manager_->RecordTaskStatusEvent(
current_task->AttemptNumber(),
*current_task,
task_id,
worker_context_.GetCurrentJobID(),
attempt_number,
rpc::TaskStatus::NIL,
/* include_task_info */ false,
worker::TaskStatusEvent::TaskStateUpdate(task_log_info));
}

void CoreWorker::RecordTaskLogEnd(int64_t stdout_end_offset,
void CoreWorker::RecordTaskLogEnd(const TaskID &task_id,
int32_t attempt_number,
int64_t stdout_end_offset,
int64_t stderr_end_offset) const {
if (options_.is_local_mode) {
return;
Expand All @@ -4337,10 +4341,10 @@ void CoreWorker::RecordTaskLogEnd(int64_t stdout_end_offset,
RAY_CHECK(current_task)
<< "We should have set the current task spec before executing the task.";
task_manager_->RecordTaskStatusEvent(
current_task->AttemptNumber(),
*current_task,
task_id,
worker_context_.GetCurrentJobID(),
attempt_number,
rpc::TaskStatus::NIL,
/* include_task_info */ false,
worker::TaskStatusEvent::TaskStateUpdate(task_log_info));
}

Expand Down
9 changes: 7 additions & 2 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -1288,7 +1288,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \param stderr_path Path to stderr log file.
/// \param stdout_start_offset Start offset of the stdout for this task.
/// \param stderr_start_offset Start offset of the stderr for this task.
void RecordTaskLogStart(const std::string &stdout_path,
void RecordTaskLogStart(const TaskID &task_id,
int32_t attempt_number,
const std::string &stdout_path,
const std::string &stderr_path,
int64_t stdout_start_offset,
int64_t stderr_start_offset) const;
Expand All @@ -1299,7 +1301,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
///
/// \param stdout_end_offset End offset of the stdout for this task.
/// \param stderr_end_offset End offset of the stderr for this task.
void RecordTaskLogEnd(int64_t stdout_end_offset, int64_t stderr_end_offset) const;
void RecordTaskLogEnd(const TaskID &task_id,
int32_t attempt_number,
int64_t stdout_end_offset,
int64_t stderr_end_offset) const;

/// (WORKER mode only) Gracefully exit the worker. `Graceful` means the worker will
/// exit when it drains all tasks and cleans all owned objects.
Expand Down
21 changes: 21 additions & 0 deletions src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1426,6 +1426,27 @@ void TaskManager::RecordMetrics() {
task_counter_.FlushOnChangeCallbacks();
}

void TaskManager::RecordTaskStatusEvent(
const TaskID &task_id,
const JobID &job_id,
int32_t attempt_number,
rpc::TaskStatus status,
worker::TaskStatusEvent::TaskStateUpdate &&state_update) {
if (!task_event_buffer_.Enabled()) {
return;
}
auto task_event = std::make_unique<worker::TaskStatusEvent>(
task_id,
job_id,
attempt_number,
status,
/* timestamp */ absl::GetCurrentTimeNanos(),
/* task_spec */ nullptr,
std::move(state_update));

task_event_buffer_.AddTaskEvent(std::move(task_event));
}

void TaskManager::RecordTaskStatusEvent(
int32_t attempt_number,
const TaskSpecification &spec,
Expand Down
13 changes: 13 additions & 0 deletions src/ray/core_worker/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,19 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa
absl::optional<const worker::TaskStatusEvent::TaskStateUpdate> state_update =
absl::nullopt);

/// Update task status change for the task attempt in TaskEventBuffer.
///
/// \param task_id ID of the task to query.
/// \param job_id ID of the job to query.
/// \param attempt_number Attempt number for the task attempt.
/// \param status the changed status.
/// \param state_update task state updates.
void RecordTaskStatusEvent(const TaskID &task_id,
const JobID &job_id,
int32_t attempt_number,
rpc::TaskStatus status,
worker::TaskStatusEvent::TaskStateUpdate &&state_update);

private:
struct TaskEntry {
TaskEntry(const TaskSpecification &spec_arg,
Expand Down

0 comments on commit 267ae9f

Please sign in to comment.