Skip to content

Commit

Permalink
[Core] Fix object reconstruction hang on arguments pending creation (#…
Browse files Browse the repository at this point in the history
…47645)

Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
  • Loading branch information
jjyao authored Sep 27, 2024
1 parent c9fa046 commit 1003da0
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 2 deletions.
53 changes: 52 additions & 1 deletion python/ray/tests/test_reconstruction_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
from ray._private.internal_api import memory_summary
from ray._private.test_utils import Semaphore, SignalActor, wait_for_condition
import ray.exceptions
from ray.util.state import list_tasks

# Task status.
WAITING_FOR_DEPENDENCIES = "PENDING_ARGS_AVAIL"
SCHEDULED = "PENDING_NODE_ASSIGNMENT"
FINISHED = "FINISHED"
WAITING_FOR_EXECUTION = "SUBMITTED_TO_WORKER"

Expand Down Expand Up @@ -505,6 +505,57 @@ def dependent_task(x):
ray.get(obj)


def test_object_reconstruction_dead_actor(config, ray_start_cluster):
# Test to make sure that if object reconstruction fails
# due to dead actor, pending_creation is set back to false.
# https://github.com/ray-project/ray/issues/47606
cluster = ray_start_cluster
cluster.add_node(num_cpus=0, _system_config=config)
ray.init(address=cluster.address)
node1 = cluster.add_node(resources={"node1": 1})
node2 = cluster.add_node(resources={"node2": 1})

@ray.remote(max_restarts=0, max_task_retries=-1, resources={"node1": 0.1})
class Worker:
def func_in(self):
return np.random.rand(1024000)

@ray.remote(max_retries=-1, resources={"node2": 0.1})
def func_out(data):
return np.random.rand(1024000)

worker = Worker.remote()

ref_in = worker.func_in.remote()
ref_out = func_out.remote(ref_in)

ray.wait([ref_in, ref_out], num_returns=2, timeout=None, fetch_local=False)

def func_out_resubmitted():
tasks = list_tasks(filters=[("name", "=", "func_out")])
assert len(tasks) == 2
assert (
tasks[0]["state"] == "PENDING_NODE_ASSIGNMENT"
or tasks[1]["state"] == "PENDING_NODE_ASSIGNMENT"
)
return True

cluster.remove_node(node2, allow_graceful=False)
# ref_out will reconstruct, wait for the lease request to reach raylet.
wait_for_condition(func_out_resubmitted)

cluster.remove_node(node1, allow_graceful=False)
# ref_in is lost and the reconstruction will
# fail with ActorDiedError

node1 = cluster.add_node(resources={"node1": 1})
node2 = cluster.add_node(resources={"node2": 1})

with pytest.raises(ray.exceptions.RayTaskError) as exc_info:
ray.get(ref_out)
assert "input arguments for this task could not be computed" in str(exc_info.value)


def test_object_reconstruction_pending_creation(config, ray_start_cluster):
# Test to make sure that an object being reconstructured
# has pending_creation set to true.
Expand Down
8 changes: 7 additions & 1 deletion src/ray/core_worker/object_recovery_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,15 @@ void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) {
// object.
const auto task_id = object_id.TaskId();
std::vector<ObjectID> task_deps;
// pending_creation needs to be set to true BEFORE calling ResubmitTask,
// since it might be set back to false inside ResubmitTask if the task is
// an actor task and the actor is dead. If we set pending_creation to true
// after ResubmitTask, then it will remain true forever.
// see https://github.com/ray-project/ray/issues/47606 for more details.
reference_counter_->UpdateObjectPendingCreation(object_id, true);
auto resubmitted = task_resubmitter_->ResubmitTask(task_id, &task_deps);

if (resubmitted) {
reference_counter_->UpdateObjectPendingCreation(object_id, true);
// Try to recover the task's dependencies.
for (const auto &dep : task_deps) {
auto recovered = RecoverObject(dep);
Expand All @@ -189,6 +194,7 @@ void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) {
} else {
RAY_LOG(INFO) << "Failed to reconstruct object " << object_id
<< " because lineage has already been deleted";
reference_counter_->UpdateObjectPendingCreation(object_id, false);
recovery_failure_callback_(
object_id,
rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE_MAX_ATTEMPTS_EXCEEDED,
Expand Down

0 comments on commit 1003da0

Please sign in to comment.