diff --git a/python/ray/data/_internal/output_buffer.py b/python/ray/data/_internal/output_buffer.py index bfefd8bb18aa..4355b6e0a233 100644 --- a/python/ray/data/_internal/output_buffer.py +++ b/python/ray/data/_internal/output_buffer.py @@ -82,17 +82,23 @@ def next(self) -> Block: # this ensures that the last block produced will be at least half # the block size. num_bytes_per_row = block.size_bytes() // block.num_rows() - target_num_rows = self._target_max_block_size // num_bytes_per_row - target_num_rows = max(1, target_num_rows) - - # TODO(swang): If the buffer is finalized, try to create even - # blocks? + target_num_rows = max(1, self._target_max_block_size // num_bytes_per_row) if target_num_rows < block.num_rows(): - # Use copy=True to avoid holding the entire block in memory. + # NOTE: We're maintaining following protocol of slicing underlying block + # into appropriately sized ones: + # + # - (Finalized) Target blocks sliced from the original one + # and are *copied* to avoid referencing original blocks + # - Temporary remainder of the block should *NOT* be copied + # such as to avoid repeatedly copying the remainder bytes + # of the block, resulting in O(M * N) total bytes being + # copied, where N is the total number of bytes in the original + # block and M is the number of blocks that will be produced by + # this iterator block_to_yield = block.slice(0, target_num_rows, copy=True) block_remainder = block.slice( - target_num_rows, block.num_rows(), copy=True + target_num_rows, block.num_rows(), copy=False ) self._buffer = DelegatingBlockBuilder()