Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] Only enable blocks bundling when batch_size is set #29971

Merged
merged 6 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions python/ray/data/_internal/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,11 +515,12 @@ def _bundle_blocks_up_to_size(
block_bundles.append(curr_bundle)
if len(blocks) / len(block_bundles) >= 10:
logger.warning(
f"Having to send 10 or more blocks to a single {name} task to create a "
f"batch of size {target_size}, which may result in less transformation "
"parallelism than expected. This may indicate that your blocks are too "
"small and/or your batch size is too large, and you may want to decrease "
"your read parallelism and/or decrease your batch size."
f"`batch_size` is set to {target_size}, which reduces parallelism from "
f"{len(blocks)} to {len(block_bundles)}. If the performance is worse than "
"expected, this may indicate that the batch size is too large or the "
"input block size is too small. To reduce batch size, consider decreasing "
"`batch_size` or use the default in `map_batches`. To increase input "
"block size, consider decreasing `parallelism` in read."
)
return [tuple(zip(*block_bundle)) for block_bundle in block_bundles]

Expand Down
30 changes: 18 additions & 12 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ def map_batches(
self,
fn: BatchUDF,
*,
batch_size: Optional[int] = DEFAULT_BATCH_SIZE,
batch_size: Optional[Union[int, Literal["default"]]] = "default",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update the documentation of batch_size below as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jianoaix - could you suggest the specific documentation to be updated? It already mentions Defaults to 4096. Feel quite clear for end users.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this introduced a new default value "default", right? We need to document what does that do to the semantics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought Defaults to 4096 is clear, but updated anyway. Could you help double check?

compute: Optional[Union[str, ComputeStrategy]] = None,
batch_format: Literal["default", "pandas", "pyarrow", "numpy"] = "default",
fn_args: Optional[Iterable[Any]] = None,
Expand All @@ -335,9 +335,9 @@ def map_batches(
"""Apply the given function to batches of data.

This applies the ``fn`` in parallel with map tasks, with each task handling
a block or a bundle of blocks (if ``batch_size`` larger than block size) of
the dataset. Each batch is executed serially at Ray level (at lower level,
the processing of the batch is usually vectorized).
a block or a bundle of blocks of the dataset. Each batch is executed serially
at Ray level (at lower level, the processing of the batch is usually
vectorized).

Batches are represented as dataframes, ndarrays, or lists. The default batch
type is determined by your dataset's schema. To determine the default batch
Expand Down Expand Up @@ -366,10 +366,10 @@ def map_batches(
.. note::
The size of the batches provided to ``fn`` may be smaller than the provided
``batch_size`` if ``batch_size`` doesn't evenly divide the block(s) sent to
a given map task. Each map task will be sent a single block if the block is
equal to or larger than ``batch_size``, and will be sent a bundle of blocks
up to (but not exceeding) ``batch_size`` if blocks are smaller than
``batch_size``.
a given map task. When ``batch_size`` is specified, each map task will be
sent a single block if the block is equal to or larger than ``batch_size``,
and will be sent a bundle of blocks up to (but not exceeding)
``batch_size`` if blocks are smaller than ``batch_size``.

Examples:

Expand Down Expand Up @@ -448,7 +448,7 @@ def map_batches(
blocks as batches (blocks may contain different number of rows).
The actual size of the batch provided to ``fn`` may be smaller than
``batch_size`` if ``batch_size`` doesn't evenly divide the block(s) sent
to a given map task. Defaults to 4096.
to a given map task. Default batch_size is 4096 with "default".
compute: The compute strategy, either ``"tasks"`` (default) to use Ray
tasks, or ``"actors"`` to use an autoscaling actor pool. If you want to
configure the size of the autoscaling actor pool, provide an
Expand Down Expand Up @@ -490,8 +490,14 @@ def map_batches(
DeprecationWarning,
)

if batch_size is not None and batch_size < 1:
raise ValueError("Batch size cannot be negative or 0")
target_block_size = None
if batch_size == "default":
batch_size = DEFAULT_BATCH_SIZE
elif batch_size is not None:
if batch_size < 1:
raise ValueError("Batch size cannot be negative or 0")
# Enable blocks bundling when batch_size is specified by caller.
target_block_size = batch_size

if batch_format not in VALID_BATCH_FORMATS:
raise ValueError(
Expand Down Expand Up @@ -599,7 +605,7 @@ def process_next_batch(batch: Block) -> Iterator[Block]:
compute,
ray_remote_args,
# TODO(Clark): Add a strict cap here.
target_block_size=batch_size,
target_block_size=target_block_size,
fn=fn,
fn_args=fn_args,
fn_kwargs=fn_kwargs,
Expand Down
9 changes: 7 additions & 2 deletions python/ray/data/preprocessors/batch_mapper.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import sys
from typing import Dict, Callable, Optional, Union, Any, TYPE_CHECKING
import warnings

import numpy as np

from ray.data.preprocessor import Preprocessor
from ray.data.context import DEFAULT_BATCH_SIZE
from ray.util.annotations import PublicAPI

if sys.version_info >= (3, 8):
from typing import Literal
else:
from typing_extensions import Literal

if TYPE_CHECKING:
import pandas

Expand Down Expand Up @@ -76,7 +81,7 @@ def __init__(
],
],
batch_format: Optional[str] = None,
batch_size: Optional[int] = DEFAULT_BATCH_SIZE,
batch_size: Optional[Union[int, Literal["default"]]] = "default",
# TODO: Make batch_format required from user
# TODO: Introduce a "zero_copy" format
# TODO: We should reach consistency of args between BatchMapper and map_batches.
Expand Down
9 changes: 7 additions & 2 deletions python/ray/data/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2612,9 +2612,14 @@ def test_map_batches_block_bundling_auto(
ds = ray.data.range(num_blocks * block_size, parallelism=num_blocks)
# Confirm that we have the expected number of initial blocks.
assert ds.num_blocks() == num_blocks
ds = ds.map_batches(lambda x: x, batch_size=batch_size)

# Blocks should be bundled up to the batch size.
assert ds.num_blocks() == math.ceil(num_blocks / max(batch_size // block_size, 1))
ds1 = ds.map_batches(lambda x: x, batch_size=batch_size)
assert ds1.num_blocks() == math.ceil(num_blocks / max(batch_size // block_size, 1))

# Blocks should not be bundled up when batch_size is not specified.
ds2 = ds.map_batches(lambda x: x)
assert ds2.num_blocks() == num_blocks


@pytest.mark.parametrize(
Expand Down