Skip to content

Commit

Permalink
[Core] Add object back to memory store when object recovery is skipped (
Browse files Browse the repository at this point in the history
#46460)

Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
  • Loading branch information
jjyao authored Jul 8, 2024
1 parent 05067f4 commit 33ee732
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 2 deletions.
10 changes: 8 additions & 2 deletions src/ray/core_worker/object_recovery_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,14 @@ bool ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) {
} else if (requires_recovery) {
RAY_LOG(DEBUG) << "Recovery already started for object " << object_id;
} else {
RAY_LOG(DEBUG) << "Object " << object_id
<< " has a pinned or spilled location, skipping recovery";
RAY_LOG(INFO) << "Object " << object_id
<< " has a pinned or spilled location, skipping recovery " << pinned_at;
// If the object doesn't exist in the memory store
// (core_worker.cc removes the object from memory store before calling this method),
// we need to add it back to indicate that it's available.
// If the object is already in the memory store then the put is a no-op.
RAY_CHECK(
in_memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id));
}
return true;
}
Expand Down
26 changes: 26 additions & 0 deletions src/ray/core_worker/test/object_recovery_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,32 @@ TEST_F(ObjectRecoveryManagerTest, TestLineageEvicted) {
rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE_LINEAGE_EVICTED);
}

TEST_F(ObjectRecoveryManagerTest, TestReconstructionSkipped) {
// Test that if the object is already pinned or spilled,
// reconstruction is skipped.
ObjectID object_id = ObjectID::FromRandom();
ref_counter_->AddOwnedObject(object_id,
{},
rpc::Address(),
"",
0,
true,
/*add_local_ref=*/true);
ref_counter_->UpdateObjectPinnedAtRaylet(object_id, NodeID::FromRandom());

memory_store_->Delete({object_id});
ASSERT_TRUE(manager_.RecoverObject(object_id));
ASSERT_TRUE(failed_reconstructions_.empty());
ASSERT_EQ(object_directory_->Flush(), 0);
ASSERT_EQ(raylet_client_->Flush(), 0);
ASSERT_EQ(task_resubmitter_->num_tasks_resubmitted, 0);
// The object should be added back to the memory store
// indicating the object is available again.
bool in_plasma = false;
ASSERT_TRUE(memory_store_->Contains(object_id, &in_plasma));
ASSERT_TRUE(in_plasma);
}

} // namespace core
} // namespace ray

Expand Down

0 comments on commit 33ee732

Please sign in to comment.