Skip to content

Commit d57b582

Browse files
authored
Improving message for access to unknown objects (#28209)
Signed-off-by: Pablo E <pabloem@apache.org> When accessing unknown objects, Ray will kill the Python interpreter (without erroring out!). I've tried to rephrase the error to be a little easier to figure out and fix, and changed the check so that an error will be thrown rather than ending the whole process.
1 parent 4267ac2 commit d57b582

File tree

14 files changed

+333
-74
lines changed

14 files changed

+333
-74
lines changed

cpp/src/ray/runtime/abstract_ray_runtime.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ std::vector<std::unique_ptr<::ray::TaskArg>> TransformArgs(
141141
auto owner_address = ray::rpc::Address{};
142142
if (ConfigInternal::Instance().run_mode == RunMode::CLUSTER) {
143143
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
144-
owner_address = core_worker.GetOwnerAddress(id);
144+
owner_address = core_worker.GetOwnerAddressOrDie(id);
145145
}
146146
ray_arg = absl::make_unique<ray::TaskArgByReference>(id,
147147
owner_address,

cpp/src/ray/test/cluster/cluster_mode_test.cc

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ TEST(RayClusterModeTest, FullTest) {
7373
config.redis_password_ = password;
7474
}
7575
ray::Init(config, cmd_argc, cmd_argv);
76+
7677
/// put and get object
7778
auto obj = ray::Put(12345);
7879
auto get_result = *(ray::Get(obj));
@@ -88,9 +89,9 @@ TEST(RayClusterModeTest, FullTest) {
8889
EXPECT_EQ(1, task_result);
8990

9091
/// common task with args
91-
task_obj = ray::Task(Plus1).Remote(5);
92-
task_result = *(ray::Get(task_obj));
93-
EXPECT_EQ(6, task_result);
92+
auto task_obj1 = ray::Task(Plus1).Remote(5);
93+
auto task_result1 = *(ray::Get(task_obj1));
94+
EXPECT_EQ(6, task_result1);
9495

9596
ray::ActorHandle<Counter> actor = ray::Actor(RAY_FUNC(Counter::FactoryCreate))
9697
.SetMaxRestarts(1)
@@ -495,6 +496,10 @@ TEST(RayClusterModeTest, TaskWithPlacementGroup) {
495496
}
496497

497498
TEST(RayClusterModeTest, NamespaceTest) {
499+
if (ray::IsInitialized()) {
500+
ray::Shutdown();
501+
}
502+
ray::Init();
498503
// Create a named actor in namespace `isolated_ns`.
499504
std::string actor_name_in_isolated_ns = "named_actor_in_isolated_ns";
500505
std::string isolated_ns_name = "isolated_ns";
@@ -516,11 +521,11 @@ TEST(RayClusterModeTest, NamespaceTest) {
516521

517522
// Create a named actor in job default namespace.
518523
std::string actor_name_in_default_ns = "actor_name_in_default_ns";
519-
actor = ray::Actor(RAY_FUNC(Counter::FactoryCreate))
520-
.SetName(actor_name_in_default_ns)
521-
.Remote();
522-
initialized_obj = actor.Task(&Counter::Initialized).Remote();
523-
EXPECT_TRUE(*initialized_obj.Get());
524+
auto actor1 = ray::Actor(RAY_FUNC(Counter::FactoryCreate))
525+
.SetName(actor_name_in_default_ns)
526+
.Remote();
527+
auto initialized_obj1 = actor1.Task(&Counter::Initialized).Remote();
528+
EXPECT_TRUE(*initialized_obj1.Get());
524529
// It is visible to job default namespace.
525530
actor_optional = ray::GetActor<Counter>(actor_name_in_default_ns);
526531
EXPECT_TRUE(actor_optional);
@@ -534,6 +539,9 @@ TEST(RayClusterModeTest, GetNamespaceApiTest) {
534539
std::string ns = "test_get_current_namespace";
535540
ray::RayConfig config;
536541
config.ray_namespace = ns;
542+
if (ray::IsInitialized()) {
543+
ray::Shutdown();
544+
}
537545
ray::Init(config, cmd_argc, cmd_argv);
538546
// Get namespace in driver.
539547
EXPECT_EQ(ray::GetNamespace(), ns);

java/test/src/main/java/io/ray/test/ReferenceCountingTest.java

Lines changed: 38 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.ray.api.ActorHandle;
77
import io.ray.api.ObjectRef;
88
import io.ray.api.Ray;
9+
import io.ray.api.exception.RayException;
910
import io.ray.api.id.ObjectId;
1011
import io.ray.runtime.object.NativeObjectStore;
1112
import io.ray.runtime.object.ObjectRefImpl;
@@ -112,9 +113,14 @@ private static void fillObjectStoreAndGet(
112113
if (succeed) {
113114
TestUtils.getRuntime().getObjectStore().getRaw(ImmutableList.of(objectId), Long.MAX_VALUE);
114115
} else {
115-
List<Boolean> result =
116-
TestUtils.getRuntime().getObjectStore().wait(ImmutableList.of(objectId), 1, 100, true);
117-
Assert.assertFalse(result.get(0));
116+
try {
117+
List<Boolean> result =
118+
TestUtils.getRuntime().getObjectStore().wait(ImmutableList.of(objectId), 0, 100, true);
119+
Assert.fail(
120+
"Ray did not fail when waiting for an object that does not belong in this session");
121+
} catch (RayException e) {
122+
// This is the expected outcome for succeed=false, as we wait for non-existent objects.
123+
}
118124
}
119125
}
120126

@@ -165,27 +171,7 @@ private static void sendSignal(ActorHandle<SignalActor> signal) {
165171
}
166172

167173
/** Based on Python test case `test_dependency_refcounts`. */
168-
public void testDependencyRefCounts() {
169-
{
170-
// Test that regular plasma dependency refcounts are decremented once the
171-
// task finishes.
172-
ActorHandle<SignalActor> signal = SignalActor.create();
173-
ObjectRefImpl<TestUtils.LargeObject> largeDep =
174-
(ObjectRefImpl<TestUtils.LargeObject>) Ray.put(new TestUtils.LargeObject());
175-
ObjectRefImpl<Object> result =
176-
(ObjectRefImpl<Object>)
177-
Ray.<TestUtils.LargeObject, ActorHandle<SignalActor>, Object>task(
178-
ReferenceCountingTest::oneDep, largeDep, signal)
179-
.remote();
180-
checkRefCounts(largeDep.getId(), 1, 1, result.getId(), 1, 0);
181-
sendSignal(signal);
182-
// Reference count should be removed once the task finishes.
183-
checkRefCounts(largeDep.getId(), 1, 0, result.getId(), 1, 0);
184-
del(largeDep);
185-
del(result);
186-
checkRefCounts(ImmutableMap.of());
187-
}
188-
174+
public void testDependencyRefCounts1() {
189175
{
190176
// Test that inlined dependency refcounts are decremented once they are
191177
// inlined.
@@ -207,7 +193,9 @@ public void testDependencyRefCounts() {
207193
del(result);
208194
checkRefCounts(ImmutableMap.of());
209195
}
196+
}
210197

198+
public void testDependencyRefCounts2() {
211199
{
212200
// Test that spilled plasma dependency refcounts are decremented once
213201
// the task finishes.
@@ -236,7 +224,9 @@ public void testDependencyRefCounts() {
236224
del(result);
237225
checkRefCounts(ImmutableMap.of());
238226
}
227+
}
239228

229+
public void testDependencyRefCounts3() {
240230
{
241231
// Test that regular plasma dependency refcounts are decremented if a task
242232
// fails.
@@ -257,7 +247,9 @@ public void testDependencyRefCounts() {
257247
del(result);
258248
checkRefCounts(ImmutableMap.of());
259249
}
250+
}
260251

252+
public void testDependencyRefCounts4() {
261253
{
262254
// Test that spilled plasma dependency refcounts are decremented if a task
263255
// fails.
@@ -288,6 +280,28 @@ public void testDependencyRefCounts() {
288280
}
289281
}
290282

283+
public void testDependencyRefCounts5() {
284+
{
285+
// Test that regular plasma dependency refcounts are decremented once the
286+
// task finishes.
287+
ActorHandle<SignalActor> signal = SignalActor.create();
288+
ObjectRefImpl<TestUtils.LargeObject> largeDep =
289+
(ObjectRefImpl<TestUtils.LargeObject>) Ray.put(new TestUtils.LargeObject());
290+
ObjectRefImpl<Object> result =
291+
(ObjectRefImpl<Object>)
292+
Ray.<TestUtils.LargeObject, ActorHandle<SignalActor>, Object>task(
293+
ReferenceCountingTest::oneDep, largeDep, signal)
294+
.remote();
295+
checkRefCounts(largeDep.getId(), 1, 1, result.getId(), 1, 0);
296+
sendSignal(signal);
297+
// Reference count should be removed once the task finishes.
298+
checkRefCounts(largeDep.getId(), 1, 0, result.getId(), 1, 0);
299+
del(largeDep);
300+
del(result);
301+
checkRefCounts(ImmutableMap.of());
302+
}
303+
}
304+
291305
private static int fooBasicPinning(Object arg) {
292306
return 0;
293307
}

python/ray/_raylet.pyx

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,10 @@ cdef int check_status(const CRayStatus& status) nogil except -1:
198198
raise GetTimeoutError(message)
199199
elif status.IsNotFound():
200200
raise ValueError(message)
201+
elif status.IsObjectNotFound():
202+
raise ValueError(message)
203+
elif status.IsObjectUnknownOwner():
204+
raise ValueError(message)
201205
else:
202206
raise RaySystemError(message)
203207

@@ -421,6 +425,8 @@ cdef prepare_args_internal(
421425
c_vector[CObjectID] inlined_ids
422426
c_string put_arg_call_site
423427
c_vector[CObjectReference] inlined_refs
428+
CAddress c_owner_address
429+
CRayStatus op_status
424430

425431
worker = ray._private.worker.global_worker
426432
put_threshold = RayConfig.instance().max_direct_call_object_size()
@@ -429,11 +435,13 @@ cdef prepare_args_internal(
429435
for arg in args:
430436
if isinstance(arg, ObjectRef):
431437
c_arg = (<ObjectRef>arg).native()
438+
op_status = CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress(
439+
c_arg, &c_owner_address)
440+
check_status(op_status)
432441
args_vector.push_back(
433442
unique_ptr[CTaskArg](new CTaskArgByReference(
434443
c_arg,
435-
CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress(
436-
c_arg),
444+
c_owner_address,
437445
arg.call_site())))
438446

439447
else:
@@ -1565,8 +1573,9 @@ cdef class CoreWorker:
15651573
CTaskID c_task_id = current_task_id.native()
15661574
c_vector[CObjectID] c_object_ids = ObjectRefsToVector(object_refs)
15671575
with nogil:
1568-
check_status(CCoreWorkerProcess.GetCoreWorker().Get(
1569-
c_object_ids, timeout_ms, &results))
1576+
op_status = CCoreWorkerProcess.GetCoreWorker().Get(
1577+
c_object_ids, timeout_ms, &results)
1578+
check_status(op_status)
15701579

15711580
return RayObjectsToDataMetadataPairs(results)
15721581

@@ -1770,8 +1779,9 @@ cdef class CoreWorker:
17701779

17711780
wait_ids = ObjectRefsToVector(object_refs)
17721781
with nogil:
1773-
check_status(CCoreWorkerProcess.GetCoreWorker().Wait(
1774-
wait_ids, num_returns, timeout_ms, &results, fetch_local))
1782+
op_status = CCoreWorkerProcess.GetCoreWorker().Wait(
1783+
wait_ids, num_returns, timeout_ms, &results, fetch_local)
1784+
check_status(op_status)
17751785

17761786
assert len(results) == len(object_refs)
17771787

@@ -2322,16 +2332,20 @@ cdef class CoreWorker:
23222332
def get_owner_address(self, ObjectRef object_ref):
23232333
cdef:
23242334
CObjectID c_object_id = object_ref.native()
2325-
return CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress(
2326-
c_object_id).SerializeAsString()
2335+
CAddress c_owner_address
2336+
op_status = CCoreWorkerProcess.GetCoreWorker().GetOwnerAddress(
2337+
c_object_id, &c_owner_address)
2338+
check_status(op_status)
2339+
return c_owner_address.SerializeAsString()
23272340

23282341
def serialize_object_ref(self, ObjectRef object_ref):
23292342
cdef:
23302343
CObjectID c_object_id = object_ref.native()
23312344
CAddress c_owner_address = CAddress()
23322345
c_string serialized_object_status
2333-
CCoreWorkerProcess.GetCoreWorker().GetOwnershipInfo(
2346+
op_status = CCoreWorkerProcess.GetCoreWorker().GetOwnershipInfo(
23342347
c_object_id, &c_owner_address, &serialized_object_status)
2348+
check_status(op_status)
23352349
return (object_ref,
23362350
c_owner_address.SerializeAsString(),
23372351
serialized_object_status)

python/ray/includes/common.pxd

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,9 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil:
113113
c_bool IsTimedOut()
114114
c_bool IsInterrupted()
115115
c_bool ShouldExitWorker()
116+
c_bool IsObjectNotFound()
116117
c_bool IsNotFound()
118+
c_bool IsObjectUnknownOwner()
117119

118120
c_string ToString()
119121
c_string CodeAsString()

python/ray/includes/libcoreworker.pxd

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,13 +185,14 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
185185
void PutObjectIntoPlasma(const CRayObject &object,
186186
const CObjectID &object_id)
187187
const CAddress &GetRpcAddress() const
188-
CAddress GetOwnerAddress(const CObjectID &object_id) const
188+
CRayStatus GetOwnerAddress(const CObjectID &object_id,
189+
CAddress *owner_address) const
189190
c_vector[CObjectReference] GetObjectRefs(
190191
const c_vector[CObjectID] &object_ids) const
191192

192-
void GetOwnershipInfo(const CObjectID &object_id,
193-
CAddress *owner_address,
194-
c_string *object_status)
193+
CRayStatus GetOwnershipInfo(const CObjectID &object_id,
194+
CAddress *owner_address,
195+
c_string *object_status)
195196
void RegisterOwnershipInfoAndResolveFuture(
196197
const CObjectID &object_id,
197198
const CObjectID &outer_object_id,

python/ray/tests/BUILD

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ py_test_module_list(
5858
"test_get_locations.py",
5959
"test_global_state.py",
6060
"test_healthcheck.py",
61-
"test_ray_shutdown.py",
6261
"test_kill_raylet_signal_log.py",
6362
"test_memstat.py",
6463
"test_protobuf_compatibility.py"
@@ -128,6 +127,7 @@ py_test_module_list(
128127
"test_placement_group_failover.py",
129128
"test_ray_init.py",
130129
"test_ray_init_2.py",
130+
"test_ray_shutdown.py",
131131
"test_resource_demand_scheduler.py",
132132
"test_resource_metrics.py",
133133
"test_runtime_context.py",

python/ray/tests/test_basic.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -650,8 +650,9 @@ def h():
650650

651651
assert ray.get(f.remote()) == (1, 2)
652652

653-
# Test a remote function that recursively calls itself.
654653

654+
def test_recursive_remote_call(ray_start_shared_local_modes):
655+
# Test a remote function that recursively calls itself.
655656
@ray.remote
656657
def factorial(n):
657658
if n == 0:

0 commit comments

Comments
 (0)