Skip to content

Commit

Permalink
fix: fix native decompress binary offsets out of bounds (#16085)
Browse files Browse the repository at this point in the history
* fix: fix native decompress binary offsets out of bounds

* fix

* fix
  • Loading branch information
b41sh authored Jul 22, 2024
1 parent 3246945 commit 65ccf8d
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 13 deletions.
13 changes: 8 additions & 5 deletions src/common/arrow/src/native/compression/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,24 +138,27 @@ pub fn decompress_binary<O: Offset, R: NativeReadBuf>(
)
};
c.decompress(&input[..compressed_size], out_slice)?;
unsafe { offsets.set_len(offsets.len() + length + 1) };

if use_inner {
reader.consume(compressed_size);
}
let old_length = offsets.len();
let new_length = offsets.len() + length;
unsafe { offsets.set_len(new_length + 1) };

if let Some(last) = last {
// fix offset
for i in offsets.len() - length - 1..offsets.len() {
// fix offset:
// because the offsets in current page is append to the original offsets,
// each new offset value must add the last value in original offsets.
for i in old_length..new_length {
let next_val = unsafe { *offsets.get_unchecked(i + 1) };
let val = unsafe { offsets.get_unchecked_mut(i) };
*val = last + next_val;
}
unsafe { offsets.set_len(offsets.len() - 1) };
unsafe { offsets.set_len(new_length) };
}

// values

let (_, compressed_size, uncompressed_size) = read_compress_header(reader)?;
use_inner = false;
reader.fill_buf()?;
Expand Down
20 changes: 12 additions & 8 deletions src/common/arrow/tests/it/native/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use rand::Rng;
use rand::SeedableRng;

pub const WRITE_PAGE: usize = 2048;
pub const SMALL_WRITE_PAGE: usize = 2;

pub fn new_test_chunk() -> Chunk<Box<dyn Array>> {
Chunk::new(vec![
Expand All @@ -75,10 +76,10 @@ pub fn new_test_chunk() -> Chunk<Box<dyn Array>> {
Box::new(Float32Array::from_vec(vec![1.1, 2.2, 3.3, 4.4, 5.5, 6.6])) as _,
Box::new(Float64Array::from_vec(vec![1.1, 2.2, 3.3, 4.4, 5.5, 6.6])) as _,
Box::new(Utf8Array::<i32>::from_iter_values(
["1.1", "2.2", "3.3", "4.4", "5.5", "6.6"].iter(),
["abcdefg", "mn", "11", "", "3456", "xyz"].iter(),
)) as _,
Box::new(BinaryArray::<i64>::from_iter_values(
["1.1", "2.2", "3.3", "4.4", "5.5", "6.6"].iter(),
["abcdefg", "mn", "11", "", "3456", "xyz"].iter(),
)) as _,
])
}
Expand Down Expand Up @@ -437,14 +438,17 @@ fn test_write_read(chunk: Chunk<Box<dyn Array>>) {
CommonCompression::Snappy,
CommonCompression::None,
];
let page_sizes = vec![WRITE_PAGE, SMALL_WRITE_PAGE];

for compression in compressions {
test_write_read_with_options(chunk.clone(), WriteOptions {
default_compression: compression,
max_page_size: Some(WRITE_PAGE),
default_compress_ratio: Some(2.0f64),
forbidden_compressions: vec![],
});
for page_size in &page_sizes {
test_write_read_with_options(chunk.clone(), WriteOptions {
default_compression: compression,
max_page_size: Some(*page_size),
default_compress_ratio: Some(2.0f64),
forbidden_compressions: vec![],
});
}
}
}

Expand Down

0 comments on commit 65ccf8d

Please sign in to comment.