Skip to content

Commit

Permalink
[Datasets] Add feature guide section for configuring batch_size i…
Browse files Browse the repository at this point in the history
…n ``.map_batches()`` (ray-project#29117)

Users may need guidance on configuring batch_size for ds.map_batches(), since the hardcoded default of 4096 will be suboptimal for wide tables and datasets containing large images, and the semantics of parallelism vs. batch size got more complicated with the block bundling PR. This PR adds a section for configuring batch_size to the "Transforming Datasets" feature guide.

Signed-off-by: tmynn <hovhannes.tamoyan@gmail.com>
  • Loading branch information
clarkzinzow authored and tamohannes committed Jan 25, 2023
1 parent d9ab277 commit e10b26f
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 0 deletions.
29 changes: 29 additions & 0 deletions doc/source/data/doc_code/transforming_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,35 @@ def map_row(row: TableRow) -> tuple:
# __writing_simple_out_row_udfs_end__
# fmt: on

# fmt: off
# __configuring_batch_size_begin__
import ray
import pandas as pd

# Load dataset.
ds = ray.data.read_csv("example://iris.csv")

# UDF as a function on Pandas DataFrame batches.
def pandas_transform(df: pd.DataFrame) -> pd.DataFrame:
# Filter rows.
df = df[df["variety"] == "Versicolor"]
# Add derived column.
df.loc[:, "normalized.sepal.length"] = df["sepal.length"] / df["sepal.length"].max()
# Drop column.
df = df.drop(columns=["sepal.length"])
return df

# Have each batch that pandas_transform receives contain 10 rows.
ds = ds.map_batches(pandas_transform, batch_size=10)

ds.show(2)
# -> {'sepal.width': 3.2, 'petal.length': 4.7, 'petal.width': 1.4,
# 'variety': 'Versicolor', 'normalized.sepal.length': 1.0}
# -> {'sepal.width': 3.2, 'petal.length': 4.5, 'petal.width': 1.5,
# 'variety': 'Versicolor', 'normalized.sepal.length': 0.9142857142857144}
# __configuring_batch_size_end__
# fmt: on

# fmt: off
# __dataset_compute_strategy_begin__
import ray
Expand Down
51 changes: 51 additions & 0 deletions doc/source/data/transforming-datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,57 @@ The following output types are allowed for per-row UDFs (e.g.,
:start-after: __writing_simple_out_row_udfs_begin__
:end-before: __writing_simple_out_row_udfs_end__

.. _transform_datasets_configuring_batch_size:

----------------------
Configuring Batch Size
----------------------

:meth:`ds.map_batches() <ray.data.Dataset.map_batches>` is the canonical parallel
transformation API for Datasets: it launches parallel tasks over the underlying Datasets
blocks and maps UDFs over data batches within those tasks, allowing the UDF to
implement vectorized operations on batches. An important parameter to
set is ``batch_size``, which controls the size of the batches provided to the UDF.

.. literalinclude:: ./doc_code/transforming_datasets.py
:language: python
:start-after: __configuring_batch_size_begin__
:end-before: __configuring_batch_size_end__

Increasing ``batch_size`` can result in faster execution by better leveraging vectorized
operations and hardware, reducing batch slicing and concatenation overhead, and overall
saturation of CPUs/GPUs, but will also result in higher memory utilization, which can
lead to out-of-memory failures. If encountering OOMs, decreasing your ``batch_size`` may
help.

.. note::
The default ``batch_size`` of ``4096`` may be too large for datasets with large rows
(e.g. tables with many columns or a collection of large images).

If you specify a ``batch_size`` that's larger than your ``Dataset`` blocks, Datasets
will bundle multiple blocks together for a single task in order to better satisfy
``batch_size``. If ``batch_size`` is a lot larger than your ``Dataset`` blocks (e.g. if
your dataset was created with too large of a ``parallelism`` and/or the ``batch_size``
is set to too large of a value for your dataset), the number of parallel tasks
may be less than expected.

If your ``Dataset`` blocks are smaller than your ``batch_size`` and you want to increase
`:meth:`ds.map_batches() <ray.data.Dataset.map_batches>` parallelism, decrease your
``batch_size`` to prevent this block bundling. If you think that your ``Dataset`` blocks
are too small, try decreasing ``parallelism`` during the read to create larger blocks.

.. note::
The size of the batches provided to the UDF may be smaller than the provided
``batch_size`` if ``batch_size`` doesn't evenly divide the block(s) sent to a given
task.

.. note::
Block bundling (processing multiple blocks in a single task) will not occur if
``batch_size`` is not set; instead, each task will receive a single block. If a block
is smaller than the default ``batch_size`` (4096), then the batch provided to the UDF
in that task will the same size as the block, and will therefore be smaller than the
default ``batch_size``.

.. _transform_datasets_compute_strategy:

----------------
Expand Down

0 comments on commit e10b26f

Please sign in to comment.