Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions python/ray/tests/test_core_worker_fault_tolerance.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,5 +207,32 @@ def remote_wait(sg):
ray.get(inner, timeout=10)


def test_double_borrowing_with_rpc_failure(monkeypatch, shutdown_only):
"""Regression test for https://github.com/ray-project/ray/issues/57997"""
monkeypatch.setenv(
"RAY_testing_rpc_failure", "CoreWorkerService.grpc_client.PushTask=3:0:100"
)

ray.init()

@ray.remote(max_task_retries=-1, max_restarts=-1)
class Actor:
def __init__(self, objs):
# Actor is a borrower of obj
self.obj = objs[0]

def test(self):
# Return the borrowed object inside the list
# so the caller is a borrower as well.
# This actor task will be retried since
# the first PushTask RPC response will be lost.
return [self.obj]

obj = ray.put(31)
actor = Actor.remote([obj])
result = ray.get(actor.test.remote())
assert ray.get(result[0]) == 31


if __name__ == "__main__":
pytest.main([__file__, "-v", "-s"])
1 change: 0 additions & 1 deletion python/ray/tests/test_reference_counting_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import ray
import ray._private.gcs_utils as gcs_utils
import ray.cluster_utils
from ray._common.test_utils import SignalActor, wait_for_condition
from ray._private.internal_api import memory_summary
from ray._private.test_utils import (
Expand Down
4 changes: 4 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2713,6 +2713,10 @@ Status CoreWorker::AllocateReturnObject(const ObjectID &object_id,
// Mark this object as containing other object IDs. The ref counter will
// keep the inner IDs in scope until the outer one is out of scope.
if (!contained_object_ids.empty() && !options_.is_local_mode) {
// Due to response loss caused by network failures,
// this method may be called multiple times for the same return object
// but it's fine since AddNestedObjectIds is idempotent.
// See https://github.com/ray-project/ray/issues/57997
reference_counter_->AddNestedObjectIds(
object_id, contained_object_ids, owner_address);
}
Expand Down
8 changes: 2 additions & 6 deletions src/ray/core_worker/reference_counter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1261,12 +1261,8 @@ void ReferenceCounter::AddNestedObjectIdsInternal(const ObjectID &object_id,
WaitForRefRemoved(inner_it, owner_address, object_id);
}
} else {
auto inserted = inner_it->second.mutable_borrow()
->stored_in_objects.emplace(object_id, owner_address)
.second;
// This should be the first time that we have stored this object ID
// inside this return ID.
RAY_CHECK(inserted);
inner_it->second.mutable_borrow()->stored_in_objects.emplace(object_id,
owner_address);
}
PRINT_REF_COUNT(inner_it);
}
Expand Down
2 changes: 2 additions & 0 deletions src/ray/core_worker/reference_counter_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,8 @@ class ReferenceCounterInterface {
/// and we are processing the worker's reply. In this case, we own the task's
/// return objects and are borrowing the nested IDs.
///
/// This method is idempotent.
///
/// \param[in] object_id The ID of the object that contains other ObjectIDs.
/// \param[in] inner_ids The object IDs are nested in object_id's value.
/// \param[in] owner_address The owner address of the outer object_id. If
Expand Down
59 changes: 59 additions & 0 deletions src/ray/core_worker/tests/reference_counter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2988,6 +2988,65 @@ TEST_F(ReferenceCountTest, TestOwnDynamicStreamingTaskReturnRef) {
ASSERT_FALSE(rc->HasReference(object_id_2));
}

TEST(DistributedReferenceCountTest, TestAddNestedObjectIdsIdempotency) {
auto caller = std::make_shared<MockWorkerClient>("1");
auto executor = std::make_shared<MockWorkerClient>(
"2", [&](const rpc::Address &addr) { return caller; });

{
// Case 1: ray.put a nested object
// object_id_1 = ray.put([object_id_2])
auto object_id_1 = ObjectID::FromRandom();
auto object_id_2 = ObjectID::FromRandom();
executor->rc_.AddOwnedObject(
object_id_1, {}, executor->address_, "", 0, false, /*add_local_ref=*/true);
executor->rc_.AddNestedObjectIds(object_id_1, {object_id_2}, executor->address_);
executor->rc_.AddNestedObjectIds(object_id_1, {object_id_2}, executor->address_);
ASSERT_TRUE(executor->rc_.HasReference(object_id_1));
ASSERT_TRUE(executor->rc_.HasReference(object_id_2));
executor->rc_.RemoveLocalReference(object_id_1, nullptr);
executor->rc_.RemoveLocalReference(object_id_2, nullptr);
ASSERT_FALSE(executor->rc_.HasReference(object_id_1));
ASSERT_FALSE(executor->rc_.HasReference(object_id_2));
}

{
// Case 2: task returns an owned nested object
auto object_id_3 = ObjectID::FromRandom();
auto object_id_4 = ObjectID::FromRandom();
executor->rc_.AddOwnedObject(
object_id_3, {}, executor->address_, "", 0, false, /*add_local_ref=*/true);
executor->rc_.AddNestedObjectIds(object_id_4, {object_id_3}, caller->address_);
executor->rc_.AddNestedObjectIds(object_id_4, {object_id_3}, caller->address_);
ASSERT_TRUE(executor->rc_.HasReference(object_id_3));
// There should be one WaitForRefRemoved call due to idempotency.
ASSERT_EQ(caller->num_requests_, 1);
executor->rc_.RemoveLocalReference(object_id_3, nullptr);
// Caller is still borrowing
ASSERT_TRUE(executor->rc_.HasReference(object_id_3));
// Caller is no longer borrowing
caller->FlushBorrowerCallbacks();
ASSERT_FALSE(executor->rc_.HasReference(object_id_3));
}

{
// Case 3: task returns a borrowed nested object
auto object_id_5 = ObjectID::FromRandom();
auto object_id_6 = ObjectID::FromRandom();
executor->rc_.AddBorrowedObject(object_id_5, ObjectID::Nil(), caller->address_);
executor->rc_.AddNestedObjectIds(object_id_6, {object_id_5}, caller->address_);
executor->rc_.AddNestedObjectIds(object_id_6, {object_id_5}, caller->address_);
ASSERT_TRUE(executor->rc_.HasReference(object_id_5));
// Task finishes and we return the borrower info to the owner.
ReferenceCounterInterface::ReferenceTableProto refs;
executor->rc_.PopAndClearLocalBorrowers({object_id_5}, &refs, nullptr);
ASSERT_EQ(refs.size(), 1);
ASSERT_EQ(refs[0].stored_in_objects().size(), 1);
ASSERT_EQ(refs[0].stored_in_objects()[0].object_id(), object_id_6.Binary());
ASSERT_FALSE(executor->rc_.HasReference(object_id_5));
}
}

} // namespace core
} // namespace ray

Expand Down