From 363197829f0269da403e52573a841160887eed2f Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Sat, 20 Jul 2024 09:29:14 -0700 Subject: [PATCH 1/5] up Signed-off-by: Jiajun Yao --- release/serve_tests/workloads/microbenchmarks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/release/serve_tests/workloads/microbenchmarks.py b/release/serve_tests/workloads/microbenchmarks.py index b6a42f562dc7b..f4e93cffc32dc 100644 --- a/release/serve_tests/workloads/microbenchmarks.py +++ b/release/serve_tests/workloads/microbenchmarks.py @@ -117,7 +117,7 @@ async def _main(output_path: Optional[str]): payload_1mb = generate_payload(1000000) payload_10mb = generate_payload(10000000) - # HTTP + # HTP serve.run(Noop.bind()) # Microbenchmark: HTTP noop latencies latencies = await run_latency_benchmark( From aaa63641bdede6cc691c2c12614bc4b81cda906b Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Sat, 20 Jul 2024 21:41:03 -0700 Subject: [PATCH 2/5] Revert "[core] Weakref semantics for actor handles retrieved from self or by name (#45699)" This reverts commit d729815c4b88232dcb20860ff5ee1e7f871111f4. --- cpp/src/ray/runtime/abstract_ray_runtime.cc | 4 +- python/ray/_private/serialization.py | 29 ++-- python/ray/_raylet.pxd | 3 +- python/ray/_raylet.pyx | 25 ++-- python/ray/actor.py | 28 +--- python/ray/includes/libcoreworker.pxd | 3 +- python/ray/tests/test_actor.py | 131 +----------------- python/ray/tests/test_memstat.py | 7 +- python/ray/util/client/common.py | 14 +- python/ray/util/client/server/dataservicer.py | 2 - python/ray/util/client/server/server.py | 3 +- python/ray/util/client/worker.py | 2 +- src/ray/common/id.h | 5 - src/ray/core_worker/actor_manager.cc | 90 ++++++------ src/ray/core_worker/actor_manager.h | 14 +- src/ray/core_worker/core_worker.cc | 13 +- src/ray/core_worker/core_worker.h | 8 +- .../io_ray_runtime_actor_NativeActorHandle.cc | 4 +- .../core_worker/test/actor_manager_test.cc | 14 +- 19 files changed, 98 insertions(+), 301 deletions(-) diff --git a/cpp/src/ray/runtime/abstract_ray_runtime.cc b/cpp/src/ray/runtime/abstract_ray_runtime.cc index f9e7b83277d96..f0649fe1af87e 100644 --- a/cpp/src/ray/runtime/abstract_ray_runtime.cc +++ b/cpp/src/ray/runtime/abstract_ray_runtime.cc @@ -389,9 +389,7 @@ std::string AbstractRayRuntime::DeserializeAndRegisterActorHandle( const std::string &serialized_actor_handle) { auto &core_worker = CoreWorkerProcess::GetCoreWorker(); return core_worker - .DeserializeAndRegisterActorHandle(serialized_actor_handle, - ObjectID::Nil(), - /*add_local_ref=*/true) + .DeserializeAndRegisterActorHandle(serialized_actor_handle, ObjectID::Nil()) .Binary(); } diff --git a/python/ray/_private/serialization.py b/python/ray/_private/serialization.py index 397005b45f412..31c439a934931 100644 --- a/python/ray/_private/serialization.py +++ b/python/ray/_private/serialization.py @@ -101,14 +101,12 @@ def _object_ref_deserializer(binary, call_site, owner_address, object_status): return obj_ref -def _actor_handle_deserializer(serialized_obj, weak_ref): +def _actor_handle_deserializer(serialized_obj): # If this actor handle was stored in another object, then tell the # core worker. context = ray._private.worker.global_worker.get_serialization_context() outer_id = context.get_outer_object_ref() - return ray.actor.ActorHandle._deserialization_helper( - serialized_obj, weak_ref, outer_id - ) + return ray.actor.ActorHandle._deserialization_helper(serialized_obj, outer_id) class SerializationContext: @@ -124,11 +122,10 @@ def __init__(self, worker): def actor_handle_reducer(obj): ray._private.worker.global_worker.check_connected() - serialized, actor_handle_id, weak_ref = obj._serialization_helper() + serialized, actor_handle_id = obj._serialization_helper() # Update ref counting for the actor handle - if not weak_ref: - self.add_contained_object_ref(actor_handle_id) - return _actor_handle_deserializer, (serialized, weak_ref) + self.add_contained_object_ref(actor_handle_id) + return _actor_handle_deserializer, (serialized,) self._register_cloudpickle_reducer(ray.actor.ActorHandle, actor_handle_reducer) @@ -285,9 +282,7 @@ def _deserialize_object(self, data, metadata, object_ref): return data.to_pybytes() elif metadata_fields[0] == ray_constants.OBJECT_METADATA_TYPE_ACTOR_HANDLE: obj = self._deserialize_msgpack_data(data, metadata_fields) - # The last character is a 1 if weak_ref=True and 0 else. - serialized, weak_ref = obj[:-1], obj[-1:] == b"1" - return _actor_handle_deserializer(serialized, weak_ref) + return _actor_handle_deserializer(obj) # Otherwise, return an exception object based on # the error type. try: @@ -469,17 +464,11 @@ def _serialize_to_msgpack(self, value): elif isinstance(value, ray.actor.ActorHandle): # TODO(fyresone): ActorHandle should be serialized via the # custom type feature of cross-language. - serialized, actor_handle_id, weak_ref = value._serialization_helper() - if not weak_ref: - contained_object_refs.append(actor_handle_id) + serialized, actor_handle_id = value._serialization_helper() + contained_object_refs.append(actor_handle_id) # Update ref counting for the actor handle metadata = ray_constants.OBJECT_METADATA_TYPE_ACTOR_HANDLE - # Append a 1 to mean weak ref or 0 for strong ref. - # We do this here instead of in the main serialization helper - # because msgpack expects a bytes object. We cannot serialize - # `weak_ref` in the C++ code because the weak_ref property is only - # available in the Python ActorHandle instance. - value = serialized + (b"1" if weak_ref else b"0") + value = serialized else: metadata = ray_constants.OBJECT_METADATA_TYPE_CROSS_LANGUAGE diff --git a/python/ray/_raylet.pxd b/python/ray/_raylet.pxd index acb025e43cc02..155030ef4fe98 100644 --- a/python/ray/_raylet.pxd +++ b/python/ray/_raylet.pxd @@ -151,8 +151,7 @@ cdef class CoreWorker: const CAddress &caller_address, c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *returns, CObjectID ref_generator_id=*) - cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle, - c_bool weak_ref) + cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle) cdef c_function_descriptors_to_python( self, const c_vector[CFunctionDescriptor] &c_function_descriptors) cdef initialize_eventloops_for_actor_concurrency_group( diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 96fb723c39a57..466cc907fa1ad 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -4375,8 +4375,7 @@ cdef class CoreWorker: CCoreWorkerProcess.GetCoreWorker().RemoveActorHandleReference( c_actor_id) - cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle, - c_bool weak_ref): + cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle): worker = ray._private.worker.global_worker worker.check_connected() manager = worker.function_actor_manager @@ -4414,8 +4413,7 @@ cdef class CoreWorker: method_meta.enable_task_events, actor_method_cpu, actor_creation_function_descriptor, - worker.current_cluster_and_job, - weak_ref=weak_ref) + worker.current_cluster_and_job) else: return ray.actor.ActorHandle(language, actor_id, 0, # max_task_retries, @@ -4430,14 +4428,11 @@ cdef class CoreWorker: {}, # enable_task_events 0, # actor method cpu actor_creation_function_descriptor, - worker.current_cluster_and_job, - weak_ref=weak_ref, - ) + worker.current_cluster_and_job) def deserialize_and_register_actor_handle(self, const c_string &bytes, ObjectRef - outer_object_ref, - c_bool weak_ref): + outer_object_ref): cdef: CObjectID c_outer_object_id = (outer_object_ref.native() if outer_object_ref else @@ -4445,11 +4440,9 @@ cdef class CoreWorker: c_actor_id = (CCoreWorkerProcess .GetCoreWorker() .DeserializeAndRegisterActorHandle( - bytes, c_outer_object_id, - add_local_ref=not weak_ref)) + bytes, c_outer_object_id)) return self.make_actor_handle( - CCoreWorkerProcess.GetCoreWorker().GetActorHandle(c_actor_id), - weak_ref) + CCoreWorkerProcess.GetCoreWorker().GetActorHandle(c_actor_id)) def get_named_actor_handle(self, const c_string &name, const c_string &ray_namespace): @@ -4464,15 +4457,13 @@ cdef class CoreWorker: name, ray_namespace)) check_status(named_actor_handle_pair.second) - return self.make_actor_handle(named_actor_handle_pair.first, - weak_ref=True) + return self.make_actor_handle(named_actor_handle_pair.first) def get_actor_handle(self, ActorID actor_id): cdef: CActorID c_actor_id = actor_id.native() return self.make_actor_handle( - CCoreWorkerProcess.GetCoreWorker().GetActorHandle(c_actor_id), - weak_ref=True) + CCoreWorkerProcess.GetCoreWorker().GetActorHandle(c_actor_id)) def list_named_actors(self, c_bool all_namespaces): """Returns (namespace, name) for named actors in the system. diff --git a/python/ray/actor.py b/python/ray/actor.py index a9dd729288346..cba953650bac0 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -1259,13 +1259,6 @@ class ActorHandle: _ray_original_handle: True if this is the original actor handle for a given actor. If this is true, then the actor will be destroyed when this handle goes out of scope. - _ray_weak_ref: True means that this handle does not count towards the - distributed ref count for the actor, i.e. the actor may be GCed - while this handle is still in scope. This is set to True if the - handle was created by getting an actor by name or by getting the - self handle. It is set to False if this is the original handle or - if it was created by passing the original handle through task args - and returns. _ray_is_cross_language: Whether this actor is cross language. _ray_actor_creation_function_descriptor: The function descriptor of the actor creation task. @@ -1289,13 +1282,11 @@ def __init__( actor_creation_function_descriptor, cluster_and_job, original_handle=False, - weak_ref: bool = False, ): self._ray_actor_language = language self._ray_actor_id = actor_id self._ray_max_task_retries = max_task_retries self._ray_original_handle = original_handle - self._ray_weak_ref = weak_ref self._ray_enable_task_events = enable_task_events self._ray_method_is_generator = method_is_generator @@ -1354,11 +1345,6 @@ def __init__( setattr(self, method_name, method) def __del__(self): - # Weak references don't count towards the distributed ref count, so no - # need to decrement the ref count. - if self._ray_weak_ref: - return - try: # Mark that this actor handle has gone out of scope. Once all actor # handles are out of scope, the actor will exit. @@ -1579,10 +1565,10 @@ def _serialization_helper(self): None, ) - return (*state, self._ray_weak_ref) + return state @classmethod - def _deserialization_helper(cls, state, weak_ref: bool, outer_object_ref=None): + def _deserialization_helper(cls, state, outer_object_ref=None): """This is defined in order to make pickling work. Args: @@ -1590,8 +1576,6 @@ def _deserialization_helper(cls, state, weak_ref: bool, outer_object_ref=None): outer_object_ref: The ObjectRef that the serialized actor handle was contained in, if any. This is used for counting references to the actor handle. - weak_ref: Whether this was serialized from an actor handle with a - weak ref to the actor. """ worker = ray._private.worker.global_worker @@ -1600,9 +1584,7 @@ def _deserialization_helper(cls, state, weak_ref: bool, outer_object_ref=None): if hasattr(worker, "core_worker"): # Non-local mode return worker.core_worker.deserialize_and_register_actor_handle( - state, - outer_object_ref, - weak_ref, + state, outer_object_ref ) else: # Local mode @@ -1629,10 +1611,10 @@ def _deserialization_helper(cls, state, weak_ref: bool, outer_object_ref=None): def __reduce__(self): """This code path is used by pickling but not by Ray forking.""" - (serialized, _, weak_ref) = self._serialization_helper() + (serialized, _) = self._serialization_helper() # There is no outer object ref when the actor handle is # deserialized out-of-band using pickle. - return ActorHandle._deserialization_helper, (serialized, weak_ref, None) + return ActorHandle._deserialization_helper, (serialized, None) def _modify_class(cls): diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 1533c3e8dd3bc..fe84242ee12ba 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -200,8 +200,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const ResourceMappingType &GetResourceIDs() const void RemoveActorHandleReference(const CActorID &actor_id) CActorID DeserializeAndRegisterActorHandle(const c_string &bytes, const - CObjectID &outer_object_id, - c_bool add_local_ref) + CObjectID &outer_object_id) CRayStatus SerializeActorHandle(const CActorID &actor_id, c_string *bytes, CObjectID *c_actor_handle_id) diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index 706ad29c30e25..ca05fff9e172c 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -935,7 +935,6 @@ def inc_and_get(self): a = Counter.options(name="hi").remote() first_get = ray.get_actor("hi") assert ray.get(first_get.inc_and_get.remote()) == 1 - second_get = ray.get_actor("hi") assert ray.get(second_get.inc_and_get.remote()) == 2 ray.kill(a, no_restart=True) @@ -1222,10 +1221,9 @@ def hello(self): actor = Actor.options(name="ABC").remote() assert ray.get(actor.hello.remote()) == "hello" - # Getting the actor by name acts as a weakref. for _ in range(10): - named_actor = ray.get_actor("ABC") - assert ray.get(named_actor.hello.remote()) == "hello" + actor = ray.get_actor("ABC") + assert ray.get(actor.hello.remote()) == "hello" del actor @@ -1390,131 +1388,6 @@ def get_actor(actor): assert origin == remote -def test_actor_handle_weak_ref_counting(ray_start_regular_shared): - """ - Actors can get handles to themselves or to named actors but these count - only as weak refs. Check that this pattern does not crash the normal ref - counting protocol, which tracks handles passed through task args and return - values. - """ - - @ray.remote - class WeakReferenceHolder: - def pass_weak_ref(self, handle): - self.handle = handle - - @ray.remote - class Actor: - def read_self_handle(self, self_handle): - # This actor has a strong reference to itself through the arg - # self_handle. - - # Get and delete a weak reference to ourselves. This should not - # crash the distributed ref counting protocol. - # TODO(swang): Commenting these lines out currently causes the - # actor handle to leak. - weak_self_handle = ray.get_runtime_context().current_actor - del weak_self_handle - - def pass_self_handle(self, self_handle, weak_ref_holder): - # This actor has a strong reference to itself through the arg - # self_handle. - - # Pass a weak reference to ourselves to another actor. This should - # not count towards the distributed ref counting protocol. - weak_self_handle = ray.get_runtime_context().current_actor - ray.get(weak_ref_holder.pass_weak_ref.remote(weak_self_handle)) - - def read_handle_by_name(self, handle, name): - # This actor has a strong reference to another actor through the - # arg handle. - - # Get and delete a weak reference to the same actor as the one - # passed through handle. This should not crash the distributed ref - # counting protocol. - weak_handle = ray.get_actor(name=name) - del weak_handle - - def pass_named_handle(self, handle, name, weak_ref_holder): - # This actor has a strong reference to another actor through the - # arg handle. - - # Pass a weak reference to the actor to another actor. This should - # not count towards the distributed ref counting protocol. - weak_handle = ray.get_actor(name=name) - ray.get(weak_ref_holder.pass_weak_ref.remote(weak_handle)) - - def getpid(self): - return os.getpid() - - # Check ref counting when getting actors via self handle. - a = Actor.remote() - pid = ray.get(a.getpid.remote()) - for _ in range(3): - ray.get(a.read_self_handle.remote(a)) - # Check that there are no leaks after all handles have gone out of scope. - a = None - wait_for_pid_to_exit(pid) - - # Check that passing a weak ref to the self actor to other actors does not - # count towards the ref count. - weak_ref_holder = WeakReferenceHolder.remote() - a = Actor.remote() - pid = ray.get(a.getpid.remote()) - for _ in range(3): - ray.get(a.pass_self_handle.remote(a, weak_ref_holder)) - # Check that there are no leaks after all strong refs have gone out of - # scope. - a = None - wait_for_pid_to_exit(pid) - - # Check ref counting when getting actors by name. - a = Actor.remote() - b = Actor.options(name="actor").remote() - pid = ray.get(b.getpid.remote()) - for _ in range(3): - ray.get(a.read_handle_by_name.remote(b, "actor")) - # Check that there are no leaks after all handles have gone out of scope. - b = None - wait_for_pid_to_exit(pid) - - # Check that passing a weak ref to an actor handle that was gotten by name - # to other actors does not count towards the ref count. - a = Actor.remote() - b = Actor.options(name="actor").remote() - pid = ray.get(b.getpid.remote()) - for _ in range(3): - ray.get(a.pass_named_handle.remote(b, "actor", weak_ref_holder)) - # Check that there are no leaks after all strong refs have gone out of - # scope. - b = None - wait_for_pid_to_exit(pid) - - -def test_self_handle_leak(ray_start_regular_shared): - """ - Actors can get handles to themselves. Check that holding such a reference - does not cause the actor to leak. - """ - - @ray.remote - class Actor: - def read_self_handle(self, self_handle): - pass - - def getpid(self): - return os.getpid() - - # Check ref counting when getting actors via self handle. - a = Actor.remote() - pid = ray.get(a.getpid.remote()) - for _ in range(3): - ray.get(a.read_self_handle.remote(a)) - # Check that there are no leaks after all handles have gone out of scope. - a = None - wait_for_pid_to_exit(pid) - - if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) diff --git a/python/ray/tests/test_memstat.py b/python/ray/tests/test_memstat.py index 923d608df4bf1..44c09987aa43a 100644 --- a/python/ray/tests/test_memstat.py +++ b/python/ray/tests/test_memstat.py @@ -161,15 +161,16 @@ def make_actor(): x_id = actor.f.remote(np.zeros(100000)) info = ray.get(x_id) print(info) - assert num_objects(info) == 4, info + # Note, the actor will always hold a handle to the actor itself. + assert num_objects(info) == 5, info # Actor handle, task argument id, task return id. assert count(info, ACTOR_TASK_CALL_OBJ) == 3, info assert count(info, DRIVER_PID) == 3, info - assert count(info, WORKER_PID) == 1, info + assert count(info, WORKER_PID) == 2, info assert count(info, LOCAL_REF) == 1, info assert count(info, PINNED_IN_MEMORY) == 1, info assert count(info, USED_BY_PENDING_TASK) == 1, info - assert count(info, ACTOR_HANDLE) == 1, info + assert count(info, ACTOR_HANDLE) == 2, info assert count(info, DESER_ACTOR_TASK_ARG) == 1, info del x_id diff --git a/python/ray/util/client/common.py b/python/ray/util/client/common.py index caf5572c69d0b..8262ba09b71d2 100644 --- a/python/ray/util/client/common.py +++ b/python/ray/util/client/common.py @@ -196,12 +196,7 @@ def _wait_for_id(self, timeout=None): class ClientActorRef(raylet.ActorID): - def __init__( - self, - id: Union[bytes, Future], - weak_ref: Optional[bool] = False, - ): - self._weak_ref = weak_ref + def __init__(self, id: Union[bytes, Future]): self._mutex = threading.Lock() self._worker = ray.get_context().client_worker if isinstance(id, bytes): @@ -213,9 +208,6 @@ def __init__( raise TypeError("Unexpected type for id {}".format(id)) def __del__(self): - if self._weak_ref: - return - if self._worker is not None and self._worker.is_connected(): try: if not self.is_nil(): @@ -440,9 +432,7 @@ class ClientActorHandle(ClientStub): """ def __init__( - self, - actor_ref: ClientActorRef, - actor_class: Optional[ClientActorClass] = None, + self, actor_ref: ClientActorRef, actor_class: Optional[ClientActorClass] = None ): self.actor_ref = actor_ref self._dir: Optional[List[str]] = None diff --git a/python/ray/util/client/server/dataservicer.py b/python/ray/util/client/server/dataservicer.py index 9ce816856e4df..f874c34e291d8 100644 --- a/python/ray/util/client/server/dataservicer.py +++ b/python/ray/util/client/server/dataservicer.py @@ -274,8 +274,6 @@ def Datapath(self, request_iterator, context): req.task, arglist, kwargs, context ) resp = ray_client_pb2.DataResponse(task_ticket=resp_ticket) - del arglist - del kwargs elif req_type == "terminate": with self.clients_lock: response = self.basic_service.Terminate(req.terminate, context) diff --git a/python/ray/util/client/server/server.py b/python/ray/util/client/server/server.py index a0d9b89f5caec..d7a0db9a956e5 100644 --- a/python/ray/util/client/server/server.py +++ b/python/ray/util/client/server/server.py @@ -687,8 +687,7 @@ def _schedule_named_actor( # Convert empty string back to None. actor = ray.get_actor(task.name, task.namespace or None) bin_actor_id = actor._actor_id.binary() - if bin_actor_id not in self.actor_refs: - self.actor_refs[bin_actor_id] = actor + self.actor_refs[bin_actor_id] = actor self.actor_owners[task.client_id].add(bin_actor_id) self.named_actors.add(bin_actor_id) return ray_client_pb2.ClientTaskTicket(return_ids=[actor._actor_id.binary()]) diff --git a/python/ray/util/client/worker.py b/python/ray/util/client/worker.py index 57acede6bd4d5..dc9e42c76cc72 100644 --- a/python/ray/util/client/worker.py +++ b/python/ray/util/client/worker.py @@ -672,7 +672,7 @@ def get_actor( task.data = dumps_from_client(([], {}), self._client_id) futures = self._call_schedule_for_task(task, 1) assert len(futures) == 1 - handle = ClientActorHandle(ClientActorRef(futures[0], weak_ref=True)) + handle = ClientActorHandle(ClientActorRef(futures[0])) # `actor_ref.is_nil()` waits until the underlying ID is resolved. # This is needed because `get_actor` is often used to check the # existence of an actor. diff --git a/src/ray/common/id.h b/src/ray/common/id.h index 289e7560836eb..1f39ace7771f4 100644 --- a/src/ray/common/id.h +++ b/src/ray/common/id.h @@ -318,12 +318,7 @@ class ObjectID : public BaseID { /// \return The computed object ID. static ObjectID ForActorHandle(const ActorID &actor_id); - /// Whether this ObjectID represents an actor handle. This is the ObjectID - /// returned by the actor's creation task. static bool IsActorID(const ObjectID &object_id); - /// Return the ID of the actor that produces this object. For the actor - /// creation task and for tasks executed by the actor, this will return a - /// non-nil ActorID. static ActorID ToActorID(const ObjectID &object_id); MSGPACK_DEFINE(id_); diff --git a/src/ray/core_worker/actor_manager.cc b/src/ray/core_worker/actor_manager.cc index efca63feb86f5..0d1dbf8e1c097 100644 --- a/src/ray/core_worker/actor_manager.cc +++ b/src/ray/core_worker/actor_manager.cc @@ -23,7 +23,6 @@ ActorID ActorManager::RegisterActorHandle(std::unique_ptr actor_han const ObjectID &outer_object_id, const std::string &call_site, const rpc::Address &caller_address, - bool add_local_ref, bool is_self) { const ActorID actor_id = actor_handle->GetActorID(); const rpc::Address owner_address = actor_handle->GetOwnerAddress(); @@ -36,7 +35,6 @@ ActorID ActorManager::RegisterActorHandle(std::unique_ptr actor_han caller_address, actor_id, actor_creation_return_id, - add_local_ref, is_self)); ObjectID actor_handle_id = ObjectID::ForActorHandle(actor_id); reference_counter_->AddBorrowedObject(actor_handle_id, outer_object_id, owner_address); @@ -58,39 +56,49 @@ std::pair, Status> ActorManager::GetNamedActo const std::string &call_site, const rpc::Address &caller_address) { ActorID actor_id = GetCachedNamedActorID(GenerateCachedActorName(ray_namespace, name)); - if (!actor_id.IsNil()) { - return std::make_pair(GetActorHandle(actor_id), Status::OK()); - } + if (actor_id.IsNil()) { + // This call needs to be blocking because we can't return until the actor + // handle is created, which requires the response from the RPC. This is + // implemented using a promise that's captured in the RPC callback. + // There should be no risk of deadlock because we don't hold any + // locks during the call and the RPCs run on a separate thread. + rpc::ActorTableData actor_table_data; + rpc::TaskSpec task_spec; + const auto status = gcs_client_->Actors().SyncGetByName( + name, ray_namespace, actor_table_data, task_spec); + if (status.ok()) { + auto actor_handle = std::make_unique(actor_table_data, task_spec); + actor_id = actor_handle->GetActorID(); + AddNewActorHandle(std::move(actor_handle), + call_site, + caller_address, + /*is_detached*/ true); + } else { + // Use a NIL actor ID to signal that the actor wasn't found. + RAY_LOG(DEBUG) << "Failed to look up actor with name: " << name; + actor_id = ActorID::Nil(); + } - // This call needs to be blocking because we can't return until the actor - // handle is created, which requires the response from the RPC. This is - // implemented using a promise that's captured in the RPC callback. - // There should be no risk of deadlock because we don't hold any - // locks during the call and the RPCs run on a separate thread. - rpc::ActorTableData actor_table_data; - rpc::TaskSpec task_spec; - const auto status = gcs_client_->Actors().SyncGetByName( - name, ray_namespace, actor_table_data, task_spec); - if (status.ok()) { - auto actor_handle = std::make_unique(actor_table_data, task_spec); - actor_id = actor_handle->GetActorID(); - AddNewActorHandle(std::move(actor_handle), - call_site, - caller_address, - /*owned*/ false); + if (status.IsTimedOut()) { + std::ostringstream stream; + stream << "There was timeout in getting the actor handle, " + "probably because the GCS server is dead or under high load ."; + std::string error_str = stream.str(); + RAY_LOG(ERROR) << error_str; + return std::make_pair(nullptr, Status::TimedOut(error_str)); + } } else { - // Use a NIL actor ID to signal that the actor wasn't found. - RAY_LOG(DEBUG) << "Failed to look up actor with name: " << name; - actor_id = ActorID::Nil(); - } - - if (status.IsTimedOut()) { - std::ostringstream stream; - stream << "There was timeout in getting the actor handle, " - "probably because the GCS server is dead or under high load ."; - std::string error_str = stream.str(); - RAY_LOG(ERROR) << error_str; - return std::make_pair(nullptr, Status::TimedOut(error_str)); + // When the named actor is already cached, the reference of actor_creation_return_id + // must be increased, so we call AddActorHandle here to ensure that. + std::string serialized_actor_handle; + auto actor_handle = GetActorHandle(actor_id); + actor_handle->Serialize(&serialized_actor_handle); + + AddActorHandle(std::make_unique(serialized_actor_handle), + call_site, + caller_address, + actor_id, + ObjectID::ForActorHandle(actor_id)); } if (actor_id.IsNil()) { @@ -116,26 +124,27 @@ bool ActorManager::CheckActorHandleExists(const ActorID &actor_id) { bool ActorManager::AddNewActorHandle(std::unique_ptr actor_handle, const std::string &call_site, const rpc::Address &caller_address, - bool owned) { + bool is_detached) { const auto &actor_id = actor_handle->GetActorID(); const auto actor_creation_return_id = ObjectID::ForActorHandle(actor_id); // Detached actor doesn't need ref counting. - if (owned) { + if (!is_detached) { + // We don't need to add an initial local ref here because it will get added + // in AddActorHandle. reference_counter_->AddOwnedObject(actor_creation_return_id, /*inner_ids=*/{}, caller_address, call_site, /*object_size*/ -1, /*is_reconstructable=*/true, - /*add_local_ref=*/true); + /*add_local_ref=*/false); } return AddActorHandle(std::move(actor_handle), call_site, caller_address, actor_id, - actor_creation_return_id, - /*add_local_ref=*/false); + actor_creation_return_id); } bool ActorManager::AddActorHandle(std::unique_ptr actor_handle, @@ -143,11 +152,8 @@ bool ActorManager::AddActorHandle(std::unique_ptr actor_handle, const rpc::Address &caller_address, const ActorID &actor_id, const ObjectID &actor_creation_return_id, - bool add_local_ref, bool is_self) { - if (add_local_ref) { - reference_counter_->AddLocalReference(actor_creation_return_id, call_site); - } + reference_counter_->AddLocalReference(actor_creation_return_id, call_site); direct_actor_submitter_->AddActorQueueIfNotExists( actor_id, actor_handle->MaxPendingCalls(), diff --git a/src/ray/core_worker/actor_manager.h b/src/ray/core_worker/actor_manager.h index 3d3f7dd827629..373bf11fdd39f 100644 --- a/src/ray/core_worker/actor_manager.h +++ b/src/ray/core_worker/actor_manager.h @@ -53,9 +53,6 @@ class ActorManager { /// \param[in] outer_object_id The object ID that contained the serialized /// actor handle, if any. /// \param[in] call_site The caller's site. - /// \param[in] Whether to add a local ref for this actor handle. Ref count - /// should be incremented for strong refs, i.e. ones where the actor handle - /// was passed from the original handle via task arguments or returns. /// \param[in] is_self Whether this handle is current actor's handle. If true, actor /// manager won't subscribe actor info from GCS. /// \return The ActorID of the deserialized handle. @@ -63,7 +60,6 @@ class ActorManager { const ObjectID &outer_object_id, const std::string &call_site, const rpc::Address &caller_address, - bool add_local_ref, bool is_self = false); /// Get a handle to an actor. @@ -105,15 +101,13 @@ class ActorManager { /// /// \param actor_handle The handle to the actor. /// \param[in] call_site The caller's site. - /// \param[in] owned Whether or not we own the this actor, i.e. the actor is - /// not detached and we were the process that submitted the actor creation - /// task. - /// \return True if the handle was added and False if we already had a handle to + /// \param[in] is_detached Whether or not the actor of a handle is detached (named) + /// actor. \return True if the handle was added and False if we already had a handle to /// the same actor. bool AddNewActorHandle(std::unique_ptr actor_handle, const std::string &call_site, const rpc::Address &caller_address, - bool owned); + bool is_detached); /// Wait for actor out of scope. /// @@ -151,7 +145,6 @@ class ActorManager { /// \param[in] call_site The caller's site. /// \param[in] actor_id The id of an actor /// \param[in] actor_creation_return_id object id of this actor creation - /// \param[in] Whether to add a local reference for this actor. /// \param[in] is_self Whether this handle is current actor's handle. If true, actor /// to the same actor. /// manager won't subscribe actor info from GCS. @@ -162,7 +155,6 @@ class ActorManager { const rpc::Address &caller_address, const ActorID &actor_id, const ObjectID &actor_creation_return_id, - bool add_local_ref, bool is_self = false); /// Check if named actor is cached locally. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index d3a4b476fb298..2e72b725d16e9 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2351,7 +2351,7 @@ Status CoreWorker::CreateActor(const RayFunction &function, // actor handle must be in scope by the time the GCS sends the // WaitForActorOutOfScopeRequest. RAY_CHECK(actor_manager_->AddNewActorHandle( - std::move(actor_handle), CurrentCallSite(), rpc_address_, /*owned=*/!is_detached)) + std::move(actor_handle), CurrentCallSite(), rpc_address_, is_detached)) << "Actor " << actor_id << " already exists"; *return_actor_id = actor_id; TaskSpecification task_spec = builder.Build(); @@ -2726,14 +2726,10 @@ void CoreWorker::RemoveActorHandleReference(const ActorID &actor_id) { } ActorID CoreWorker::DeserializeAndRegisterActorHandle(const std::string &serialized, - const ObjectID &outer_object_id, - bool add_local_ref) { + const ObjectID &outer_object_id) { std::unique_ptr actor_handle(new ActorHandle(serialized)); - return actor_manager_->RegisterActorHandle(std::move(actor_handle), - outer_object_id, - CurrentCallSite(), - rpc_address_, - add_local_ref); + return actor_manager_->RegisterActorHandle( + std::move(actor_handle), outer_object_id, CurrentCallSite(), rpc_address_); } Status CoreWorker::SerializeActorHandle(const ActorID &actor_id, @@ -3009,7 +3005,6 @@ Status CoreWorker::ExecuteTask( ObjectID::Nil(), CurrentCallSite(), rpc_address_, - /*add_local_ref=*/false, /*is_self=*/true); } RAY_LOG(INFO) << "Creating actor: " << task_spec.ActorCreationId(); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index b4bed09ceb7e2..bb80d0234211d 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1070,15 +1070,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[in] serialized The serialized actor handle. /// \param[in] outer_object_id The object ID that contained the serialized /// actor handle, if any. - /// \param[in] add_local_ref Whether to add a local reference for this actor - /// handle. Handles that were created out-of-band (i.e. via getting actor by - /// name or getting a handle to self) should not add a local reference - /// because the distributed reference counting protocol does not ensure that - /// the owner will learn of this reference. /// \return The ActorID of the deserialized handle. ActorID DeserializeAndRegisterActorHandle(const std::string &serialized, - const ObjectID &outer_object_id, - bool add_local_ref); + const ObjectID &outer_object_id); /// Serialize an actor handle. /// diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.cc b/src/ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.cc index 957c77321ba35..97a0ca4074cfc 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.cc @@ -68,7 +68,7 @@ Java_io_ray_runtime_actor_NativeActorHandle_nativeDeserialize(JNIEnv *env, RAY_CHECK(buffer->Size() > 0); auto binary = std::string(reinterpret_cast(buffer->Data()), buffer->Size()); auto actor_id = CoreWorkerProcess::GetCoreWorker().DeserializeAndRegisterActorHandle( - binary, /*outer_object_id=*/ObjectID::Nil(), /*add_local_ref=*/true); + binary, /*outer_object_id=*/ObjectID::Nil()); return IdToJavaByteArray(env, actor_id); } @@ -79,7 +79,7 @@ Java_io_ray_runtime_actor_NativeActorHandle_nativeRemoveActorHandleReference( // We can't control the timing of Java GC, so it's normal that this method is called but // core worker is shutting down (or already shut down). If we can't get a core worker // instance here, skip calling the `RemoveLocalReference` method. - auto core_worker = CoreWorkerProcess::TryGetWorker(); + auto core_worker = CoreWorkerProcess::TryGetWorker(); if (core_worker != nullptr) { const auto actor_id = JavaByteArrayToId(env, actorId); core_worker->RemoveActorHandleReference(actor_id); diff --git a/src/ray/core_worker/test/actor_manager_test.cc b/src/ray/core_worker/test/actor_manager_test.cc index 05203ae04bd37..13af0a0c6412a 100644 --- a/src/ray/core_worker/test/actor_manager_test.cc +++ b/src/ray/core_worker/test/actor_manager_test.cc @@ -208,7 +208,7 @@ class ActorManagerTest : public ::testing::Test { actor_manager_->AddNewActorHandle(std::move(actor_handle), call_site, caller_address, - /*owned*/ true); + /*is_detached*/ false); actor_manager_->SubscribeActorState(actor_id); return actor_id; } @@ -247,7 +247,7 @@ TEST_F(ActorManagerTest, TestAddAndGetActorHandleEndToEnd) { // Add an actor handle. ASSERT_TRUE(actor_manager_->AddNewActorHandle( - std::move(actor_handle), call_site, caller_address, true)); + std::move(actor_handle), call_site, caller_address, false)); actor_manager_->SubscribeActorState(actor_id); // Make sure the subscription request is sent to GCS. @@ -269,7 +269,7 @@ TEST_F(ActorManagerTest, TestAddAndGetActorHandleEndToEnd) { false); // Make sure the same actor id adding will return false. ASSERT_FALSE(actor_manager_->AddNewActorHandle( - std::move(actor_handle2), call_site, caller_address, true)); + std::move(actor_handle2), call_site, caller_address, false)); actor_manager_->SubscribeActorState(actor_id); // Make sure we can get an actor handle correctly. @@ -326,12 +326,8 @@ TEST_F(ActorManagerTest, RegisterActorHandles) { // Sinece RegisterActor happens in a non-owner worker, we should // make sure it borrows an object. EXPECT_CALL(*reference_counter_, AddBorrowedObject(_, _, _, _)); - EXPECT_CALL(*reference_counter_, AddLocalReference(_, _)); - ActorID returned_actor_id = actor_manager_->RegisterActorHandle(std::move(actor_handle), - outer_object_id, - call_site, - caller_address, - /*add_local_ref=*/true); + ActorID returned_actor_id = actor_manager_->RegisterActorHandle( + std::move(actor_handle), outer_object_id, call_site, caller_address); ASSERT_TRUE(returned_actor_id == actor_id); // Let's try to get the handle and make sure it works. const std::shared_ptr actor_handle_to_get = From 433d8128dde4acdca73b96d0ad34e078d17da0f1 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Thu, 25 Jul 2024 14:16:27 -0700 Subject: [PATCH 3/5] Revert "Revert "[core] Weakref semantics for actor handles retrieved from self or by name (#45699)"" This reverts commit aaa63641bdede6cc691c2c12614bc4b81cda906b. --- cpp/src/ray/runtime/abstract_ray_runtime.cc | 4 +- python/ray/_private/serialization.py | 29 ++-- python/ray/_raylet.pxd | 3 +- python/ray/_raylet.pyx | 25 ++-- python/ray/actor.py | 28 +++- python/ray/includes/libcoreworker.pxd | 3 +- python/ray/tests/test_actor.py | 131 +++++++++++++++++- python/ray/tests/test_memstat.py | 7 +- python/ray/util/client/common.py | 14 +- python/ray/util/client/server/dataservicer.py | 2 + python/ray/util/client/server/server.py | 3 +- python/ray/util/client/worker.py | 2 +- src/ray/common/id.h | 5 + src/ray/core_worker/actor_manager.cc | 90 ++++++------ src/ray/core_worker/actor_manager.h | 14 +- src/ray/core_worker/core_worker.cc | 13 +- src/ray/core_worker/core_worker.h | 8 +- .../io_ray_runtime_actor_NativeActorHandle.cc | 4 +- .../core_worker/test/actor_manager_test.cc | 14 +- 19 files changed, 301 insertions(+), 98 deletions(-) diff --git a/cpp/src/ray/runtime/abstract_ray_runtime.cc b/cpp/src/ray/runtime/abstract_ray_runtime.cc index f0649fe1af87e..f9e7b83277d96 100644 --- a/cpp/src/ray/runtime/abstract_ray_runtime.cc +++ b/cpp/src/ray/runtime/abstract_ray_runtime.cc @@ -389,7 +389,9 @@ std::string AbstractRayRuntime::DeserializeAndRegisterActorHandle( const std::string &serialized_actor_handle) { auto &core_worker = CoreWorkerProcess::GetCoreWorker(); return core_worker - .DeserializeAndRegisterActorHandle(serialized_actor_handle, ObjectID::Nil()) + .DeserializeAndRegisterActorHandle(serialized_actor_handle, + ObjectID::Nil(), + /*add_local_ref=*/true) .Binary(); } diff --git a/python/ray/_private/serialization.py b/python/ray/_private/serialization.py index 31c439a934931..397005b45f412 100644 --- a/python/ray/_private/serialization.py +++ b/python/ray/_private/serialization.py @@ -101,12 +101,14 @@ def _object_ref_deserializer(binary, call_site, owner_address, object_status): return obj_ref -def _actor_handle_deserializer(serialized_obj): +def _actor_handle_deserializer(serialized_obj, weak_ref): # If this actor handle was stored in another object, then tell the # core worker. context = ray._private.worker.global_worker.get_serialization_context() outer_id = context.get_outer_object_ref() - return ray.actor.ActorHandle._deserialization_helper(serialized_obj, outer_id) + return ray.actor.ActorHandle._deserialization_helper( + serialized_obj, weak_ref, outer_id + ) class SerializationContext: @@ -122,10 +124,11 @@ def __init__(self, worker): def actor_handle_reducer(obj): ray._private.worker.global_worker.check_connected() - serialized, actor_handle_id = obj._serialization_helper() + serialized, actor_handle_id, weak_ref = obj._serialization_helper() # Update ref counting for the actor handle - self.add_contained_object_ref(actor_handle_id) - return _actor_handle_deserializer, (serialized,) + if not weak_ref: + self.add_contained_object_ref(actor_handle_id) + return _actor_handle_deserializer, (serialized, weak_ref) self._register_cloudpickle_reducer(ray.actor.ActorHandle, actor_handle_reducer) @@ -282,7 +285,9 @@ def _deserialize_object(self, data, metadata, object_ref): return data.to_pybytes() elif metadata_fields[0] == ray_constants.OBJECT_METADATA_TYPE_ACTOR_HANDLE: obj = self._deserialize_msgpack_data(data, metadata_fields) - return _actor_handle_deserializer(obj) + # The last character is a 1 if weak_ref=True and 0 else. + serialized, weak_ref = obj[:-1], obj[-1:] == b"1" + return _actor_handle_deserializer(serialized, weak_ref) # Otherwise, return an exception object based on # the error type. try: @@ -464,11 +469,17 @@ def _serialize_to_msgpack(self, value): elif isinstance(value, ray.actor.ActorHandle): # TODO(fyresone): ActorHandle should be serialized via the # custom type feature of cross-language. - serialized, actor_handle_id = value._serialization_helper() - contained_object_refs.append(actor_handle_id) + serialized, actor_handle_id, weak_ref = value._serialization_helper() + if not weak_ref: + contained_object_refs.append(actor_handle_id) # Update ref counting for the actor handle metadata = ray_constants.OBJECT_METADATA_TYPE_ACTOR_HANDLE - value = serialized + # Append a 1 to mean weak ref or 0 for strong ref. + # We do this here instead of in the main serialization helper + # because msgpack expects a bytes object. We cannot serialize + # `weak_ref` in the C++ code because the weak_ref property is only + # available in the Python ActorHandle instance. + value = serialized + (b"1" if weak_ref else b"0") else: metadata = ray_constants.OBJECT_METADATA_TYPE_CROSS_LANGUAGE diff --git a/python/ray/_raylet.pxd b/python/ray/_raylet.pxd index 155030ef4fe98..acb025e43cc02 100644 --- a/python/ray/_raylet.pxd +++ b/python/ray/_raylet.pxd @@ -151,7 +151,8 @@ cdef class CoreWorker: const CAddress &caller_address, c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *returns, CObjectID ref_generator_id=*) - cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle) + cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle, + c_bool weak_ref) cdef c_function_descriptors_to_python( self, const c_vector[CFunctionDescriptor] &c_function_descriptors) cdef initialize_eventloops_for_actor_concurrency_group( diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 466cc907fa1ad..96fb723c39a57 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -4375,7 +4375,8 @@ cdef class CoreWorker: CCoreWorkerProcess.GetCoreWorker().RemoveActorHandleReference( c_actor_id) - cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle): + cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle, + c_bool weak_ref): worker = ray._private.worker.global_worker worker.check_connected() manager = worker.function_actor_manager @@ -4413,7 +4414,8 @@ cdef class CoreWorker: method_meta.enable_task_events, actor_method_cpu, actor_creation_function_descriptor, - worker.current_cluster_and_job) + worker.current_cluster_and_job, + weak_ref=weak_ref) else: return ray.actor.ActorHandle(language, actor_id, 0, # max_task_retries, @@ -4428,11 +4430,14 @@ cdef class CoreWorker: {}, # enable_task_events 0, # actor method cpu actor_creation_function_descriptor, - worker.current_cluster_and_job) + worker.current_cluster_and_job, + weak_ref=weak_ref, + ) def deserialize_and_register_actor_handle(self, const c_string &bytes, ObjectRef - outer_object_ref): + outer_object_ref, + c_bool weak_ref): cdef: CObjectID c_outer_object_id = (outer_object_ref.native() if outer_object_ref else @@ -4440,9 +4445,11 @@ cdef class CoreWorker: c_actor_id = (CCoreWorkerProcess .GetCoreWorker() .DeserializeAndRegisterActorHandle( - bytes, c_outer_object_id)) + bytes, c_outer_object_id, + add_local_ref=not weak_ref)) return self.make_actor_handle( - CCoreWorkerProcess.GetCoreWorker().GetActorHandle(c_actor_id)) + CCoreWorkerProcess.GetCoreWorker().GetActorHandle(c_actor_id), + weak_ref) def get_named_actor_handle(self, const c_string &name, const c_string &ray_namespace): @@ -4457,13 +4464,15 @@ cdef class CoreWorker: name, ray_namespace)) check_status(named_actor_handle_pair.second) - return self.make_actor_handle(named_actor_handle_pair.first) + return self.make_actor_handle(named_actor_handle_pair.first, + weak_ref=True) def get_actor_handle(self, ActorID actor_id): cdef: CActorID c_actor_id = actor_id.native() return self.make_actor_handle( - CCoreWorkerProcess.GetCoreWorker().GetActorHandle(c_actor_id)) + CCoreWorkerProcess.GetCoreWorker().GetActorHandle(c_actor_id), + weak_ref=True) def list_named_actors(self, c_bool all_namespaces): """Returns (namespace, name) for named actors in the system. diff --git a/python/ray/actor.py b/python/ray/actor.py index cba953650bac0..a9dd729288346 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -1259,6 +1259,13 @@ class ActorHandle: _ray_original_handle: True if this is the original actor handle for a given actor. If this is true, then the actor will be destroyed when this handle goes out of scope. + _ray_weak_ref: True means that this handle does not count towards the + distributed ref count for the actor, i.e. the actor may be GCed + while this handle is still in scope. This is set to True if the + handle was created by getting an actor by name or by getting the + self handle. It is set to False if this is the original handle or + if it was created by passing the original handle through task args + and returns. _ray_is_cross_language: Whether this actor is cross language. _ray_actor_creation_function_descriptor: The function descriptor of the actor creation task. @@ -1282,11 +1289,13 @@ def __init__( actor_creation_function_descriptor, cluster_and_job, original_handle=False, + weak_ref: bool = False, ): self._ray_actor_language = language self._ray_actor_id = actor_id self._ray_max_task_retries = max_task_retries self._ray_original_handle = original_handle + self._ray_weak_ref = weak_ref self._ray_enable_task_events = enable_task_events self._ray_method_is_generator = method_is_generator @@ -1345,6 +1354,11 @@ def __init__( setattr(self, method_name, method) def __del__(self): + # Weak references don't count towards the distributed ref count, so no + # need to decrement the ref count. + if self._ray_weak_ref: + return + try: # Mark that this actor handle has gone out of scope. Once all actor # handles are out of scope, the actor will exit. @@ -1565,10 +1579,10 @@ def _serialization_helper(self): None, ) - return state + return (*state, self._ray_weak_ref) @classmethod - def _deserialization_helper(cls, state, outer_object_ref=None): + def _deserialization_helper(cls, state, weak_ref: bool, outer_object_ref=None): """This is defined in order to make pickling work. Args: @@ -1576,6 +1590,8 @@ def _deserialization_helper(cls, state, outer_object_ref=None): outer_object_ref: The ObjectRef that the serialized actor handle was contained in, if any. This is used for counting references to the actor handle. + weak_ref: Whether this was serialized from an actor handle with a + weak ref to the actor. """ worker = ray._private.worker.global_worker @@ -1584,7 +1600,9 @@ def _deserialization_helper(cls, state, outer_object_ref=None): if hasattr(worker, "core_worker"): # Non-local mode return worker.core_worker.deserialize_and_register_actor_handle( - state, outer_object_ref + state, + outer_object_ref, + weak_ref, ) else: # Local mode @@ -1611,10 +1629,10 @@ def _deserialization_helper(cls, state, outer_object_ref=None): def __reduce__(self): """This code path is used by pickling but not by Ray forking.""" - (serialized, _) = self._serialization_helper() + (serialized, _, weak_ref) = self._serialization_helper() # There is no outer object ref when the actor handle is # deserialized out-of-band using pickle. - return ActorHandle._deserialization_helper, (serialized, None) + return ActorHandle._deserialization_helper, (serialized, weak_ref, None) def _modify_class(cls): diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index fe84242ee12ba..1533c3e8dd3bc 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -200,7 +200,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const ResourceMappingType &GetResourceIDs() const void RemoveActorHandleReference(const CActorID &actor_id) CActorID DeserializeAndRegisterActorHandle(const c_string &bytes, const - CObjectID &outer_object_id) + CObjectID &outer_object_id, + c_bool add_local_ref) CRayStatus SerializeActorHandle(const CActorID &actor_id, c_string *bytes, CObjectID *c_actor_handle_id) diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index ca05fff9e172c..706ad29c30e25 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -935,6 +935,7 @@ def inc_and_get(self): a = Counter.options(name="hi").remote() first_get = ray.get_actor("hi") assert ray.get(first_get.inc_and_get.remote()) == 1 + second_get = ray.get_actor("hi") assert ray.get(second_get.inc_and_get.remote()) == 2 ray.kill(a, no_restart=True) @@ -1221,9 +1222,10 @@ def hello(self): actor = Actor.options(name="ABC").remote() assert ray.get(actor.hello.remote()) == "hello" + # Getting the actor by name acts as a weakref. for _ in range(10): - actor = ray.get_actor("ABC") - assert ray.get(actor.hello.remote()) == "hello" + named_actor = ray.get_actor("ABC") + assert ray.get(named_actor.hello.remote()) == "hello" del actor @@ -1388,6 +1390,131 @@ def get_actor(actor): assert origin == remote +def test_actor_handle_weak_ref_counting(ray_start_regular_shared): + """ + Actors can get handles to themselves or to named actors but these count + only as weak refs. Check that this pattern does not crash the normal ref + counting protocol, which tracks handles passed through task args and return + values. + """ + + @ray.remote + class WeakReferenceHolder: + def pass_weak_ref(self, handle): + self.handle = handle + + @ray.remote + class Actor: + def read_self_handle(self, self_handle): + # This actor has a strong reference to itself through the arg + # self_handle. + + # Get and delete a weak reference to ourselves. This should not + # crash the distributed ref counting protocol. + # TODO(swang): Commenting these lines out currently causes the + # actor handle to leak. + weak_self_handle = ray.get_runtime_context().current_actor + del weak_self_handle + + def pass_self_handle(self, self_handle, weak_ref_holder): + # This actor has a strong reference to itself through the arg + # self_handle. + + # Pass a weak reference to ourselves to another actor. This should + # not count towards the distributed ref counting protocol. + weak_self_handle = ray.get_runtime_context().current_actor + ray.get(weak_ref_holder.pass_weak_ref.remote(weak_self_handle)) + + def read_handle_by_name(self, handle, name): + # This actor has a strong reference to another actor through the + # arg handle. + + # Get and delete a weak reference to the same actor as the one + # passed through handle. This should not crash the distributed ref + # counting protocol. + weak_handle = ray.get_actor(name=name) + del weak_handle + + def pass_named_handle(self, handle, name, weak_ref_holder): + # This actor has a strong reference to another actor through the + # arg handle. + + # Pass a weak reference to the actor to another actor. This should + # not count towards the distributed ref counting protocol. + weak_handle = ray.get_actor(name=name) + ray.get(weak_ref_holder.pass_weak_ref.remote(weak_handle)) + + def getpid(self): + return os.getpid() + + # Check ref counting when getting actors via self handle. + a = Actor.remote() + pid = ray.get(a.getpid.remote()) + for _ in range(3): + ray.get(a.read_self_handle.remote(a)) + # Check that there are no leaks after all handles have gone out of scope. + a = None + wait_for_pid_to_exit(pid) + + # Check that passing a weak ref to the self actor to other actors does not + # count towards the ref count. + weak_ref_holder = WeakReferenceHolder.remote() + a = Actor.remote() + pid = ray.get(a.getpid.remote()) + for _ in range(3): + ray.get(a.pass_self_handle.remote(a, weak_ref_holder)) + # Check that there are no leaks after all strong refs have gone out of + # scope. + a = None + wait_for_pid_to_exit(pid) + + # Check ref counting when getting actors by name. + a = Actor.remote() + b = Actor.options(name="actor").remote() + pid = ray.get(b.getpid.remote()) + for _ in range(3): + ray.get(a.read_handle_by_name.remote(b, "actor")) + # Check that there are no leaks after all handles have gone out of scope. + b = None + wait_for_pid_to_exit(pid) + + # Check that passing a weak ref to an actor handle that was gotten by name + # to other actors does not count towards the ref count. + a = Actor.remote() + b = Actor.options(name="actor").remote() + pid = ray.get(b.getpid.remote()) + for _ in range(3): + ray.get(a.pass_named_handle.remote(b, "actor", weak_ref_holder)) + # Check that there are no leaks after all strong refs have gone out of + # scope. + b = None + wait_for_pid_to_exit(pid) + + +def test_self_handle_leak(ray_start_regular_shared): + """ + Actors can get handles to themselves. Check that holding such a reference + does not cause the actor to leak. + """ + + @ray.remote + class Actor: + def read_self_handle(self, self_handle): + pass + + def getpid(self): + return os.getpid() + + # Check ref counting when getting actors via self handle. + a = Actor.remote() + pid = ray.get(a.getpid.remote()) + for _ in range(3): + ray.get(a.read_self_handle.remote(a)) + # Check that there are no leaks after all handles have gone out of scope. + a = None + wait_for_pid_to_exit(pid) + + if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) diff --git a/python/ray/tests/test_memstat.py b/python/ray/tests/test_memstat.py index 44c09987aa43a..923d608df4bf1 100644 --- a/python/ray/tests/test_memstat.py +++ b/python/ray/tests/test_memstat.py @@ -161,16 +161,15 @@ def make_actor(): x_id = actor.f.remote(np.zeros(100000)) info = ray.get(x_id) print(info) - # Note, the actor will always hold a handle to the actor itself. - assert num_objects(info) == 5, info + assert num_objects(info) == 4, info # Actor handle, task argument id, task return id. assert count(info, ACTOR_TASK_CALL_OBJ) == 3, info assert count(info, DRIVER_PID) == 3, info - assert count(info, WORKER_PID) == 2, info + assert count(info, WORKER_PID) == 1, info assert count(info, LOCAL_REF) == 1, info assert count(info, PINNED_IN_MEMORY) == 1, info assert count(info, USED_BY_PENDING_TASK) == 1, info - assert count(info, ACTOR_HANDLE) == 2, info + assert count(info, ACTOR_HANDLE) == 1, info assert count(info, DESER_ACTOR_TASK_ARG) == 1, info del x_id diff --git a/python/ray/util/client/common.py b/python/ray/util/client/common.py index 8262ba09b71d2..caf5572c69d0b 100644 --- a/python/ray/util/client/common.py +++ b/python/ray/util/client/common.py @@ -196,7 +196,12 @@ def _wait_for_id(self, timeout=None): class ClientActorRef(raylet.ActorID): - def __init__(self, id: Union[bytes, Future]): + def __init__( + self, + id: Union[bytes, Future], + weak_ref: Optional[bool] = False, + ): + self._weak_ref = weak_ref self._mutex = threading.Lock() self._worker = ray.get_context().client_worker if isinstance(id, bytes): @@ -208,6 +213,9 @@ def __init__(self, id: Union[bytes, Future]): raise TypeError("Unexpected type for id {}".format(id)) def __del__(self): + if self._weak_ref: + return + if self._worker is not None and self._worker.is_connected(): try: if not self.is_nil(): @@ -432,7 +440,9 @@ class ClientActorHandle(ClientStub): """ def __init__( - self, actor_ref: ClientActorRef, actor_class: Optional[ClientActorClass] = None + self, + actor_ref: ClientActorRef, + actor_class: Optional[ClientActorClass] = None, ): self.actor_ref = actor_ref self._dir: Optional[List[str]] = None diff --git a/python/ray/util/client/server/dataservicer.py b/python/ray/util/client/server/dataservicer.py index f874c34e291d8..9ce816856e4df 100644 --- a/python/ray/util/client/server/dataservicer.py +++ b/python/ray/util/client/server/dataservicer.py @@ -274,6 +274,8 @@ def Datapath(self, request_iterator, context): req.task, arglist, kwargs, context ) resp = ray_client_pb2.DataResponse(task_ticket=resp_ticket) + del arglist + del kwargs elif req_type == "terminate": with self.clients_lock: response = self.basic_service.Terminate(req.terminate, context) diff --git a/python/ray/util/client/server/server.py b/python/ray/util/client/server/server.py index d7a0db9a956e5..a0d9b89f5caec 100644 --- a/python/ray/util/client/server/server.py +++ b/python/ray/util/client/server/server.py @@ -687,7 +687,8 @@ def _schedule_named_actor( # Convert empty string back to None. actor = ray.get_actor(task.name, task.namespace or None) bin_actor_id = actor._actor_id.binary() - self.actor_refs[bin_actor_id] = actor + if bin_actor_id not in self.actor_refs: + self.actor_refs[bin_actor_id] = actor self.actor_owners[task.client_id].add(bin_actor_id) self.named_actors.add(bin_actor_id) return ray_client_pb2.ClientTaskTicket(return_ids=[actor._actor_id.binary()]) diff --git a/python/ray/util/client/worker.py b/python/ray/util/client/worker.py index dc9e42c76cc72..57acede6bd4d5 100644 --- a/python/ray/util/client/worker.py +++ b/python/ray/util/client/worker.py @@ -672,7 +672,7 @@ def get_actor( task.data = dumps_from_client(([], {}), self._client_id) futures = self._call_schedule_for_task(task, 1) assert len(futures) == 1 - handle = ClientActorHandle(ClientActorRef(futures[0])) + handle = ClientActorHandle(ClientActorRef(futures[0], weak_ref=True)) # `actor_ref.is_nil()` waits until the underlying ID is resolved. # This is needed because `get_actor` is often used to check the # existence of an actor. diff --git a/src/ray/common/id.h b/src/ray/common/id.h index 1f39ace7771f4..289e7560836eb 100644 --- a/src/ray/common/id.h +++ b/src/ray/common/id.h @@ -318,7 +318,12 @@ class ObjectID : public BaseID { /// \return The computed object ID. static ObjectID ForActorHandle(const ActorID &actor_id); + /// Whether this ObjectID represents an actor handle. This is the ObjectID + /// returned by the actor's creation task. static bool IsActorID(const ObjectID &object_id); + /// Return the ID of the actor that produces this object. For the actor + /// creation task and for tasks executed by the actor, this will return a + /// non-nil ActorID. static ActorID ToActorID(const ObjectID &object_id); MSGPACK_DEFINE(id_); diff --git a/src/ray/core_worker/actor_manager.cc b/src/ray/core_worker/actor_manager.cc index 0d1dbf8e1c097..efca63feb86f5 100644 --- a/src/ray/core_worker/actor_manager.cc +++ b/src/ray/core_worker/actor_manager.cc @@ -23,6 +23,7 @@ ActorID ActorManager::RegisterActorHandle(std::unique_ptr actor_han const ObjectID &outer_object_id, const std::string &call_site, const rpc::Address &caller_address, + bool add_local_ref, bool is_self) { const ActorID actor_id = actor_handle->GetActorID(); const rpc::Address owner_address = actor_handle->GetOwnerAddress(); @@ -35,6 +36,7 @@ ActorID ActorManager::RegisterActorHandle(std::unique_ptr actor_han caller_address, actor_id, actor_creation_return_id, + add_local_ref, is_self)); ObjectID actor_handle_id = ObjectID::ForActorHandle(actor_id); reference_counter_->AddBorrowedObject(actor_handle_id, outer_object_id, owner_address); @@ -56,49 +58,39 @@ std::pair, Status> ActorManager::GetNamedActo const std::string &call_site, const rpc::Address &caller_address) { ActorID actor_id = GetCachedNamedActorID(GenerateCachedActorName(ray_namespace, name)); - if (actor_id.IsNil()) { - // This call needs to be blocking because we can't return until the actor - // handle is created, which requires the response from the RPC. This is - // implemented using a promise that's captured in the RPC callback. - // There should be no risk of deadlock because we don't hold any - // locks during the call and the RPCs run on a separate thread. - rpc::ActorTableData actor_table_data; - rpc::TaskSpec task_spec; - const auto status = gcs_client_->Actors().SyncGetByName( - name, ray_namespace, actor_table_data, task_spec); - if (status.ok()) { - auto actor_handle = std::make_unique(actor_table_data, task_spec); - actor_id = actor_handle->GetActorID(); - AddNewActorHandle(std::move(actor_handle), - call_site, - caller_address, - /*is_detached*/ true); - } else { - // Use a NIL actor ID to signal that the actor wasn't found. - RAY_LOG(DEBUG) << "Failed to look up actor with name: " << name; - actor_id = ActorID::Nil(); - } + if (!actor_id.IsNil()) { + return std::make_pair(GetActorHandle(actor_id), Status::OK()); + } - if (status.IsTimedOut()) { - std::ostringstream stream; - stream << "There was timeout in getting the actor handle, " - "probably because the GCS server is dead or under high load ."; - std::string error_str = stream.str(); - RAY_LOG(ERROR) << error_str; - return std::make_pair(nullptr, Status::TimedOut(error_str)); - } + // This call needs to be blocking because we can't return until the actor + // handle is created, which requires the response from the RPC. This is + // implemented using a promise that's captured in the RPC callback. + // There should be no risk of deadlock because we don't hold any + // locks during the call and the RPCs run on a separate thread. + rpc::ActorTableData actor_table_data; + rpc::TaskSpec task_spec; + const auto status = gcs_client_->Actors().SyncGetByName( + name, ray_namespace, actor_table_data, task_spec); + if (status.ok()) { + auto actor_handle = std::make_unique(actor_table_data, task_spec); + actor_id = actor_handle->GetActorID(); + AddNewActorHandle(std::move(actor_handle), + call_site, + caller_address, + /*owned*/ false); } else { - // When the named actor is already cached, the reference of actor_creation_return_id - // must be increased, so we call AddActorHandle here to ensure that. - std::string serialized_actor_handle; - auto actor_handle = GetActorHandle(actor_id); - actor_handle->Serialize(&serialized_actor_handle); - - AddActorHandle(std::make_unique(serialized_actor_handle), - call_site, - caller_address, - actor_id, - ObjectID::ForActorHandle(actor_id)); + // Use a NIL actor ID to signal that the actor wasn't found. + RAY_LOG(DEBUG) << "Failed to look up actor with name: " << name; + actor_id = ActorID::Nil(); + } + + if (status.IsTimedOut()) { + std::ostringstream stream; + stream << "There was timeout in getting the actor handle, " + "probably because the GCS server is dead or under high load ."; + std::string error_str = stream.str(); + RAY_LOG(ERROR) << error_str; + return std::make_pair(nullptr, Status::TimedOut(error_str)); } if (actor_id.IsNil()) { @@ -124,27 +116,26 @@ bool ActorManager::CheckActorHandleExists(const ActorID &actor_id) { bool ActorManager::AddNewActorHandle(std::unique_ptr actor_handle, const std::string &call_site, const rpc::Address &caller_address, - bool is_detached) { + bool owned) { const auto &actor_id = actor_handle->GetActorID(); const auto actor_creation_return_id = ObjectID::ForActorHandle(actor_id); // Detached actor doesn't need ref counting. - if (!is_detached) { - // We don't need to add an initial local ref here because it will get added - // in AddActorHandle. + if (owned) { reference_counter_->AddOwnedObject(actor_creation_return_id, /*inner_ids=*/{}, caller_address, call_site, /*object_size*/ -1, /*is_reconstructable=*/true, - /*add_local_ref=*/false); + /*add_local_ref=*/true); } return AddActorHandle(std::move(actor_handle), call_site, caller_address, actor_id, - actor_creation_return_id); + actor_creation_return_id, + /*add_local_ref=*/false); } bool ActorManager::AddActorHandle(std::unique_ptr actor_handle, @@ -152,8 +143,11 @@ bool ActorManager::AddActorHandle(std::unique_ptr actor_handle, const rpc::Address &caller_address, const ActorID &actor_id, const ObjectID &actor_creation_return_id, + bool add_local_ref, bool is_self) { - reference_counter_->AddLocalReference(actor_creation_return_id, call_site); + if (add_local_ref) { + reference_counter_->AddLocalReference(actor_creation_return_id, call_site); + } direct_actor_submitter_->AddActorQueueIfNotExists( actor_id, actor_handle->MaxPendingCalls(), diff --git a/src/ray/core_worker/actor_manager.h b/src/ray/core_worker/actor_manager.h index 373bf11fdd39f..3d3f7dd827629 100644 --- a/src/ray/core_worker/actor_manager.h +++ b/src/ray/core_worker/actor_manager.h @@ -53,6 +53,9 @@ class ActorManager { /// \param[in] outer_object_id The object ID that contained the serialized /// actor handle, if any. /// \param[in] call_site The caller's site. + /// \param[in] Whether to add a local ref for this actor handle. Ref count + /// should be incremented for strong refs, i.e. ones where the actor handle + /// was passed from the original handle via task arguments or returns. /// \param[in] is_self Whether this handle is current actor's handle. If true, actor /// manager won't subscribe actor info from GCS. /// \return The ActorID of the deserialized handle. @@ -60,6 +63,7 @@ class ActorManager { const ObjectID &outer_object_id, const std::string &call_site, const rpc::Address &caller_address, + bool add_local_ref, bool is_self = false); /// Get a handle to an actor. @@ -101,13 +105,15 @@ class ActorManager { /// /// \param actor_handle The handle to the actor. /// \param[in] call_site The caller's site. - /// \param[in] is_detached Whether or not the actor of a handle is detached (named) - /// actor. \return True if the handle was added and False if we already had a handle to + /// \param[in] owned Whether or not we own the this actor, i.e. the actor is + /// not detached and we were the process that submitted the actor creation + /// task. + /// \return True if the handle was added and False if we already had a handle to /// the same actor. bool AddNewActorHandle(std::unique_ptr actor_handle, const std::string &call_site, const rpc::Address &caller_address, - bool is_detached); + bool owned); /// Wait for actor out of scope. /// @@ -145,6 +151,7 @@ class ActorManager { /// \param[in] call_site The caller's site. /// \param[in] actor_id The id of an actor /// \param[in] actor_creation_return_id object id of this actor creation + /// \param[in] Whether to add a local reference for this actor. /// \param[in] is_self Whether this handle is current actor's handle. If true, actor /// to the same actor. /// manager won't subscribe actor info from GCS. @@ -155,6 +162,7 @@ class ActorManager { const rpc::Address &caller_address, const ActorID &actor_id, const ObjectID &actor_creation_return_id, + bool add_local_ref, bool is_self = false); /// Check if named actor is cached locally. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 2e72b725d16e9..d3a4b476fb298 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2351,7 +2351,7 @@ Status CoreWorker::CreateActor(const RayFunction &function, // actor handle must be in scope by the time the GCS sends the // WaitForActorOutOfScopeRequest. RAY_CHECK(actor_manager_->AddNewActorHandle( - std::move(actor_handle), CurrentCallSite(), rpc_address_, is_detached)) + std::move(actor_handle), CurrentCallSite(), rpc_address_, /*owned=*/!is_detached)) << "Actor " << actor_id << " already exists"; *return_actor_id = actor_id; TaskSpecification task_spec = builder.Build(); @@ -2726,10 +2726,14 @@ void CoreWorker::RemoveActorHandleReference(const ActorID &actor_id) { } ActorID CoreWorker::DeserializeAndRegisterActorHandle(const std::string &serialized, - const ObjectID &outer_object_id) { + const ObjectID &outer_object_id, + bool add_local_ref) { std::unique_ptr actor_handle(new ActorHandle(serialized)); - return actor_manager_->RegisterActorHandle( - std::move(actor_handle), outer_object_id, CurrentCallSite(), rpc_address_); + return actor_manager_->RegisterActorHandle(std::move(actor_handle), + outer_object_id, + CurrentCallSite(), + rpc_address_, + add_local_ref); } Status CoreWorker::SerializeActorHandle(const ActorID &actor_id, @@ -3005,6 +3009,7 @@ Status CoreWorker::ExecuteTask( ObjectID::Nil(), CurrentCallSite(), rpc_address_, + /*add_local_ref=*/false, /*is_self=*/true); } RAY_LOG(INFO) << "Creating actor: " << task_spec.ActorCreationId(); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index bb80d0234211d..b4bed09ceb7e2 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1070,9 +1070,15 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[in] serialized The serialized actor handle. /// \param[in] outer_object_id The object ID that contained the serialized /// actor handle, if any. + /// \param[in] add_local_ref Whether to add a local reference for this actor + /// handle. Handles that were created out-of-band (i.e. via getting actor by + /// name or getting a handle to self) should not add a local reference + /// because the distributed reference counting protocol does not ensure that + /// the owner will learn of this reference. /// \return The ActorID of the deserialized handle. ActorID DeserializeAndRegisterActorHandle(const std::string &serialized, - const ObjectID &outer_object_id); + const ObjectID &outer_object_id, + bool add_local_ref); /// Serialize an actor handle. /// diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.cc b/src/ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.cc index 97a0ca4074cfc..957c77321ba35 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.cc @@ -68,7 +68,7 @@ Java_io_ray_runtime_actor_NativeActorHandle_nativeDeserialize(JNIEnv *env, RAY_CHECK(buffer->Size() > 0); auto binary = std::string(reinterpret_cast(buffer->Data()), buffer->Size()); auto actor_id = CoreWorkerProcess::GetCoreWorker().DeserializeAndRegisterActorHandle( - binary, /*outer_object_id=*/ObjectID::Nil()); + binary, /*outer_object_id=*/ObjectID::Nil(), /*add_local_ref=*/true); return IdToJavaByteArray(env, actor_id); } @@ -79,7 +79,7 @@ Java_io_ray_runtime_actor_NativeActorHandle_nativeRemoveActorHandleReference( // We can't control the timing of Java GC, so it's normal that this method is called but // core worker is shutting down (or already shut down). If we can't get a core worker // instance here, skip calling the `RemoveLocalReference` method. - auto core_worker = CoreWorkerProcess::TryGetWorker(); + auto core_worker = CoreWorkerProcess::TryGetWorker(); if (core_worker != nullptr) { const auto actor_id = JavaByteArrayToId(env, actorId); core_worker->RemoveActorHandleReference(actor_id); diff --git a/src/ray/core_worker/test/actor_manager_test.cc b/src/ray/core_worker/test/actor_manager_test.cc index 13af0a0c6412a..05203ae04bd37 100644 --- a/src/ray/core_worker/test/actor_manager_test.cc +++ b/src/ray/core_worker/test/actor_manager_test.cc @@ -208,7 +208,7 @@ class ActorManagerTest : public ::testing::Test { actor_manager_->AddNewActorHandle(std::move(actor_handle), call_site, caller_address, - /*is_detached*/ false); + /*owned*/ true); actor_manager_->SubscribeActorState(actor_id); return actor_id; } @@ -247,7 +247,7 @@ TEST_F(ActorManagerTest, TestAddAndGetActorHandleEndToEnd) { // Add an actor handle. ASSERT_TRUE(actor_manager_->AddNewActorHandle( - std::move(actor_handle), call_site, caller_address, false)); + std::move(actor_handle), call_site, caller_address, true)); actor_manager_->SubscribeActorState(actor_id); // Make sure the subscription request is sent to GCS. @@ -269,7 +269,7 @@ TEST_F(ActorManagerTest, TestAddAndGetActorHandleEndToEnd) { false); // Make sure the same actor id adding will return false. ASSERT_FALSE(actor_manager_->AddNewActorHandle( - std::move(actor_handle2), call_site, caller_address, false)); + std::move(actor_handle2), call_site, caller_address, true)); actor_manager_->SubscribeActorState(actor_id); // Make sure we can get an actor handle correctly. @@ -326,8 +326,12 @@ TEST_F(ActorManagerTest, RegisterActorHandles) { // Sinece RegisterActor happens in a non-owner worker, we should // make sure it borrows an object. EXPECT_CALL(*reference_counter_, AddBorrowedObject(_, _, _, _)); - ActorID returned_actor_id = actor_manager_->RegisterActorHandle( - std::move(actor_handle), outer_object_id, call_site, caller_address); + EXPECT_CALL(*reference_counter_, AddLocalReference(_, _)); + ActorID returned_actor_id = actor_manager_->RegisterActorHandle(std::move(actor_handle), + outer_object_id, + call_site, + caller_address, + /*add_local_ref=*/true); ASSERT_TRUE(returned_actor_id == actor_id); // Let's try to get the handle and make sure it works. const std::shared_ptr actor_handle_to_get = From 7ca547602e8f825c976020619c557eb5c7ff4d9e Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Thu, 25 Jul 2024 16:24:27 -0700 Subject: [PATCH 4/5] up Signed-off-by: Jiajun Yao --- release/serve_tests/workloads/microbenchmarks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/release/serve_tests/workloads/microbenchmarks.py b/release/serve_tests/workloads/microbenchmarks.py index bb042a767ac34..6fced5903a428 100644 --- a/release/serve_tests/workloads/microbenchmarks.py +++ b/release/serve_tests/workloads/microbenchmarks.py @@ -117,7 +117,7 @@ async def _main(output_path: Optional[str]): payload_1mb = generate_payload(1000000) payload_10mb = generate_payload(10000000) - # HTP + # HTTP serve.run(Noop.bind()) # Microbenchmark: HTTP noop latencies latencies = await run_latency_benchmark( From 910c4053ce79f931245863fa73ae9aaa3388367f Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Thu, 25 Jul 2024 17:03:31 -0700 Subject: [PATCH 5/5] Emit total lineage bytes metric Signed-off-by: Jiajun Yao --- src/ray/core_worker/task_manager.cc | 1 + src/ray/stats/metric_defs.cc | 9 +++++++++ src/ray/stats/metric_defs.h | 3 +++ 3 files changed, 13 insertions(+) diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index e50d8c9fe5ffa..cd991afbda380 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -1525,6 +1525,7 @@ void TaskManager::FillTaskInfo(rpc::GetCoreWorkerStatsReply *reply, void TaskManager::RecordMetrics() { absl::MutexLock lock(&mu_); + ray::stats::STATS_total_lineage_bytes.Record(total_lineage_footprint_bytes_); task_counter_.FlushOnChangeCallbacks(); } diff --git a/src/ray/stats/metric_defs.cc b/src/ray/stats/metric_defs.cc index c63100e44d81b..114e6c07434d5 100644 --- a/src/ray/stats/metric_defs.cc +++ b/src/ray/stats/metric_defs.cc @@ -357,6 +357,15 @@ DEFINE_stats( ("Type", "Name"), (), ray::stats::COUNT); + +/// Core Worker Task Manager +DEFINE_stats( + total_lineage_bytes, + "Total amount of memory used to store task specs for lineage reconstruction.", + (), + (), + ray::stats::GAUGE); + } // namespace stats } // namespace ray diff --git a/src/ray/stats/metric_defs.h b/src/ray/stats/metric_defs.h index ea3d1165cb41c..44d77b8171594 100644 --- a/src/ray/stats/metric_defs.h +++ b/src/ray/stats/metric_defs.h @@ -119,6 +119,9 @@ DECLARE_stats(gcs_actors_count); /// Memory Manager DECLARE_stats(memory_manager_worker_eviction_total); +/// Core Worker Task Manager +DECLARE_stats(total_lineage_bytes); + /// The below items are legacy implementation of metrics. /// TODO(sang): Use DEFINE_stats instead.