Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alowed reusing compression buffer #60

Merged
merged 4 commits into from
Oct 17, 2021
Merged

Alowed reusing compression buffer #60

merged 4 commits into from
Oct 17, 2021

Conversation

jorgecarleitao
Copy link
Owner

This is a major refactor of the page API to make it easier to use and operate on. Main changes:

  • changed from streaming-iterator to fallible-streaming-iterator, so that we keep ownership of the Result, thereby making the API easier to use
  • added struct CompressedDataPage and struct DataPage to differentiate when a page is compressed
  • allowed parquet writers to re-use the compression buffer

@jorgecarleitao jorgecarleitao changed the title Reuse compress Reuse compress buffer Oct 13, 2021
@codecov-commenter
Copy link

codecov-commenter commented Oct 13, 2021

Codecov Report

Merging #60 (75fd09f) into main (d73f166) will decrease coverage by 0.85%.
The diff coverage is 38.93%.

Impacted file tree graph

@@            Coverage Diff             @@
##             main      #60      +/-   ##
==========================================
- Coverage   67.38%   66.52%   -0.86%     
==========================================
  Files          64       65       +1     
  Lines        3523     3594      +71     
==========================================
+ Hits         2374     2391      +17     
- Misses       1149     1203      +54     
Impacted Files Coverage Δ
integration-tests/src/read/binary.rs 100.00% <ø> (ø)
integration-tests/src/read/primitive.rs 98.00% <ø> (ø)
src/error.rs 21.42% <ø> (+2.67%) ⬆️
src/lib.rs 76.73% <ø> (ø)
src/page/mod.rs 13.79% <0.00%> (+1.44%) ⬆️
src/page/page_dict/binary.rs 0.00% <0.00%> (ø)
src/page/page_dict/fixed_len_binary.rs 0.00% <0.00%> (ø)
src/write/column_chunk.rs 0.00% <ø> (ø)
src/write/compression.rs 0.00% <0.00%> (ø)
src/write/dyn_iter.rs 0.00% <0.00%> (ø)
... and 19 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update d73f166...75fd09f. Read the comment docs.

@jorgecarleitao
Copy link
Owner Author

cc @ives9638 . I think I found the root cause: we were incorrectly swapping the buffers, causing re-allocations of the compression buffer. (related to jorgecarleitao/arrow2#529)

let compressed_buffer = &compressed_page.buffer;

// prepare the compression buffer
buffer.clear();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this clear really help with avoiding the memcopy you mentioned in jorgecarleitao/arrow2#529 (comment)?

clear sets len to 0, it looks like when resize size is larger then current length (0), the value copy loop will always be executed? https://doc.rust-lang.org/src/alloc/vec/mod.rs.html#2236

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I am trying to avoid with .clear is the reserve piece (of the extend_with): if len != 0 and reserve reallocates, we need to reallocate all bytes to the new memory location.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extend_with's n: usize argument is calculated as new_len - len in resize, if len is 0, then n is always > 0 right?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very well spotted: I think that you are right. 🙇

Let me try to re-phrase it: there are 3 main cases when we need a larger region:

let len = vec.len();

if len == 0 {
    if value == 0 {  // case 1
         // dealloc old
         // alloc_zeroed (the sys call)
    } else {  // case 2
        // alloc_uninitialized
        // set `[0, new_len[` to `value`
    }
else {  // case 3
    // realloc
    // memcopy `[0, len[`
    // set `[len, new_len[` to `value`
}
}

I was hoping to hit case 1, but you are right that .clear followed by .resize does not do this (imo std here is suboptimal: they could specialize to hit case 1). What I was trying to hit here was this SpecFromElem::from_elem specialization.

It seems to me that the way to handle this is to use vec![0; decompressed_len] when the required length is larger.

Copy link
Owner Author

@jorgecarleitao jorgecarleitao Oct 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically, what do you think about using

        // prepare the compression buffer
        if buffer.capacity() < compressed_page.uncompressed_size() {
            *buffer = vec![0; compressed_page.uncompressed_size()]
        } else {
            buffer.truncate(compressed_page.uncompressed_size())
        };

Copy link

@houqp houqp Oct 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ha, very cool, I didn't know 0 was handled as a special case in SpecFromElem.

imo std here is suboptimal: they could specialize to hit case 1

I was thinking about the same thing as well. The abstraction here is quite leaky and not adhering to rust's zero cost philosophy.

Specifically, what do you think about using

This is very close to what I have in mind. However, I think we can further improve the performance if we could do the followings:

  • Reallocate larger memory without initialization. Zeroing the memory (AllocInit::Zeroed) still has more overhead than AllocInit::Uninitialized. I believe the heap management, i.e. malloc, is all managed in user space. The only syscall we use should be brk or sbrk. So to guarantee zeroed memory, the memset or memcpy overhead is unavoidable. And even if we can let kernel do this for us, the kernel would still need to burn cpu instructions to clear out the memory. If we are always overwriting the full buffer in the subsequent decompress, I feel like requesting for zeroed memory from the allocator results in unnecessary overhead.
  • I think *buffer = vec![0; compressed_page.uncompressed_size()] is on the right path. What I am not 100% sure is whether it is always optimal. When there are large enough gaps between allocated memory buffers in the heap, the allocator could simply just extend the size of the current buffer instead of always allocating an entire new buffer. Combining this with with my previous point, I think this will still lead to unnecessary memcopy/memset on every resize due to the need of having to zero out the newly allocated memory buffer.

Here is what I think what we really want:

        // prepare the compression buffer
        match buffer.capacity().cmp(compressed_page.uncompressed_size()) {
            Ord::Equal => _, // avoid the unnecessary truncate
            Ord::Greater => buffer.truncate(compressed_page.uncompressed_size()),
            Ord::Less => {
                // realloc and extend memory buffer:
                //     If not enough space/gap after the current buffer, allocate an entire new memory buffer and free the current one. Don't zero out the newly allocated buffer nor the old one. 
                //     If enough free space after the current buffer, simply extend the buffer length in allocator's tracking metadata. Don't zero out the newly extended space.
            }
        }

For the less branch, I believe most allocator's grow method (https://doc.rust-lang.org/src/alloc/raw_vec.rs.html#507) should be smart enough to support it. We just need to find a way to avoid the zero initialization. I think Vec::reserve is probably closer to what we want here.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, I see. Yes, that would be ideal.

However, I think that that is not possible to do safely atm, unfortunately :( most decompressors' API are based on the trait std::io::Read, whose read_exact expects an initialized memory region.

Basically, we would write something like

Decoder::new(compressed_buffer);

decompressed_buffer.reserve(capacity);
unsafe {decompressed_buffer.set_len(capacity)};  // assumption: we not read from un-initialized

codec.read_exact(decompressed_buffer.as_mut_ref())?; // assumption violated

The assumption is violated because (either)

  1. read_exact may read from the ref it receives
  2. if read_exact errors and we recover from it, we may end up reading an uninitialized memory region

This is mentioned in https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read and has received a number of rustsecs (e.g. here and here).

imo atm a new allocation (i.e. vec![0; ...]) is the best we can do under the safety constraints (but I would be very happy to be proven wrong ^_^)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what I am missing now, thanks a lot for pointing out that the read method doesn't guarantee no read from the output buf :( Coming from a heavy C background, I totally didn't expect this behavior. This is very unfortunate. They should have added a new read method to support write only use-case.

In this case agree with you we need to initialize the output buffer on every iteration. Although I think it might be better to use let the allocator decide whether an entire new allocation is needed or not. So I actually think your original code has it right: clear + resize.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed there is a new api in the brew that we could leverage in the future to get rid of the initialization overhead: https://doc.rust-lang.org/nightly/std/io/trait.Read.html#method.initializer

Copy link

@houqp houqp Oct 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another possible route would be rolling our own read_exact implementation, i.e. use codec.bytes().take(new_len) to get a decompressed bytes iterator, then write to the uninitialized buffer manually. But I haven't looked into whether the rust compiler can optimize the iterator away into simple memcopy call.

@jorgecarleitao jorgecarleitao merged commit 7fbf8e4 into main Oct 17, 2021
@jorgecarleitao jorgecarleitao deleted the reuse_compress branch October 17, 2021 04:29
@jorgecarleitao jorgecarleitao changed the title Reuse compress buffer Alowed reusing compression buffer Oct 18, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants