Skip to content

Commit 545c1ec

Browse files
dayshahedoakes
authored andcommitted
[core] Warning when creating actor with restarts and arguments in plasma (#53713)
Currently actor restarts won't because lineage ref counting doesn't work for actors with restarts. See description here #51653 (comment). Minimal repro ``` cluster = ray_start_cluster cluster.add_node(num_cpus=0) # head ray.init(address=cluster.address) worker1 = cluster.add_node(num_cpus=1) @ray.remote(num_cpus=1, max_restarts=1) class Actor: def __init__(self, config): self.config = config def ping(self): return self.config # Arg is >100kb so will go in the object store actor = Actor.remote(np.zeros(100 * 1024 * 1024, dtype=np.uint8)) ray.get(actor.ping.remote()) worker2 = cluster.add_node(num_cpus=1) cluster.remove_node(worker1, allow_graceful=True) # This line will break ray.get(actor.ping.remote()) ``` --------- Signed-off-by: dayshah <dhyey2019@gmail.com> Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
1 parent 3590ba7 commit 545c1ec

File tree

1 file changed

+57
-37
lines changed

1 file changed

+57
-37
lines changed

src/ray/core_worker/core_worker.cc

Lines changed: 57 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2723,45 +2723,65 @@ Status CoreWorker::CreateActor(const RayFunction &function,
27232723
local_mode_named_actor_registry_.emplace(actor_name, actor_id);
27242724
}
27252725
ExecuteTaskLocalMode(task_spec);
2726-
} else {
2727-
task_manager_->AddPendingTask(
2728-
rpc_address_,
2729-
task_spec,
2730-
CurrentCallSite(),
2731-
// Actor creation task retry happens on GCS not on core worker.
2732-
/*max_retries*/ 0);
2733-
2734-
if (actor_name.empty()) {
2735-
io_service_.post(
2736-
[this, task_spec = std::move(task_spec)]() {
2737-
RAY_UNUSED(actor_creator_->AsyncRegisterActor(
2738-
task_spec, [this, task_spec](Status status) {
2739-
if (!status.ok()) {
2740-
RAY_LOG(ERROR).WithField(task_spec.ActorCreationId())
2741-
<< "Failed to register actor. Error message: " << status;
2742-
task_manager_->FailPendingTask(task_spec.TaskId(),
2743-
rpc::ErrorType::ACTOR_CREATION_FAILED,
2744-
&status);
2745-
} else {
2746-
RAY_UNUSED(actor_task_submitter_->SubmitActorCreationTask(task_spec));
2747-
}
2748-
}));
2749-
},
2750-
"ActorCreator.AsyncRegisterActor");
2751-
} else {
2752-
// For named actor, we still go through the sync way because for
2753-
// functions like list actors these actors need to be there, especially
2754-
// for local driver. But the current code all go through the gcs right now.
2755-
auto status = actor_creator_->RegisterActor(task_spec);
2756-
if (!status.ok()) {
2757-
return status;
2726+
return Status::OK();
2727+
}
2728+
2729+
if (task_spec.MaxActorRestarts() != 0) {
2730+
bool actor_restart_warning = false;
2731+
for (size_t i = 0; i < task_spec.NumArgs(); i++) {
2732+
if (task_spec.ArgByRef(i) || !task_spec.ArgInlinedRefs(i).empty()) {
2733+
actor_restart_warning = true;
2734+
break;
27582735
}
2759-
io_service_.post(
2760-
[this, task_spec = std::move(task_spec)]() {
2761-
RAY_UNUSED(actor_task_submitter_->SubmitActorCreationTask(task_spec));
2762-
},
2763-
"CoreWorker.SubmitTask");
27642736
}
2737+
if (actor_restart_warning) {
2738+
RAY_LOG(ERROR)
2739+
<< "Actor " << (actor_name.empty() ? "" : (actor_name + " "))
2740+
<< "with class name: '" << function.GetFunctionDescriptor()->ClassName()
2741+
<< "' and ID: '" << task_spec.ActorCreationId()
2742+
<< "' has constructor arguments in the object store and max_restarts > 0. If "
2743+
"the arguments in the object store go out of scope or are lost, the "
2744+
"actor restart will fail. See "
2745+
"https://github.com/ray-project/ray/issues/53727 for more details.";
2746+
}
2747+
}
2748+
2749+
task_manager_->AddPendingTask(
2750+
rpc_address_,
2751+
task_spec,
2752+
CurrentCallSite(),
2753+
// Actor creation task retry happens on GCS not on core worker.
2754+
/*max_retries*/ 0);
2755+
2756+
if (actor_name.empty()) {
2757+
io_service_.post(
2758+
[this, task_spec = std::move(task_spec)]() {
2759+
RAY_UNUSED(actor_creator_->AsyncRegisterActor(
2760+
task_spec, [this, task_spec](Status status) {
2761+
if (!status.ok()) {
2762+
RAY_LOG(ERROR).WithField(task_spec.ActorCreationId())
2763+
<< "Failed to register actor. Error message: " << status;
2764+
task_manager_->FailPendingTask(
2765+
task_spec.TaskId(), rpc::ErrorType::ACTOR_CREATION_FAILED, &status);
2766+
} else {
2767+
RAY_UNUSED(actor_task_submitter_->SubmitActorCreationTask(task_spec));
2768+
}
2769+
}));
2770+
},
2771+
"ActorCreator.AsyncRegisterActor");
2772+
} else {
2773+
// For named actor, we still go through the sync way because for
2774+
// functions like list actors these actors need to be there, especially
2775+
// for local driver. But the current code all go through the gcs right now.
2776+
auto status = actor_creator_->RegisterActor(task_spec);
2777+
if (!status.ok()) {
2778+
return status;
2779+
}
2780+
io_service_.post(
2781+
[this, task_spec = std::move(task_spec)]() {
2782+
RAY_UNUSED(actor_task_submitter_->SubmitActorCreationTask(task_spec));
2783+
},
2784+
"CoreWorker.SubmitTask");
27652785
}
27662786
return Status::OK();
27672787
}

0 commit comments

Comments
 (0)