Skip to content

Commit

Permalink
[Datasets] Add logical operator for sort() (ray-project#32133)
Browse files Browse the repository at this point in the history
This PR is to add logical operator for `sort()`, the change includes:
* `Sort` logical operator
* `SortTaskSpec` to copy from `sort.py`
* `generate_sort_fn` is generated function for sort

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
  • Loading branch information
c21 authored and edoakes committed Mar 22, 2023
1 parent 15490ab commit 68a793a
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 1 deletion.
18 changes: 18 additions & 0 deletions python/ray/data/_internal/logical/operators/all_to_all_operator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Any, Dict, Optional

from ray.data._internal.logical.interfaces import LogicalOperator
from ray.data.block import KeyFn


class AbstractAllToAll(LogicalOperator):
Expand Down Expand Up @@ -79,3 +80,20 @@ def __init__(
num_outputs=num_outputs,
)
self._shuffle = shuffle


class Sort(AbstractAllToAll):
"""Logical operator for sort."""

def __init__(
self,
input_op: LogicalOperator,
key: Optional[KeyFn],
descending: bool,
):
super().__init__(
"Sort",
input_op,
)
self._key = key
self._descending = descending
125 changes: 125 additions & 0 deletions python/ray/data/_internal/planner/exchange/sort_task_spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from typing import Any, Callable, List, Tuple, TypeVar, Union

import numpy as np

from ray.data._internal.progress_bar import ProgressBar
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data._internal.planner.exchange.interfaces import ExchangeTaskSpec
from ray.data.block import Block, BlockAccessor, BlockExecStats, BlockMetadata
from ray.types import ObjectRef


T = TypeVar("T")

# Data can be sorted by value (None), a list of columns and
# ascending/descending orders (List), or a custom transform function
# (Callable).
SortKeyT = Union[None, List[Tuple[str, str]], Callable[[T], Any]]


class SortTaskSpec(ExchangeTaskSpec):
"""
The implementation for distributed sort tasks.
The algorithm is similar to [External Merge Sort]
(https://en.wikipedia.org/wiki/External_sorting).
Sorting is done in 3 steps: sampling, sorting individual blocks, and
merging sorted blocks.
Sampling (`sample_boundaries`): we get a number of sample items from each block,
sort them, and use them to compute boundaries that would partition all items into
approximately equal ranges.
Sorting (`map`): each block is sorted locally, then partitioned into smaller
blocks according to the boundaries. Each partitioned block is passed to a merge
task.
Merging (`reduce`): a merge task would receive a block from every worker that
consists of items in a certain range. It then merges the sorted blocks into one
sorted block and becomes part of the new, sorted block.
"""

def __init__(
self,
boundaries: List[T],
key: SortKeyT,
descending: bool,
):
super().__init__(
map_args=[boundaries, key, descending],
reduce_args=[key, descending],
)

@staticmethod
def map(
idx: int,
block: Block,
output_num_blocks: int,
boundaries: List[T],
key: SortKeyT,
descending: bool,
) -> List[Union[BlockMetadata, Block]]:
stats = BlockExecStats.builder()
out = BlockAccessor.for_block(block).sort_and_partition(
boundaries, key, descending
)
meta = BlockAccessor.for_block(block).get_metadata(
input_files=None, exec_stats=stats.build()
)
return out + [meta]

@staticmethod
def reduce(
key: SortKeyT,
descending: bool,
*mapper_outputs: List[Block],
partial_reduce: bool = False,
) -> Tuple[Block, BlockMetadata]:
return BlockAccessor.for_block(mapper_outputs[0]).merge_sorted_blocks(
mapper_outputs, key, descending
)

@staticmethod
def sample_boundaries(
blocks: List[ObjectRef[Block]], key: SortKeyT, num_reducers: int
) -> List[T]:
"""
Return (num_reducers - 1) items in ascending order from the blocks that
partition the domain into ranges with approximately equally many elements.
"""
# TODO(Clark): Support multiple boundary sampling keys.
if isinstance(key, list) and len(key) > 1:
raise ValueError("Multiple boundary sampling keys not supported.")

n_samples = int(num_reducers * 10 / len(blocks))

sample_block = cached_remote_fn(_sample_block)

sample_results = [
sample_block.remote(block, n_samples, key) for block in blocks
]
sample_bar = ProgressBar("Sort Sample", len(sample_results))
samples = sample_bar.fetch_until_complete(sample_results)
sample_bar.close()
del sample_results
samples = [s for s in samples if len(s) > 0]
# The dataset is empty
if len(samples) == 0:
return [None] * (num_reducers - 1)
builder = DelegatingBlockBuilder()
for sample in samples:
builder.add_block(sample)
samples = builder.build()
column = key[0][0] if isinstance(key, list) else None
sample_items = BlockAccessor.for_block(samples).to_numpy(column)
sample_items = np.sort(sample_items)
ret = [
np.quantile(sample_items, q, interpolation="nearest")
for q in np.linspace(0, 1, num_reducers)
]
return ret[1:]


def _sample_block(block: Block[T], n_samples: int, key: SortKeyT) -> Block[T]:
return BlockAccessor.for_block(block).sample(n_samples, key)
4 changes: 4 additions & 0 deletions python/ray/data/_internal/planner/plan_all_to_all_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
RandomShuffle,
RandomizeBlocks,
Repartition,
Sort,
)
from ray.data._internal.planner.random_shuffle import generate_random_shuffle_fn
from ray.data._internal.planner.randomize_blocks import generate_randomize_blocks_fn
from ray.data._internal.planner.repartition import generate_repartition_fn
from ray.data._internal.planner.sort import generate_sort_fn


def _plan_all_to_all_op(
Expand All @@ -26,6 +28,8 @@ def _plan_all_to_all_op(
fn = generate_random_shuffle_fn(op._seed, op._num_outputs)
elif isinstance(op, Repartition):
fn = generate_repartition_fn(op._num_outputs, op._shuffle)
elif isinstance(op, Sort):
fn = generate_sort_fn(op._key, op._descending)
else:
raise ValueError(f"Found unknown logical operator during planning: {op}")

Expand Down
65 changes: 65 additions & 0 deletions python/ray/data/_internal/planner/sort.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from functools import partial
from typing import List, Tuple

from ray.data._internal.execution.interfaces import (
AllToAllTransformFn,
RefBundle,
TaskContext,
)
from ray.data._internal.planner.exchange.push_based_shuffle_task_scheduler import (
PushBasedShuffleTaskScheduler,
)
from ray.data._internal.planner.exchange.pull_based_shuffle_task_scheduler import (
PullBasedShuffleTaskScheduler,
)
from ray.data._internal.planner.exchange.sort_task_spec import SortKeyT, SortTaskSpec
from ray.data._internal.stats import StatsDict
from ray.data.context import DatasetContext


def generate_sort_fn(
key: SortKeyT,
descending: bool,
) -> AllToAllTransformFn:
"""Generate function to sort blocks by the specified key column or key function."""
# TODO: validate key with block._validate_key_fn.

def fn(
key: SortKeyT,
descending: bool,
refs: List[RefBundle],
ctx: TaskContext,
) -> Tuple[List[RefBundle], StatsDict]:
blocks = []
for ref_bundle in refs:
for block, _ in ref_bundle.blocks:
blocks.append(block)
if len(blocks) == 0:
return (blocks, {})

if isinstance(key, str):
key = [(key, "descending" if descending else "ascending")]
if isinstance(key, list):
descending = key[0][1] == "descending"

num_mappers = len(blocks)
# Use same number of output partitions.
num_outputs = num_mappers

# Sample boundaries for sort key.
boundaries = SortTaskSpec.sample_boundaries(blocks, key, num_outputs)
if descending:
boundaries.reverse()
sort_spec = SortTaskSpec(boundaries=boundaries, key=key, descending=descending)

if DatasetContext.get_current().use_push_based_shuffle:
scheduler = PushBasedShuffleTaskScheduler(sort_spec)
else:
scheduler = PullBasedShuffleTaskScheduler(sort_spec)

return scheduler.execute(refs, num_outputs)

# NOTE: use partial function to pass parameters to avoid error like
# "UnboundLocalError: local variable ... referenced before assignment",
# because `key` and `descending` variables are reassigned in `fn()`.
return partial(fn, key, descending)
12 changes: 11 additions & 1 deletion python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
RandomShuffle,
RandomizeBlocks,
Repartition,
Sort,
)
from ray.data._internal.logical.optimizers import LogicalPlan
from ray.data._internal.logical.operators.map_operator import (
Expand Down Expand Up @@ -2006,7 +2007,16 @@ def sort(
"""

plan = self._plan.with_stage(SortStage(self, key, descending))
return Dataset(plan, self._epoch, self._lazy)

logical_plan = self._logical_plan
if logical_plan is not None:
op = Sort(
logical_plan.dag,
key=key,
descending=descending,
)
logical_plan = LogicalPlan(op)
return Dataset(plan, self._epoch, self._lazy, logical_plan)

def zip(self, other: "Dataset[U]") -> "Dataset[(T, U)]":
"""Zip this dataset with the elements of another.
Expand Down
45 changes: 45 additions & 0 deletions python/ray/data/tests/test_execution_optimizer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os
import pytest
import pandas as pd

import ray
from ray.data._internal.execution.operators.map_operator import MapOperator
Expand All @@ -10,6 +12,7 @@
RandomShuffle,
RandomizeBlocks,
Repartition,
Sort,
)
from ray.data._internal.logical.operators.read_operator import Read
from ray.data._internal.logical.operators.map_operator import (
Expand All @@ -21,6 +24,7 @@
from ray.data._internal.planner.planner import Planner
from ray.data.datasource.parquet_datasource import ParquetDatasource

from ray.data.tests.conftest import * # noqa
from ray.tests.conftest import * # noqa


Expand Down Expand Up @@ -516,6 +520,47 @@ def test_read_map_chain_operator_fusion_e2e(ray_start_regular_shared, enable_opt
assert name in ds.stats()


def test_sort_operator(ray_start_regular_shared, enable_optimizer):
planner = Planner()
read_op = Read(ParquetDatasource())
op = Sort(
read_op,
key="col1",
descending=False,
)
plan = LogicalPlan(op)
physical_op = planner.plan(plan).dag

assert op.name == "Sort"
assert isinstance(physical_op, AllToAllOperator)
assert len(physical_op.input_dependencies) == 1
assert isinstance(physical_op.input_dependencies[0], MapOperator)


def test_sort_e2e(
ray_start_regular_shared, enable_optimizer, use_push_based_shuffle, local_path
):
ds = ray.data.range(100, parallelism=4)
ds = ds.random_shuffle()
ds = ds.sort()
assert ds.take_all() == list(range(100))

df = pd.DataFrame({"one": list(range(100)), "two": ["a"] * 100})
ds = ray.data.from_pandas([df])
path = os.path.join(local_path, "test_parquet_dir")
os.mkdir(path)
ds.write_parquet(path)

ds = ray.data.read_parquet(path)
ds = ds.random_shuffle()
ds1 = ds.sort("one")
ds2 = ds.sort("one", descending=True)
r1 = ds1.select_columns(["one"]).take_all()
r2 = ds2.select_columns(["one"]).take_all()
assert [d["one"] for d in r1] == list(range(100))
assert [d["one"] for d in r2] == list(reversed(range(100)))


if __name__ == "__main__":
import sys

Expand Down

0 comments on commit 68a793a

Please sign in to comment.