Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] Refactor map-like functions into planner package #32021

Merged
merged 1 commit into from
Jan 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 21 additions & 32 deletions python/ray/data/_internal/logical/operators/map_operator.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import sys
from typing import Any, Dict, Iterable, Optional, Union

from ray.data._internal.compute import BlockTransform
from ray.data._internal.logical.interfaces import LogicalOperator
from ray.data._internal.compute import (
UDF,
ComputeStrategy,
)
from ray.data.block import BatchUDF, RowUDF
from ray.data.context import DEFAULT_BATCH_SIZE


if sys.version_info >= (3, 8):
Expand All @@ -21,21 +21,17 @@ class AbstractMap(LogicalOperator):
MapOperator.
"""

# TODO: Replace `fn`, `fn_args`, `fn_kwargs`, `fn_constructor_args`, and
# `fn_constructor_kwargs` from this API, in favor of `block_fn_args` and
# `block_fn_kwargs`. Operators should only be concerned with `block_fn`.
def __init__(
self,
name: str,
input_op: LogicalOperator,
block_fn: BlockTransform,
compute: Optional[Union[str, ComputeStrategy]] = None,
target_block_size: Optional[int] = None,
fn: Optional[UDF] = None,
fn: UDF,
fn_args: Optional[Iterable[Any]] = None,
fn_kwargs: Optional[Dict[str, Any]] = None,
fn_constructor_args: Optional[Iterable[Any]] = None,
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
target_block_size: Optional[int] = None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just reordering arguments, to make all fn-related arguments to be put together, and look more coherent.

compute: Optional[Union[str, ComputeStrategy]] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
):
"""
Expand All @@ -44,27 +40,26 @@ def __init__(
inspecting the logical plan of a Dataset.
input_op: The operator preceding this operator in the plan DAG. The outputs
of `input_op` will be the inputs to this operator.
block_fn: The transform function to apply to each input block to produce
output blocks.
target_block_size: The target size for blocks outputted by this operator.
fn: User provided UDF to be called in `block_fn`.
fn: User-defined function to be called.
fn_args: Arguments to `fn`.
fn_kwargs: Keyword arguments to `fn`.
fn_constructor_args: Arguments to provide to the initializor of `fn` if
`fn` is a callable class.
fn_constructor_kwargs: Keyword Arguments to provide to the initializor of
`fn` if `fn` is a callable class.
target_block_size: The target size for blocks outputted by this operator.
compute: The compute strategy, either ``"tasks"`` (default) to use Ray
tasks, or ``"actors"`` to use an autoscaling actor pool.
ray_remote_args: Args to provide to ray.remote.
"""
super().__init__(name, [input_op])
self._block_fn = block_fn
self._compute = compute or "tasks"
self._target_block_size = target_block_size
self._fn = fn
self._fn_args = fn_args
self._fn_kwargs = fn_kwargs
self._fn_constructor_args = fn_constructor_args
self._fn_constructor_kwargs = fn_constructor_kwargs
self._target_block_size = target_block_size
self._compute = compute or "tasks"
self._ray_remote_args = ray_remote_args or {}


Expand All @@ -74,34 +69,34 @@ class MapBatches(AbstractMap):
def __init__(
self,
input_op: LogicalOperator,
block_fn: BlockTransform,
fn: BatchUDF,
batch_size: Optional[Union[int, Literal["default"]]] = "default",
compute: Optional[Union[str, ComputeStrategy]] = None,
batch_size: Optional[int] = DEFAULT_BATCH_SIZE,
batch_format: Literal["default", "pandas", "pyarrow", "numpy"] = "default",
prefetch_batches: int = 0,
zero_copy_batch: bool = False,
target_block_size: Optional[int] = None,
fn_args: Optional[Iterable[Any]] = None,
fn_kwargs: Optional[Dict[str, Any]] = None,
fn_constructor_args: Optional[Iterable[Any]] = None,
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
target_block_size: Optional[int] = None,
compute: Optional[Union[str, ComputeStrategy]] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
):
super().__init__(
"MapBatches",
input_op,
block_fn,
compute=compute,
target_block_size=target_block_size,
fn=fn,
fn,
fn_args=fn_args,
fn_kwargs=fn_kwargs,
fn_constructor_args=fn_constructor_args,
fn_constructor_kwargs=fn_constructor_kwargs,
target_block_size=target_block_size,
compute=compute,
ray_remote_args=ray_remote_args,
)
self._batch_size = batch_size
self._batch_format = batch_format
self._prefetch_batches = prefetch_batches
self._zero_copy_batch = zero_copy_batch


Expand All @@ -111,17 +106,15 @@ class MapRows(AbstractMap):
def __init__(
self,
input_op: LogicalOperator,
block_fn: BlockTransform,
fn: RowUDF,
compute: Optional[Union[str, ComputeStrategy]] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
):
super().__init__(
"MapRows",
input_op,
block_fn,
fn,
compute=compute,
fn=fn,
ray_remote_args=ray_remote_args,
)

Expand All @@ -132,17 +125,15 @@ class Filter(AbstractMap):
def __init__(
self,
input_op: LogicalOperator,
block_fn: BlockTransform,
fn: RowUDF,
compute: Optional[Union[str, ComputeStrategy]] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
):
super().__init__(
"Filter",
input_op,
block_fn,
fn,
compute=compute,
fn=fn,
ray_remote_args=ray_remote_args,
)

Expand All @@ -153,16 +144,14 @@ class FlatMap(AbstractMap):
def __init__(
self,
input_op: LogicalOperator,
block_fn: BlockTransform,
fn: RowUDF,
compute: Optional[Union[str, ComputeStrategy]] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
):
super().__init__(
"FlatMap",
input_op,
block_fn,
fn,
compute=compute,
fn=fn,
ray_remote_args=ray_remote_args,
)
23 changes: 23 additions & 0 deletions python/ray/data/_internal/planner/filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from typing import Callable, Iterator

from ray.data.block import Block, BlockAccessor, RowUDF
from ray.data.context import DatasetContext


def generate_filter_fn() -> Callable[[Iterator[Block]], Iterator[Block]]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied from Dataset.filter()

"""Generate function to apply the UDF to each record of blocks,
and filter out records that do not satisfy the given predicate.
"""
context = DatasetContext.get_current()

def fn(blocks: Iterator[Block], row_fn: RowUDF) -> Iterator[Block]:
DatasetContext._set_current(context)
for block in blocks:
block = BlockAccessor.for_block(block)
builder = block.builder()
for row in block.iter_rows():
if row_fn(row):
builder.add(row)
return [builder.build()]

return fn
28 changes: 28 additions & 0 deletions python/ray/data/_internal/planner/flat_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from typing import Callable, Iterator

from ray.data._internal.output_buffer import BlockOutputBuffer
from ray.data.block import Block, BlockAccessor, RowUDF
from ray.data.context import DatasetContext


def generate_flat_map_fn() -> Callable[[Iterator[Block]], Iterator[Block]]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied from Dataset.flat_map()

"""Generate function to apply the UDF to each record of blocks,
and then flatten results.
"""
context = DatasetContext.get_current()

def fn(blocks: Iterator[Block], row_fn: RowUDF) -> Iterator[Block]:
DatasetContext._set_current(context)
for block in blocks:
output_buffer = BlockOutputBuffer(None, context.target_max_block_size)
block = BlockAccessor.for_block(block)
for row in block.iter_rows():
for r2 in row_fn(row):
output_buffer.add(r2)
if output_buffer.has_next():
yield output_buffer.next()
output_buffer.finalize()
if output_buffer.has_next():
yield output_buffer.next()

return fn
104 changes: 104 additions & 0 deletions python/ray/data/_internal/planner/map_batches.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import sys
from typing import Callable, Iterator, Optional

from ray.data._internal.block_batching import batch_blocks
from ray.data._internal.output_buffer import BlockOutputBuffer
from ray.data.block import BatchUDF, Block, DataBatch
from ray.data.context import DEFAULT_BATCH_SIZE, DatasetContext


if sys.version_info >= (3, 8):
from typing import Literal
else:
from typing_extensions import Literal


def generate_map_batches_fn(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied from Dataset.map_batches()

batch_size: Optional[int] = DEFAULT_BATCH_SIZE,
batch_format: Literal["default", "pandas", "pyarrow", "numpy"] = "default",
prefetch_batches: int = 0,
zero_copy_batch: bool = False,
) -> Callable[[Iterator[Block]], Iterator[Block]]:
"""Generate function to apply the batch UDF to blocks."""
import numpy as np
import pandas as pd
import pyarrow as pa

context = DatasetContext.get_current()

def fn(
blocks: Iterator[Block], batch_fn: BatchUDF, *fn_args, **fn_kwargs
) -> Iterator[Block]:
DatasetContext._set_current(context)
output_buffer = BlockOutputBuffer(None, context.target_max_block_size)

def validate_batch(batch: Block) -> None:
if not isinstance(
batch, (list, pa.Table, np.ndarray, dict, pd.core.frame.DataFrame)
):
raise ValueError(
"The `fn` you passed to `map_batches` returned a value of type "
f"{type(batch)}. This isn't allowed -- `map_batches` expects "
"`fn` to return a `pandas.DataFrame`, `pyarrow.Table`, "
"`numpy.ndarray`, `list`, or `dict[str, numpy.ndarray]`."
)

if isinstance(batch, dict):
for key, value in batch.items():
if not isinstance(value, np.ndarray):
raise ValueError(
"The `fn` you passed to `map_batches` returned a "
f"`dict`. `map_batches` expects all `dict` values "
f"to be of type `numpy.ndarray`, but the value "
f"corresponding to key {key!r} is of type "
f"{type(value)}. To fix this issue, convert "
f"the {type(value)} to a `numpy.ndarray`."
)

def process_next_batch(batch: DataBatch) -> Iterator[Block]:
# Apply UDF.
try:
batch = batch_fn(batch, *fn_args, **fn_kwargs)
except ValueError as e:
read_only_msgs = [
"assignment destination is read-only",
"buffer source array is read-only",
]
err_msg = str(e)
if any(msg in err_msg for msg in read_only_msgs):
raise ValueError(
f"Batch mapper function {fn.__name__} tried to mutate a "
"zero-copy read-only batch. To be able to mutate the "
"batch, pass zero_copy_batch=False to map_batches(); "
"this will create a writable copy of the batch before "
"giving it to fn. To elide this copy, modify your mapper "
"function so it doesn't try to mutate its input."
) from e
else:
raise e from None

validate_batch(batch)
# Add output batch to output buffer.
output_buffer.add_batch(batch)
if output_buffer.has_next():
yield output_buffer.next()

# Ensure that zero-copy batch views are copied so mutating UDFs don't error.
formatted_batch_iter = batch_blocks(
blocks=blocks,
stats=None,
batch_size=batch_size,
batch_format=batch_format,
ensure_copy=not zero_copy_batch and batch_size is not None,
prefetch_batches=prefetch_batches,
)

for batch in formatted_batch_iter:
yield from process_next_batch(batch)

# Yield remainder block from output buffer.
output_buffer.finalize()
if output_buffer.has_next():
yield output_buffer.next()

return fn
25 changes: 25 additions & 0 deletions python/ray/data/_internal/planner/map_rows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from typing import Callable, Iterator

from ray.data._internal.output_buffer import BlockOutputBuffer
from ray.data.block import Block, BlockAccessor, RowUDF
from ray.data.context import DatasetContext


def generate_map_rows_fn() -> Callable[[Iterator[Block]], Iterator[Block]]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied from Dataset.map()

"""Generate function to apply the UDF to each record of blocks."""
context = DatasetContext.get_current()

def fn(blocks: Iterator[Block], row_fn: RowUDF) -> Iterator[Block]:
DatasetContext._set_current(context)
for block in blocks:
output_buffer = BlockOutputBuffer(None, context.target_max_block_size)
block = BlockAccessor.for_block(block)
for row in block.iter_rows():
output_buffer.add(row_fn(row))
if output_buffer.has_next():
yield output_buffer.next()
output_buffer.finalize()
if output_buffer.has_next():
yield output_buffer.next()

return fn
Loading