Skip to content

Commit

Permalink
[Data] Fix OutputBlockBuffer to avoid repeatedly copying remainder …
Browse files Browse the repository at this point in the history
…block (ray-project#48266)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

Currently, inside `OutputBlockBuffer` we're

1. Repeatedly copying remainder of the original block, bringing total #
of bytes copied to O(N^2) (where N is the size of the original block)
2. Creating potentially very large blocks (like in
ray-project#48236) that could overflow
underlying Arrow data types.

This change addresses both of these issues, by establishing following
protocol where

1. Finalized target blocks *are* copied, while
2. Remainder block is NOT (therefore continuing referencing original
block)

Addresses ray-project#48236

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
  • Loading branch information
alexeykudinkin authored and JP-sDEV committed Nov 14, 2024
1 parent fb30b01 commit cd5a388
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions python/ray/data/_internal/output_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,23 @@ def next(self) -> Block:
# this ensures that the last block produced will be at least half
# the block size.
num_bytes_per_row = block.size_bytes() // block.num_rows()
target_num_rows = self._target_max_block_size // num_bytes_per_row
target_num_rows = max(1, target_num_rows)

# TODO(swang): If the buffer is finalized, try to create even
# blocks?
target_num_rows = max(1, self._target_max_block_size // num_bytes_per_row)

if target_num_rows < block.num_rows():
# Use copy=True to avoid holding the entire block in memory.
# NOTE: We're maintaining following protocol of slicing underlying block
# into appropriately sized ones:
#
# - (Finalized) Target blocks sliced from the original one
# and are *copied* to avoid referencing original blocks
# - Temporary remainder of the block should *NOT* be copied
# such as to avoid repeatedly copying the remainder bytes
# of the block, resulting in O(M * N) total bytes being
# copied, where N is the total number of bytes in the original
# block and M is the number of blocks that will be produced by
# this iterator
block_to_yield = block.slice(0, target_num_rows, copy=True)
block_remainder = block.slice(
target_num_rows, block.num_rows(), copy=True
target_num_rows, block.num_rows(), copy=False
)

self._buffer = DelegatingBlockBuilder()
Expand Down

0 comments on commit cd5a388

Please sign in to comment.