Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ParquetMetaDataReader to load page indexes in SerializedFileReader::new_with_options #6506

Merged
merged 1 commit into from
Oct 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 154 additions & 20 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use crate::bloom_filter::Sbbf;
use crate::column::page::{Page, PageMetadata, PageReader};
use crate::compression::{create_codec, Codec};
use crate::errors::{ParquetError, Result};
use crate::file::page_index::index_reader;
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::file::{
metadata::*,
Expand Down Expand Up @@ -210,24 +209,19 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
}
}

let mut metadata = metadata_builder.build();

// If page indexes are desired, build them with the filtered set of row groups
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key here is that we're only pulling page indexes for row groups that survived the predicate filtering above.

if options.enable_page_index {
let mut columns_indexes = vec![];
let mut offset_indexes = vec![];

for rg in metadata_builder.row_groups().iter() {
let column_index = index_reader::read_columns_indexes(&chunk_reader, rg.columns())?;
let offset_index = index_reader::read_offset_indexes(&chunk_reader, rg.columns())?;
columns_indexes.push(column_index);
offset_indexes.push(offset_index);
}
metadata_builder = metadata_builder
.set_column_index(Some(columns_indexes))
.set_offset_index(Some(offset_indexes));
let mut reader =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes sense to switch the metadata.

ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
reader.read_page_indexes(&chunk_reader)?;
metadata = reader.finish()?;
}

Ok(Self {
chunk_reader: Arc::new(chunk_reader),
metadata: Arc::new(metadata_builder.build()),
metadata: Arc::new(metadata),
props: Arc::new(options.props),
})
}
Expand Down Expand Up @@ -769,12 +763,15 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {

#[cfg(test)]
mod tests {
use bytes::Buf;

use crate::file::properties::{EnabledStatistics, WriterProperties};
use crate::format::BoundaryOrder;

use crate::basic::{self, ColumnOrder};
use crate::column::reader::ColumnReader;
use crate::data_type::private::ParquetValueType;
use crate::data_type::{AsBytes, FixedLenByteArrayType};
use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
use crate::file::page_index::index::{Index, NativeIndex};
use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
use crate::file::writer::SerializedFileWriter;
Expand Down Expand Up @@ -1198,50 +1195,62 @@ mod tests {

#[test]
fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
let test_file = get_test_file("alltypes_plain.parquet");
let test_file = get_test_file("alltypes_tiny_pages.parquet");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switch to a test file that contains page indexes.

let origin_reader = SerializedFileReader::new(test_file)?;
let metadata = origin_reader.metadata();
let mid = get_midpoint_offset(metadata.row_group(0));

// true, true predicate
let test_file = get_test_file("alltypes_plain.parquet");
let test_file = get_test_file("alltypes_tiny_pages.parquet");
let read_options = ReadOptionsBuilder::new()
.with_page_index()
.with_predicate(Box::new(|_, _| true))
.with_range(mid, mid + 1)
.build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
assert_eq!(metadata.column_index().unwrap().len(), 1);
assert_eq!(metadata.offset_index().unwrap().len(), 1);

// true, false predicate
let test_file = get_test_file("alltypes_plain.parquet");
let test_file = get_test_file("alltypes_tiny_pages.parquet");
let read_options = ReadOptionsBuilder::new()
.with_page_index()
.with_predicate(Box::new(|_, _| true))
.with_range(0, mid)
.build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);
assert_eq!(metadata.column_index().unwrap().len(), 0);
assert_eq!(metadata.offset_index().unwrap().len(), 0);

// false, true predicate
let test_file = get_test_file("alltypes_plain.parquet");
let test_file = get_test_file("alltypes_tiny_pages.parquet");
let read_options = ReadOptionsBuilder::new()
.with_page_index()
.with_predicate(Box::new(|_, _| false))
.with_range(mid, mid + 1)
.build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);
assert_eq!(metadata.column_index().unwrap().len(), 0);
assert_eq!(metadata.offset_index().unwrap().len(), 0);

// false, false predicate
let test_file = get_test_file("alltypes_plain.parquet");
let test_file = get_test_file("alltypes_tiny_pages.parquet");
let read_options = ReadOptionsBuilder::new()
.with_page_index()
.with_predicate(Box::new(|_, _| false))
.with_range(0, mid)
.build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);
assert_eq!(metadata.column_index().unwrap().len(), 0);
assert_eq!(metadata.offset_index().unwrap().len(), 0);
Ok(())
}

Expand Down Expand Up @@ -1804,4 +1813,129 @@ mod tests {
start += 1;
}
}

#[test]
fn test_filtered_rowgroup_metadata() {
let message_type = "
message test_schema {
REQUIRED INT32 a;
}
";
let schema = Arc::new(parse_message_type(message_type).unwrap());
let props = Arc::new(
WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.build(),
);
let mut file: File = tempfile::tempfile().unwrap();
let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
let data = [1, 2, 3, 4, 5];

// write 5 row groups
for idx in 0..5 {
let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
let mut row_group_writer = file_writer.next_row_group().unwrap();
if let Some(mut writer) = row_group_writer.next_column().unwrap() {
writer
.typed::<Int32Type>()
.write_batch(data_i.as_slice(), None, None)
.unwrap();
writer.close().unwrap();
}
row_group_writer.close().unwrap();
file_writer.flushed_row_groups();
}
let file_metadata = file_writer.close().unwrap();

assert_eq!(file_metadata.num_rows, 25);
assert_eq!(file_metadata.row_groups.len(), 5);

// read only the 3rd row group
let read_options = ReadOptionsBuilder::new()
.with_page_index()
.with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
.build();
let reader =
SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
.unwrap();
let metadata = reader.metadata();

// check we got the expected row group
assert_eq!(metadata.num_row_groups(), 1);
assert_eq!(metadata.row_group(0).ordinal(), Some(2));

// check we only got the relevant page indexes
assert!(metadata.column_index().is_some());
assert!(metadata.offset_index().is_some());
assert_eq!(metadata.column_index().unwrap().len(), 1);
assert_eq!(metadata.offset_index().unwrap().len(), 1);
let col_idx = metadata.column_index().unwrap();
let off_idx = metadata.offset_index().unwrap();
let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
let pg_idx = &col_idx[0][0];
let off_idx_i = &off_idx[0][0];

// test that we got the index matching the row group
match pg_idx {
Index::INT32(int_idx) => {
let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
}
_ => panic!("wrong stats type"),
}

// check offset index matches too
assert_eq!(
off_idx_i.page_locations[0].offset,
metadata.row_group(0).column(0).data_page_offset()
);

// read non-contiguous row groups
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is great -- than you for adding these tests for predicate

let read_options = ReadOptionsBuilder::new()
.with_page_index()
.with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
.build();
let reader =
SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
.unwrap();
let metadata = reader.metadata();

// check we got the expected row groups
assert_eq!(metadata.num_row_groups(), 2);
assert_eq!(metadata.row_group(0).ordinal(), Some(1));
assert_eq!(metadata.row_group(1).ordinal(), Some(3));

// check we only got the relevant page indexes
assert!(metadata.column_index().is_some());
assert!(metadata.offset_index().is_some());
assert_eq!(metadata.column_index().unwrap().len(), 2);
assert_eq!(metadata.offset_index().unwrap().len(), 2);
let col_idx = metadata.column_index().unwrap();
let off_idx = metadata.offset_index().unwrap();

for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
let pg_idx = &col_idx_i[0];
let off_idx_i = &off_idx[i][0];

// test that we got the index matching the row group
match pg_idx {
Index::INT32(int_idx) => {
let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
}
_ => panic!("wrong stats type"),
}

// check offset index matches too
assert_eq!(
off_idx_i.page_locations[0].offset,
metadata.row_group(i).column(0).data_page_offset()
);
}
}
}
Loading