From e10b26fa6d491af2cb255e7c683369f21f435a00 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Thu, 22 Dec 2022 12:13:40 -0800 Subject: [PATCH] [Datasets] Add feature guide section for configuring ``batch_size`` in ``.map_batches()`` (#29117) Users may need guidance on configuring batch_size for ds.map_batches(), since the hardcoded default of 4096 will be suboptimal for wide tables and datasets containing large images, and the semantics of parallelism vs. batch size got more complicated with the block bundling PR. This PR adds a section for configuring batch_size to the "Transforming Datasets" feature guide. Signed-off-by: tmynn --- .../data/doc_code/transforming_datasets.py | 29 +++++++++++ doc/source/data/transforming-datasets.rst | 51 +++++++++++++++++++ 2 files changed, 80 insertions(+) diff --git a/doc/source/data/doc_code/transforming_datasets.py b/doc/source/data/doc_code/transforming_datasets.py index 946ba5300874..876526c07f4c 100644 --- a/doc/source/data/doc_code/transforming_datasets.py +++ b/doc/source/data/doc_code/transforming_datasets.py @@ -569,6 +569,35 @@ def map_row(row: TableRow) -> tuple: # __writing_simple_out_row_udfs_end__ # fmt: on +# fmt: off +# __configuring_batch_size_begin__ +import ray +import pandas as pd + +# Load dataset. +ds = ray.data.read_csv("example://iris.csv") + +# UDF as a function on Pandas DataFrame batches. +def pandas_transform(df: pd.DataFrame) -> pd.DataFrame: + # Filter rows. + df = df[df["variety"] == "Versicolor"] + # Add derived column. + df.loc[:, "normalized.sepal.length"] = df["sepal.length"] / df["sepal.length"].max() + # Drop column. + df = df.drop(columns=["sepal.length"]) + return df + +# Have each batch that pandas_transform receives contain 10 rows. +ds = ds.map_batches(pandas_transform, batch_size=10) + +ds.show(2) +# -> {'sepal.width': 3.2, 'petal.length': 4.7, 'petal.width': 1.4, +# 'variety': 'Versicolor', 'normalized.sepal.length': 1.0} +# -> {'sepal.width': 3.2, 'petal.length': 4.5, 'petal.width': 1.5, +# 'variety': 'Versicolor', 'normalized.sepal.length': 0.9142857142857144} +# __configuring_batch_size_end__ +# fmt: on + # fmt: off # __dataset_compute_strategy_begin__ import ray diff --git a/doc/source/data/transforming-datasets.rst b/doc/source/data/transforming-datasets.rst index bfd131ba5307..a1a4cfaf7402 100644 --- a/doc/source/data/transforming-datasets.rst +++ b/doc/source/data/transforming-datasets.rst @@ -346,6 +346,57 @@ The following output types are allowed for per-row UDFs (e.g., :start-after: __writing_simple_out_row_udfs_begin__ :end-before: __writing_simple_out_row_udfs_end__ +.. _transform_datasets_configuring_batch_size: + +---------------------- +Configuring Batch Size +---------------------- + +:meth:`ds.map_batches() ` is the canonical parallel +transformation API for Datasets: it launches parallel tasks over the underlying Datasets +blocks and maps UDFs over data batches within those tasks, allowing the UDF to +implement vectorized operations on batches. An important parameter to +set is ``batch_size``, which controls the size of the batches provided to the UDF. + +.. literalinclude:: ./doc_code/transforming_datasets.py + :language: python + :start-after: __configuring_batch_size_begin__ + :end-before: __configuring_batch_size_end__ + +Increasing ``batch_size`` can result in faster execution by better leveraging vectorized +operations and hardware, reducing batch slicing and concatenation overhead, and overall +saturation of CPUs/GPUs, but will also result in higher memory utilization, which can +lead to out-of-memory failures. If encountering OOMs, decreasing your ``batch_size`` may +help. + +.. note:: + The default ``batch_size`` of ``4096`` may be too large for datasets with large rows + (e.g. tables with many columns or a collection of large images). + +If you specify a ``batch_size`` that's larger than your ``Dataset`` blocks, Datasets +will bundle multiple blocks together for a single task in order to better satisfy +``batch_size``. If ``batch_size`` is a lot larger than your ``Dataset`` blocks (e.g. if +your dataset was created with too large of a ``parallelism`` and/or the ``batch_size`` +is set to too large of a value for your dataset), the number of parallel tasks +may be less than expected. + +If your ``Dataset`` blocks are smaller than your ``batch_size`` and you want to increase +`:meth:`ds.map_batches() ` parallelism, decrease your +``batch_size`` to prevent this block bundling. If you think that your ``Dataset`` blocks +are too small, try decreasing ``parallelism`` during the read to create larger blocks. + +.. note:: + The size of the batches provided to the UDF may be smaller than the provided + ``batch_size`` if ``batch_size`` doesn't evenly divide the block(s) sent to a given + task. + +.. note:: + Block bundling (processing multiple blocks in a single task) will not occur if + ``batch_size`` is not set; instead, each task will receive a single block. If a block + is smaller than the default ``batch_size`` (4096), then the batch provided to the UDF + in that task will the same size as the block, and will therefore be smaller than the + default ``batch_size``. + .. _transform_datasets_compute_strategy: ----------------