-
Notifications
You must be signed in to change notification settings - Fork 7.1k
[data] fix map groups don't break down blocks #58988
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[data] fix map groups don't break down blocks #58988
Conversation
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
The PR introduces a new context flag _preserve_hash_shuffle_finalize_blocks to prevent block breakdown during hash shuffle finalization, which is necessary for map_groups. The overall approach is sound, but I've found a critical logic issue in the implementation and some inconsistencies in the documentation and tests that need to be addressed.
- There's a logical error in
hash_shuffle.pywhere anoris used instead of anand, which could cause the fix to not work as intended whentarget_max_block_sizeis set. - The documentation and comments for the new flag in
context.pycontradict the implementation's behavior formap_groups. - The new test for
map_groupshas a misleading name and comments.
I've left specific comments with suggestions for these points. Once these are addressed, the PR should be in good shape.
| aggregator_ray_remote_args_override: Optional[Dict[str, Any]] = None, | ||
| shuffle_progress_bar_name: Optional[str] = None, | ||
| finalize_progress_bar_name: Optional[str] = None, | ||
| preserve_finalize_blocks: bool = False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
disallow_block_splitting
srinathk10
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| ctx = DataContext.get_current() | ||
| # Very small to force splitting if enabled | ||
| ctx.target_max_block_size = 1 | ||
| yield |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Test fixture doesn't restore modified DataContext value
The setup fixture modifies ctx.target_max_block_size = 1 but never restores the original value after the test. The established pattern in conftest.py for such fixtures is to save the original value before modifying, then restore it after yield. Without restoration, the modified value of 1 persists and could cause test pollution affecting subsequent tests that depend on the default target_max_block_size value.
…/preserve-block-size
| target_max_block_size = self._data_context.target_max_block_size | ||
| # None means the user wants to preserve the block distribution, | ||
| # so we do not break the block down further. | ||
| if target_max_block_size is not None: | ||
| # Also check _disallow_block_splitting parameter. | ||
| if target_max_block_size is not None and not self._disallow_block_splitting: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead pass target_max_block_size as None when block splitting not allowed
| # NOTE: This is set to True because num_partitions (aka, # of output blocks) | ||
| # must be preserved. | ||
| disallow_block_splitting=True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| # NOTE: This is set to True because num_partitions (aka, # of output blocks) | |
| # must be preserved. | |
| disallow_block_splitting=True, | |
| # NOTE: In cases like ``groupby`` blocks can't be split as this might violate an invariant that all rows | |
| # with the same key are in the same group (block) | |
| disallow_block_splitting=True, |
…/preserve-block-size
| self._data_context = data_context.copy() | ||
| if disallow_block_splitting: | ||
| self._data_context.target_max_block_size = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, i should have been more clear:
- Don't patch the DC
- Instead pass target_max_block_size directly into the aggregator (to avoid passing to overlapping configs DC and disallowing flag)
…llow_block_splitting=True Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
## Description `test_preserve_hash_shuffle_blocks` has been flaking consistently. To mitigate the flakiness, this PR bumps the test size from "small" to "medium". ``` [2025-12-06T07:06:32Z] //python/ray/data:test_preserve_hash_shuffle_blocks TIMEOUT in 3 out of 3 in 63.4s -- [2025-12-06T07:06:32Z] Stats over 3 runs: max = 63.4s, min = 60.1s, avg = 62.3s, dev = 1.6s [2025-12-06T07:06:32Z] /root/.cache/bazel/_bazel_root/1df605deb6d24fc8068f6e25793ec703/execroot/io_ray/bazel-out/k8-opt/testlogs/python/ray/data/test_preserve_hash_shuffle_blocks/test.log [2025-12-06T07:06:32Z] /root/.cache/bazel/_bazel_root/1df605deb6d24fc8068f6e25793ec703/execroot/io_ray/bazel-out/k8-opt/testlogs/python/ray/data/test_preserve_hash_shuffle_blocks/test_attempts/attempt_1.log [2025-12-06T07:06:32Z] /root/.cache/bazel/_bazel_root/1df605deb6d24fc8068f6e25793ec703/execroot/io_ray/bazel-out/k8-opt/testlogs/python/ray/data/test_preserve_hash_shuffle_blocks/test_attempts/attempt_2.log [2025-12-06T07:06:32Z] ``` See also #58988 Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
## Description `test_preserve_hash_shuffle_blocks` has been flaking consistently. To mitigate the flakiness, this PR bumps the test size from "small" to "medium". This is a follow-up to #59256, which accidentally bumped the wrong test. ``` [2025-12-06T07:06:32Z] //python/ray/data:test_preserve_hash_shuffle_blocks TIMEOUT in 3 out of 3 in 63.4s -- [2025-12-06T07:06:32Z] Stats over 3 runs: max = 63.4s, min = 60.1s, avg = 62.3s, dev = 1.6s [2025-12-06T07:06:32Z] /root/.cache/bazel/_bazel_root/1df605deb6d24fc8068f6e25793ec703/execroot/io_ray/bazel-out/k8-opt/testlogs/python/ray/data/test_preserve_hash_shuffle_blocks/test.log [2025-12-06T07:06:32Z] /root/.cache/bazel/_bazel_root/1df605deb6d24fc8068f6e25793ec703/execroot/io_ray/bazel-out/k8-opt/testlogs/python/ray/data/test_preserve_hash_shuffle_blocks/test_attempts/attempt_1.log [2025-12-06T07:06:32Z] /root/.cache/bazel/_bazel_root/1df605deb6d24fc8068f6e25793ec703/execroot/io_ray/bazel-out/k8-opt/testlogs/python/ray/data/test_preserve_hash_shuffle_blocks/test_attempts/attempt_2.log [2025-12-06T07:06:32Z] ``` See also #58988 Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
…ect#59399) ## Description `test_preserve_hash_shuffle_blocks` has been flaking consistently. To mitigate the flakiness, this PR bumps the test size from "small" to "medium". This is a follow-up to ray-project#59256, which accidentally bumped the wrong test. ``` [2025-12-06T07:06:32Z] //python/ray/data:test_preserve_hash_shuffle_blocks TIMEOUT in 3 out of 3 in 63.4s -- [2025-12-06T07:06:32Z] Stats over 3 runs: max = 63.4s, min = 60.1s, avg = 62.3s, dev = 1.6s [2025-12-06T07:06:32Z] /root/.cache/bazel/_bazel_root/1df605deb6d24fc8068f6e25793ec703/execroot/io_ray/bazel-out/k8-opt/testlogs/python/ray/data/test_preserve_hash_shuffle_blocks/test.log [2025-12-06T07:06:32Z] /root/.cache/bazel/_bazel_root/1df605deb6d24fc8068f6e25793ec703/execroot/io_ray/bazel-out/k8-opt/testlogs/python/ray/data/test_preserve_hash_shuffle_blocks/test_attempts/attempt_1.log [2025-12-06T07:06:32Z] /root/.cache/bazel/_bazel_root/1df605deb6d24fc8068f6e25793ec703/execroot/io_ray/bazel-out/k8-opt/testlogs/python/ray/data/test_preserve_hash_shuffle_blocks/test_attempts/attempt_2.log [2025-12-06T07:06:32Z] ``` See also ray-project#58988 Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Description
Fixes #58603.
map_groupassumes that all partitions fix in one block. However, we my PR broke this behavior by breaking down blocks. To address this, I added a data context variable that is implicitly set toFalseto breakdown blocks. IfTrue, block size will be preservedRelated issues
Additional information