Skip to content

Commit

Permalink
[core] surface OOM error when actor is killed due to OOM (#32107)
Browse files Browse the repository at this point in the history
Right now we show Actor error if the actor is killed due to OOM. This PR changes it so it surfaces a OOM error

It does not support actor / actor task oom retry, as the goal of this PR is to improve observability by setting the death cause of the actor to OOM

Related issue number
#29736
Signed-off-by: Aviv Haber <aviv@anyscale.com>
Signed-off-by: Clarence Ng <clarence@anyscale.com>
  • Loading branch information
clarng authored Feb 1, 2023
1 parent 12d7d7d commit 174f157
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 21 deletions.
2 changes: 1 addition & 1 deletion doc/source/ray-core/doc_code/ray_oom_prevention.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def allocate(self, bytes_to_allocate: float) -> None:
error_thrown = False
try:
ray.get(first_actor_task)
except ray.exceptions.RayActorError as ex:
except ray.exceptions.OutOfMemoryError as ex:
error_thrown = True
print("first actor was killed by memory monitor")
assert error_thrown
Expand Down
40 changes: 33 additions & 7 deletions python/ray/tests/test_memory_pressure.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def has_metric_tagged_with_value(addr, tag, value) -> bool:
sys.platform != "linux" and sys.platform != "linux2",
reason="memory monitor only on linux currently",
)
def test_memory_pressure_kill_actor(ray_with_memory_monitor):
def test_non_restartable_actor_throws_oom_error(ray_with_memory_monitor):
addr = ray_with_memory_monitor
leaker = Leaker.options(max_restarts=0, max_task_retries=0).remote()

Expand All @@ -151,7 +151,7 @@ def test_memory_pressure_kill_actor(ray_with_memory_monitor):
bytes_to_alloc = get_additional_bytes_to_reach_memory_usage_pct(
memory_usage_threshold + 0.1
)
with pytest.raises(ray.exceptions.RayActorError) as _:
with pytest.raises(ray.exceptions.OutOfMemoryError) as _:
ray.get(leaker.allocate.remote(bytes_to_alloc, memory_monitor_refresh_ms * 3))

wait_for_condition(
Expand All @@ -168,7 +168,7 @@ def test_memory_pressure_kill_actor(ray_with_memory_monitor):
sys.platform != "linux" and sys.platform != "linux2",
reason="memory monitor only on linux currently",
)
def test_restartable_actor_killed_by_memory_monitor_with_actor_error(
def test_restartable_actor_throws_oom_error(
ray_with_memory_monitor,
):
addr = ray_with_memory_monitor
Expand All @@ -177,7 +177,33 @@ def test_restartable_actor_killed_by_memory_monitor_with_actor_error(
bytes_to_alloc = get_additional_bytes_to_reach_memory_usage_pct(
memory_usage_threshold + 0.1
)
with pytest.raises(ray.exceptions.RayActorError) as _:
with pytest.raises(ray.exceptions.OutOfMemoryError) as _:
ray.get(leaker.allocate.remote(bytes_to_alloc, memory_monitor_refresh_ms * 3))

wait_for_condition(
has_metric_tagged_with_value,
timeout=10,
retry_interval_ms=100,
addr=addr,
tag="MemoryManager.ActorEviction.Total",
value=2.0,
)


@pytest.mark.skipif(
sys.platform != "linux" and sys.platform != "linux2",
reason="memory monitor only on linux currently",
)
def test_restartable_actor_oom_retry_off_throws_oom_error(
ray_with_memory_monitor_no_oom_retry,
):
addr = ray_with_memory_monitor_no_oom_retry
leaker = Leaker.options(max_restarts=1, max_task_retries=1).remote()

bytes_to_alloc = get_additional_bytes_to_reach_memory_usage_pct(
memory_usage_threshold + 0.1
)
with pytest.raises(ray.exceptions.OutOfMemoryError) as _:
ray.get(leaker.allocate.remote(bytes_to_alloc, memory_monitor_refresh_ms * 3))

wait_for_condition(
Expand Down Expand Up @@ -274,7 +300,7 @@ async def test_actor_oom_logs_error(ray_with_memory_monitor):
actor_id = ray.get(oom_actor.get_actor_id.remote())

bytes_to_alloc = get_additional_bytes_to_reach_memory_usage_pct(1)
with pytest.raises(ray.exceptions.RayActorError) as _:
with pytest.raises(ray.exceptions.OutOfMemoryError) as _:
ray.get(
oom_actor.allocate.remote(bytes_to_alloc, memory_monitor_refresh_ms * 3)
)
Expand All @@ -293,10 +319,10 @@ async def test_actor_oom_logs_error(ray_with_memory_monitor):
for actor in result.actor_table_data:
if actor.actor_id.hex() == actor_id:
assert actor.death_cause
assert actor.death_cause.actor_died_error_context
assert actor.death_cause.oom_context
assert (
expected_worker_eviction_message
in actor.death_cause.actor_died_error_context.error_message
in actor.death_cause.oom_context.error_message
)
verified = True
assert verified
Expand Down
30 changes: 25 additions & 5 deletions src/ray/core_worker/transport/direct_actor_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,16 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe
auto status = Status::IOError("cancelling task of dead actor");
// No need to increment the number of completed tasks since the actor is
// dead.
GetTaskFinisherWithoutMu().FailOrRetryPendingTask(
task_id, error_type, &status, &error_info);
bool fail_immediatedly =
error_info.has_actor_died_error() &&
error_info.actor_died_error().has_oom_context() &&
error_info.actor_died_error().oom_context().fail_immediately();
GetTaskFinisherWithoutMu().FailOrRetryPendingTask(task_id,
error_type,
&status,
&error_info,
/*mark_task_object_failed*/ true,
fail_immediatedly);
}

// If the task submission subsequently fails, then the client will receive
Expand Down Expand Up @@ -306,8 +314,16 @@ void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(
// This task may have been waiting for dependency resolution, so cancel
// this first.
resolver_.CancelDependencyResolution(task_id);
GetTaskFinisherWithoutMu().FailOrRetryPendingTask(
task_id, error_type, &status, &error_info);
bool fail_immediatedly =
error_info.has_actor_died_error() &&
error_info.actor_died_error().has_oom_context() &&
error_info.actor_died_error().oom_context().fail_immediately();
GetTaskFinisherWithoutMu().FailOrRetryPendingTask(task_id,
error_type,
&status,
&error_info,
/*mark_task_object_failed*/ true,
fail_immediatedly);
}
if (!wait_for_death_info_tasks.empty()) {
RAY_LOG(DEBUG) << "Failing tasks waiting for death info, size="
Expand Down Expand Up @@ -496,6 +512,7 @@ void CoreWorkerDirectActorTaskSubmitter::HandlePushTaskReply(
task_id, reply, addr, reply.is_application_error());
} else {
bool is_actor_dead = false;
bool fail_immediatedly = false;
rpc::ErrorType error_type;
rpc::RayErrorInfo error_info;
{
Expand All @@ -512,6 +529,9 @@ void CoreWorkerDirectActorTaskSubmitter::HandlePushTaskReply(
const auto &death_cause = queue.death_cause;
error_info = GetErrorInfoFromActorDeathCause(death_cause);
error_type = GenErrorTypeFromDeathCause(death_cause);
fail_immediatedly = error_info.has_actor_died_error() &&
error_info.actor_died_error().has_oom_context() &&
error_info.actor_died_error().oom_context().fail_immediately();
}

// This task may have been waiting for dependency resolution, so cancel
Expand All @@ -524,7 +544,7 @@ void CoreWorkerDirectActorTaskSubmitter::HandlePushTaskReply(
&status,
&error_info,
/*mark_task_object_failed*/ is_actor_dead,
/*fail_immediatedly*/ false);
fail_immediatedly);

if (!is_actor_dead && !will_retry) {
// No retry == actor is dead.
Expand Down
24 changes: 17 additions & 7 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,21 @@ const ray::rpc::ActorDeathCause GenWorkerDiedCause(
const ray::rpc::WorkerExitType &disconnect_type,
const std::string &disconnect_detail) {
ray::rpc::ActorDeathCause death_cause;
auto actor_died_error_ctx = death_cause.mutable_actor_died_error_context();
AddActorInfo(actor, actor_died_error_ctx);
actor_died_error_ctx->set_error_message(absl::StrCat(
"The actor is dead because its worker process has died. Worker exit type: ",
ray::rpc::WorkerExitType_Name(disconnect_type),
" Worker exit detail: ",
disconnect_detail));
if (disconnect_type == ray::rpc::WorkerExitType::NODE_OUT_OF_MEMORY) {
auto oom_ctx = death_cause.mutable_oom_context();
/// TODO(clarng): actors typically don't retry. Support actor (task) oom retry
/// and set this value from raylet here.
oom_ctx->set_fail_immediately(false);
oom_ctx->set_error_message(disconnect_detail);
} else {
auto actor_died_error_ctx = death_cause.mutable_actor_died_error_context();
AddActorInfo(actor, actor_died_error_ctx);
actor_died_error_ctx->set_error_message(absl::StrCat(
"The actor is dead because its worker process has died. Worker exit type: ",
ray::rpc::WorkerExitType_Name(disconnect_type),
" Worker exit detail: ",
disconnect_detail));
}
return death_cause;
}
const ray::rpc::ActorDeathCause GenOwnerDiedCause(
Expand Down Expand Up @@ -908,6 +916,8 @@ void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id,
rpc::WorkerExitType_Name(disconnect_type),
", has creation_task_exception = ",
(creation_task_exception != nullptr));
RAY_LOG(DEBUG) << "on worker dead worker id " << worker_id << " disconnect detail "
<< disconnect_detail;
if (disconnect_type == rpc::WorkerExitType::INTENDED_USER_EXIT ||
disconnect_type == rpc::WorkerExitType::INTENDED_SYSTEM_EXIT) {
RAY_LOG(DEBUG) << message;
Expand Down
8 changes: 7 additions & 1 deletion src/ray/gcs/pb_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ inline rpc::ErrorType GenErrorTypeFromDeathCause(
return rpc::ErrorType::RUNTIME_ENV_SETUP_FAILED;
} else if (death_cause.context_case() == ContextCase::kActorUnschedulableContext) {
return rpc::ErrorType::ACTOR_UNSCHEDULABLE_ERROR;
} else if (death_cause.context_case() == ContextCase::kOomContext) {
return rpc::ErrorType::OUT_OF_MEMORY;
} else {
return rpc::ErrorType::ACTOR_DIED;
}
Expand All @@ -159,7 +161,8 @@ inline const std::string &GetActorDeathCauseString(
{ContextCase::kRuntimeEnvFailedContext, "RuntimeEnvFailedContext"},
{ContextCase::kCreationTaskFailureContext, "CreationTaskFailureContext"},
{ContextCase::kActorUnschedulableContext, "ActorUnschedulableContext"},
{ContextCase::kActorDiedErrorContext, "ActorDiedErrorContext"}};
{ContextCase::kActorDiedErrorContext, "ActorDiedErrorContext"},
{ContextCase::kOomContext, "OOMContext"}};
auto it = death_cause_string.find(death_cause.context_case());
RAY_CHECK(it != death_cause_string.end())
<< "Given death cause case " << death_cause.context_case() << " doesn't exist.";
Expand All @@ -182,6 +185,9 @@ inline rpc::RayErrorInfo GetErrorInfoFromActorDeathCause(
} else if (death_cause.context_case() == ContextCase::kActorUnschedulableContext) {
*(error_info.mutable_error_message()) =
death_cause.actor_unschedulable_context().error_message();
} else if (death_cause.context_case() == ContextCase::kOomContext) {
error_info.mutable_actor_died_error()->CopyFrom(death_cause);
*(error_info.mutable_error_message()) = death_cause.oom_context().error_message();
} else {
RAY_CHECK(death_cause.context_case() == ContextCase::CONTEXT_NOT_SET);
}
Expand Down
11 changes: 11 additions & 0 deletions src/ray/protobuf/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ message ActorDeathCause {
RuntimeEnvFailedContext runtime_env_failed_context = 2;
ActorDiedErrorContext actor_died_error_context = 3;
ActorUnschedulableContext actor_unschedulable_context = 4;
OomContext oom_context = 5;
}
}
// ---Actor death contexts start----
Expand Down Expand Up @@ -296,6 +297,16 @@ message ActorDiedErrorContext {
// before scheduling had completed.
bool never_started = 10;
}

// Context for task OOM.
message OomContext {
// The error message
string error_message = 1;

// Whether the task / actor should fail immediately and not be retried.
bool fail_immediately = 2;
}

// ---Actor death contexts end----

message JobConfig {
Expand Down

0 comments on commit 174f157

Please sign in to comment.