From e8a21ac2a522f73949839ed8451d755563e0a903 Mon Sep 17 00:00:00 2001 From: lile18 Date: Fri, 24 Nov 2023 18:10:23 +0800 Subject: [PATCH 01/14] Add the boundary param for sort in ray.data.Dataset Signed-off-by: lile18 --- python/ray/data/_internal/planner/sort.py | 6 +- python/ray/data/_internal/sort.py | 13 +++- python/ray/data/dataset.py | 3 +- python/ray/data/tests/test_sort.py | 72 ++++++++++++++++++++++- 4 files changed, 90 insertions(+), 4 deletions(-) diff --git a/python/ray/data/_internal/planner/sort.py b/python/ray/data/_internal/planner/sort.py index 5f1cc7a83ec8..5f2764bbbc8d 100644 --- a/python/ray/data/_internal/planner/sort.py +++ b/python/ray/data/_internal/planner/sort.py @@ -44,7 +44,11 @@ def fn( num_outputs = num_mappers # Sample boundaries for sort key. - boundaries = SortTaskSpec.sample_boundaries(blocks, sort_key, num_outputs) + if not sort_key.boundaries: + boundaries = SortTaskSpec.sample_boundaries(blocks, sort_key, num_outputs) + else: + boundaries = [(b, ) for b in sort_key.boundaries] + num_outputs = len(boundaries) + 1 _, ascending = sort_key.to_pandas_sort_args() if not ascending: boundaries.reverse() diff --git a/python/ray/data/_internal/sort.py b/python/ray/data/_internal/sort.py index 25fec3bf80d2..ab0e9fba8c49 100644 --- a/python/ray/data/_internal/sort.py +++ b/python/ray/data/_internal/sort.py @@ -44,6 +44,7 @@ def __init__( self, key: Optional[Union[str, List[str]]] = None, descending: Union[bool, List[bool]] = False, + boundaries: Optional[list] = None, ): if key is None: key = [] @@ -64,6 +65,7 @@ def __init__( raise ValueError("Sorting with mixed key orders not supported yet.") self._columns = key self._descending = descending + self._boundaries = boundaries def get_columns(self) -> List[str]: return self._columns @@ -94,6 +96,10 @@ def validate_schema(self, schema: Optional[Union[type, "pyarrow.lib.Schema"]]): "schema '{}'.".format(column, schema) ) + @property + def boundaries(self): + return self._boundaries + class _SortOp(ShuffleOp): @staticmethod @@ -209,7 +215,12 @@ def sort_impl( # Use same number of output partitions. num_reducers = num_mappers # TODO(swang): sample_boundaries could be fused with a previous stage. - boundaries = sample_boundaries(blocks_list, sort_key, num_reducers, ctx) + if not sort_key.boundaries: + boundaries = sample_boundaries(blocks_list, sort_key, num_reducers, ctx) + else: + boundaries = [(b, ) for b in sort_key.boundaries] + num_mappers = len(boundaries) + 1 + num_reducers = num_mappers _, ascending = sort_key.to_pandas_sort_args() if not ascending: boundaries.reverse() diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 6a1d5b07a67b..b5da7bc6f4b7 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -2212,6 +2212,7 @@ def sort( self, key: Union[str, List[str], None] = None, descending: Union[bool, List[bool]] = False, + boundaries: Optional[list] = None, ) -> "Dataset": """Sort the dataset by the specified key column or key function. @@ -2237,7 +2238,7 @@ def sort( A new, sorted :class:`Dataset`. """ - sort_key = SortKey(key, descending) + sort_key = SortKey(key, descending, boundaries) plan = self._plan.with_stage(SortStage(self, sort_key)) logical_plan = self._logical_plan diff --git a/python/ray/data/tests/test_sort.py b/python/ray/data/tests/test_sort.py index 3fcc0bfb096a..e2b2b18f3f11 100644 --- a/python/ray/data/tests/test_sort.py +++ b/python/ray/data/tests/test_sort.py @@ -5,7 +5,7 @@ import pandas as pd import pyarrow as pa import pytest - +from ray.data.context import DataContext import ray from ray.data._internal.planner.exchange.push_based_shuffle_task_scheduler import ( PushBasedShuffleTaskScheduler, @@ -18,6 +18,76 @@ from ray.tests.conftest import * # noqa +@pytest.mark.parametrize("new_backend", [True, False]) +@pytest.mark.parametrize("descending,boundaries", [(True, list(range(100,1000,200))), (False, list(range(100,1000,200))), (True, [1,998]), (False, [1,998])]) +def test_sort_with_specified_boundaries(descending, boundaries, new_backend): + def check_id_in_block(dataset, boundaries_, ordered_id, descending): + id_exp = [] + for i in range(len(boundaries_)): + if boundaries_[i] > ordered_id[0] and boundaries_[i] <= ordered_id[-1]: + if i == 0: + id_exp.append(list(range(ordered_id[0], boundaries_[i]))) + else: + if boundaries_[i - 1] <= ordered_id[0]: + id_exp.append(list(range(ordered_id[0], boundaries_[i]))) + else: + id_exp.append(list(range(boundaries_[i - 1], boundaries_[i]))) + elif boundaries_[i] <= ordered_id[0]: + id_exp.append([]) + else: + if len(id_exp) == 0: + id_exp.append([]) + else: + if len(id_exp[-1]) > 0: + if id_exp[-1][-1] >= ordered_id[-1]: + id_exp.append([]) + else: + id_exp.append(list(range(ordered_id[-1][-1] + 1, ordered_id[-1] + 1))) + else: + id_exp.append([]) + + if boundaries_[-1] <= ordered_id[0]: + id_exp.append(ordered_id) + elif boundaries_[0] > ordered_id[-1]: + id_exp.insert(0, ordered_id) + else: + id_exp.append(list(range(boundaries_[-1], ordered_id[-1] + 1))) + + + dfs = ray.get(dataset.to_pandas_refs()) + if descending: + for i in range(len(dfs)): + if dfs[i].shape[0] == 0: + assert id_exp[-1-i] != [] + else: + idx = dfs[i]["id"].values.tolist() + idx.sort() + assert idx == id_exp[-1-i] + else: + for i in range(len(dfs)): + if dfs[i].shape[0] == 0: + assert id_exp[i] != [] + else: + idx = dfs[i]["id"].values.tolist() + #print(idx, id_exp[i]) + assert idx == id_exp[i] + + + x = np.random.randn(1000, 2) + idx = np.asarray(range(1000)).reshape(-1, 1) + np.random.shuffle(idx) + x = np.concatenate([idx, x], axis=1) + x = pd.DataFrame(x, columns=["id", "a", "b"]) + ds = ray.data.from_pandas(x) + if new_backend: + DataContext.get_current().new_execution_backend = False + ds = ds.sort("id", descending, boundaries) + ordered_ids = x["id"].values.tolist() + ordered_ids.sort() + check_id_in_block(ds, boundaries, list(range(1000)), descending) + print("GOOD!") + + def test_sort_simple(ray_start_regular, use_push_based_shuffle): num_items = 100 parallelism = 4 From 1efd05a9d6786b839eb9502362fc02dcb5ec7319 Mon Sep 17 00:00:00 2001 From: lile18 Date: Fri, 24 Nov 2023 18:59:11 +0800 Subject: [PATCH 02/14] Signed-off-by: lile18 Detailed annotation and instructions are provided. --- python/ray/data/dataset.py | 35 +++++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index b5da7bc6f4b7..f2d123770148 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -2212,7 +2212,7 @@ def sort( self, key: Union[str, List[str], None] = None, descending: Union[bool, List[bool]] = False, - boundaries: Optional[list] = None, + boundaries: List[Union[int, float]] = None, ) -> "Dataset": """Sort the dataset by the specified key column or key function. @@ -2220,12 +2220,32 @@ def sort( The `descending` parameter must be a boolean, or a list of booleans. If it is a list, all items in the list must share the same direction. Multi-directional sort is not supported yet. + The type of element in boundaries should be int or float currently. Examples: >>> import ray - >>> ds = ray.data.range(100) - >>> ds.sort("id", descending=True).take(3) - [{'id': 99}, {'id': 98}, {'id': 97}] + >>> ds = ray.data.range(15) + >>> ds = ds.sort("id", descending=False, boundaries=[5, 10]) + >>> for df in ray.get(ds.to_pandas_refs()): + >>> print(df) + id + 0 0 + 1 1 + 2 2 + 3 3 + 4 4 + id + 0 5 + 1 6 + 2 7 + 3 8 + 4 9 + id + 0 10 + 1 11 + 2 12 + 3 13 + 4 14 Time complexity: O(dataset size * log(dataset size / parallelism)) @@ -2233,6 +2253,13 @@ def sort( key: The column or a list of columns to sort by. descending: Whether to sort in descending order. Must be a boolean or a list of booleans matching the number of the columns. + boundaries: The list of values based on which to repartition the dataset. + For example, if the input boundary is [10,20], rows with values less + than 10 will be divided into the first block, rows with values greater + than or equal to 10 and less than 20 will be divided into the second block, + and rows with values greater than or equal to 20 will be divided into into + the third block. + Returns: A new, sorted :class:`Dataset`. From a1b3adbec8429837f9e112cace66bf98dad11740 Mon Sep 17 00:00:00 2001 From: Rony Lee <43735106+veryhannibal@users.noreply.github.com> Date: Thu, 30 Nov 2023 10:56:18 +0800 Subject: [PATCH 03/14] Update python/ray/data/dataset.py Co-authored-by: Stephanie Wang Signed-off-by: Rony Lee <43735106+veryhannibal@users.noreply.github.com> --- python/ray/data/dataset.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index f2d123770148..e79c6c0702a9 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -2220,7 +2220,6 @@ def sort( The `descending` parameter must be a boolean, or a list of booleans. If it is a list, all items in the list must share the same direction. Multi-directional sort is not supported yet. - The type of element in boundaries should be int or float currently. Examples: >>> import ray From e1fa5a440fb56c13b58bc37b0acd9577140daab1 Mon Sep 17 00:00:00 2001 From: Rony Lee <43735106+veryhannibal@users.noreply.github.com> Date: Thu, 30 Nov 2023 10:59:32 +0800 Subject: [PATCH 04/14] Update python/ray/data/dataset.py Co-authored-by: Stephanie Wang Signed-off-by: Rony Lee <43735106+veryhannibal@users.noreply.github.com> --- python/ray/data/dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index e79c6c0702a9..eb4085a5d2e2 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -2257,7 +2257,7 @@ def sort( than 10 will be divided into the first block, rows with values greater than or equal to 10 and less than 20 will be divided into the second block, and rows with values greater than or equal to 20 will be divided into into - the third block. + the third block. If not provided, the boundaries will be sampled from the input blocks. Returns: From e9455c245ba216d3435b97e41a55fda18ecf06be Mon Sep 17 00:00:00 2001 From: lile18 Date: Thu, 30 Nov 2023 11:00:27 +0800 Subject: [PATCH 05/14] Signed-off-by: lile18 Added a relatively simple test to test_sort_simple --- python/ray/data/tests/test_sort.py | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/python/ray/data/tests/test_sort.py b/python/ray/data/tests/test_sort.py index e2b2b18f3f11..e7497f717bdf 100644 --- a/python/ray/data/tests/test_sort.py +++ b/python/ray/data/tests/test_sort.py @@ -19,9 +19,13 @@ @pytest.mark.parametrize("new_backend", [True, False]) -@pytest.mark.parametrize("descending,boundaries", [(True, list(range(100,1000,200))), (False, list(range(100,1000,200))), (True, [1,998]), (False, [1,998])]) +@pytest.mark.parametrize("descending,boundaries", [(True, list(range(100,1000,200))), + (False, list(range(100,1000,200))), + (True, [1,998]), (False, [1,998])]) def test_sort_with_specified_boundaries(descending, boundaries, new_backend): - def check_id_in_block(dataset, boundaries_, ordered_id, descending): + # This function is used to build expected sorted result, and the returned result will + # be compared with the result of ray.data.Dataset.sort(). + def expected_sorted_result_in_blocks(boundaries_, ordered_id): id_exp = [] for i in range(len(boundaries_)): if boundaries_[i] > ordered_id[0] and boundaries_[i] <= ordered_id[-1]: @@ -45,15 +49,15 @@ def check_id_in_block(dataset, boundaries_, ordered_id, descending): id_exp.append(list(range(ordered_id[-1][-1] + 1, ordered_id[-1] + 1))) else: id_exp.append([]) - if boundaries_[-1] <= ordered_id[0]: id_exp.append(ordered_id) elif boundaries_[0] > ordered_id[-1]: id_exp.insert(0, ordered_id) else: id_exp.append(list(range(boundaries_[-1], ordered_id[-1] + 1))) + return id_exp - + def check_sort_result_in_blocks(dataset, id_exp, descending): dfs = ray.get(dataset.to_pandas_refs()) if descending: for i in range(len(dfs)): @@ -71,8 +75,6 @@ def check_id_in_block(dataset, boundaries_, ordered_id, descending): idx = dfs[i]["id"].values.tolist() #print(idx, id_exp[i]) assert idx == id_exp[i] - - x = np.random.randn(1000, 2) idx = np.asarray(range(1000)).reshape(-1, 1) np.random.shuffle(idx) @@ -82,9 +84,11 @@ def check_id_in_block(dataset, boundaries_, ordered_id, descending): if new_backend: DataContext.get_current().new_execution_backend = False ds = ds.sort("id", descending, boundaries) - ordered_ids = x["id"].values.tolist() - ordered_ids.sort() - check_id_in_block(ds, boundaries, list(range(1000)), descending) + # Get the expected value in each block + id_exp = expected_sorted_result_in_blocks(boundaries, list(range(1000))) + # After sorting, check whether the number of blocks, the number of samples + # in each block, and the sorting of IDs in the blocks are as expected. + check_sort_result_in_blocks(ds, id_exp, descending) print("GOOD!") @@ -99,6 +103,12 @@ def test_sort_simple(ray_start_regular, use_push_based_shuffle): ) # Make sure we have rows in each block. assert len([n for n in ds.sort("item")._block_num_rows() if n > 0]) == parallelism + # Make sure that the number of samples in each block is as expected, + # when boundaries is given. + boundaries = [25, 50, 75] + for n in ds.sort("item", boundaries=boundaries)._block_num_rows(): + assert n == 25 + assert extract_values( "item", ds.sort("item", descending=True).take(num_items) ) == list(reversed(range(num_items))) From d6f0e644ff856f1b27003edefc50d957c6d55af9 Mon Sep 17 00:00:00 2001 From: lile18 Date: Thu, 30 Nov 2023 11:04:13 +0800 Subject: [PATCH 06/14] Signed-off-by: lile18 Modified code annotation. --- python/ray/data/dataset.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index eb4085a5d2e2..31dc983502a1 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -2256,8 +2256,9 @@ def sort( For example, if the input boundary is [10,20], rows with values less than 10 will be divided into the first block, rows with values greater than or equal to 10 and less than 20 will be divided into the second block, - and rows with values greater than or equal to 20 will be divided into into - the third block. If not provided, the boundaries will be sampled from the input blocks. + and rows with values greater than or equal to 20 will be divided into + the third block. If not provided, the boundaries will be sampled from + the input blocks. Returns: From 448072f043e3104a1c92eaef8d0f080eb079301a Mon Sep 17 00:00:00 2001 From: Rony Lee <43735106+veryhannibal@users.noreply.github.com> Date: Tue, 5 Dec 2023 10:51:44 +0800 Subject: [PATCH 07/14] Update python/ray/data/tests/test_sort.py Co-authored-by: Wanqiang Ji Signed-off-by: Rony Lee <43735106+veryhannibal@users.noreply.github.com> --- python/ray/data/tests/test_sort.py | 23 +++++++---------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/python/ray/data/tests/test_sort.py b/python/ray/data/tests/test_sort.py index e7497f717bdf..9aaabea7668b 100644 --- a/python/ray/data/tests/test_sort.py +++ b/python/ray/data/tests/test_sort.py @@ -59,22 +59,13 @@ def expected_sorted_result_in_blocks(boundaries_, ordered_id): def check_sort_result_in_blocks(dataset, id_exp, descending): dfs = ray.get(dataset.to_pandas_refs()) - if descending: - for i in range(len(dfs)): - if dfs[i].shape[0] == 0: - assert id_exp[-1-i] != [] - else: - idx = dfs[i]["id"].values.tolist() - idx.sort() - assert idx == id_exp[-1-i] - else: - for i in range(len(dfs)): - if dfs[i].shape[0] == 0: - assert id_exp[i] != [] - else: - idx = dfs[i]["id"].values.tolist() - #print(idx, id_exp[i]) - assert idx == id_exp[i] + for i in range(len(dfs)): + if dfs[i].shape[0] == 0: + assert id_exp[-1-i] != [] if descending else assert id_exp[i] != [] + else: + idx = dfs[i]["id"].values.tolist() + idx.sort() if descending else None + assert idx == id_exp[-1-i] if descending else assert idx == id_exp[i] x = np.random.randn(1000, 2) idx = np.asarray(range(1000)).reshape(-1, 1) np.random.shuffle(idx) From ab10605f4b712ce4788f2b448f5af7c79eae287e Mon Sep 17 00:00:00 2001 From: Rony Lee <43735106+veryhannibal@users.noreply.github.com> Date: Tue, 5 Dec 2023 11:16:29 +0800 Subject: [PATCH 08/14] Update python/ray/data/tests/test_sort.py Co-authored-by: Wanqiang Ji Signed-off-by: Rony Lee <43735106+veryhannibal@users.noreply.github.com> --- python/ray/data/tests/test_sort.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/ray/data/tests/test_sort.py b/python/ray/data/tests/test_sort.py index 9aaabea7668b..11d46b8de9ff 100644 --- a/python/ray/data/tests/test_sort.py +++ b/python/ray/data/tests/test_sort.py @@ -80,7 +80,6 @@ def check_sort_result_in_blocks(dataset, id_exp, descending): # After sorting, check whether the number of blocks, the number of samples # in each block, and the sorting of IDs in the blocks are as expected. check_sort_result_in_blocks(ds, id_exp, descending) - print("GOOD!") def test_sort_simple(ray_start_regular, use_push_based_shuffle): From 5f7b579130d58e5f0cc24747dc1d8da89f6c84a3 Mon Sep 17 00:00:00 2001 From: Rony Lee <43735106+veryhannibal@users.noreply.github.com> Date: Tue, 5 Dec 2023 11:18:01 +0800 Subject: [PATCH 09/14] Update python/ray/data/tests/test_sort.py Co-authored-by: Wanqiang Ji Signed-off-by: Rony Lee <43735106+veryhannibal@users.noreply.github.com> --- python/ray/data/tests/test_sort.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/data/tests/test_sort.py b/python/ray/data/tests/test_sort.py index 11d46b8de9ff..27965aacc27c 100644 --- a/python/ray/data/tests/test_sort.py +++ b/python/ray/data/tests/test_sort.py @@ -66,6 +66,8 @@ def check_sort_result_in_blocks(dataset, id_exp, descending): idx = dfs[i]["id"].values.tolist() idx.sort() if descending else None assert idx == id_exp[-1-i] if descending else assert idx == id_exp[i] + + x = np.random.randn(1000, 2) idx = np.asarray(range(1000)).reshape(-1, 1) np.random.shuffle(idx) From 59e999ec6e25bfc1261ece88af68fe867d513d5f Mon Sep 17 00:00:00 2001 From: lile18 Date: Tue, 5 Dec 2023 15:26:28 +0800 Subject: [PATCH 10/14] Signed-off-by: lile18 1. Updated unit test in test_sort_simple. 2. Added outlier detection for parameter boundaries in the ray.data.Dataset.sort function. 3. Added code comments to explain that the boundaries parameter currently only supports numeric types. --- python/ray/data/_internal/sort.py | 6 ++++++ python/ray/data/dataset.py | 4 +--- python/ray/data/tests/test_sort.py | 29 +++++++++++++++++++++++++++-- 3 files changed, 34 insertions(+), 5 deletions(-) diff --git a/python/ray/data/_internal/sort.py b/python/ray/data/_internal/sort.py index ab0e9fba8c49..68e656a932a0 100644 --- a/python/ray/data/_internal/sort.py +++ b/python/ray/data/_internal/sort.py @@ -65,6 +65,12 @@ def __init__( raise ValueError("Sorting with mixed key orders not supported yet.") self._columns = key self._descending = descending + if boundaries: + for item in boundaries: + if not isinstance(item, (int, float)): + raise ValueError("The type of items in boundaries must be int or float.") + boundaries = list(set(boundaries)) + boundaries.sort() self._boundaries = boundaries def get_columns(self) -> List[str]: diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 31dc983502a1..d00f6a63f4f0 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -2258,13 +2258,11 @@ def sort( than or equal to 10 and less than 20 will be divided into the second block, and rows with values greater than or equal to 20 will be divided into the third block. If not provided, the boundaries will be sampled from - the input blocks. - + the input blocks. This feature only supports numeric columns right now. Returns: A new, sorted :class:`Dataset`. """ - sort_key = SortKey(key, descending, boundaries) plan = self._plan.with_stage(SortStage(self, sort_key)) diff --git a/python/ray/data/tests/test_sort.py b/python/ray/data/tests/test_sort.py index e7497f717bdf..369f606d112d 100644 --- a/python/ray/data/tests/test_sort.py +++ b/python/ray/data/tests/test_sort.py @@ -105,9 +105,34 @@ def test_sort_simple(ray_start_regular, use_push_based_shuffle): assert len([n for n in ds.sort("item")._block_num_rows() if n > 0]) == parallelism # Make sure that the number of samples in each block is as expected, # when boundaries is given. - boundaries = [25, 50, 75] - for n in ds.sort("item", boundaries=boundaries)._block_num_rows(): + + # Divided into two blocks. + boundaries1 = [50] + for n in ds.sort("item", boundaries=boundaries1)._block_num_rows(): + assert n == 50 + # Divided into three blocks. + boundaries2 = [60, 30] + for i, n in enumerate(ds.sort("item", boundaries=boundaries2)._block_num_rows()): + if i == 2: + assert n == 40 + else: + assert n == 30 + # The items in boundaries are of type float + boundaries3 = [24.2, 50.0, 75.0] + for n in ds.sort("item", boundaries=boundaries3)._block_num_rows(): assert n == 25 + # When the value of an element in the boundaries exceeds the value range of + # the selected column, an empty block will be generated. + boundaries4 = [50, 200] + for i, n in enumerate(ds.sort("item", boundaries=boundaries4)._block_num_rows()): + if i == 2: + assert n == 0 + else: + assert n == 50 + # When an element of non-numeric type appears in boundaries, + # an exception will be thrown. + boundaries5 = ["a"] + ds.sort("item", boundaries=boundaries5) assert extract_values( "item", ds.sort("item", descending=True).take(num_items) From 211753246e41f0734b4d2aa097e3fc0026464f77 Mon Sep 17 00:00:00 2001 From: lile18 Date: Tue, 5 Dec 2023 15:39:04 +0800 Subject: [PATCH 11/14] modify code in test_sort_with_specified_boundaries --- python/ray/data/tests/test_sort.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/data/tests/test_sort.py b/python/ray/data/tests/test_sort.py index dc6cf7ddf2bc..4419ee021c27 100644 --- a/python/ray/data/tests/test_sort.py +++ b/python/ray/data/tests/test_sort.py @@ -61,11 +61,11 @@ def check_sort_result_in_blocks(dataset, id_exp, descending): dfs = ray.get(dataset.to_pandas_refs()) for i in range(len(dfs)): if dfs[i].shape[0] == 0: - assert id_exp[-1-i] != [] if descending else assert id_exp[i] != [] + assert id_exp[-1-i] != [] if descending else id_exp[i] != [] else: idx = dfs[i]["id"].values.tolist() idx.sort() if descending else None - assert idx == id_exp[-1-i] if descending else assert idx == id_exp[i] + assert idx == id_exp[-1-i] if descending else idx == id_exp[i] x = np.random.randn(1000, 2) From d9ea6c43037abb2991d0b5a3f98125ee86706de2 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Thu, 4 Jan 2024 08:43:57 -0500 Subject: [PATCH 12/14] Fix tests, lint Signed-off-by: Stephanie Wang --- python/ray/data/_internal/planner/sort.py | 2 +- python/ray/data/_internal/sort.py | 9 +- python/ray/data/dataset.py | 9 +- python/ray/data/tests/test_sort.py | 121 +++++----------------- 4 files changed, 37 insertions(+), 104 deletions(-) diff --git a/python/ray/data/_internal/planner/sort.py b/python/ray/data/_internal/planner/sort.py index 5f2764bbbc8d..4b691e417c56 100644 --- a/python/ray/data/_internal/planner/sort.py +++ b/python/ray/data/_internal/planner/sort.py @@ -47,7 +47,7 @@ def fn( if not sort_key.boundaries: boundaries = SortTaskSpec.sample_boundaries(blocks, sort_key, num_outputs) else: - boundaries = [(b, ) for b in sort_key.boundaries] + boundaries = [(b,) for b in sort_key.boundaries] num_outputs = len(boundaries) + 1 _, ascending = sort_key.to_pandas_sort_args() if not ascending: diff --git a/python/ray/data/_internal/sort.py b/python/ray/data/_internal/sort.py index 68e656a932a0..7eaf859a41c6 100644 --- a/python/ray/data/_internal/sort.py +++ b/python/ray/data/_internal/sort.py @@ -68,7 +68,9 @@ def __init__( if boundaries: for item in boundaries: if not isinstance(item, (int, float)): - raise ValueError("The type of items in boundaries must be int or float.") + raise ValueError( + "The type of items in boundaries must be int or float." + ) boundaries = list(set(boundaries)) boundaries.sort() self._boundaries = boundaries @@ -224,9 +226,8 @@ def sort_impl( if not sort_key.boundaries: boundaries = sample_boundaries(blocks_list, sort_key, num_reducers, ctx) else: - boundaries = [(b, ) for b in sort_key.boundaries] - num_mappers = len(boundaries) + 1 - num_reducers = num_mappers + boundaries = [(b,) for b in sort_key.boundaries] + num_reducers = len(boundaries) + 1 _, ascending = sort_key.to_pandas_sort_args() if not ascending: boundaries.reverse() diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index d00f6a63f4f0..4f171d21ad69 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -2255,10 +2255,11 @@ def sort( boundaries: The list of values based on which to repartition the dataset. For example, if the input boundary is [10,20], rows with values less than 10 will be divided into the first block, rows with values greater - than or equal to 10 and less than 20 will be divided into the second block, - and rows with values greater than or equal to 20 will be divided into - the third block. If not provided, the boundaries will be sampled from - the input blocks. This feature only supports numeric columns right now. + than or equal to 10 and less than 20 will be divided into the + second block, and rows with values greater than or equal to 20 + will be divided into the third block. If not provided, the + boundaries will be sampled from the input blocks. This feature + only supports numeric columns right now. Returns: A new, sorted :class:`Dataset`. diff --git a/python/ray/data/tests/test_sort.py b/python/ray/data/tests/test_sort.py index 4419ee021c27..671173b8bf69 100644 --- a/python/ray/data/tests/test_sort.py +++ b/python/ray/data/tests/test_sort.py @@ -5,7 +5,7 @@ import pandas as pd import pyarrow as pa import pytest -from ray.data.context import DataContext + import ray from ray.data._internal.planner.exchange.push_based_shuffle_task_scheduler import ( PushBasedShuffleTaskScheduler, @@ -18,70 +18,31 @@ from ray.tests.conftest import * # noqa -@pytest.mark.parametrize("new_backend", [True, False]) -@pytest.mark.parametrize("descending,boundaries", [(True, list(range(100,1000,200))), - (False, list(range(100,1000,200))), - (True, [1,998]), (False, [1,998])]) -def test_sort_with_specified_boundaries(descending, boundaries, new_backend): - # This function is used to build expected sorted result, and the returned result will - # be compared with the result of ray.data.Dataset.sort(). - def expected_sorted_result_in_blocks(boundaries_, ordered_id): - id_exp = [] - for i in range(len(boundaries_)): - if boundaries_[i] > ordered_id[0] and boundaries_[i] <= ordered_id[-1]: - if i == 0: - id_exp.append(list(range(ordered_id[0], boundaries_[i]))) - else: - if boundaries_[i - 1] <= ordered_id[0]: - id_exp.append(list(range(ordered_id[0], boundaries_[i]))) - else: - id_exp.append(list(range(boundaries_[i - 1], boundaries_[i]))) - elif boundaries_[i] <= ordered_id[0]: - id_exp.append([]) - else: - if len(id_exp) == 0: - id_exp.append([]) - else: - if len(id_exp[-1]) > 0: - if id_exp[-1][-1] >= ordered_id[-1]: - id_exp.append([]) - else: - id_exp.append(list(range(ordered_id[-1][-1] + 1, ordered_id[-1] + 1))) - else: - id_exp.append([]) - if boundaries_[-1] <= ordered_id[0]: - id_exp.append(ordered_id) - elif boundaries_[0] > ordered_id[-1]: - id_exp.insert(0, ordered_id) - else: - id_exp.append(list(range(boundaries_[-1], ordered_id[-1] + 1))) - return id_exp - - def check_sort_result_in_blocks(dataset, id_exp, descending): - dfs = ray.get(dataset.to_pandas_refs()) - for i in range(len(dfs)): - if dfs[i].shape[0] == 0: - assert id_exp[-1-i] != [] if descending else id_exp[i] != [] - else: - idx = dfs[i]["id"].values.tolist() - idx.sort() if descending else None - assert idx == id_exp[-1-i] if descending else idx == id_exp[i] - - - x = np.random.randn(1000, 2) - idx = np.asarray(range(1000)).reshape(-1, 1) - np.random.shuffle(idx) - x = np.concatenate([idx, x], axis=1) - x = pd.DataFrame(x, columns=["id", "a", "b"]) - ds = ray.data.from_pandas(x) - if new_backend: - DataContext.get_current().new_execution_backend = False - ds = ds.sort("id", descending, boundaries) - # Get the expected value in each block - id_exp = expected_sorted_result_in_blocks(boundaries, list(range(1000))) - # After sorting, check whether the number of blocks, the number of samples - # in each block, and the sorting of IDs in the blocks are as expected. - check_sort_result_in_blocks(ds, id_exp, descending) +@pytest.mark.parametrize( + "descending,boundaries", + [ + (True, list(range(100, 1000, 200))), + (False, list(range(100, 1000, 200))), + (True, [1, 998]), + (False, [1, 998]), + ], +) +def test_sort_with_specified_boundaries(ray_start_regular, descending, boundaries): + num_items = 1000 + ds = ray.data.range(num_items) + ds = ds.sort("id", descending, boundaries).materialize() + + items = range(num_items) + boundaries = [0] + sorted(boundaries) + [num_items] + expected_blocks = [ + items[boundaries[i] : boundaries[i + 1]] for i in range(len(boundaries) - 1) + ] + if descending: + expected_blocks = [list(reversed(block)) for block in reversed(expected_blocks)] + blocks = list(ds.iter_batches(batch_size=None)) + assert len(blocks) == len(expected_blocks) + for block, expected_block in zip(blocks, expected_blocks): + assert np.all(block["id"] == expected_block) def test_sort_simple(ray_start_regular, use_push_based_shuffle): @@ -95,36 +56,6 @@ def test_sort_simple(ray_start_regular, use_push_based_shuffle): ) # Make sure we have rows in each block. assert len([n for n in ds.sort("item")._block_num_rows() if n > 0]) == parallelism - # Make sure that the number of samples in each block is as expected, - # when boundaries is given. - - # Divided into two blocks. - boundaries1 = [50] - for n in ds.sort("item", boundaries=boundaries1)._block_num_rows(): - assert n == 50 - # Divided into three blocks. - boundaries2 = [60, 30] - for i, n in enumerate(ds.sort("item", boundaries=boundaries2)._block_num_rows()): - if i == 2: - assert n == 40 - else: - assert n == 30 - # The items in boundaries are of type float - boundaries3 = [24.2, 50.0, 75.0] - for n in ds.sort("item", boundaries=boundaries3)._block_num_rows(): - assert n == 25 - # When the value of an element in the boundaries exceeds the value range of - # the selected column, an empty block will be generated. - boundaries4 = [50, 200] - for i, n in enumerate(ds.sort("item", boundaries=boundaries4)._block_num_rows()): - if i == 2: - assert n == 0 - else: - assert n == 50 - # When an element of non-numeric type appears in boundaries, - # an exception will be thrown. - boundaries5 = ["a"] - ds.sort("item", boundaries=boundaries5) assert extract_values( "item", ds.sort("item", descending=True).take(num_items) From 90bfd6f1c181e3c873c01a0406cd3186fea89110 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Thu, 4 Jan 2024 08:46:01 -0500 Subject: [PATCH 13/14] add float test Signed-off-by: Stephanie Wang --- python/ray/data/tests/test_sort.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/ray/data/tests/test_sort.py b/python/ray/data/tests/test_sort.py index 671173b8bf69..06f11320357e 100644 --- a/python/ray/data/tests/test_sort.py +++ b/python/ray/data/tests/test_sort.py @@ -25,6 +25,9 @@ (False, list(range(100, 1000, 200))), (True, [1, 998]), (False, [1, 998]), + # Test float. + (True, [501.5]), + (False, [501.5]), ], ) def test_sort_with_specified_boundaries(ray_start_regular, descending, boundaries): @@ -39,6 +42,7 @@ def test_sort_with_specified_boundaries(ray_start_regular, descending, boundarie ] if descending: expected_blocks = [list(reversed(block)) for block in reversed(expected_blocks)] + blocks = list(ds.iter_batches(batch_size=None)) assert len(blocks) == len(expected_blocks) for block, expected_block in zip(blocks, expected_blocks): From e2ccb6d74bc5dae5a4a5efb73c0669e4adba9990 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Thu, 4 Jan 2024 11:04:45 -0500 Subject: [PATCH 14/14] fixes Signed-off-by: Stephanie Wang --- python/ray/data/dataset.py | 38 +++++++++++++++--------------- python/ray/data/tests/test_sort.py | 2 +- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 4f171d21ad69..c9f25f8c0203 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -2226,25 +2226,25 @@ def sort( >>> ds = ray.data.range(15) >>> ds = ds.sort("id", descending=False, boundaries=[5, 10]) >>> for df in ray.get(ds.to_pandas_refs()): - >>> print(df) - id - 0 0 - 1 1 - 2 2 - 3 3 - 4 4 - id - 0 5 - 1 6 - 2 7 - 3 8 - 4 9 - id - 0 10 - 1 11 - 2 12 - 3 13 - 4 14 + ... print(df) + id + 0 0 + 1 1 + 2 2 + 3 3 + 4 4 + id + 0 5 + 1 6 + 2 7 + 3 8 + 4 9 + id + 0 10 + 1 11 + 2 12 + 3 13 + 4 14 Time complexity: O(dataset size * log(dataset size / parallelism)) diff --git a/python/ray/data/tests/test_sort.py b/python/ray/data/tests/test_sort.py index 06f11320357e..e9cf20f738dd 100644 --- a/python/ray/data/tests/test_sort.py +++ b/python/ray/data/tests/test_sort.py @@ -36,7 +36,7 @@ def test_sort_with_specified_boundaries(ray_start_regular, descending, boundarie ds = ds.sort("id", descending, boundaries).materialize() items = range(num_items) - boundaries = [0] + sorted(boundaries) + [num_items] + boundaries = [0] + sorted([round(b) for b in boundaries]) + [num_items] expected_blocks = [ items[boundaries[i] : boundaries[i + 1]] for i in range(len(boundaries) - 1) ]