Skip to content

Commit

Permalink
Add the boundary param for sort in ray.data.Dataset
Browse files Browse the repository at this point in the history
Signed-off-by: lile18 <lile18@jd.com>
  • Loading branch information
lile18 committed Nov 24, 2023
1 parent 5a7071e commit e8a21ac
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 4 deletions.
6 changes: 5 additions & 1 deletion python/ray/data/_internal/planner/sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ def fn(
num_outputs = num_mappers

# Sample boundaries for sort key.
boundaries = SortTaskSpec.sample_boundaries(blocks, sort_key, num_outputs)
if not sort_key.boundaries:
boundaries = SortTaskSpec.sample_boundaries(blocks, sort_key, num_outputs)
else:
boundaries = [(b, ) for b in sort_key.boundaries]
num_outputs = len(boundaries) + 1
_, ascending = sort_key.to_pandas_sort_args()
if not ascending:
boundaries.reverse()
Expand Down
13 changes: 12 additions & 1 deletion python/ray/data/_internal/sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __init__(
self,
key: Optional[Union[str, List[str]]] = None,
descending: Union[bool, List[bool]] = False,
boundaries: Optional[list] = None,
):
if key is None:
key = []
Expand All @@ -64,6 +65,7 @@ def __init__(
raise ValueError("Sorting with mixed key orders not supported yet.")
self._columns = key
self._descending = descending
self._boundaries = boundaries

def get_columns(self) -> List[str]:
return self._columns
Expand Down Expand Up @@ -94,6 +96,10 @@ def validate_schema(self, schema: Optional[Union[type, "pyarrow.lib.Schema"]]):
"schema '{}'.".format(column, schema)
)

@property
def boundaries(self):
return self._boundaries


class _SortOp(ShuffleOp):
@staticmethod
Expand Down Expand Up @@ -209,7 +215,12 @@ def sort_impl(
# Use same number of output partitions.
num_reducers = num_mappers
# TODO(swang): sample_boundaries could be fused with a previous stage.
boundaries = sample_boundaries(blocks_list, sort_key, num_reducers, ctx)
if not sort_key.boundaries:
boundaries = sample_boundaries(blocks_list, sort_key, num_reducers, ctx)
else:
boundaries = [(b, ) for b in sort_key.boundaries]
num_mappers = len(boundaries) + 1
num_reducers = num_mappers
_, ascending = sort_key.to_pandas_sort_args()
if not ascending:
boundaries.reverse()
Expand Down
3 changes: 2 additions & 1 deletion python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2212,6 +2212,7 @@ def sort(
self,
key: Union[str, List[str], None] = None,
descending: Union[bool, List[bool]] = False,
boundaries: Optional[list] = None,
) -> "Dataset":
"""Sort the dataset by the specified key column or key function.
Expand All @@ -2237,7 +2238,7 @@ def sort(
A new, sorted :class:`Dataset`.
"""

sort_key = SortKey(key, descending)
sort_key = SortKey(key, descending, boundaries)
plan = self._plan.with_stage(SortStage(self, sort_key))

logical_plan = self._logical_plan
Expand Down
72 changes: 71 additions & 1 deletion python/ray/data/tests/test_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pandas as pd
import pyarrow as pa
import pytest

from ray.data.context import DataContext
import ray
from ray.data._internal.planner.exchange.push_based_shuffle_task_scheduler import (
PushBasedShuffleTaskScheduler,
Expand All @@ -18,6 +18,76 @@
from ray.tests.conftest import * # noqa


@pytest.mark.parametrize("new_backend", [True, False])
@pytest.mark.parametrize("descending,boundaries", [(True, list(range(100,1000,200))), (False, list(range(100,1000,200))), (True, [1,998]), (False, [1,998])])
def test_sort_with_specified_boundaries(descending, boundaries, new_backend):
def check_id_in_block(dataset, boundaries_, ordered_id, descending):
id_exp = []
for i in range(len(boundaries_)):
if boundaries_[i] > ordered_id[0] and boundaries_[i] <= ordered_id[-1]:
if i == 0:
id_exp.append(list(range(ordered_id[0], boundaries_[i])))
else:
if boundaries_[i - 1] <= ordered_id[0]:
id_exp.append(list(range(ordered_id[0], boundaries_[i])))
else:
id_exp.append(list(range(boundaries_[i - 1], boundaries_[i])))
elif boundaries_[i] <= ordered_id[0]:
id_exp.append([])
else:
if len(id_exp) == 0:
id_exp.append([])
else:
if len(id_exp[-1]) > 0:
if id_exp[-1][-1] >= ordered_id[-1]:
id_exp.append([])
else:
id_exp.append(list(range(ordered_id[-1][-1] + 1, ordered_id[-1] + 1)))
else:
id_exp.append([])

if boundaries_[-1] <= ordered_id[0]:
id_exp.append(ordered_id)
elif boundaries_[0] > ordered_id[-1]:
id_exp.insert(0, ordered_id)
else:
id_exp.append(list(range(boundaries_[-1], ordered_id[-1] + 1)))


dfs = ray.get(dataset.to_pandas_refs())
if descending:
for i in range(len(dfs)):
if dfs[i].shape[0] == 0:
assert id_exp[-1-i] != []
else:
idx = dfs[i]["id"].values.tolist()
idx.sort()
assert idx == id_exp[-1-i]
else:
for i in range(len(dfs)):
if dfs[i].shape[0] == 0:
assert id_exp[i] != []
else:
idx = dfs[i]["id"].values.tolist()
#print(idx, id_exp[i])
assert idx == id_exp[i]


x = np.random.randn(1000, 2)
idx = np.asarray(range(1000)).reshape(-1, 1)
np.random.shuffle(idx)
x = np.concatenate([idx, x], axis=1)
x = pd.DataFrame(x, columns=["id", "a", "b"])
ds = ray.data.from_pandas(x)
if new_backend:
DataContext.get_current().new_execution_backend = False
ds = ds.sort("id", descending, boundaries)
ordered_ids = x["id"].values.tolist()
ordered_ids.sort()
check_id_in_block(ds, boundaries, list(range(1000)), descending)
print("GOOD!")


def test_sort_simple(ray_start_regular, use_push_based_shuffle):
num_items = 100
parallelism = 4
Expand Down

0 comments on commit e8a21ac

Please sign in to comment.