diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 4e058824f79f..60758d4ee652 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -9,6 +9,7 @@ import traceback from ray.experimental.channel.cached_channel import CachedChannel +from ray.experimental.channel.gpu_communicator import GPUCommunicator import ray from ray.exceptions import RayTaskError, RayChannelError from ray.experimental.compiled_dag_ref import ( @@ -711,6 +712,9 @@ def __init__( # Mapping from the actor handle to the node ID that the actor is on. self.actor_to_node_id: Dict["ray.actor.ActorHandle", str] = {} + # Type hints specified by the user for DAG (intermediate) outputs. + self._type_hints = [] + # This is set to true when type hint of `transport="nccl"`` is used self._use_default_nccl_group = False # This is set to the specified custom nccl group diff --git a/python/ray/experimental/channel/torch_tensor_nccl_channel.py b/python/ray/experimental/channel/torch_tensor_nccl_channel.py index c97f9913838e..da7448137c8a 100644 --- a/python/ray/experimental/channel/torch_tensor_nccl_channel.py +++ b/python/ray/experimental/channel/torch_tensor_nccl_channel.py @@ -480,9 +480,8 @@ def _get_ranks( actors: List[ray.actor.ActorHandle], custom_nccl_group: Optional[GPUCommunicator] ) -> List[int]: """ - Get ranks for the NCCL group to use. If custom_nccl_group is specified, - return the ranks of the actors in the custom NCCL group, in the same - order of the actors; otherwise, return list(range(len(actors))). + Get sorted ranks for the NCCL group to use. If custom_nccl_group is specified, + return all ranks from it, otherwise, return list(range(len(actors))). Args: actors: A list of actors that participate in the NCCL group. @@ -495,18 +494,18 @@ def _get_ranks( "The world size of the custom NCCL group does not match the number " "of actors." ) - ranks = [] + ranks = set() for actor in actors: rank = custom_nccl_group.get_rank(actor) assert rank not in ranks, "Duplicate rank in custom NCCL group" - ranks.append(rank) + ranks.add(rank) assert custom_nccl_group.get_world_size() == len(actors), ( "The world size of the custom NCCL group " f"({custom_nccl_group.get_world_size()}) " "does not match the number of actors " f"({len(actors)})." ) - return ranks + return sorted(ranks) def _init_nccl_group(