From 5e6d7caa6041162d6b76f12cef5ec0ffbb523923 Mon Sep 17 00:00:00 2001 From: Rui Qiao <161574667+ruisearch42@users.noreply.github.com> Date: Fri, 6 Sep 2024 13:16:23 -0700 Subject: [PATCH] [aDAG] Allow custom NCCL group for aDAG (#47141) Allow custom NCCL group for aDAG so that we can reuse what the user already created. Marking NcclGroupInterface as DeveloperAPI for now. After validation by using it in vLLM we can change to alpha stability. vLLM prototype: vllm-project/vllm#7568 Signed-off-by: ujjawal-khare --- python/ray/dag/compiled_dag_node.py | 4 ++++ .../experimental/channel/torch_tensor_nccl_channel.py | 11 +++++------ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 4e058824f79f..60758d4ee652 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -9,6 +9,7 @@ import traceback from ray.experimental.channel.cached_channel import CachedChannel +from ray.experimental.channel.gpu_communicator import GPUCommunicator import ray from ray.exceptions import RayTaskError, RayChannelError from ray.experimental.compiled_dag_ref import ( @@ -711,6 +712,9 @@ def __init__( # Mapping from the actor handle to the node ID that the actor is on. self.actor_to_node_id: Dict["ray.actor.ActorHandle", str] = {} + # Type hints specified by the user for DAG (intermediate) outputs. + self._type_hints = [] + # This is set to true when type hint of `transport="nccl"`` is used self._use_default_nccl_group = False # This is set to the specified custom nccl group diff --git a/python/ray/experimental/channel/torch_tensor_nccl_channel.py b/python/ray/experimental/channel/torch_tensor_nccl_channel.py index c97f9913838e..da7448137c8a 100644 --- a/python/ray/experimental/channel/torch_tensor_nccl_channel.py +++ b/python/ray/experimental/channel/torch_tensor_nccl_channel.py @@ -480,9 +480,8 @@ def _get_ranks( actors: List[ray.actor.ActorHandle], custom_nccl_group: Optional[GPUCommunicator] ) -> List[int]: """ - Get ranks for the NCCL group to use. If custom_nccl_group is specified, - return the ranks of the actors in the custom NCCL group, in the same - order of the actors; otherwise, return list(range(len(actors))). + Get sorted ranks for the NCCL group to use. If custom_nccl_group is specified, + return all ranks from it, otherwise, return list(range(len(actors))). Args: actors: A list of actors that participate in the NCCL group. @@ -495,18 +494,18 @@ def _get_ranks( "The world size of the custom NCCL group does not match the number " "of actors." ) - ranks = [] + ranks = set() for actor in actors: rank = custom_nccl_group.get_rank(actor) assert rank not in ranks, "Duplicate rank in custom NCCL group" - ranks.append(rank) + ranks.add(rank) assert custom_nccl_group.get_world_size() == len(actors), ( "The world size of the custom NCCL group " f"({custom_nccl_group.get_world_size()}) " "does not match the number of actors " f"({len(actors)})." ) - return ranks + return sorted(ranks) def _init_nccl_group(