Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the boundary param for sort in ray.data.Dataset #41269

Merged
merged 17 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion python/ray/data/_internal/planner/sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ def fn(
num_outputs = num_mappers

# Sample boundaries for sort key.
boundaries = SortTaskSpec.sample_boundaries(blocks, sort_key, num_outputs)
if not sort_key.boundaries:
boundaries = SortTaskSpec.sample_boundaries(blocks, sort_key, num_outputs)
else:
boundaries = [(b, ) for b in sort_key.boundaries]
num_outputs = len(boundaries) + 1
_, ascending = sort_key.to_pandas_sort_args()
if not ascending:
boundaries.reverse()
Expand Down
13 changes: 12 additions & 1 deletion python/ray/data/_internal/sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(
self,
key: Optional[Union[str, List[str]]] = None,
descending: Union[bool, List[bool]] = False,
boundaries: Optional[list] = None,
):
if key is None:
key = []
Expand All @@ -64,6 +65,7 @@ def __init__(
raise ValueError("Sorting with mixed key orders not supported yet.")
self._columns = key
self._descending = descending
self._boundaries = boundaries

def get_columns(self) -> List[str]:
return self._columns
Expand Down Expand Up @@ -94,6 +96,10 @@ def validate_schema(self, schema: Optional[Union[type, "pyarrow.lib.Schema"]]):
"schema '{}'.".format(column, schema)
)

@property
def boundaries(self):
return self._boundaries


class _SortOp(ShuffleOp):
@staticmethod
Expand Down Expand Up @@ -209,7 +215,12 @@ def sort_impl(
# Use same number of output partitions.
num_reducers = num_mappers
# TODO(swang): sample_boundaries could be fused with a previous stage.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment should follow closely with the related code. In this case, I think it should be moved into the if block.

boundaries = sample_boundaries(blocks_list, sort_key, num_reducers, ctx)
if not sort_key.boundaries:
boundaries = sample_boundaries(blocks_list, sort_key, num_reducers, ctx)
else:
boundaries = [(b, ) for b in sort_key.boundaries]
stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved
num_mappers = len(boundaries) + 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
num_mappers = len(boundaries) + 1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, according to the current implementation, line 222 cannot be deleted because the block of the output dataset is determined by user-defined boundaries. For example, if I split the list L=[0,1,2,3,4,5] and the defined boundaries are [2,4], then L will be divided into 3 parts, which are [0,1 ],[2,3],[4,5].

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that is quite right... the number of boundaries should determine the number of reducers, while the number of input blocks determines the number of mappers, no?

For example, say the input is in two blocks L = [[0, 1, 2], [3, 4, 5]]. In your example, we will use num_mappers=2 and num_reducers=3.

num_reducers = num_mappers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, sort requires num_mappers == num_reducers == num input blocks, so instead we should add a check that the length of boundaries is equal to the length of block_list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the user customizes the boundaries parameter, then after the sort is executed, the block_num of the output dataset is equal to len(boundaries) + 1.

Copy link
Contributor

@stephanie-wang stephanie-wang Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, what I meant is that right now we assume that num input blocks == num output blocks, so it would be good to assert num_mappers == len(boundaries) + 1 instead of setting num_mappers = len(boundaries) + 1.

However, it does seem like num input blocks != num output blocks is working, so maybe it's okay as is. Still, we should not modify num_mappers (this should be decided based on the number of input blocks, not by the user-provided boundaries).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original intention of letting users define boundaries here is to allow users to decide the number of blocks in the output dataset, so I made adjustments to num_mappers here. Of course, if the user does not pass the boundaries parameter, it will not affect the original logic.😊

_, ascending = sort_key.to_pandas_sort_args()
if not ascending:
boundaries.reverse()
Expand Down
36 changes: 32 additions & 4 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2212,32 +2212,60 @@ def sort(
self,
key: Union[str, List[str], None] = None,
descending: Union[bool, List[bool]] = False,
boundaries: List[Union[int, float]] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it work for non-numeric columns?

Copy link
Contributor Author

@veryhannibal veryhannibal Nov 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, this function cannot currently process non-numeric columns. However, in our business, if we encounter a non-numeric column, we will process it and convert it to a numeric type.
For example, for a non-numeric column, calculate the hash value and then take modulo 3. Then the value of this column becomes 0, 1 or 2. Then, if the parameter boundaries is set to [1,2], then the rows with values 0, 1, and 2 will be divided into three blocks respectively.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good for now; could you just update the docstring to say that this only supports numeric columns right now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I have added code comments to explain that the boundaries parameter currently supports numeric types.😁😁😁

) -> "Dataset":
"""Sort the dataset by the specified key column or key function.

.. note::
The `descending` parameter must be a boolean, or a list of booleans.
If it is a list, all items in the list must share the same direction.
Multi-directional sort is not supported yet.
The type of element in boundaries should be int or float currently.
veryhannibal marked this conversation as resolved.
Show resolved Hide resolved

Examples:
>>> import ray
>>> ds = ray.data.range(100)
>>> ds.sort("id", descending=True).take(3)
[{'id': 99}, {'id': 98}, {'id': 97}]
>>> ds = ray.data.range(15)
>>> ds = ds.sort("id", descending=False, boundaries=[5, 10])
>>> for df in ray.get(ds.to_pandas_refs()):
>>> print(df)
id
0 0
1 1
2 2
3 3
4 4
id
0 5
1 6
2 7
3 8
4 9
id
0 10
1 11
2 12
3 13
4 14

Time complexity: O(dataset size * log(dataset size / parallelism))

Args:
key: The column or a list of columns to sort by.
descending: Whether to sort in descending order. Must be a boolean or a list
of booleans matching the number of the columns.
boundaries: The list of values based on which to repartition the dataset.
For example, if the input boundary is [10,20], rows with values less
than 10 will be divided into the first block, rows with values greater
than or equal to 10 and less than 20 will be divided into the second block,
and rows with values greater than or equal to 20 will be divided into into
the third block.
veryhannibal marked this conversation as resolved.
Show resolved Hide resolved


Returns:
A new, sorted :class:`Dataset`.
"""

sort_key = SortKey(key, descending)
sort_key = SortKey(key, descending, boundaries)
plan = self._plan.with_stage(SortStage(self, sort_key))

logical_plan = self._logical_plan
Expand Down
72 changes: 71 additions & 1 deletion python/ray/data/tests/test_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pandas as pd
import pyarrow as pa
import pytest

from ray.data.context import DataContext
import ray
from ray.data._internal.planner.exchange.push_based_shuffle_task_scheduler import (
PushBasedShuffleTaskScheduler,
Expand All @@ -18,6 +18,76 @@
from ray.tests.conftest import * # noqa


@pytest.mark.parametrize("new_backend", [True, False])
@pytest.mark.parametrize("descending,boundaries", [(True, list(range(100,1000,200))), (False, list(range(100,1000,200))), (True, [1,998]), (False, [1,998])])
def test_sort_with_specified_boundaries(descending, boundaries, new_backend):
def check_id_in_block(dataset, boundaries_, ordered_id, descending):
id_exp = []
for i in range(len(boundaries_)):
if boundaries_[i] > ordered_id[0] and boundaries_[i] <= ordered_id[-1]:
if i == 0:
id_exp.append(list(range(ordered_id[0], boundaries_[i])))
else:
if boundaries_[i - 1] <= ordered_id[0]:
id_exp.append(list(range(ordered_id[0], boundaries_[i])))
else:
id_exp.append(list(range(boundaries_[i - 1], boundaries_[i])))
elif boundaries_[i] <= ordered_id[0]:
id_exp.append([])
else:
if len(id_exp) == 0:
id_exp.append([])
else:
if len(id_exp[-1]) > 0:
if id_exp[-1][-1] >= ordered_id[-1]:
id_exp.append([])
else:
id_exp.append(list(range(ordered_id[-1][-1] + 1, ordered_id[-1] + 1)))
else:
id_exp.append([])

if boundaries_[-1] <= ordered_id[0]:
id_exp.append(ordered_id)
elif boundaries_[0] > ordered_id[-1]:
id_exp.insert(0, ordered_id)
else:
id_exp.append(list(range(boundaries_[-1], ordered_id[-1] + 1)))


dfs = ray.get(dataset.to_pandas_refs())
if descending:
for i in range(len(dfs)):
if dfs[i].shape[0] == 0:
assert id_exp[-1-i] != []
else:
idx = dfs[i]["id"].values.tolist()
idx.sort()
assert idx == id_exp[-1-i]
else:
for i in range(len(dfs)):
if dfs[i].shape[0] == 0:
assert id_exp[i] != []
else:
idx = dfs[i]["id"].values.tolist()
#print(idx, id_exp[i])
assert idx == id_exp[i]


x = np.random.randn(1000, 2)
veryhannibal marked this conversation as resolved.
Show resolved Hide resolved
idx = np.asarray(range(1000)).reshape(-1, 1)
np.random.shuffle(idx)
x = np.concatenate([idx, x], axis=1)
x = pd.DataFrame(x, columns=["id", "a", "b"])
ds = ray.data.from_pandas(x)
if new_backend:
DataContext.get_current().new_execution_backend = False
ds = ds.sort("id", descending, boundaries)
ordered_ids = x["id"].values.tolist()
ordered_ids.sort()
check_id_in_block(ds, boundaries, list(range(1000)), descending)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this test! But it's a bit hard to read; could you instead follow / extend the test_sort_simple example? I think all we really need to do is add checks to make sure that ds._block_num_rows() is as expected when different boundaries are passed in.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Give me the same feel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. I have added a relatively simple test to test_sort_simple.
However, test_sort_with_specified_boundaries will be a more comprehensive test and takes into account some more complex situations, such as some values of boundaries not being in the key column of the dataset.😁😁😁

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I think we can test that without needing this code? For example, something like ds.range(100).sort(boundaries=[10, 200]) would work, right?

In any case, I think the test sounds like a good idea but let's please try to simplify it. It is quite hard to read right now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I think we can test that without needing this code? For example, something like ds.range(100).sort(boundaries=[10, 200]) would work, right?

In any case, I think the test sounds like a good idea but let's please try to simplify it. It is quite hard to read right now.

Yeah, it would work, and I updated the unit tests, mainly adding a few simple examples in test_sort_simple.😁

print("GOOD!")
veryhannibal marked this conversation as resolved.
Show resolved Hide resolved


def test_sort_simple(ray_start_regular, use_push_based_shuffle):
num_items = 100
parallelism = 4
Expand Down
Loading