diff --git a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py index 5225508fb..ff6e53892 100644 --- a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py +++ b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py @@ -15,13 +15,19 @@ from typing import Any import ray -from ray.experimental.state.api import list_actors from data_processing.utils import GB, UnrecoverableException from ray.actor import ActorHandle from ray.exceptions import RayError +from ray.experimental.state.api import list_actors from ray.util.actor_pool import ActorPool +# This value matches the constant `RAY_MAX_LIMIT_FROM_API_SERVER` defined in the ray source code here: +# https://github.com/ray-project/ray/blob/569f7df9067c5654fb57ba7bc4792b3ba5aaa846/python/ray/util/state/common.py#L50-L53 + +RAY_MAX_ACTOR_LIMIT = 10000 + + class RayUtils: """ Class implementing support methods for Ray execution @@ -109,11 +115,13 @@ def operator() -> ActorHandle: time.sleep(creation_delay) return clazz.options(**actor_options).remote(params) - cls_name = clazz.__class__.__name__.replace('ActorClass(', '').replace(')','') + cls_name = clazz.__class__.__name__.replace("ActorClass(", "").replace(")", "") actors = [operator() for _ in range(n_actors)] for i in range(120): time.sleep(1) - alive = list_actors(filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")]) + alive = list_actors( + filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")], limit=RAY_MAX_ACTOR_LIMIT + ) if len(actors) == len(alive): return actors # failed - raise an exception