Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] Add logical operator for sort() #32133

Merged
merged 1 commit into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Any, Dict, Optional

from ray.data._internal.logical.interfaces import LogicalOperator
from ray.data.block import KeyFn


class AbstractAllToAll(LogicalOperator):
Expand Down Expand Up @@ -79,3 +80,20 @@ def __init__(
num_outputs=num_outputs,
)
self._shuffle = shuffle


class Sort(AbstractAllToAll):
"""Logical operator for sort."""

def __init__(
self,
input_op: LogicalOperator,
key: Optional[KeyFn],
descending: bool,
):
super().__init__(
"Sort",
input_op,
)
self._key = key
self._descending = descending
125 changes: 125 additions & 0 deletions python/ray/data/_internal/planner/exchange/sort_task_spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from typing import Any, Callable, List, Tuple, TypeVar, Union

import numpy as np

from ray.data._internal.progress_bar import ProgressBar
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data._internal.planner.exchange.interfaces import ExchangeTaskSpec
from ray.data.block import Block, BlockAccessor, BlockExecStats, BlockMetadata
from ray.types import ObjectRef


T = TypeVar("T")

# Data can be sorted by value (None), a list of columns and
# ascending/descending orders (List), or a custom transform function
# (Callable).
SortKeyT = Union[None, List[Tuple[str, str]], Callable[[T], Any]]


class SortTaskSpec(ExchangeTaskSpec):
"""
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is copied from https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/sort.py, with change from BlockList to RefBundle.

The implementation for distributed sort tasks.

The algorithm is similar to [External Merge Sort]
(https://en.wikipedia.org/wiki/External_sorting).
Sorting is done in 3 steps: sampling, sorting individual blocks, and
merging sorted blocks.

Sampling (`sample_boundaries`): we get a number of sample items from each block,
sort them, and use them to compute boundaries that would partition all items into
approximately equal ranges.

Sorting (`map`): each block is sorted locally, then partitioned into smaller
blocks according to the boundaries. Each partitioned block is passed to a merge
task.

Merging (`reduce`): a merge task would receive a block from every worker that
consists of items in a certain range. It then merges the sorted blocks into one
sorted block and becomes part of the new, sorted block.
"""

def __init__(
self,
boundaries: List[T],
key: SortKeyT,
descending: bool,
):
super().__init__(
map_args=[boundaries, key, descending],
reduce_args=[key, descending],
)

@staticmethod
def map(
idx: int,
block: Block,
output_num_blocks: int,
boundaries: List[T],
key: SortKeyT,
descending: bool,
) -> List[Union[BlockMetadata, Block]]:
stats = BlockExecStats.builder()
out = BlockAccessor.for_block(block).sort_and_partition(
boundaries, key, descending
)
meta = BlockAccessor.for_block(block).get_metadata(
input_files=None, exec_stats=stats.build()
)
return out + [meta]

@staticmethod
def reduce(
key: SortKeyT,
descending: bool,
*mapper_outputs: List[Block],
partial_reduce: bool = False,
) -> Tuple[Block, BlockMetadata]:
return BlockAccessor.for_block(mapper_outputs[0]).merge_sorted_blocks(
mapper_outputs, key, descending
)

@staticmethod
def sample_boundaries(
blocks: List[ObjectRef[Block]], key: SortKeyT, num_reducers: int
) -> List[T]:
"""
Return (num_reducers - 1) items in ascending order from the blocks that
partition the domain into ranges with approximately equally many elements.
"""
# TODO(Clark): Support multiple boundary sampling keys.
if isinstance(key, list) and len(key) > 1:
raise ValueError("Multiple boundary sampling keys not supported.")

n_samples = int(num_reducers * 10 / len(blocks))

sample_block = cached_remote_fn(_sample_block)

sample_results = [
sample_block.remote(block, n_samples, key) for block in blocks
]
sample_bar = ProgressBar("Sort Sample", len(sample_results))
samples = sample_bar.fetch_until_complete(sample_results)
sample_bar.close()
del sample_results
samples = [s for s in samples if len(s) > 0]
# The dataset is empty
if len(samples) == 0:
return [None] * (num_reducers - 1)
builder = DelegatingBlockBuilder()
for sample in samples:
builder.add_block(sample)
samples = builder.build()
column = key[0][0] if isinstance(key, list) else None
sample_items = BlockAccessor.for_block(samples).to_numpy(column)
sample_items = np.sort(sample_items)
ret = [
np.quantile(sample_items, q, interpolation="nearest")
for q in np.linspace(0, 1, num_reducers)
]
return ret[1:]


def _sample_block(block: Block[T], n_samples: int, key: SortKeyT) -> Block[T]:
return BlockAccessor.for_block(block).sample(n_samples, key)
4 changes: 4 additions & 0 deletions python/ray/data/_internal/planner/plan_all_to_all_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
RandomShuffle,
RandomizeBlocks,
Repartition,
Sort,
)
from ray.data._internal.planner.random_shuffle import generate_random_shuffle_fn
from ray.data._internal.planner.randomize_blocks import generate_randomize_blocks_fn
from ray.data._internal.planner.repartition import generate_repartition_fn
from ray.data._internal.planner.sort import generate_sort_fn


def _plan_all_to_all_op(
Expand All @@ -26,6 +28,8 @@ def _plan_all_to_all_op(
fn = generate_random_shuffle_fn(op._seed, op._num_outputs)
elif isinstance(op, Repartition):
fn = generate_repartition_fn(op._num_outputs, op._shuffle)
elif isinstance(op, Sort):
fn = generate_sort_fn(op._key, op._descending)
else:
raise ValueError(f"Found unknown logical operator during planning: {op}")

Expand Down
65 changes: 65 additions & 0 deletions python/ray/data/_internal/planner/sort.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from functools import partial
from typing import List, Tuple

from ray.data._internal.execution.interfaces import (
AllToAllTransformFn,
RefBundle,
TaskContext,
)
from ray.data._internal.planner.exchange.push_based_shuffle_task_scheduler import (
PushBasedShuffleTaskScheduler,
)
from ray.data._internal.planner.exchange.pull_based_shuffle_task_scheduler import (
PullBasedShuffleTaskScheduler,
)
from ray.data._internal.planner.exchange.sort_task_spec import SortKeyT, SortTaskSpec
from ray.data._internal.stats import StatsDict
from ray.data.context import DatasetContext


def generate_sort_fn(
key: SortKeyT,
descending: bool,
) -> AllToAllTransformFn:
"""Generate function to sort blocks by the specified key column or key function."""
# TODO: validate key with block._validate_key_fn.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #32137 for followup. This needs more refactoring, as it involves Dataset object.


def fn(
key: SortKeyT,
descending: bool,
refs: List[RefBundle],
ctx: TaskContext,
) -> Tuple[List[RefBundle], StatsDict]:
blocks = []
for ref_bundle in refs:
for block, _ in ref_bundle.blocks:
blocks.append(block)
if len(blocks) == 0:
return (blocks, {})

if isinstance(key, str):
key = [(key, "descending" if descending else "ascending")]
if isinstance(key, list):
descending = key[0][1] == "descending"

num_mappers = len(blocks)
# Use same number of output partitions.
num_outputs = num_mappers

# Sample boundaries for sort key.
boundaries = SortTaskSpec.sample_boundaries(blocks, key, num_outputs)
if descending:
boundaries.reverse()
sort_spec = SortTaskSpec(boundaries=boundaries, key=key, descending=descending)

if DatasetContext.get_current().use_push_based_shuffle:
scheduler = PushBasedShuffleTaskScheduler(sort_spec)
else:
scheduler = PullBasedShuffleTaskScheduler(sort_spec)

return scheduler.execute(refs, num_outputs)

# NOTE: use partial function to pass parameters to avoid error like
# "UnboundLocalError: local variable ... referenced before assignment",
# because `key` and `descending` variables are reassigned in `fn()`.
return partial(fn, key, descending)
12 changes: 11 additions & 1 deletion python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
RandomShuffle,
RandomizeBlocks,
Repartition,
Sort,
)
from ray.data._internal.logical.optimizers import LogicalPlan
from ray.data._internal.logical.operators.map_operator import (
Expand Down Expand Up @@ -2006,7 +2007,16 @@ def sort(
"""

plan = self._plan.with_stage(SortStage(self, key, descending))
return Dataset(plan, self._epoch, self._lazy)

logical_plan = self._logical_plan
if logical_plan is not None:
op = Sort(
logical_plan.dag,
key=key,
descending=descending,
)
logical_plan = LogicalPlan(op)
return Dataset(plan, self._epoch, self._lazy, logical_plan)

def zip(self, other: "Dataset[U]") -> "Dataset[(T, U)]":
"""Zip this dataset with the elements of another.
Expand Down
45 changes: 45 additions & 0 deletions python/ray/data/tests/test_execution_optimizer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os
import pytest
import pandas as pd

import ray
from ray.data._internal.execution.operators.map_operator import MapOperator
Expand All @@ -10,6 +12,7 @@
RandomShuffle,
RandomizeBlocks,
Repartition,
Sort,
)
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.logical.operators.map_operator import (
Expand All @@ -21,6 +24,7 @@
from ray.data._internal.planner.planner import Planner
from ray.data.datasource.parquet_datasource import ParquetDatasource

from ray.data.tests.conftest import * # noqa
from ray.tests.conftest import * # noqa


Expand Down Expand Up @@ -516,6 +520,47 @@ def test_read_map_chain_operator_fusion_e2e(ray_start_regular_shared, enable_opt
assert name in ds.stats()


def test_sort_operator(ray_start_regular_shared, enable_optimizer):
planner = Planner()
read_op = Read(ParquetDatasource())
op = Sort(
read_op,
key="col1",
descending=False,
)
plan = LogicalPlan(op)
physical_op = planner.plan(plan).dag

assert op.name == "Sort"
assert isinstance(physical_op, AllToAllOperator)
assert len(physical_op.input_dependencies) == 1
assert isinstance(physical_op.input_dependencies[0], MapOperator)


def test_sort_e2e(
ray_start_regular_shared, enable_optimizer, use_push_based_shuffle, local_path
):
ds = ray.data.range(100, parallelism=4)
ds = ds.random_shuffle()
ds = ds.sort()
assert ds.take_all() == list(range(100))

df = pd.DataFrame({"one": list(range(100)), "two": ["a"] * 100})
ds = ray.data.from_pandas([df])
path = os.path.join(local_path, "test_parquet_dir")
os.mkdir(path)
ds.write_parquet(path)

ds = ray.data.read_parquet(path)
ds = ds.random_shuffle()
ds1 = ds.sort("one")
ds2 = ds.sort("one", descending=True)
r1 = ds1.select_columns(["one"]).take_all()
r2 = ds2.select_columns(["one"]).take_all()
assert [d["one"] for d in r1] == list(range(100))
assert [d["one"] for d in r2] == list(reversed(range(100)))


if __name__ == "__main__":
import sys

Expand Down