Skip to content

Conversation

@iamjustinhsu
Copy link
Contributor

@iamjustinhsu iamjustinhsu commented Nov 13, 2025

Description

HashShuffleAggregator currently doesn't break big blocks into smaller blocks (or combine smaller blocks into bigger ones). For large blocks, this can be very problematic because then block being returned will spill to disk. Consider the following scenario:

  • A node with 200 GiB memory, 100 GiB disk
  • Ray core allocates 50% of memory to object store (so 100GiB heap memory, 100GiB object store)
  • You return a block with 150GiB of memory, which fits in memory (doesn't OOM)
  • This object spills to disk, but since the disk size << block, the node OOD.

Why this is better

  • Practically speaking, this can happen a lot with AWS and GCP nodes, because nodes typically have higher memory than disk space. So yielding smaller blocks can utilize streaming_gen backpressure to avoid materializing the entire object.
  • However, even if this wasn't the case (suppose we have nodes with large disk with low memory), we shouldn't be storing large objects / blocks like that because it hurts task/block based parallelism. You can solve this by using a StreamingRepartition, but that is more work for the user.
  • In some cases, you may have enough object store + disk space to store the block, but it doesn't store contiguously. For example, 50GiB of object store remaining, 25GiB of disk remaining, but you want to store a 70GiB Block. Ray core can't split the object, but if you yield smaller smaller blocks, they can fit on that node.

This PR addresses this by using OutputBlockBuffer to reshape the blocks back to data_context.target_max_block_size.

Related issues

None

Additional information

Encountered this personally with 180GiB block, which would OOD

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu requested a review from a team as a code owner November 13, 2025 19:49
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request correctly identifies and aims to solve an important issue with HashShuffleAggregator handling very large blocks, which can lead to out-of-memory errors. The approach of using BlockOutputBuffer to break down large blocks is sound. However, the current implementation of the finalize method introduces several critical issues, including a risk of deadlocks, potential data loss, and incorrect metrics reporting. My review provides a detailed comment with a suggested replacement for the finalize method that addresses these problems while preserving the original intent of the change.

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu force-pushed the jhsu/aggregator-yield-block-size branch from 26404f8 to ec0e610 Compare November 13, 2025 22:25
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
if partition_id in self._finalizing_tasks:
self._finalizing_tasks.pop(partition_id)

# Update Finalize Metrics on task completion
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive-by

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@ray-gardener ray-gardener bot added the data Ray Data-related issues label Nov 14, 2025
@richardliaw
Copy link
Contributor

who is reviewing this pr?

# so we do not break the block down further.
if target_max_block_size is not None:
# Creating a block output buffer per partition finalize task because:
# 1. Need to keep track of which tasks have already been finalized
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't understand what (1) means. Could you elaborate/revise?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated, I don't think 1 makes sense too lol. My intent was that I can keep track of re-finalizing tasks, but that would lead to additional stats + maybe additional locks = more complexity, so kept it simple


def finalize(
self, partition_id: int
) -> AsyncGenerator[Union[Block, "BlockMetadataWithSchema"], None]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Out-of-scope for this PR, but I think this is a regular generator, not async

Suggested change
) -> AsyncGenerator[Union[Block, "BlockMetadataWithSchema"], None]:
) -> Generator[Union[Block, "BlockMetadataWithSchema"], None]:

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu added the go add ONLY when ready to merge, run all tests label Nov 20, 2025
@bveeramani bveeramani merged commit 06fd709 into ray-project:master Nov 21, 2025
7 checks passed
@iamjustinhsu iamjustinhsu deleted the jhsu/aggregator-yield-block-size branch November 21, 2025 20:08
400Ping pushed a commit to 400Ping/ray that referenced this pull request Nov 21, 2025
…t#58603)

## Description
`HashShuffleAggregator` currently doesn't break big blocks into smaller
blocks (or combine smaller blocks into bigger ones). For large blocks,
this can be very problematic. This PR addresses this by using
`OutputBlockBuffer` to reshape the blocks back to
`data_context.target_max_block_size`

## Related issues
None
## Additional information
Encountered this personally with 180GiB block, which would OOD

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
ykdojo pushed a commit to ykdojo/ray that referenced this pull request Nov 27, 2025
…t#58603)

## Description
`HashShuffleAggregator` currently doesn't break big blocks into smaller
blocks (or combine smaller blocks into bigger ones). For large blocks,
this can be very problematic. This PR addresses this by using
`OutputBlockBuffer` to reshape the blocks back to
`data_context.target_max_block_size`

## Related issues
None
## Additional information
Encountered this personally with 180GiB block, which would OOD

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
…t#58603)

## Description
`HashShuffleAggregator` currently doesn't break big blocks into smaller
blocks (or combine smaller blocks into bigger ones). For large blocks,
this can be very problematic. This PR addresses this by using
`OutputBlockBuffer` to reshape the blocks back to
`data_context.target_max_block_size`

## Related issues
None
## Additional information
Encountered this personally with 180GiB block, which would OOD

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants