-
Notifications
You must be signed in to change notification settings - Fork 7.1k
[Data] Add configurable batching for resolve_block_refs to speed up iter_batches #58467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] Add configurable batching for resolve_block_refs to speed up iter_batches #58467
Conversation
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces batching for resolve_block_refs to improve the performance of iter_batches by reducing the number of ray.get() calls. The batch size is made configurable through a new DataContext setting. The implementation is sound and includes a good test case to verify the batching behavior. I have one suggestion to improve code conciseness by using yield from.
| for block_ref in block_ref_iter: | ||
| pending.append(block_ref) | ||
| if len(pending) >= batch_size: | ||
| for block in _resolve_pending(): | ||
| yield block | ||
|
|
||
| for block in _resolve_pending(): | ||
| yield block |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for yielding blocks from _resolve_pending is duplicated. You can simplify this by using yield from to make the code more concise and avoid repetition.
| for block_ref in block_ref_iter: | |
| pending.append(block_ref) | |
| if len(pending) >= batch_size: | |
| for block in _resolve_pending(): | |
| yield block | |
| for block in _resolve_pending(): | |
| yield block | |
| for block_ref in block_ref_iter: | |
| pending.append(block_ref) | |
| if len(pending) >= batch_size: | |
| yield from _resolve_pending() | |
| yield from _resolve_pending() |
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
| if batch_size is None or current_window_size < num_rows_to_prefetch: | ||
| try: | ||
| next_ref_bundle = get_next_ref_bundle() | ||
| next_ref_bundle = next(ref_bundles) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: RefBundle Retrieval Observability Gap
The removal of the get_next_ref_bundle() helper function eliminates tracking of stats.iter_get_ref_bundles_s timing metrics. The direct calls to next(ref_bundles) at lines 371 and 384 no longer wrap the operation with the stats timer, causing loss of observability for RefBundle retrieval time which was previously tracked and reported in iteration statistics.
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Phantom Constant Breaks Imports
The constant DEFAULT_ACTOR_MAX_TASKS_IN_FLIGHT_TO_MAX_CONCURRENCY_FACTOR is removed but it's still imported and used in actor_pool_map_operator.py and test_operators.py. This will cause an ImportError when those modules try to import this constant from ray.data.context.
python/ray/data/context.py#L217-L221
ray/python/ray/data/context.py
Lines 217 to 221 in 17d88de
| ) | |
| # Enable per node metrics reporting for Ray Data, disabled by default. | |
| DEFAULT_ENABLE_PER_NODE_METRICS = bool( | |
| int(os.environ.get("RAY_DATA_PER_NODE_METRICS", "0")) | |
| ) |
srinathk10
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@YoussefEssDS Motivation for the changes look good. Please address review comments.
Also w.r.t your micro-benchmark, please add the results as comment here describing your test setup.
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
| self._eager_free = clear_block_after_read and ctx.eager_free | ||
| max_get_blocks_batch_size = max(1, (prefetch_batches or 0) + 1) | ||
| self._block_get_batch_size = min( | ||
| ctx.iter_get_block_batch_size, max_get_blocks_batch_size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Overly Conservative Batching Limits Performance
The calculation of _block_get_batch_size overly restricts batching by limiting it to prefetch_batches + 1 blocks. With default settings (prefetch_batches=1, iter_get_block_batch_size=32), this results in batching only 2 blocks at a time instead of the configured 32, significantly reducing the performance benefit. The formula max(1, (prefetch_batches or 0) + 1) creates a cap that's too conservative since prefetch_batches measures batches (not blocks), and their relationship varies with block size. This causes the configured iter_get_block_batch_size to be silently overridden in most cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Relaxing that cap breaks the backpressure tests, as it forces the materialization of more blocks than the configured prefetch size.
|
Hi @srinathk10 thanks for the review. I ran the microbenchmark on a ryzen 9 7950X / 64 GB RAM machine (Ubuntu 22.04, Python 3.12) Before batching change: mean 3.82 s (p50 3.83 s, min 3.78 s, max 3.86 s) = 1.31 M rows/s over 4 883 batches. Net improvement ~5% in end-to-end batch iteration throughput with prefetch set to 32. Both runs used the same script and dataset parameters on the same machine. |
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
|
Train release tests: https://buildkite.com/ray-project/release/builds/67245 |
srinathk10
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| clear_block_after_read and DataContext.get_current().eager_free | ||
| ctx = DataContext.get_current() | ||
| self._eager_free = clear_block_after_read and ctx.eager_free | ||
| max_get_blocks_batch_size = max(1, (prefetch_batches or 0) + 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefetch_batches is the number of batches to prefetch, not blocks.
The actual number of blocks to prefetch is calculated in BlockPrefecther.
We can add a method to let it report the number of blocks being prefetched.
| hits += current_hit | ||
| misses += current_miss | ||
| unknowns += current_unknown | ||
| ctx = ray.data.context.DataContext.get_current() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pass in the correct context object.
avoid using the global one.
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
|
hi @raulchen is this what you had in mind? any further suggestions? |
|
Hi @raulchen PTAL, Thanks! |
| block_ref_iter: Iterator[ObjectRef[Block]], | ||
| stats: Optional[DatasetStats] = None, | ||
| max_get_batch_size: Optional[Union[int, Callable[[], int]]] = None, | ||
| ctx: Optional["DataContext"] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make this mandatory?
| self._eager_free = ( | ||
| clear_block_after_read and DataContext.get_current().eager_free | ||
| ) | ||
| self._ctx = DataContext.get_current() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally this ctx should be passed from Dataset._context.
but since it's an existing issue, you can leave a TODO here if it requires a massive change.
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
|
@raulchen PTAL. thanks! |
|
Hi @raulchen , just bumping this. Can you check if any further changes are needed? Thanks! |
|
Hi @bveeramani can we get this over the line? It's approved by the reviewers. Thanks! |
|
@YoussefEssDS merged. Thank you for the contribution! |
…ter_batches (ray-project#58467) ## Description This PR will: - Batch block resolution in `resolve_block_refs()` so `iter_batches()` issues one `ray.get()` per chunk of block refs instead of per ref. The chunk size is configurable using new `DataContext.iter_get_block_batch_size` knob. - Added a test that proves that `resolve_block_refs()` actually batches the `ray.get()` calls. ## Related issues Raised by @amogkam in `python/ray/data/_internal/block_batching/util.py` ## Additional information Simple benchmark available: https://gist.github.com/YoussefEssDS/40de959a42a19334b8dac8bd217c319b --------- Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com> Signed-off-by: kriyanshii <kriyanshishah06@gmail.com>
…ter_batches (ray-project#58467) ## Description This PR will: - Batch block resolution in `resolve_block_refs()` so `iter_batches()` issues one `ray.get()` per chunk of block refs instead of per ref. The chunk size is configurable using new `DataContext.iter_get_block_batch_size` knob. - Added a test that proves that `resolve_block_refs()` actually batches the `ray.get()` calls. ## Related issues Raised by @amogkam in `python/ray/data/_internal/block_batching/util.py` ## Additional information Simple benchmark available: https://gist.github.com/YoussefEssDS/40de959a42a19334b8dac8bd217c319b --------- Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
…ter_batches (ray-project#58467) ## Description This PR will: - Batch block resolution in `resolve_block_refs()` so `iter_batches()` issues one `ray.get()` per chunk of block refs instead of per ref. The chunk size is configurable using new `DataContext.iter_get_block_batch_size` knob. - Added a test that proves that `resolve_block_refs()` actually batches the `ray.get()` calls. ## Related issues Raised by @amogkam in `python/ray/data/_internal/block_batching/util.py` ## Additional information Simple benchmark available: https://gist.github.com/YoussefEssDS/40de959a42a19334b8dac8bd217c319b --------- Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
Description
This PR will:
resolve_block_refs()soiter_batches()issues oneray.get()per chunk of block refs instead of per ref. The chunk size is configurable using newDataContext.iter_get_block_batch_sizeknob.resolve_block_refs()actually batches theray.get()calls.Related issues
Raised by @amogkam in
python/ray/data/_internal/block_batching/util.pyAdditional information
Simple benchmark available: https://gist.github.com/YoussefEssDS/40de959a42a19334b8dac8bd217c319b