Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 50 additions & 25 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,31 +389,15 @@ pub(crate) fn decode_page(

// TODO: page header could be huge because of statistics. We should set a
// maximum page header size and abort if that is exceeded.
Comment on lines 390 to 391
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not relevant to this PR, but I think this TODO has largely been addressed by #8376 which enabled skipping the decoding of the page statistics.

let buffer = match decompressor {
Some(decompressor) if can_decompress => {
let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
if offset > buffer.len() || offset > uncompressed_page_size {
return Err(general_err!("Invalid page header"));
}
let decompressed_size = uncompressed_page_size - offset;
let mut decompressed = Vec::with_capacity(uncompressed_page_size);
decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
if decompressed_size > 0 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see here that if decompressed_size is 0, it returns decompressed unmodified and decompressed was just the first offset bytes of the input buffer

I changed the code to return buffer.slice(..offset) in this case

I can add it to this location directly to make the diff smaller, but the logic was already hard to follow and highly indented to I moved it into its own function while I was at it

let compressed = &buffer.as_ref()[offset..];
decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?;
}

if decompressed.len() != uncompressed_page_size {
return Err(general_err!(
"Actual decompressed size doesn't match the expected one ({} vs {})",
decompressed.len(),
uncompressed_page_size
));
}

Bytes::from(decompressed)
}
_ => buffer,
let buffer = if can_decompress {
decompress_buffer(
buffer,
offset,
decompressor,
page_header.uncompressed_page_size,
)?
} else {
buffer
};

let result = match page_header.r#type {
Expand Down Expand Up @@ -471,6 +455,47 @@ pub(crate) fn decode_page(
Ok(result)
}

/// Decompressed the specified buffer, starting from the specified offset, using
/// the provided decompressor if available and applicable. If the buffer is not
/// compressed, it will be returned as is.
Comment on lines +459 to +460
Copy link
Member

@mapleFU mapleFU Oct 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code looks good to me but the I don't know if the comment "not compressed" can be replaced, if decompress_buffer is called and decompressed_size == 0 , seems that it generally means something like "this page only have levels, but not have non-null values"? ( Point me out if I'm wrong)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually also do not know what decompressed_size really means -- you are probably right. I will research it more carefully

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read https://github.com/apache/parquet-format/blob/master/README.md#data-pages and I agree with your assessment that if the decompressed_size=0 it corresponds to no non-null values

fn decompress_buffer(
buffer: Bytes,
offset: usize,
mut decompressor: Option<&mut Box<dyn Codec>>,
uncompressed_page_size: i32,
) -> Result<Bytes> {
let Some(decompressor) = decompressor.as_mut() else {
return Ok(buffer);
};

let uncompressed_page_size = usize::try_from(uncompressed_page_size)?;
if offset > buffer.len() || offset > uncompressed_page_size {
return Err(general_err!("Invalid page header"));
}

let decompressed_size = uncompressed_page_size - offset;

if decompressed_size == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me 👍

// Not compressed, return buffer as is
return Ok(buffer.slice(..offset));
}

let mut decompressed = Vec::with_capacity(uncompressed_page_size);
decompressed.extend_from_slice(&buffer[..offset]);
let compressed = &buffer.as_ref()[offset..];
decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?;

if decompressed.len() != uncompressed_page_size {
return Err(general_err!(
"Actual decompressed size doesn't match the expected one ({} vs {})",
decompressed.len(),
uncompressed_page_size
));
}

Ok(Bytes::from(decompressed))
}

enum SerializedPageReaderState {
Values {
/// The current byte offset in the reader
Expand Down
Loading