Skip to content

Commit

Permalink
perf: Lazy decompress Parquet pages (#18326)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored and ritchie46 committed Aug 23, 2024
1 parent 9828c41 commit e8cbe81
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 24 deletions.
10 changes: 8 additions & 2 deletions crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,7 @@ impl<D: utils::NestedDecoder> PageNestedDecoder<D> {
break;
};
let page = page?;
let page = page.decompress(&mut self.iter)?;

let mut state =
utils::State::new_nested(&self.decoder, &page, self.dict.as_ref())?;
Expand Down Expand Up @@ -743,9 +744,11 @@ impl<D: utils::NestedDecoder> PageNestedDecoder<D> {
break;
};
let page = page?;
// We cannot lazily decompress because we don't have the number of leaf values
// at this point. This is encoded within the `definition level` values. *sign*.
// In general, lazy decompression is quite difficult with nested values.
let page = page.decompress(&mut self.iter)?;

let mut state =
utils::State::new_nested(&self.decoder, &page, self.dict.as_ref())?;
let (def_iter, rep_iter) = level_iters(&page)?;

let mut count = ZeroCount::default();
Expand All @@ -762,6 +765,9 @@ impl<D: utils::NestedDecoder> PageNestedDecoder<D> {
None
};

let mut state =
utils::State::new_nested(&self.decoder, &page, self.dict.as_ref())?;

let start_length = nested_state.len();

// @TODO: move this to outside the loop.
Expand Down
16 changes: 9 additions & 7 deletions crates/polars-parquet/src/arrow/read/deserialize/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,18 +136,20 @@ pub fn iter_to_arrays(
};
let page = page?;

let rows = page.num_values();
let page_filter;
(page_filter, filter) = Filter::opt_split_at(&filter, rows);
let state_filter;
(state_filter, filter) = Filter::opt_split_at(&filter, page.num_values());

let num_rows = match page_filter {
None => rows,
// Skip the whole page if we don't need any rows from it
if state_filter.as_ref().is_some_and(|f| f.num_rows() == 0) {
continue;
}

let num_rows = match state_filter {
None => page.num_values(),
Some(filter) => filter.num_rows(),
};

len = (len + num_rows).min(num_rows);

iter.reuse_page_buffer(page);
}

Ok(Box::new(NullArray::new(data_type, len)))
Expand Down
11 changes: 6 additions & 5 deletions crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,21 +653,22 @@ impl<D: Decoder> PageDecoder<D> {

while num_rows_remaining > 0 {
let Some(page) = self.iter.next() else {
return self.decoder.finalize(self.data_type, self.dict, target);
break;
};
let page = page?;

let mut state = State::new(&self.decoder, &page, self.dict.as_ref())?;
let state_len = state.len();

let state_filter;
(state_filter, filter) = Filter::opt_split_at(&filter, state_len);
(state_filter, filter) = Filter::opt_split_at(&filter, page.num_values());

// Skip the whole page if we don't need any rows from it
if state_filter.as_ref().is_some_and(|f| f.num_rows() == 0) {
continue;
}

let page = page.decompress(&mut self.iter)?;

let mut state = State::new(&self.decoder, &page, self.dict.as_ref())?;

let start_length = target.len();
state.extend_from_state(&mut self.decoder, &mut target, state_filter)?;
let end_length = target.len();
Expand Down
39 changes: 29 additions & 10 deletions crates/polars-parquet/src/parquet/read/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use parquet_format_safe::DataPageHeaderV2;
use super::PageReader;
use crate::parquet::compression::{self, Compression};
use crate::parquet::error::{ParquetError, ParquetResult};
use crate::parquet::page::{CompressedPage, DataPage, DataPageHeader, DictPage, Page};
use crate::parquet::page::{
CompressedDataPage, CompressedPage, DataPage, DataPageHeader, DictPage, Page,
};
use crate::parquet::CowBuffer;

fn decompress_v1(
Expand Down Expand Up @@ -205,8 +207,27 @@ impl BasicDecompressor {
}
}

pub struct DataPageItem {
page: CompressedDataPage,
}

impl DataPageItem {
pub fn num_values(&self) -> usize {
self.page.num_values()
}

pub fn decompress(self, decompressor: &mut BasicDecompressor) -> ParquetResult<DataPage> {
let p = decompress(CompressedPage::Data(self.page), &mut decompressor.buffer)?;
let Page::Data(p) = p else {
panic!("Decompressing a data page should result in a data page");
};

Ok(p)
}
}

impl Iterator for BasicDecompressor {
type Item = ParquetResult<DataPage>;
type Item = ParquetResult<DataPageItem>;

fn next(&mut self) -> Option<Self::Item> {
let page = match self.reader.next() {
Expand All @@ -215,15 +236,13 @@ impl Iterator for BasicDecompressor {
Some(Ok(p)) => p,
};

Some(decompress(page, &mut self.buffer).and_then(|p| {
let Page::Data(p) = p else {
return Err(ParquetError::oos(
"Found dictionary page beyond the first page of a column chunk",
));
};
let CompressedPage::Data(page) = page else {
return Some(Err(ParquetError::oos(
"Found dictionary page beyond the first page of a column chunk",
)));
};

Ok(p)
}))
Some(Ok(DataPageItem { page }))
}

fn size_hint(&self) -> (usize, Option<usize>) {
Expand Down
1 change: 1 addition & 0 deletions crates/polars/tests/it/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ where
.map(|dict| dictionary::deserialize(&dict, column.physical_type()))
.transpose()?;
while let Some(page) = iterator.next().transpose()? {
let page = page.decompress(&mut iterator)?;
if !has_filled {
struct_::extend_validity(&mut validity, &page)?;
}
Expand Down

0 comments on commit e8cbe81

Please sign in to comment.