Skip to content

Commit

Permalink
Revert "[core] Weakref semantics for actor handles retrieved from sel…
Browse files Browse the repository at this point in the history
…f or by name (ray-project#45699)"

This reverts commit d729815.
  • Loading branch information
jjyao committed Jul 21, 2024
1 parent 104bb49 commit aaa6364
Show file tree
Hide file tree
Showing 19 changed files with 98 additions and 301 deletions.
4 changes: 1 addition & 3 deletions cpp/src/ray/runtime/abstract_ray_runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,7 @@ std::string AbstractRayRuntime::DeserializeAndRegisterActorHandle(
const std::string &serialized_actor_handle) {
auto &core_worker = CoreWorkerProcess::GetCoreWorker();
return core_worker
.DeserializeAndRegisterActorHandle(serialized_actor_handle,
ObjectID::Nil(),
/*add_local_ref=*/true)
.DeserializeAndRegisterActorHandle(serialized_actor_handle, ObjectID::Nil())
.Binary();
}

Expand Down
29 changes: 9 additions & 20 deletions python/ray/_private/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,12 @@ def _object_ref_deserializer(binary, call_site, owner_address, object_status):
return obj_ref


def _actor_handle_deserializer(serialized_obj, weak_ref):
def _actor_handle_deserializer(serialized_obj):
# If this actor handle was stored in another object, then tell the
# core worker.
context = ray._private.worker.global_worker.get_serialization_context()
outer_id = context.get_outer_object_ref()
return ray.actor.ActorHandle._deserialization_helper(
serialized_obj, weak_ref, outer_id
)
return ray.actor.ActorHandle._deserialization_helper(serialized_obj, outer_id)


class SerializationContext:
Expand All @@ -124,11 +122,10 @@ def __init__(self, worker):

def actor_handle_reducer(obj):
ray._private.worker.global_worker.check_connected()
serialized, actor_handle_id, weak_ref = obj._serialization_helper()
serialized, actor_handle_id = obj._serialization_helper()
# Update ref counting for the actor handle
if not weak_ref:
self.add_contained_object_ref(actor_handle_id)
return _actor_handle_deserializer, (serialized, weak_ref)
self.add_contained_object_ref(actor_handle_id)
return _actor_handle_deserializer, (serialized,)

self._register_cloudpickle_reducer(ray.actor.ActorHandle, actor_handle_reducer)

Expand Down Expand Up @@ -285,9 +282,7 @@ def _deserialize_object(self, data, metadata, object_ref):
return data.to_pybytes()
elif metadata_fields[0] == ray_constants.OBJECT_METADATA_TYPE_ACTOR_HANDLE:
obj = self._deserialize_msgpack_data(data, metadata_fields)
# The last character is a 1 if weak_ref=True and 0 else.
serialized, weak_ref = obj[:-1], obj[-1:] == b"1"
return _actor_handle_deserializer(serialized, weak_ref)
return _actor_handle_deserializer(obj)
# Otherwise, return an exception object based on
# the error type.
try:
Expand Down Expand Up @@ -469,17 +464,11 @@ def _serialize_to_msgpack(self, value):
elif isinstance(value, ray.actor.ActorHandle):
# TODO(fyresone): ActorHandle should be serialized via the
# custom type feature of cross-language.
serialized, actor_handle_id, weak_ref = value._serialization_helper()
if not weak_ref:
contained_object_refs.append(actor_handle_id)
serialized, actor_handle_id = value._serialization_helper()
contained_object_refs.append(actor_handle_id)
# Update ref counting for the actor handle
metadata = ray_constants.OBJECT_METADATA_TYPE_ACTOR_HANDLE
# Append a 1 to mean weak ref or 0 for strong ref.
# We do this here instead of in the main serialization helper
# because msgpack expects a bytes object. We cannot serialize
# `weak_ref` in the C++ code because the weak_ref property is only
# available in the Python ActorHandle instance.
value = serialized + (b"1" if weak_ref else b"0")
value = serialized
else:
metadata = ray_constants.OBJECT_METADATA_TYPE_CROSS_LANGUAGE

Expand Down
3 changes: 1 addition & 2 deletions python/ray/_raylet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,7 @@ cdef class CoreWorker:
const CAddress &caller_address,
c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *returns,
CObjectID ref_generator_id=*)
cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle,
c_bool weak_ref)
cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle)
cdef c_function_descriptors_to_python(
self, const c_vector[CFunctionDescriptor] &c_function_descriptors)
cdef initialize_eventloops_for_actor_concurrency_group(
Expand Down
25 changes: 8 additions & 17 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4375,8 +4375,7 @@ cdef class CoreWorker:
CCoreWorkerProcess.GetCoreWorker().RemoveActorHandleReference(
c_actor_id)

cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle,
c_bool weak_ref):
cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle):
worker = ray._private.worker.global_worker
worker.check_connected()
manager = worker.function_actor_manager
Expand Down Expand Up @@ -4414,8 +4413,7 @@ cdef class CoreWorker:
method_meta.enable_task_events,
actor_method_cpu,
actor_creation_function_descriptor,
worker.current_cluster_and_job,
weak_ref=weak_ref)
worker.current_cluster_and_job)
else:
return ray.actor.ActorHandle(language, actor_id,
0, # max_task_retries,
Expand All @@ -4430,26 +4428,21 @@ cdef class CoreWorker:
{}, # enable_task_events
0, # actor method cpu
actor_creation_function_descriptor,
worker.current_cluster_and_job,
weak_ref=weak_ref,
)
worker.current_cluster_and_job)

def deserialize_and_register_actor_handle(self, const c_string &bytes,
ObjectRef
outer_object_ref,
c_bool weak_ref):
outer_object_ref):
cdef:
CObjectID c_outer_object_id = (outer_object_ref.native() if
outer_object_ref else
CObjectID.Nil())
c_actor_id = (CCoreWorkerProcess
.GetCoreWorker()
.DeserializeAndRegisterActorHandle(
bytes, c_outer_object_id,
add_local_ref=not weak_ref))
bytes, c_outer_object_id))
return self.make_actor_handle(
CCoreWorkerProcess.GetCoreWorker().GetActorHandle(c_actor_id),
weak_ref)
CCoreWorkerProcess.GetCoreWorker().GetActorHandle(c_actor_id))

def get_named_actor_handle(self, const c_string &name,
const c_string &ray_namespace):
Expand All @@ -4464,15 +4457,13 @@ cdef class CoreWorker:
name, ray_namespace))
check_status(named_actor_handle_pair.second)

return self.make_actor_handle(named_actor_handle_pair.first,
weak_ref=True)
return self.make_actor_handle(named_actor_handle_pair.first)

def get_actor_handle(self, ActorID actor_id):
cdef:
CActorID c_actor_id = actor_id.native()
return self.make_actor_handle(
CCoreWorkerProcess.GetCoreWorker().GetActorHandle(c_actor_id),
weak_ref=True)
CCoreWorkerProcess.GetCoreWorker().GetActorHandle(c_actor_id))

def list_named_actors(self, c_bool all_namespaces):
"""Returns (namespace, name) for named actors in the system.
Expand Down
28 changes: 5 additions & 23 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1259,13 +1259,6 @@ class ActorHandle:
_ray_original_handle: True if this is the original actor handle for a
given actor. If this is true, then the actor will be destroyed when
this handle goes out of scope.
_ray_weak_ref: True means that this handle does not count towards the
distributed ref count for the actor, i.e. the actor may be GCed
while this handle is still in scope. This is set to True if the
handle was created by getting an actor by name or by getting the
self handle. It is set to False if this is the original handle or
if it was created by passing the original handle through task args
and returns.
_ray_is_cross_language: Whether this actor is cross language.
_ray_actor_creation_function_descriptor: The function descriptor
of the actor creation task.
Expand All @@ -1289,13 +1282,11 @@ def __init__(
actor_creation_function_descriptor,
cluster_and_job,
original_handle=False,
weak_ref: bool = False,
):
self._ray_actor_language = language
self._ray_actor_id = actor_id
self._ray_max_task_retries = max_task_retries
self._ray_original_handle = original_handle
self._ray_weak_ref = weak_ref
self._ray_enable_task_events = enable_task_events

self._ray_method_is_generator = method_is_generator
Expand Down Expand Up @@ -1354,11 +1345,6 @@ def __init__(
setattr(self, method_name, method)

def __del__(self):
# Weak references don't count towards the distributed ref count, so no
# need to decrement the ref count.
if self._ray_weak_ref:
return

try:
# Mark that this actor handle has gone out of scope. Once all actor
# handles are out of scope, the actor will exit.
Expand Down Expand Up @@ -1579,19 +1565,17 @@ def _serialization_helper(self):
None,
)

return (*state, self._ray_weak_ref)
return state

@classmethod
def _deserialization_helper(cls, state, weak_ref: bool, outer_object_ref=None):
def _deserialization_helper(cls, state, outer_object_ref=None):
"""This is defined in order to make pickling work.
Args:
state: The serialized state of the actor handle.
outer_object_ref: The ObjectRef that the serialized actor handle
was contained in, if any. This is used for counting references
to the actor handle.
weak_ref: Whether this was serialized from an actor handle with a
weak ref to the actor.
"""
worker = ray._private.worker.global_worker
Expand All @@ -1600,9 +1584,7 @@ def _deserialization_helper(cls, state, weak_ref: bool, outer_object_ref=None):
if hasattr(worker, "core_worker"):
# Non-local mode
return worker.core_worker.deserialize_and_register_actor_handle(
state,
outer_object_ref,
weak_ref,
state, outer_object_ref
)
else:
# Local mode
Expand All @@ -1629,10 +1611,10 @@ def _deserialization_helper(cls, state, weak_ref: bool, outer_object_ref=None):

def __reduce__(self):
"""This code path is used by pickling but not by Ray forking."""
(serialized, _, weak_ref) = self._serialization_helper()
(serialized, _) = self._serialization_helper()
# There is no outer object ref when the actor handle is
# deserialized out-of-band using pickle.
return ActorHandle._deserialization_helper, (serialized, weak_ref, None)
return ActorHandle._deserialization_helper, (serialized, None)


def _modify_class(cls):
Expand Down
3 changes: 1 addition & 2 deletions python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const ResourceMappingType &GetResourceIDs() const
void RemoveActorHandleReference(const CActorID &actor_id)
CActorID DeserializeAndRegisterActorHandle(const c_string &bytes, const
CObjectID &outer_object_id,
c_bool add_local_ref)
CObjectID &outer_object_id)
CRayStatus SerializeActorHandle(const CActorID &actor_id, c_string
*bytes,
CObjectID *c_actor_handle_id)
Expand Down
Loading

0 comments on commit aaa6364

Please sign in to comment.