-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add the boundary param for sort in ray.data.Dataset #41269
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can u add the related UTs for this changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for contributing this change! Overall structure looks good, but had a few comments. Also agree that we should add unit tests for this. ray/python/ray/data/tests/test_sort.py
is a good place to add.
python/ray/data/dataset.py
Outdated
@@ -2212,6 +2212,7 @@ def sort( | |||
self, | |||
key: Union[str, List[str], None] = None, | |||
descending: Union[bool, List[bool]] = False, | |||
boundaries: Optional[list] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you document the arg in the docstring? It'd also be good to specify what the type of the List element should be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the tip, I'll provide detailed annotation instructions.
python/ray/data/_internal/sort.py
Outdated
else: | ||
boundaries = [(b, ) for b in sort_key.boundaries] | ||
num_mappers = len(boundaries) + 1 | ||
num_reducers = num_mappers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now, sort requires num_mappers == num_reducers == num input blocks, so instead we should add a check that the length of boundaries is equal to the length of block_list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the user customizes the boundaries parameter, then after the sort is executed, the block_num of the output dataset is equal to len(boundaries) + 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, what I meant is that right now we assume that num input blocks == num output blocks
, so it would be good to assert num_mappers == len(boundaries) + 1
instead of setting num_mappers = len(boundaries) + 1
.
However, it does seem like num input blocks != num output blocks
is working, so maybe it's okay as is. Still, we should not modify num_mappers
(this should be decided based on the number of input blocks, not by the user-provided boundaries).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original intention of letting users define boundaries here is to allow users to decide the number of blocks in the output dataset, so I made adjustments to num_mappers here. Of course, if the user does not pass the boundaries parameter, it will not affect the original logic.😊
Signed-off-by: lile18 <lile18@jd.com>
c5ddb00
to
e8a21ac
Compare
Detailed annotation and instructions are provided.
python/ray/data/_internal/sort.py
Outdated
boundaries = sample_boundaries(blocks_list, sort_key, num_reducers, ctx) | ||
else: | ||
boundaries = [(b, ) for b in sort_key.boundaries] | ||
num_mappers = len(boundaries) + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
num_mappers = len(boundaries) + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, according to the current implementation, line 222 cannot be deleted because the block of the output dataset is determined by user-defined boundaries. For example, if I split the list L=[0,1,2,3,4,5] and the defined boundaries are [2,4], then L will be divided into 3 parts, which are [0,1 ],[2,3],[4,5].
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that is quite right... the number of boundaries should determine the number of reducers, while the number of input blocks determines the number of mappers, no?
For example, say the input is in two blocks L = [[0, 1, 2], [3, 4, 5]]. In your example, we will use num_mappers=2 and num_reducers=3.
@@ -2212,32 +2212,60 @@ def sort( | |||
self, | |||
key: Union[str, List[str], None] = None, | |||
descending: Union[bool, List[bool]] = False, | |||
boundaries: List[Union[int, float]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will it work for non-numeric columns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, this function cannot currently process non-numeric columns. However, in our business, if we encounter a non-numeric column, we will process it and convert it to a numeric type.
For example, for a non-numeric column, calculate the hash value and then take modulo 3. Then the value of this column becomes 0, 1 or 2. Then, if the parameter boundaries is set to [1,2], then the rows with values 0, 1, and 2 will be divided into three blocks respectively.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds good for now; could you just update the docstring to say that this only supports numeric columns right now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I have added code comments to explain that the boundaries parameter currently supports numeric types.😁😁😁
python/ray/data/tests/test_sort.py
Outdated
ds = ds.sort("id", descending, boundaries) | ||
ordered_ids = x["id"].values.tolist() | ||
ordered_ids.sort() | ||
check_id_in_block(ds, boundaries, list(range(1000)), descending) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this test! But it's a bit hard to read; could you instead follow / extend the test_sort_simple
example? I think all we really need to do is add checks to make sure that ds._block_num_rows()
is as expected when different boundaries are passed in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Give me the same feel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion. I have added a relatively simple test to test_sort_simple.
However, test_sort_with_specified_boundaries will be a more comprehensive test and takes into account some more complex situations, such as some values of boundaries not being in the key column of the dataset.😁😁😁
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I think we can test that without needing this code? For example, something like ds.range(100).sort(boundaries=[10, 200])
would work, right?
In any case, I think the test sounds like a good idea but let's please try to simplify it. It is quite hard to read right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I think we can test that without needing this code? For example, something like
ds.range(100).sort(boundaries=[10, 200])
would work, right?In any case, I think the test sounds like a good idea but let's please try to simplify it. It is quite hard to read right now.
Yeah, it would work, and I updated the unit tests, mainly adding a few simple examples in test_sort_simple.😁
Co-authored-by: Stephanie Wang <swang@cs.berkeley.edu> Signed-off-by: Rony Lee <43735106+veryhannibal@users.noreply.github.com>
Co-authored-by: Stephanie Wang <swang@cs.berkeley.edu> Signed-off-by: Rony Lee <43735106+veryhannibal@users.noreply.github.com>
Added a relatively simple test to test_sort_simple
Modified code annotation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think some UT cases as below should be considered:
- boundaries = [15, 10, 5] or [10, 5, 15]
- use the fixed data and split to two parts and three.
- missing the test for float type
@@ -209,7 +215,12 @@ def sort_impl( | |||
# Use same number of output partitions. | |||
num_reducers = num_mappers | |||
# TODO(swang): sample_boundaries could be fused with a previous stage. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment should follow closely with the related code. In this case, I think it should be moved into the if block.
Co-authored-by: Wanqiang Ji <wanqiang.ji@gmail.com> Signed-off-by: Rony Lee <43735106+veryhannibal@users.noreply.github.com>
Co-authored-by: Wanqiang Ji <wanqiang.ji@gmail.com> Signed-off-by: Rony Lee <43735106+veryhannibal@users.noreply.github.com>
Co-authored-by: Wanqiang Ji <wanqiang.ji@gmail.com> Signed-off-by: Rony Lee <43735106+veryhannibal@users.noreply.github.com>
1. Updated unit test in test_sort_simple. 2. Added outlier detection for parameter boundaries in the ray.data.Dataset.sort function. 3. Added code comments to explain that the boundaries parameter currently only supports numeric types.
@stephanie-wang @jiwq Hello, is there anything else I need to add about this pull-request? |
cc @c21 |
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Sorry for the delay here. I went ahead and updated the unit tests to simplify. LGTM now. |
Thanks a lot ! 😁😁😁 |
User can specify the boundaries so the dataset will be divided into blocks according to the specified boundaries while sorting. Closes ray-project#41265 --------- Signed-off-by: lile18 <lile18@jd.com> Signed-off-by: Rony Lee <43735106+veryhannibal@users.noreply.github.com> Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu> Co-authored-by: lile18 <lile18@jd.com> Co-authored-by: Stephanie Wang <swang@cs.berkeley.edu> Co-authored-by: Wanqiang Ji <wanqiang.ji@gmail.com>
Why are these changes needed?
User can specify the boundaries so the dataset will be divided into blocks according to the specified boundaries while sorting.
Related issue number
Closes #41265
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.