From 25b3a7bd37f0744a327479f9440baac8db700c49 Mon Sep 17 00:00:00 2001 From: Constantin M Adam Date: Fri, 22 Nov 2024 11:15:22 -0500 Subject: [PATCH] Print correctly actor class name in exception Signed-off-by: Constantin M Adam --- .../runtime/ray/ray_utils.py | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py index a854e8ec3..0480cef70 100644 --- a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py +++ b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py @@ -15,12 +15,13 @@ from typing import Any import ray -from ray.experimental.state.api import list_actors from data_processing.utils import GB, UnrecoverableException, get_logger from ray.actor import ActorHandle from ray.exceptions import RayError +from ray.experimental.state.api import list_actors from ray.util.actor_pool import ActorPool + MAX_LIST = 10000 # Max number of actors returned by list @@ -112,7 +113,7 @@ def operator() -> ActorHandle: return clazz.options(**actor_options).remote(params) logger = get_logger(__name__) - cls_name = clazz.__class__.__name__.replace('ActorClass(', '').replace(')','') + cls_name = clazz.__class__.__name__.replace("ActorClass(", "").replace(")", "") current = list_actors(filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")], limit=MAX_LIST) c_len = len(current) actors = [operator() for _ in range(n_actors)] @@ -126,20 +127,23 @@ def operator() -> ActorHandle: if len(alive) >= n_actors + c_len: return actors # failed - if len(alive) >= n_actors/2 + c_len: + if len(alive) >= n_actors / 2 + c_len: # At least half of the actors were created logger.info(f"created {n_actors}, alive {len(alive)} Running with less actors") created_ids = [item.actor_id for item in alive if item not in current] return [ - actor for actor in actors - if (str(actor._ray_actor_id). - replace('ActorID(', '').replace(')','') in created_ids)] + actor + for actor in actors + if (str(actor._ray_actor_id).replace("ActorID(", "").replace(")", "") in created_ids) + ] else: # too few actors created raise UnrecoverableException(f"Created {n_actors}, alive {len(alive)}. Too few actors were created") else: - raise UnrecoverableException(f"Overall number of actors of class {clazz.__name__} is {overall}. " - f"Too many actors to create, greater then {MAX_LIST}") + raise UnrecoverableException( + f"Overall number of actors of class {cls_name} is {overall}. " + f"Too many actors to create, greater then {MAX_LIST}" + ) @staticmethod def process_files(