From 70e409dd336759ad932db9841ef09f49fd97099b Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 7 Nov 2025 08:55:47 -0800 Subject: [PATCH 1/6] Tidying up Signed-off-by: Alexey Kudinkin --- .../ray/data/_internal/execution/operators/join.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/python/ray/data/_internal/execution/operators/join.py b/python/ray/data/_internal/execution/operators/join.py index 2567c1f05e89..74f5897bd789 100644 --- a/python/ray/data/_internal/execution/operators/join.py +++ b/python/ray/data/_internal/execution/operators/join.py @@ -166,6 +166,7 @@ def _preprocess( left_seq_partition: pa.Table = self._get_partition_builder( input_seq_id=0, partition_id=partition_id ).build() + right_seq_partition: pa.Table = self._get_partition_builder( input_seq_id=1, partition_id=partition_id ).build() @@ -198,7 +199,6 @@ def _preprocess( should_index_r = self._should_index_side("right", supported_r, unsupported_r) # Add index columns for back-referencing if we have unsupported columns - # TODO: what are the chances of a collision with the index column? if should_index_l: supported_l = self._append_index_column( table=supported_l, col_name=self._index_name("left") @@ -246,7 +246,7 @@ def _postprocess( return supported def _index_name(self, suffix: str) -> str: - return f"__ray_data_index_level_{suffix}__" + return f"__rd_index_level_{suffix}__" def clear(self, partition_id: int): self._left_input_seq_partition_builders.pop(partition_id) @@ -263,9 +263,6 @@ def _get_partition_builder(self, *, input_seq_id: int, partition_id: int): ) return partition_builder - def _get_index_col_name(self, index: int) -> str: - return f"__index_level_{index}__" - def _should_index_side( self, side: str, supported_table: "pa.Table", unsupported_table: "pa.Table" ) -> bool: @@ -318,9 +315,8 @@ def _split_unsupported_columns( """ supported, unsupported = [], [] for idx in range(len(table.columns)): - column: "pa.ChunkedArray" = table.column(idx) - - col_type = column.type + col: "pa.ChunkedArray" = table.column(idx) + col_type: "pa.DataType" = col.type if _is_pa_extension_type(col_type) or self._is_pa_join_not_supported( col_type @@ -329,7 +325,7 @@ def _split_unsupported_columns( else: supported.append(idx) - return (table.select(supported), table.select(unsupported)) + return table.select(supported), table.select(unsupported) def _add_back_unsupported_columns( self, From 3eaf98be872d739144d299f3ea5d38624957e3d6 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 7 Nov 2025 09:25:28 -0800 Subject: [PATCH 2/6] Random sample partitions for finalization Signed-off-by: Alexey Kudinkin --- .../execution/operators/hash_shuffle.py | 43 ++++++++++--------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/python/ray/data/_internal/execution/operators/hash_shuffle.py b/python/ray/data/_internal/execution/operators/hash_shuffle.py index 6a9a2f43141b..7551da0604b4 100644 --- a/python/ray/data/_internal/execution/operators/hash_shuffle.py +++ b/python/ray/data/_internal/execution/operators/hash_shuffle.py @@ -3,6 +3,7 @@ import itertools import logging import math +import random import threading import time from collections import defaultdict, deque @@ -17,7 +18,7 @@ List, Optional, Tuple, - Union, + Union, Set, ) import numpy as np @@ -601,8 +602,8 @@ def __init__( # aggregators (keeps track which input sequences have already broadcasted # their schemas) self._has_schemas_broadcasted: DefaultDict[int, bool] = defaultdict(bool) - # Id of the last partition finalization of which had already been scheduled - self._last_finalized_partition_id: int = -1 + # Set of partitions still pending finalization + self._pending_finalization_partition_ids: Set[int] = set(range(num_partitions)) self._output_queue: Deque[RefBundle] = deque() @@ -823,11 +824,6 @@ def _try_finalize(self): if not self._is_shuffling_done(): return - logger.debug( - f"Scheduling next shuffling finalization batch (last finalized " - f"partition id is {self._last_finalized_partition_id})" - ) - def _on_bundle_ready(partition_id: int, bundle: RefBundle): # Add finalized block to the output queue self._output_queue.append(bundle) @@ -872,10 +868,8 @@ def _on_aggregation_done(partition_id: int, exc: Optional[Exception]): or self._aggregator_pool.num_aggregators ) - num_remaining_partitions = ( - self._num_partitions - 1 - self._last_finalized_partition_id - ) num_running_finalizing_tasks = len(self._finalizing_tasks) + num_remaining_partitions = len(self._pending_finalization_partition_ids) # Finalization is executed in batches of no more than # `DataContext.max_hash_shuffle_finalization_batch_size` tasks at a time. @@ -899,12 +893,19 @@ def _on_aggregation_done(partition_id: int, exc: Optional[Exception]): if next_batch_size == 0: return - # Next partition to be scheduled for finalization is the one right - # after the last one scheduled - next_partition_id = self._last_finalized_partition_id + 1 - - target_partition_ids = list( - range(next_partition_id, next_partition_id + next_batch_size) + # We're sampling randomly next set of partitions to be finalized + # to distribute finalization window uniformly across the nodes of the cluster + # and avoid effect of "sliding lense" effect where we finalize the batch of + # N *adjacent* partitions that may be co-located on the same node: + # + # - Adjacent partitions i and i+1 are handled by adjacent + # aggregators (since membership is determined as i % num_aggregators) + # + # - Adjacent aggregators have high likelihood of running on the + # same node (when num aggregators > num nodes) + target_partition_ids = random.sample( + list(self._pending_finalization_partition_ids), + next_batch_size ) logger.debug( @@ -941,15 +942,15 @@ def _on_aggregation_done(partition_id: int, exc: Optional[Exception]): ), ) + # Pop partition id from remaining set + self._pending_finalization_partition_ids.remove(partition_id) + # Update Finalize Metrics on task submission # NOTE: This is empty because the input is directly forwarded from the # output of the shuffling stage, which we don't return. empty_bundle = RefBundle([], schema=None, owns_blocks=False) self.reduce_metrics.on_task_submitted(partition_id, empty_bundle) - # Update last finalized partition id - self._last_finalized_partition_id = max(target_partition_ids) - def _do_shutdown(self, force: bool = False) -> None: self._aggregator_pool.shutdown(force=True) # NOTE: It's critical for Actor Pool to release actors before calling into @@ -1021,7 +1022,7 @@ def implements_accurate_memory_accounting(self) -> bool: return True def _is_finalized(self): - return self._last_finalized_partition_id == self._num_partitions - 1 + return len(self._pending_finalization_partition_ids) == 0 def _handle_shuffled_block_metadata( self, From 2a6994f17367bd4847c2facf746605d709f1aa84 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 7 Nov 2025 09:28:13 -0800 Subject: [PATCH 3/6] Tidying up Signed-off-by: Alexey Kudinkin --- python/ray/data/_internal/execution/operators/hash_shuffle.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/ray/data/_internal/execution/operators/hash_shuffle.py b/python/ray/data/_internal/execution/operators/hash_shuffle.py index 7551da0604b4..73765055ac65 100644 --- a/python/ray/data/_internal/execution/operators/hash_shuffle.py +++ b/python/ray/data/_internal/execution/operators/hash_shuffle.py @@ -903,6 +903,9 @@ def _on_aggregation_done(partition_id: int, exc: Optional[Exception]): # # - Adjacent aggregators have high likelihood of running on the # same node (when num aggregators > num nodes) + # + # NOTE: This doesn't affect determinism, since this only impacts order + # of finalization (hence not required to be seeded) target_partition_ids = random.sample( list(self._pending_finalization_partition_ids), next_batch_size From 04b45457a5e02bbfaf3b1e32805193e4da8a77ef Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 7 Nov 2025 09:41:39 -0800 Subject: [PATCH 4/6] `lint` Signed-off-by: Alexey Kudinkin --- .../ray/data/_internal/execution/operators/hash_shuffle.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/data/_internal/execution/operators/hash_shuffle.py b/python/ray/data/_internal/execution/operators/hash_shuffle.py index 73765055ac65..486a28efbd45 100644 --- a/python/ray/data/_internal/execution/operators/hash_shuffle.py +++ b/python/ray/data/_internal/execution/operators/hash_shuffle.py @@ -17,8 +17,9 @@ Dict, List, Optional, + Set, Tuple, - Union, Set, + Union, ) import numpy as np @@ -907,8 +908,7 @@ def _on_aggregation_done(partition_id: int, exc: Optional[Exception]): # NOTE: This doesn't affect determinism, since this only impacts order # of finalization (hence not required to be seeded) target_partition_ids = random.sample( - list(self._pending_finalization_partition_ids), - next_batch_size + list(self._pending_finalization_partition_ids), next_batch_size ) logger.debug( From c4bf9569782a999f8539623879d14a8bc9264898 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 7 Nov 2025 12:01:56 -0800 Subject: [PATCH 5/6] Fixed typo Signed-off-by: Alexey Kudinkin --- python/ray/data/_internal/execution/operators/hash_shuffle.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/_internal/execution/operators/hash_shuffle.py b/python/ray/data/_internal/execution/operators/hash_shuffle.py index 486a28efbd45..bcdae28d4afd 100644 --- a/python/ray/data/_internal/execution/operators/hash_shuffle.py +++ b/python/ray/data/_internal/execution/operators/hash_shuffle.py @@ -604,7 +604,7 @@ def __init__( # their schemas) self._has_schemas_broadcasted: DefaultDict[int, bool] = defaultdict(bool) # Set of partitions still pending finalization - self._pending_finalization_partition_ids: Set[int] = set(range(num_partitions)) + self._pending_finalization_partition_ids: Set[int] = set(range(target_num_partitions)) self._output_queue: Deque[RefBundle] = deque() From e9de75be333448a0e1aa91335b6813c8120edbbe Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 7 Nov 2025 12:30:20 -0800 Subject: [PATCH 6/6] `lint` Signed-off-by: Alexey Kudinkin --- python/ray/data/_internal/execution/operators/hash_shuffle.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/ray/data/_internal/execution/operators/hash_shuffle.py b/python/ray/data/_internal/execution/operators/hash_shuffle.py index bcdae28d4afd..69219d4535f8 100644 --- a/python/ray/data/_internal/execution/operators/hash_shuffle.py +++ b/python/ray/data/_internal/execution/operators/hash_shuffle.py @@ -604,7 +604,9 @@ def __init__( # their schemas) self._has_schemas_broadcasted: DefaultDict[int, bool] = defaultdict(bool) # Set of partitions still pending finalization - self._pending_finalization_partition_ids: Set[int] = set(range(target_num_partitions)) + self._pending_finalization_partition_ids: Set[int] = set( + range(target_num_partitions) + ) self._output_queue: Deque[RefBundle] = deque()