Skip to content

Commit

Permalink
[core] Weakref semantics for actor handles retrieved from self or by …
Browse files Browse the repository at this point in the history
…name (#45699)

The distributed ref counting protocol assumes that ObjectRefs and
ActorHandles are only ever passed in-band, through task arguments and
return values. However, it is possible to get actor handles
"out-of-band" through two APIs:
- `ray.get_runtime_context().current_actor` returns a handle to the
`self` actor
- `ray.get_actor` can get an actor handle by name

We used to assert that when a task finished, its arguments were still
registered in the ref count table. That way, we could return the
remaining ref count back to the task's caller. This assertion is too
strong and causes crash in the following case:
1. Actor receives a handle to itself through its arguments.
2. Actor gets another handle to itself via
`ray.get_runtime_context().current_actor`
3. The handle in step (2) goes out of scope -> ref removed from the
ReferenceCounter
4. Actor task finishes -> the handle in its arguments is no longer in
the ReferenceCounter -> assertion failure

This PR updates ref counting for actors to distinguish between strong
refs and weak refs. Strong refs are handles passed via task args and
return values from the original handle. These are tracked by the ref
counting protocol and will keep the actor alive. Weak refs are handles
created from the above cases and are not tracked by the ref counting
protocol (doing so would require additional protocols to notify the
owner of the new handle).

This PR also fixes the related #45704 by not incrementing the self ref
count on the actor itself when the actor is first created.

## Related issue number

Closes #45704.

---------

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
  • Loading branch information
stephanie-wang authored Jun 14, 2024
1 parent 0e5c79a commit d729815
Show file tree
Hide file tree
Showing 19 changed files with 301 additions and 98 deletions.
4 changes: 3 additions & 1 deletion cpp/src/ray/runtime/abstract_ray_runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,9 @@ std::string AbstractRayRuntime::DeserializeAndRegisterActorHandle(
const std::string &serialized_actor_handle) {
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
return core_worker
.DeserializeAndRegisterActorHandle(serialized_actor_handle, ObjectID::Nil())
.DeserializeAndRegisterActorHandle(serialized_actor_handle,
ObjectID::Nil(),
/*add_local_ref=*/true)
.Binary();
}

Expand Down
29 changes: 20 additions & 9 deletions python/ray/_private/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,14 @@ def _object_ref_deserializer(binary, call_site, owner_address, object_status):
return obj_ref


def _actor_handle_deserializer(serialized_obj):
def _actor_handle_deserializer(serialized_obj, weak_ref):
# If this actor handle was stored in another object, then tell the
# core worker.
context = ray._private.worker.global_worker.get_serialization_context()
outer_id = context.get_outer_object_ref()
return ray.actor.ActorHandle._deserialization_helper(serialized_obj, outer_id)
return ray.actor.ActorHandle._deserialization_helper(
serialized_obj, weak_ref, outer_id
)


class SerializationContext:
Expand All @@ -122,10 +124,11 @@ def __init__(self, worker):

def actor_handle_reducer(obj):
ray._private.worker.global_worker.check_connected()
serialized, actor_handle_id = obj._serialization_helper()
serialized, actor_handle_id, weak_ref = obj._serialization_helper()
# Update ref counting for the actor handle
self.add_contained_object_ref(actor_handle_id)
return _actor_handle_deserializer, (serialized,)
if not weak_ref:
self.add_contained_object_ref(actor_handle_id)
return _actor_handle_deserializer, (serialized, weak_ref)

self._register_cloudpickle_reducer(ray.actor.ActorHandle, actor_handle_reducer)

Expand Down Expand Up @@ -282,7 +285,9 @@ def _deserialize_object(self, data, metadata, object_ref):
return data.to_pybytes()
elif metadata_fields[0] == ray_constants.OBJECT_METADATA_TYPE_ACTOR_HANDLE:
obj = self._deserialize_msgpack_data(data, metadata_fields)
return _actor_handle_deserializer(obj)
# The last character is a 1 if weak_ref=True and 0 else.
serialized, weak_ref = obj[:-1], obj[-1:] == b"1"
return _actor_handle_deserializer(serialized, weak_ref)
# Otherwise, return an exception object based on
# the error type.
try:
Expand Down Expand Up @@ -464,11 +469,17 @@ def _serialize_to_msgpack(self, value):
elif isinstance(value, ray.actor.ActorHandle):
# TODO(fyresone): ActorHandle should be serialized via the
# custom type feature of cross-language.
serialized, actor_handle_id = value._serialization_helper()
contained_object_refs.append(actor_handle_id)
serialized, actor_handle_id, weak_ref = value._serialization_helper()
if not weak_ref:
contained_object_refs.append(actor_handle_id)
# Update ref counting for the actor handle
metadata = ray_constants.OBJECT_METADATA_TYPE_ACTOR_HANDLE
value = serialized
# Append a 1 to mean weak ref or 0 for strong ref.
# We do this here instead of in the main serialization helper
# because msgpack expects a bytes object. We cannot serialize
# `weak_ref` in the C++ code because the weak_ref property is only
# available in the Python ActorHandle instance.
value = serialized + (b"1" if weak_ref else b"0")
else:
metadata = ray_constants.OBJECT_METADATA_TYPE_CROSS_LANGUAGE

Expand Down
3 changes: 2 additions & 1 deletion python/ray/_raylet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ cdef class CoreWorker:
const CAddress &caller_address,
c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *returns,
CObjectID ref_generator_id=*)
cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle)
cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle,
c_bool weak_ref)
cdef c_function_descriptors_to_python(
self, const c_vector[CFunctionDescriptor] &c_function_descriptors)
cdef initialize_eventloops_for_actor_concurrency_group(
Expand Down
25 changes: 17 additions & 8 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4366,7 +4366,8 @@ cdef class CoreWorker:
CCoreWorkerProcess.GetCoreWorker().RemoveActorHandleReference(
c_actor_id)

cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle):
cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle,
c_bool weak_ref):
worker = ray._private.worker.global_worker
worker.check_connected()
manager = worker.function_actor_manager
Expand Down Expand Up @@ -4404,7 +4405,8 @@ cdef class CoreWorker:
method_meta.enable_task_events,
actor_method_cpu,
actor_creation_function_descriptor,
worker.current_cluster_and_job)
worker.current_cluster_and_job,
weak_ref=weak_ref)
else:
return ray.actor.ActorHandle(language, actor_id,
0, # max_task_retries,
Expand All @@ -4419,21 +4421,26 @@ cdef class CoreWorker:
{}, # enable_task_events
0, # actor method cpu
actor_creation_function_descriptor,
worker.current_cluster_and_job)
worker.current_cluster_and_job,
weak_ref=weak_ref,
)

def deserialize_and_register_actor_handle(self, const c_string &bytes,
ObjectRef
outer_object_ref):
outer_object_ref,
c_bool weak_ref):
cdef:
CObjectID c_outer_object_id = (outer_object_ref.native() if
outer_object_ref else
CObjectID.Nil())
c_actor_id = (CCoreWorkerProcess
.GetCoreWorker()
.DeserializeAndRegisterActorHandle(
bytes, c_outer_object_id))
bytes, c_outer_object_id,
add_local_ref=not weak_ref))
return self.make_actor_handle(
CCoreWorkerProcess.GetCoreWorker().GetActorHandle(c_actor_id))
CCoreWorkerProcess.GetCoreWorker().GetActorHandle(c_actor_id),
weak_ref)

def get_named_actor_handle(self, const c_string &name,
const c_string &ray_namespace):
Expand All @@ -4448,13 +4455,15 @@ cdef class CoreWorker:
name, ray_namespace))
check_status(named_actor_handle_pair.second)

return self.make_actor_handle(named_actor_handle_pair.first)
return self.make_actor_handle(named_actor_handle_pair.first,
weak_ref=True)

def get_actor_handle(self, ActorID actor_id):
cdef:
CActorID c_actor_id = actor_id.native()
return self.make_actor_handle(
CCoreWorkerProcess.GetCoreWorker().GetActorHandle(c_actor_id))
CCoreWorkerProcess.GetCoreWorker().GetActorHandle(c_actor_id),
weak_ref=True)

def list_named_actors(self, c_bool all_namespaces):
"""Returns (namespace, name) for named actors in the system.
Expand Down
28 changes: 23 additions & 5 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1262,6 +1262,13 @@ class ActorHandle:
_ray_original_handle: True if this is the original actor handle for a
given actor. If this is true, then the actor will be destroyed when
this handle goes out of scope.
_ray_weak_ref: True means that this handle does not count towards the
distributed ref count for the actor, i.e. the actor may be GCed
while this handle is still in scope. This is set to True if the
handle was created by getting an actor by name or by getting the
self handle. It is set to False if this is the original handle or
if it was created by passing the original handle through task args
and returns.
_ray_is_cross_language: Whether this actor is cross language.
_ray_actor_creation_function_descriptor: The function descriptor
of the actor creation task.
Expand All @@ -1285,11 +1292,13 @@ def __init__(
actor_creation_function_descriptor,
cluster_and_job,
original_handle=False,
weak_ref: bool = False,
):
self._ray_actor_language = language
self._ray_actor_id = actor_id
self._ray_max_task_retries = max_task_retries
self._ray_original_handle = original_handle
self._ray_weak_ref = weak_ref
self._ray_enable_task_events = enable_task_events

self._ray_method_is_generator = method_is_generator
Expand Down Expand Up @@ -1348,6 +1357,11 @@ def __init__(
setattr(self, method_name, method)

def __del__(self):
# Weak references don't count towards the distributed ref count, so no
# need to decrement the ref count.
if self._ray_weak_ref:
return

try:
# Mark that this actor handle has gone out of scope. Once all actor
# handles are out of scope, the actor will exit.
Expand Down Expand Up @@ -1568,17 +1582,19 @@ def _serialization_helper(self):
None,
)

return state
return (*state, self._ray_weak_ref)

@classmethod
def _deserialization_helper(cls, state, outer_object_ref=None):
def _deserialization_helper(cls, state, weak_ref: bool, outer_object_ref=None):
"""This is defined in order to make pickling work.
Args:
state: The serialized state of the actor handle.
outer_object_ref: The ObjectRef that the serialized actor handle
was contained in, if any. This is used for counting references
to the actor handle.
weak_ref: Whether this was serialized from an actor handle with a
weak ref to the actor.
"""
worker = ray._private.worker.global_worker
Expand All @@ -1587,7 +1603,9 @@ def _deserialization_helper(cls, state, outer_object_ref=None):
if hasattr(worker, "core_worker"):
# Non-local mode
return worker.core_worker.deserialize_and_register_actor_handle(
state, outer_object_ref
state,
outer_object_ref,
weak_ref,
)
else:
# Local mode
Expand All @@ -1614,10 +1632,10 @@ def _deserialization_helper(cls, state, outer_object_ref=None):

def __reduce__(self):
"""This code path is used by pickling but not by Ray forking."""
(serialized, _) = self._serialization_helper()
(serialized, _, weak_ref) = self._serialization_helper()
# There is no outer object ref when the actor handle is
# deserialized out-of-band using pickle.
return ActorHandle._deserialization_helper, (serialized, None)
return ActorHandle._deserialization_helper, (serialized, weak_ref, None)


def _modify_class(cls):
Expand Down
3 changes: 2 additions & 1 deletion python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const ResourceMappingType &GetResourceIDs() const
void RemoveActorHandleReference(const CActorID &actor_id)
CActorID DeserializeAndRegisterActorHandle(const c_string &bytes, const
CObjectID &outer_object_id)
CObjectID &outer_object_id,
c_bool add_local_ref)
CRayStatus SerializeActorHandle(const CActorID &actor_id, c_string
*bytes,
CObjectID *c_actor_handle_id)
Expand Down
Loading

0 comments on commit d729815

Please sign in to comment.