From e8a21ac2a522f73949839ed8451d755563e0a903 Mon Sep 17 00:00:00 2001 From: lile18 Date: Fri, 24 Nov 2023 18:10:23 +0800 Subject: [PATCH] Add the boundary param for sort in ray.data.Dataset Signed-off-by: lile18 --- python/ray/data/_internal/planner/sort.py | 6 +- python/ray/data/_internal/sort.py | 13 +++- python/ray/data/dataset.py | 3 +- python/ray/data/tests/test_sort.py | 72 ++++++++++++++++++++++- 4 files changed, 90 insertions(+), 4 deletions(-) diff --git a/python/ray/data/_internal/planner/sort.py b/python/ray/data/_internal/planner/sort.py index 5f1cc7a83ec8..5f2764bbbc8d 100644 --- a/python/ray/data/_internal/planner/sort.py +++ b/python/ray/data/_internal/planner/sort.py @@ -44,7 +44,11 @@ def fn( num_outputs = num_mappers # Sample boundaries for sort key. - boundaries = SortTaskSpec.sample_boundaries(blocks, sort_key, num_outputs) + if not sort_key.boundaries: + boundaries = SortTaskSpec.sample_boundaries(blocks, sort_key, num_outputs) + else: + boundaries = [(b, ) for b in sort_key.boundaries] + num_outputs = len(boundaries) + 1 _, ascending = sort_key.to_pandas_sort_args() if not ascending: boundaries.reverse() diff --git a/python/ray/data/_internal/sort.py b/python/ray/data/_internal/sort.py index 25fec3bf80d2..ab0e9fba8c49 100644 --- a/python/ray/data/_internal/sort.py +++ b/python/ray/data/_internal/sort.py @@ -44,6 +44,7 @@ def __init__( self, key: Optional[Union[str, List[str]]] = None, descending: Union[bool, List[bool]] = False, + boundaries: Optional[list] = None, ): if key is None: key = [] @@ -64,6 +65,7 @@ def __init__( raise ValueError("Sorting with mixed key orders not supported yet.") self._columns = key self._descending = descending + self._boundaries = boundaries def get_columns(self) -> List[str]: return self._columns @@ -94,6 +96,10 @@ def validate_schema(self, schema: Optional[Union[type, "pyarrow.lib.Schema"]]): "schema '{}'.".format(column, schema) ) + @property + def boundaries(self): + return self._boundaries + class _SortOp(ShuffleOp): @staticmethod @@ -209,7 +215,12 @@ def sort_impl( # Use same number of output partitions. num_reducers = num_mappers # TODO(swang): sample_boundaries could be fused with a previous stage. - boundaries = sample_boundaries(blocks_list, sort_key, num_reducers, ctx) + if not sort_key.boundaries: + boundaries = sample_boundaries(blocks_list, sort_key, num_reducers, ctx) + else: + boundaries = [(b, ) for b in sort_key.boundaries] + num_mappers = len(boundaries) + 1 + num_reducers = num_mappers _, ascending = sort_key.to_pandas_sort_args() if not ascending: boundaries.reverse() diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 6a1d5b07a67b..b5da7bc6f4b7 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -2212,6 +2212,7 @@ def sort( self, key: Union[str, List[str], None] = None, descending: Union[bool, List[bool]] = False, + boundaries: Optional[list] = None, ) -> "Dataset": """Sort the dataset by the specified key column or key function. @@ -2237,7 +2238,7 @@ def sort( A new, sorted :class:`Dataset`. """ - sort_key = SortKey(key, descending) + sort_key = SortKey(key, descending, boundaries) plan = self._plan.with_stage(SortStage(self, sort_key)) logical_plan = self._logical_plan diff --git a/python/ray/data/tests/test_sort.py b/python/ray/data/tests/test_sort.py index 3fcc0bfb096a..e2b2b18f3f11 100644 --- a/python/ray/data/tests/test_sort.py +++ b/python/ray/data/tests/test_sort.py @@ -5,7 +5,7 @@ import pandas as pd import pyarrow as pa import pytest - +from ray.data.context import DataContext import ray from ray.data._internal.planner.exchange.push_based_shuffle_task_scheduler import ( PushBasedShuffleTaskScheduler, @@ -18,6 +18,76 @@ from ray.tests.conftest import * # noqa +@pytest.mark.parametrize("new_backend", [True, False]) +@pytest.mark.parametrize("descending,boundaries", [(True, list(range(100,1000,200))), (False, list(range(100,1000,200))), (True, [1,998]), (False, [1,998])]) +def test_sort_with_specified_boundaries(descending, boundaries, new_backend): + def check_id_in_block(dataset, boundaries_, ordered_id, descending): + id_exp = [] + for i in range(len(boundaries_)): + if boundaries_[i] > ordered_id[0] and boundaries_[i] <= ordered_id[-1]: + if i == 0: + id_exp.append(list(range(ordered_id[0], boundaries_[i]))) + else: + if boundaries_[i - 1] <= ordered_id[0]: + id_exp.append(list(range(ordered_id[0], boundaries_[i]))) + else: + id_exp.append(list(range(boundaries_[i - 1], boundaries_[i]))) + elif boundaries_[i] <= ordered_id[0]: + id_exp.append([]) + else: + if len(id_exp) == 0: + id_exp.append([]) + else: + if len(id_exp[-1]) > 0: + if id_exp[-1][-1] >= ordered_id[-1]: + id_exp.append([]) + else: + id_exp.append(list(range(ordered_id[-1][-1] + 1, ordered_id[-1] + 1))) + else: + id_exp.append([]) + + if boundaries_[-1] <= ordered_id[0]: + id_exp.append(ordered_id) + elif boundaries_[0] > ordered_id[-1]: + id_exp.insert(0, ordered_id) + else: + id_exp.append(list(range(boundaries_[-1], ordered_id[-1] + 1))) + + + dfs = ray.get(dataset.to_pandas_refs()) + if descending: + for i in range(len(dfs)): + if dfs[i].shape[0] == 0: + assert id_exp[-1-i] != [] + else: + idx = dfs[i]["id"].values.tolist() + idx.sort() + assert idx == id_exp[-1-i] + else: + for i in range(len(dfs)): + if dfs[i].shape[0] == 0: + assert id_exp[i] != [] + else: + idx = dfs[i]["id"].values.tolist() + #print(idx, id_exp[i]) + assert idx == id_exp[i] + + + x = np.random.randn(1000, 2) + idx = np.asarray(range(1000)).reshape(-1, 1) + np.random.shuffle(idx) + x = np.concatenate([idx, x], axis=1) + x = pd.DataFrame(x, columns=["id", "a", "b"]) + ds = ray.data.from_pandas(x) + if new_backend: + DataContext.get_current().new_execution_backend = False + ds = ds.sort("id", descending, boundaries) + ordered_ids = x["id"].values.tolist() + ordered_ids.sort() + check_id_in_block(ds, boundaries, list(range(1000)), descending) + print("GOOD!") + + def test_sort_simple(ray_start_regular, use_push_based_shuffle): num_items = 100 parallelism = 4