diff --git a/python/ray/data/_internal/logical/operators/map_operator.py b/python/ray/data/_internal/logical/operators/map_operator.py index a1a1635acc38..42bfb3bb1010 100644 --- a/python/ray/data/_internal/logical/operators/map_operator.py +++ b/python/ray/data/_internal/logical/operators/map_operator.py @@ -1,13 +1,13 @@ import sys from typing import Any, Dict, Iterable, Optional, Union -from ray.data._internal.compute import BlockTransform from ray.data._internal.logical.interfaces import LogicalOperator from ray.data._internal.compute import ( UDF, ComputeStrategy, ) from ray.data.block import BatchUDF, RowUDF +from ray.data.context import DEFAULT_BATCH_SIZE if sys.version_info >= (3, 8): @@ -21,21 +21,17 @@ class AbstractMap(LogicalOperator): MapOperator. """ - # TODO: Replace `fn`, `fn_args`, `fn_kwargs`, `fn_constructor_args`, and - # `fn_constructor_kwargs` from this API, in favor of `block_fn_args` and - # `block_fn_kwargs`. Operators should only be concerned with `block_fn`. def __init__( self, name: str, input_op: LogicalOperator, - block_fn: BlockTransform, - compute: Optional[Union[str, ComputeStrategy]] = None, - target_block_size: Optional[int] = None, - fn: Optional[UDF] = None, + fn: UDF, fn_args: Optional[Iterable[Any]] = None, fn_kwargs: Optional[Dict[str, Any]] = None, fn_constructor_args: Optional[Iterable[Any]] = None, fn_constructor_kwargs: Optional[Dict[str, Any]] = None, + target_block_size: Optional[int] = None, + compute: Optional[Union[str, ComputeStrategy]] = None, ray_remote_args: Optional[Dict[str, Any]] = None, ): """ @@ -44,27 +40,26 @@ def __init__( inspecting the logical plan of a Dataset. input_op: The operator preceding this operator in the plan DAG. The outputs of `input_op` will be the inputs to this operator. - block_fn: The transform function to apply to each input block to produce - output blocks. - target_block_size: The target size for blocks outputted by this operator. - fn: User provided UDF to be called in `block_fn`. + fn: User-defined function to be called. fn_args: Arguments to `fn`. fn_kwargs: Keyword arguments to `fn`. fn_constructor_args: Arguments to provide to the initializor of `fn` if `fn` is a callable class. fn_constructor_kwargs: Keyword Arguments to provide to the initializor of `fn` if `fn` is a callable class. + target_block_size: The target size for blocks outputted by this operator. + compute: The compute strategy, either ``"tasks"`` (default) to use Ray + tasks, or ``"actors"`` to use an autoscaling actor pool. ray_remote_args: Args to provide to ray.remote. """ super().__init__(name, [input_op]) - self._block_fn = block_fn - self._compute = compute or "tasks" - self._target_block_size = target_block_size self._fn = fn self._fn_args = fn_args self._fn_kwargs = fn_kwargs self._fn_constructor_args = fn_constructor_args self._fn_constructor_kwargs = fn_constructor_kwargs + self._target_block_size = target_block_size + self._compute = compute or "tasks" self._ray_remote_args = ray_remote_args or {} @@ -74,34 +69,34 @@ class MapBatches(AbstractMap): def __init__( self, input_op: LogicalOperator, - block_fn: BlockTransform, fn: BatchUDF, - batch_size: Optional[Union[int, Literal["default"]]] = "default", - compute: Optional[Union[str, ComputeStrategy]] = None, + batch_size: Optional[int] = DEFAULT_BATCH_SIZE, batch_format: Literal["default", "pandas", "pyarrow", "numpy"] = "default", + prefetch_batches: int = 0, zero_copy_batch: bool = False, - target_block_size: Optional[int] = None, fn_args: Optional[Iterable[Any]] = None, fn_kwargs: Optional[Dict[str, Any]] = None, fn_constructor_args: Optional[Iterable[Any]] = None, fn_constructor_kwargs: Optional[Dict[str, Any]] = None, + target_block_size: Optional[int] = None, + compute: Optional[Union[str, ComputeStrategy]] = None, ray_remote_args: Optional[Dict[str, Any]] = None, ): super().__init__( "MapBatches", input_op, - block_fn, - compute=compute, - target_block_size=target_block_size, - fn=fn, + fn, fn_args=fn_args, fn_kwargs=fn_kwargs, fn_constructor_args=fn_constructor_args, fn_constructor_kwargs=fn_constructor_kwargs, + target_block_size=target_block_size, + compute=compute, ray_remote_args=ray_remote_args, ) self._batch_size = batch_size self._batch_format = batch_format + self._prefetch_batches = prefetch_batches self._zero_copy_batch = zero_copy_batch @@ -111,7 +106,6 @@ class MapRows(AbstractMap): def __init__( self, input_op: LogicalOperator, - block_fn: BlockTransform, fn: RowUDF, compute: Optional[Union[str, ComputeStrategy]] = None, ray_remote_args: Optional[Dict[str, Any]] = None, @@ -119,9 +113,8 @@ def __init__( super().__init__( "MapRows", input_op, - block_fn, + fn, compute=compute, - fn=fn, ray_remote_args=ray_remote_args, ) @@ -132,7 +125,6 @@ class Filter(AbstractMap): def __init__( self, input_op: LogicalOperator, - block_fn: BlockTransform, fn: RowUDF, compute: Optional[Union[str, ComputeStrategy]] = None, ray_remote_args: Optional[Dict[str, Any]] = None, @@ -140,9 +132,8 @@ def __init__( super().__init__( "Filter", input_op, - block_fn, + fn, compute=compute, - fn=fn, ray_remote_args=ray_remote_args, ) @@ -153,7 +144,6 @@ class FlatMap(AbstractMap): def __init__( self, input_op: LogicalOperator, - block_fn: BlockTransform, fn: RowUDF, compute: Optional[Union[str, ComputeStrategy]] = None, ray_remote_args: Optional[Dict[str, Any]] = None, @@ -161,8 +151,7 @@ def __init__( super().__init__( "FlatMap", input_op, - block_fn, + fn, compute=compute, - fn=fn, ray_remote_args=ray_remote_args, ) diff --git a/python/ray/data/_internal/planner/filter.py b/python/ray/data/_internal/planner/filter.py new file mode 100644 index 000000000000..c64beef202d6 --- /dev/null +++ b/python/ray/data/_internal/planner/filter.py @@ -0,0 +1,23 @@ +from typing import Callable, Iterator + +from ray.data.block import Block, BlockAccessor, RowUDF +from ray.data.context import DatasetContext + + +def generate_filter_fn() -> Callable[[Iterator[Block]], Iterator[Block]]: + """Generate function to apply the UDF to each record of blocks, + and filter out records that do not satisfy the given predicate. + """ + context = DatasetContext.get_current() + + def fn(blocks: Iterator[Block], row_fn: RowUDF) -> Iterator[Block]: + DatasetContext._set_current(context) + for block in blocks: + block = BlockAccessor.for_block(block) + builder = block.builder() + for row in block.iter_rows(): + if row_fn(row): + builder.add(row) + return [builder.build()] + + return fn diff --git a/python/ray/data/_internal/planner/flat_map.py b/python/ray/data/_internal/planner/flat_map.py new file mode 100644 index 000000000000..4483b9dd6e2e --- /dev/null +++ b/python/ray/data/_internal/planner/flat_map.py @@ -0,0 +1,28 @@ +from typing import Callable, Iterator + +from ray.data._internal.output_buffer import BlockOutputBuffer +from ray.data.block import Block, BlockAccessor, RowUDF +from ray.data.context import DatasetContext + + +def generate_flat_map_fn() -> Callable[[Iterator[Block]], Iterator[Block]]: + """Generate function to apply the UDF to each record of blocks, + and then flatten results. + """ + context = DatasetContext.get_current() + + def fn(blocks: Iterator[Block], row_fn: RowUDF) -> Iterator[Block]: + DatasetContext._set_current(context) + for block in blocks: + output_buffer = BlockOutputBuffer(None, context.target_max_block_size) + block = BlockAccessor.for_block(block) + for row in block.iter_rows(): + for r2 in row_fn(row): + output_buffer.add(r2) + if output_buffer.has_next(): + yield output_buffer.next() + output_buffer.finalize() + if output_buffer.has_next(): + yield output_buffer.next() + + return fn diff --git a/python/ray/data/_internal/planner/map_batches.py b/python/ray/data/_internal/planner/map_batches.py new file mode 100644 index 000000000000..c273334acb62 --- /dev/null +++ b/python/ray/data/_internal/planner/map_batches.py @@ -0,0 +1,104 @@ +import sys +from typing import Callable, Iterator, Optional + +from ray.data._internal.block_batching import batch_blocks +from ray.data._internal.output_buffer import BlockOutputBuffer +from ray.data.block import BatchUDF, Block, DataBatch +from ray.data.context import DEFAULT_BATCH_SIZE, DatasetContext + + +if sys.version_info >= (3, 8): + from typing import Literal +else: + from typing_extensions import Literal + + +def generate_map_batches_fn( + batch_size: Optional[int] = DEFAULT_BATCH_SIZE, + batch_format: Literal["default", "pandas", "pyarrow", "numpy"] = "default", + prefetch_batches: int = 0, + zero_copy_batch: bool = False, +) -> Callable[[Iterator[Block]], Iterator[Block]]: + """Generate function to apply the batch UDF to blocks.""" + import numpy as np + import pandas as pd + import pyarrow as pa + + context = DatasetContext.get_current() + + def fn( + blocks: Iterator[Block], batch_fn: BatchUDF, *fn_args, **fn_kwargs + ) -> Iterator[Block]: + DatasetContext._set_current(context) + output_buffer = BlockOutputBuffer(None, context.target_max_block_size) + + def validate_batch(batch: Block) -> None: + if not isinstance( + batch, (list, pa.Table, np.ndarray, dict, pd.core.frame.DataFrame) + ): + raise ValueError( + "The `fn` you passed to `map_batches` returned a value of type " + f"{type(batch)}. This isn't allowed -- `map_batches` expects " + "`fn` to return a `pandas.DataFrame`, `pyarrow.Table`, " + "`numpy.ndarray`, `list`, or `dict[str, numpy.ndarray]`." + ) + + if isinstance(batch, dict): + for key, value in batch.items(): + if not isinstance(value, np.ndarray): + raise ValueError( + "The `fn` you passed to `map_batches` returned a " + f"`dict`. `map_batches` expects all `dict` values " + f"to be of type `numpy.ndarray`, but the value " + f"corresponding to key {key!r} is of type " + f"{type(value)}. To fix this issue, convert " + f"the {type(value)} to a `numpy.ndarray`." + ) + + def process_next_batch(batch: DataBatch) -> Iterator[Block]: + # Apply UDF. + try: + batch = batch_fn(batch, *fn_args, **fn_kwargs) + except ValueError as e: + read_only_msgs = [ + "assignment destination is read-only", + "buffer source array is read-only", + ] + err_msg = str(e) + if any(msg in err_msg for msg in read_only_msgs): + raise ValueError( + f"Batch mapper function {fn.__name__} tried to mutate a " + "zero-copy read-only batch. To be able to mutate the " + "batch, pass zero_copy_batch=False to map_batches(); " + "this will create a writable copy of the batch before " + "giving it to fn. To elide this copy, modify your mapper " + "function so it doesn't try to mutate its input." + ) from e + else: + raise e from None + + validate_batch(batch) + # Add output batch to output buffer. + output_buffer.add_batch(batch) + if output_buffer.has_next(): + yield output_buffer.next() + + # Ensure that zero-copy batch views are copied so mutating UDFs don't error. + formatted_batch_iter = batch_blocks( + blocks=blocks, + stats=None, + batch_size=batch_size, + batch_format=batch_format, + ensure_copy=not zero_copy_batch and batch_size is not None, + prefetch_batches=prefetch_batches, + ) + + for batch in formatted_batch_iter: + yield from process_next_batch(batch) + + # Yield remainder block from output buffer. + output_buffer.finalize() + if output_buffer.has_next(): + yield output_buffer.next() + + return fn diff --git a/python/ray/data/_internal/planner/map_rows.py b/python/ray/data/_internal/planner/map_rows.py new file mode 100644 index 000000000000..0c8b6c01f57c --- /dev/null +++ b/python/ray/data/_internal/planner/map_rows.py @@ -0,0 +1,25 @@ +from typing import Callable, Iterator + +from ray.data._internal.output_buffer import BlockOutputBuffer +from ray.data.block import Block, BlockAccessor, RowUDF +from ray.data.context import DatasetContext + + +def generate_map_rows_fn() -> Callable[[Iterator[Block]], Iterator[Block]]: + """Generate function to apply the UDF to each record of blocks.""" + context = DatasetContext.get_current() + + def fn(blocks: Iterator[Block], row_fn: RowUDF) -> Iterator[Block]: + DatasetContext._set_current(context) + for block in blocks: + output_buffer = BlockOutputBuffer(None, context.target_max_block_size) + block = BlockAccessor.for_block(block) + for row in block.iter_rows(): + output_buffer.add(row_fn(row)) + if output_buffer.has_next(): + yield output_buffer.next() + output_buffer.finalize() + if output_buffer.has_next(): + yield output_buffer.next() + + return fn diff --git a/python/ray/data/_internal/planner/plan_map_op.py b/python/ray/data/_internal/planner/plan_map_op.py index 2c80c0d01514..c2157879a690 100644 --- a/python/ray/data/_internal/planner/plan_map_op.py +++ b/python/ray/data/_internal/planner/plan_map_op.py @@ -4,7 +4,17 @@ from ray.data._internal.compute import ActorPoolStrategy, TaskPoolStrategy, get_compute from ray.data._internal.execution.interfaces import PhysicalOperator from ray.data._internal.execution.operators.map_operator import MapOperator -from ray.data._internal.logical.operators.map_operator import AbstractMap +from ray.data._internal.logical.operators.map_operator import ( + AbstractMap, + Filter, + FlatMap, + MapBatches, + MapRows, +) +from ray.data._internal.planner.filter import generate_filter_fn +from ray.data._internal.planner.flat_map import generate_flat_map_fn +from ray.data._internal.planner.map_batches import generate_map_batches_fn +from ray.data._internal.planner.map_rows import generate_map_rows_fn from ray.data.block import Block, CallableClass @@ -14,8 +24,23 @@ def _plan_map_op(op: AbstractMap, input_physical_dag: PhysicalOperator) -> MapOp Note this method only converts the given `op`, but not its input dependencies. See Planner.plan() for more details. """ + if isinstance(op, MapBatches): + transform_fn = generate_map_batches_fn( + batch_size=op._batch_size, + batch_format=op._batch_format, + prefetch_batches=op._prefetch_batches, + zero_copy_batch=op._zero_copy_batch, + ) + elif isinstance(op, MapRows): + transform_fn = generate_map_rows_fn() + elif isinstance(op, FlatMap): + transform_fn = generate_flat_map_fn() + elif isinstance(op, Filter): + transform_fn = generate_filter_fn() + else: + raise ValueError(f"Found unknown logical operator during planning: {op}") + compute = get_compute(op._compute) - block_fn = op._block_fn if isinstance(op._fn, CallableClass): if isinstance(compute, TaskPoolStrategy): @@ -52,7 +77,7 @@ def fn(item: Any) -> Any: fn_kwargs = op._fn_kwargs or {} def do_map(blocks: Iterator[Block]) -> Iterator[Block]: - yield from block_fn(blocks, *fn_args, **fn_kwargs) + yield from transform_fn(blocks, *fn_args, **fn_kwargs) return MapOperator.create( do_map, diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index cdd60b40f7c0..84fca0f5b90d 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -38,8 +38,12 @@ MapRows, MapBatches, ) +from ray.data._internal.planner.filter import generate_filter_fn +from ray.data._internal.planner.flat_map import generate_flat_map_fn +from ray.data._internal.planner.map_batches import generate_map_batches_fn +from ray.data._internal.planner.map_rows import generate_map_rows_fn from ray.data.dataset_iterator import DatasetIterator -from ray.data._internal.block_batching import batch_block_refs, batch_blocks +from ray.data._internal.block_batching import batch_block_refs from ray.data._internal.block_list import BlockList from ray.data._internal.bulk_dataset_iterator import BulkDatasetIterator from ray.data._internal.compute import ( @@ -51,7 +55,6 @@ from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder from ray.data._internal.equalize import _equalize from ray.data._internal.lazy_block_list import LazyBlockList -from ray.data._internal.output_buffer import BlockOutputBuffer from ray.data._internal.util import ( _estimate_available_parallelism, _is_local_scheme, @@ -61,7 +64,6 @@ from ray.data._internal.plan import ( ExecutionPlan, OneToOneStage, - _adapt_for_multiple_blocks, ) from ray.data._internal.stage_impl import ( RandomizeBlocksStage, @@ -316,25 +318,13 @@ def map( ) self._warn_slow() - context = DatasetContext.get_current() - - @_adapt_for_multiple_blocks - def transform(block: Block, fn: RowUDF[T, U]) -> Iterable[Block]: - DatasetContext._set_current(context) - output_buffer = BlockOutputBuffer(None, context.target_max_block_size) - block = BlockAccessor.for_block(block) - for row in block.iter_rows(): - output_buffer.add(fn(row)) - if output_buffer.has_next(): - yield output_buffer.next() - output_buffer.finalize() - if output_buffer.has_next(): - yield output_buffer.next() + + transform_fn = generate_map_rows_fn() plan = self._plan.with_stage( OneToOneStage( "map", - transform, + transform_fn, compute, ray_remote_args, fn=fn, @@ -345,7 +335,6 @@ def transform(block: Block, fn: RowUDF[T, U]) -> Iterable[Block]: if logical_plan is not None: map_op = MapRows( logical_plan.dag, - transform, fn, compute=compute, ray_remote_args=ray_remote_args, @@ -540,8 +529,6 @@ def map_batches( :meth:`~Dataset.default_batch_format` Call this function to determine the default batch type. """ # noqa: E501 - import pandas as pd - import pyarrow as pa if batch_format == "native": warnings.warn( @@ -592,85 +579,12 @@ def map_batches( f"{fn}" ) - context = DatasetContext.get_current() - - def transform( - blocks: Iterable[Block], - batch_fn: BatchUDF, - *fn_args, - **fn_kwargs, - ) -> Iterable[Block]: - DatasetContext._set_current(context) - output_buffer = BlockOutputBuffer(None, context.target_max_block_size) - - def validate_batch(batch: Block) -> None: - if not isinstance( - batch, (list, pa.Table, np.ndarray, dict, pd.core.frame.DataFrame) - ): - raise ValueError( - "The `fn` you passed to `map_batches` returned a value of type " - f"{type(batch)}. This isn't allowed -- `map_batches` expects " - "`fn` to return a `pandas.DataFrame`, `pyarrow.Table`, " - "`numpy.ndarray`, `list`, or `dict[str, numpy.ndarray]`." - ) - - if isinstance(batch, dict): - for key, value in batch.items(): - if not isinstance(value, np.ndarray): - raise ValueError( - "The `fn` you passed to `map_batches` returned a " - f"`dict`. `map_batches` expects all `dict` values " - f"to be of type `numpy.ndarray`, but the value " - f"corresponding to key {key!r} is of type " - f"{type(value)}. To fix this issue, convert " - f"the {type(value)} to a `numpy.ndarray`." - ) - - def process_next_batch(batch: DataBatch) -> Iterator[Block]: - # Apply UDF. - try: - batch = batch_fn(batch, *fn_args, **fn_kwargs) - except ValueError as e: - read_only_msgs = [ - "assignment destination is read-only", - "buffer source array is read-only", - ] - err_msg = str(e) - if any(msg in err_msg for msg in read_only_msgs): - raise ValueError( - f"Batch mapper function {fn.__name__} tried to mutate a " - "zero-copy read-only batch. To be able to mutate the " - "batch, pass zero_copy_batch=False to map_batches(); " - "this will create a writable copy of the batch before " - "giving it to fn. To elide this copy, modify your mapper " - "function so it doesn't try to mutate its input." - ) from e - else: - raise e from None - - validate_batch(batch) - # Add output batch to output buffer. - output_buffer.add_batch(batch) - if output_buffer.has_next(): - yield output_buffer.next() - - # Ensure that zero-copy batch views are copied so mutating UDFs don't error. - formatted_batch_iter = batch_blocks( - blocks=blocks, - stats=None, - batch_size=batch_size, - batch_format=batch_format, - ensure_copy=not zero_copy_batch and batch_size is not None, - prefetch_batches=prefetch_batches, - ) - - for batch in formatted_batch_iter: - yield from process_next_batch(batch) - - # Yield remainder block from output buffer. - output_buffer.finalize() - if output_buffer.has_next(): - yield output_buffer.next() + transform_fn = generate_map_batches_fn( + batch_size=batch_size, + batch_format=batch_format, + prefetch_batches=prefetch_batches, + zero_copy_batch=zero_copy_batch, + ) # breakpoint() if hasattr(fn, "__self__") and isinstance( @@ -682,7 +596,7 @@ def process_next_batch(batch: DataBatch) -> Iterator[Block]: stage = OneToOneStage( stage_name, - transform, + transform_fn, compute, ray_remote_args, # TODO(Clark): Add a strict cap here. @@ -699,10 +613,8 @@ def process_next_batch(batch: DataBatch) -> Iterator[Block]: if logical_plan is not None: map_batches_op = MapBatches( logical_plan.dag, - transform, fn, batch_size=batch_size, - compute=compute, batch_format=batch_format, zero_copy_batch=zero_copy_batch, target_block_size=target_block_size, @@ -710,6 +622,7 @@ def process_next_batch(batch: DataBatch) -> Iterator[Block]: fn_kwargs=fn_kwargs, fn_constructor_args=fn_constructor_args, fn_constructor_kwargs=fn_constructor_kwargs, + compute=compute, ray_remote_args=ray_remote_args, ) logical_plan = LogicalPlan(map_batches_op) @@ -894,31 +807,17 @@ def flat_map( ) self._warn_slow() - context = DatasetContext.get_current() - - @_adapt_for_multiple_blocks - def transform(block: Block, fn: RowUDF[T, U]) -> Iterable[Block]: - DatasetContext._set_current(context) - output_buffer = BlockOutputBuffer(None, context.target_max_block_size) - block = BlockAccessor.for_block(block) - for row in block.iter_rows(): - for r2 in fn(row): - output_buffer.add(r2) - if output_buffer.has_next(): - yield output_buffer.next() - output_buffer.finalize() - if output_buffer.has_next(): - yield output_buffer.next() + + transform_fn = generate_flat_map_fn() plan = self._plan.with_stage( - OneToOneStage("flat_map", transform, compute, ray_remote_args, fn=fn) + OneToOneStage("flat_map", transform_fn, compute, ray_remote_args, fn=fn) ) logical_plan = self._logical_plan if logical_plan is not None: op = FlatMap( input_op=logical_plan.dag, - block_fn=transform, fn=fn, compute=compute, ray_remote_args=ray_remote_args, @@ -974,27 +873,17 @@ def filter( ) self._warn_slow() - context = DatasetContext.get_current() - - @_adapt_for_multiple_blocks - def transform(block: Block, fn: RowUDF[T, U]) -> Iterable[Block]: - DatasetContext._set_current(context) - block = BlockAccessor.for_block(block) - builder = block.builder() - for row in block.iter_rows(): - if fn(row): - builder.add(row) - return [builder.build()] + + transform_fn = generate_filter_fn() plan = self._plan.with_stage( - OneToOneStage("filter", transform, compute, ray_remote_args, fn=fn) + OneToOneStage("filter", transform_fn, compute, ray_remote_args, fn=fn) ) logical_plan = self._logical_plan if logical_plan is not None: op = Filter( input_op=logical_plan.dag, - block_fn=transform, fn=fn, compute=compute, ray_remote_args=ray_remote_args, diff --git a/python/ray/data/tests/test_execution_optimizer.py b/python/ray/data/tests/test_execution_optimizer.py index 6814b796ac36..c6468f4ea164 100644 --- a/python/ray/data/tests/test_execution_optimizer.py +++ b/python/ray/data/tests/test_execution_optimizer.py @@ -56,7 +56,6 @@ def test_map_rows_operator(ray_start_cluster_enabled, enable_optimizer): read_op = Read(ParquetDatasource()) op = MapRows( read_op, - lambda it: (x for x in it), lambda x: x, ) physical_op = planner.plan(op) @@ -78,7 +77,6 @@ def test_filter_operator(ray_start_cluster_enabled, enable_optimizer): read_op = Read(ParquetDatasource()) op = Filter( read_op, - lambda it: (x for x in it), lambda x: x, ) physical_op = planner.plan(op) @@ -100,7 +98,6 @@ def test_flat_map(ray_start_cluster_enabled, enable_optimizer): read_op = Read(ParquetDatasource()) op = FlatMap( read_op, - lambda it: ([x, x] for x in it), lambda x: x, ) physical_op = planner.plan(op)