Skip to content

Commit e909850

Browse files
srinathk10400Ping
authored andcommitted
[Data] Add disable Block Shaping option to BlockOutputBuffer (ray-project#58757)
## Description In addition to Block shaping by Block Size and Num Rows, add an option to skip Block Shaping altogether in BlockOutputBuffer. Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
1 parent 0c20400 commit e909850

File tree

3 files changed

+163
-48
lines changed

3 files changed

+163
-48
lines changed

python/ray/data/_internal/execution/operators/map_transformer.py

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,6 @@ def __call__(
101101
results = self._apply_transform(ctx, batches)
102102
yield from self._post_process(results)
103103

104-
@abstractmethod
105-
def _can_skip_block_sizing(self):
106-
pass
107-
108104
@property
109105
def output_block_size_option(self):
110106
return self._output_block_size_option
@@ -310,9 +306,6 @@ def _apply_transform(
310306
def _post_process(self, results: Iterable[MapTransformFnData]) -> Iterable[Block]:
311307
return self._shape_blocks(results)
312308

313-
def _can_skip_block_sizing(self):
314-
return False
315-
316309
def __repr__(self) -> str:
317310
return f"RowMapTransformFn({self._row_fn})"
318311

@@ -363,12 +356,6 @@ def _apply_transform(
363356
def _post_process(self, results: Iterable[MapTransformFnData]) -> Iterable[Block]:
364357
return self._shape_blocks(results)
365358

366-
def _can_skip_block_sizing(self):
367-
return self._output_block_size_option is None and self._batch_format in (
368-
BatchFormat.ARROW,
369-
BatchFormat.PANDAS,
370-
)
371-
372359
def __repr__(self) -> str:
373360
return f"BatchMapTransformFn({self._batch_fn=}, {self._batch_format=}, {self._batch_size=}, {self._zero_copy_batch=})"
374361

@@ -419,9 +406,6 @@ def _post_process(self, results: Iterable[MapTransformFnData]) -> Iterable[Block
419406

420407
return self._shape_blocks(results)
421408

422-
def _can_skip_block_sizing(self):
423-
return self._output_block_size_option is None
424-
425409
def __repr__(self) -> str:
426410
return (
427411
f"BlockMapTransformFn({self._block_fn=}, {self._output_block_size_option=})"

python/ray/data/_internal/output_buffer.py

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111
class OutputBlockSizeOption:
1212
target_max_block_size: Optional[int] = None
1313
target_num_rows_per_block: Optional[int] = None
14+
disable_block_shaping: bool = False
1415

1516
def __post_init__(self):
1617
if (
1718
self.target_max_block_size is None
1819
and self.target_num_rows_per_block is None
20+
and not self.disable_block_shaping
1921
):
2022
raise ValueError(
2123
"Either `target_max_block_size` or `target_num_rows_per_block` "
@@ -27,13 +29,24 @@ def of(
2729
cls,
2830
target_max_block_size: Optional[int] = None,
2931
target_num_rows_per_block: Optional[int] = None,
32+
disable_block_shaping: bool = False,
3033
) -> Optional["OutputBlockSizeOption"]:
31-
if target_max_block_size is None and target_num_rows_per_block is None:
34+
if (
35+
target_max_block_size is None
36+
and target_num_rows_per_block is None
37+
and not disable_block_shaping
38+
):
39+
# In case
40+
# - Both target_max_block_size and target_num_rows_per_block are None and
41+
# - disable_block_shaping is False
42+
#
43+
# Buffer won't be yielding incrementally, instead producing just a single block.
3244
return None
3345
else:
3446
return OutputBlockSizeOption(
3547
target_max_block_size=target_max_block_size,
3648
target_num_rows_per_block=target_num_rows_per_block,
49+
disable_block_shaping=disable_block_shaping,
3750
)
3851

3952

@@ -93,30 +106,40 @@ def finalize(self) -> None:
93106
self._finalized = True
94107

95108
def _exceeded_buffer_row_limit(self) -> bool:
109+
if self._output_block_size_option.disable_block_shaping:
110+
return False
111+
96112
return (
97113
self._max_num_rows_per_block() is not None
98114
and self._buffer.num_rows() > self._max_num_rows_per_block()
99115
)
100116

101117
def _exceeded_buffer_size_limit(self) -> bool:
118+
if self._output_block_size_option.disable_block_shaping:
119+
return False
120+
102121
return (
103122
self._max_bytes_per_block() is not None
104123
and self._buffer.get_estimated_memory_usage() > self._max_bytes_per_block()
105124
)
106125

107126
def _max_num_rows_per_block(self) -> Optional[int]:
108-
return (
109-
self._output_block_size_option.target_num_rows_per_block
110-
if self._output_block_size_option is not None
111-
else None
112-
)
127+
if self._output_block_size_option is None:
128+
return None
129+
130+
if self._output_block_size_option.disable_block_shaping:
131+
return None
132+
133+
return self._output_block_size_option.target_num_rows_per_block
113134

114135
def _max_bytes_per_block(self) -> Optional[int]:
115-
return (
116-
self._output_block_size_option.target_max_block_size
117-
if self._output_block_size_option is not None
118-
else None
119-
)
136+
if self._output_block_size_option is None:
137+
return None
138+
139+
if self._output_block_size_option.disable_block_shaping:
140+
return None
141+
142+
return self._output_block_size_option.target_max_block_size
120143

121144
def has_next(self) -> bool:
122145
"""Returns true when a complete output block is produced."""
@@ -130,6 +153,9 @@ def has_next(self) -> bool:
130153
# is required to align it with semantic of producing 1 block
131154
# from 1 block of the input
132155
return False
156+
elif self._output_block_size_option.disable_block_shaping:
157+
# When block shaping is disabled, produce blocks immediately
158+
return self._buffer.num_rows() > 0
133159

134160
return self._exceeded_buffer_row_limit() or self._exceeded_buffer_size_limit()
135161

python/ray/data/tests/test_operators.py

Lines changed: 126 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -613,44 +613,34 @@ def _check_batch(block_iter: Iterable[Block], ctx) -> Iterable[Block]:
613613
assert op.completed()
614614

615615

616-
@pytest.mark.parametrize("use_actors", [False, True])
617-
@pytest.mark.parametrize("preserve_order", [False, True])
618-
@pytest.mark.parametrize(
619-
"target_max_block_size,num_expected_blocks", [(1, 10), (2**20, 1), (None, 1)]
620-
)
621-
def test_map_operator_output_unbundling(
616+
def _run_map_operator_test(
622617
ray_start_regular_shared,
623618
use_actors,
624619
preserve_order,
625-
target_max_block_size,
626-
num_expected_blocks,
620+
transform_fn,
621+
output_block_size_option,
622+
expected_blocks,
623+
test_name="TestMapper",
627624
):
628-
# Tests that the MapOperator's output queue unbundles the bundles returned from
629-
# tasks; this facilitates features such as dynamic block splitting.
630-
def noop(block_iter: Iterable[Block], ctx) -> Iterable[Block]:
631-
for block in block_iter:
632-
yield block
633-
625+
"""Shared test function for MapOperator output unbundling tests."""
634626
# Create with inputs.
635627
input_op = InputDataBuffer(
636628
DataContext.get_current(), make_ref_bundles([[i] for i in range(10)])
637629
)
638630
compute_strategy = ActorPoolStrategy() if use_actors else TaskPoolStrategy()
639631

640632
transformer = create_map_transformer_from_block_fn(
641-
noop,
642-
output_block_size_option=OutputBlockSizeOption.of(
643-
target_max_block_size=target_max_block_size,
644-
),
633+
transform_fn,
634+
output_block_size_option=output_block_size_option,
645635
)
646636

647637
op = MapOperator.create(
648638
transformer,
649639
input_op=input_op,
650640
data_context=DataContext.get_current(),
651-
name="TestMapper",
641+
name=test_name,
652642
compute_strategy=compute_strategy,
653-
# Send the everything in a single bundle of 10 blocks.
643+
# Send everything in a single bundle of 10 blocks.
654644
min_rows_per_bundle=10,
655645
)
656646

@@ -670,10 +660,125 @@ def noop(block_iter: Iterable[Block], ctx) -> Iterable[Block]:
670660
outputs = []
671661
while op.has_next():
672662
outputs.append(op.get_next())
673-
assert len(outputs) == num_expected_blocks
663+
assert len(outputs) == expected_blocks
674664
assert op.completed()
675665

676666

667+
@pytest.mark.parametrize("use_actors", [False, True])
668+
@pytest.mark.parametrize("preserve_order", [False, True])
669+
@pytest.mark.parametrize(
670+
"target_max_block_size,num_expected_blocks", [(1, 10), (2**20, 1), (None, 1)]
671+
)
672+
def test_map_operator_output_unbundling(
673+
ray_start_regular_shared,
674+
use_actors,
675+
preserve_order,
676+
target_max_block_size,
677+
num_expected_blocks,
678+
):
679+
"""Test that MapOperator's output queue unbundles bundles from tasks."""
680+
681+
def noop(block_iter: Iterable[Block], ctx) -> Iterable[Block]:
682+
for block in block_iter:
683+
yield block
684+
685+
_run_map_operator_test(
686+
ray_start_regular_shared,
687+
use_actors,
688+
preserve_order,
689+
noop,
690+
OutputBlockSizeOption.of(target_max_block_size=target_max_block_size),
691+
num_expected_blocks,
692+
)
693+
694+
695+
@pytest.mark.parametrize("preserve_order", [False, True])
696+
@pytest.mark.parametrize(
697+
"output_block_size_option,expected_blocks",
698+
[
699+
# Test target_max_block_size
700+
(OutputBlockSizeOption.of(target_max_block_size=1), 10),
701+
(OutputBlockSizeOption.of(target_max_block_size=2**20), 1),
702+
(OutputBlockSizeOption.of(target_max_block_size=None), 1),
703+
# Test target_num_rows_per_block
704+
(OutputBlockSizeOption.of(target_num_rows_per_block=1), 10),
705+
(OutputBlockSizeOption.of(target_num_rows_per_block=5), 2),
706+
(OutputBlockSizeOption.of(target_num_rows_per_block=10), 1),
707+
(OutputBlockSizeOption.of(target_num_rows_per_block=None), 1),
708+
# Test disable_block_shaping
709+
(OutputBlockSizeOption.of(disable_block_shaping=True), 10),
710+
(OutputBlockSizeOption.of(disable_block_shaping=False), 1),
711+
# Test combinations
712+
(
713+
OutputBlockSizeOption.of(
714+
target_max_block_size=1, target_num_rows_per_block=5
715+
),
716+
10,
717+
),
718+
(
719+
OutputBlockSizeOption.of(
720+
target_max_block_size=2**20, disable_block_shaping=True
721+
),
722+
10,
723+
),
724+
(
725+
OutputBlockSizeOption.of(
726+
target_num_rows_per_block=5, disable_block_shaping=True
727+
),
728+
10,
729+
),
730+
],
731+
)
732+
def test_map_operator_output_block_size_options(
733+
ray_start_regular_shared,
734+
preserve_order,
735+
output_block_size_option,
736+
expected_blocks,
737+
):
738+
"""Test MapOperator with various OutputBlockSizeOption configurations."""
739+
740+
def noop(block_iter: Iterable[Block], ctx) -> Iterable[Block]:
741+
for block in block_iter:
742+
yield block
743+
744+
_run_map_operator_test(
745+
ray_start_regular_shared,
746+
use_actors=False,
747+
preserve_order=preserve_order,
748+
transform_fn=noop,
749+
output_block_size_option=output_block_size_option,
750+
expected_blocks=expected_blocks,
751+
)
752+
753+
754+
@pytest.mark.parametrize("preserve_order", [False, True])
755+
def test_map_operator_disable_block_shaping_with_batches(
756+
ray_start_regular_shared,
757+
preserve_order,
758+
):
759+
"""Test MapOperator with disable_block_shaping=True using batch operations."""
760+
761+
def batch_transform(batch_iter, ctx):
762+
for batch in batch_iter:
763+
# Simple transformation: add 1 to each value
764+
if hasattr(batch, "to_pandas"):
765+
df = batch.to_pandas()
766+
df = df + 1
767+
yield df
768+
else:
769+
yield batch
770+
771+
_run_map_operator_test(
772+
ray_start_regular_shared,
773+
use_actors=False,
774+
preserve_order=preserve_order,
775+
transform_fn=batch_transform,
776+
output_block_size_option=OutputBlockSizeOption.of(disable_block_shaping=True),
777+
expected_blocks=10, # With disable_block_shaping=True, we expect 10 blocks
778+
test_name="TestBatchMapper",
779+
)
780+
781+
677782
@pytest.mark.parametrize("use_actors", [False, True])
678783
def test_map_operator_ray_args(shutdown_only, use_actors):
679784
ray.shutdown()

0 commit comments

Comments
 (0)