diff --git a/doc/source/data/doc_code/transforming_datasets.py b/doc/source/data/doc_code/transforming_datasets.py index 946ba5300874..876526c07f4c 100644 --- a/doc/source/data/doc_code/transforming_datasets.py +++ b/doc/source/data/doc_code/transforming_datasets.py @@ -569,6 +569,35 @@ def map_row(row: TableRow) -> tuple: # __writing_simple_out_row_udfs_end__ # fmt: on +# fmt: off +# __configuring_batch_size_begin__ +import ray +import pandas as pd + +# Load dataset. +ds = ray.data.read_csv("example://iris.csv") + +# UDF as a function on Pandas DataFrame batches. +def pandas_transform(df: pd.DataFrame) -> pd.DataFrame: + # Filter rows. + df = df[df["variety"] == "Versicolor"] + # Add derived column. + df.loc[:, "normalized.sepal.length"] = df["sepal.length"] / df["sepal.length"].max() + # Drop column. + df = df.drop(columns=["sepal.length"]) + return df + +# Have each batch that pandas_transform receives contain 10 rows. +ds = ds.map_batches(pandas_transform, batch_size=10) + +ds.show(2) +# -> {'sepal.width': 3.2, 'petal.length': 4.7, 'petal.width': 1.4, +# 'variety': 'Versicolor', 'normalized.sepal.length': 1.0} +# -> {'sepal.width': 3.2, 'petal.length': 4.5, 'petal.width': 1.5, +# 'variety': 'Versicolor', 'normalized.sepal.length': 0.9142857142857144} +# __configuring_batch_size_end__ +# fmt: on + # fmt: off # __dataset_compute_strategy_begin__ import ray diff --git a/doc/source/data/transforming-datasets.rst b/doc/source/data/transforming-datasets.rst index bfd131ba5307..a1a4cfaf7402 100644 --- a/doc/source/data/transforming-datasets.rst +++ b/doc/source/data/transforming-datasets.rst @@ -346,6 +346,57 @@ The following output types are allowed for per-row UDFs (e.g., :start-after: __writing_simple_out_row_udfs_begin__ :end-before: __writing_simple_out_row_udfs_end__ +.. _transform_datasets_configuring_batch_size: + +---------------------- +Configuring Batch Size +---------------------- + +:meth:`ds.map_batches() ` is the canonical parallel +transformation API for Datasets: it launches parallel tasks over the underlying Datasets +blocks and maps UDFs over data batches within those tasks, allowing the UDF to +implement vectorized operations on batches. An important parameter to +set is ``batch_size``, which controls the size of the batches provided to the UDF. + +.. literalinclude:: ./doc_code/transforming_datasets.py + :language: python + :start-after: __configuring_batch_size_begin__ + :end-before: __configuring_batch_size_end__ + +Increasing ``batch_size`` can result in faster execution by better leveraging vectorized +operations and hardware, reducing batch slicing and concatenation overhead, and overall +saturation of CPUs/GPUs, but will also result in higher memory utilization, which can +lead to out-of-memory failures. If encountering OOMs, decreasing your ``batch_size`` may +help. + +.. note:: + The default ``batch_size`` of ``4096`` may be too large for datasets with large rows + (e.g. tables with many columns or a collection of large images). + +If you specify a ``batch_size`` that's larger than your ``Dataset`` blocks, Datasets +will bundle multiple blocks together for a single task in order to better satisfy +``batch_size``. If ``batch_size`` is a lot larger than your ``Dataset`` blocks (e.g. if +your dataset was created with too large of a ``parallelism`` and/or the ``batch_size`` +is set to too large of a value for your dataset), the number of parallel tasks +may be less than expected. + +If your ``Dataset`` blocks are smaller than your ``batch_size`` and you want to increase +`:meth:`ds.map_batches() ` parallelism, decrease your +``batch_size`` to prevent this block bundling. If you think that your ``Dataset`` blocks +are too small, try decreasing ``parallelism`` during the read to create larger blocks. + +.. note:: + The size of the batches provided to the UDF may be smaller than the provided + ``batch_size`` if ``batch_size`` doesn't evenly divide the block(s) sent to a given + task. + +.. note:: + Block bundling (processing multiple blocks in a single task) will not occur if + ``batch_size`` is not set; instead, each task will receive a single block. If a block + is smaller than the default ``batch_size`` (4096), then the batch provided to the UDF + in that task will the same size as the block, and will therefore be smaller than the + default ``batch_size``. + .. _transform_datasets_compute_strategy: ----------------