Skip to content

Commit

Permalink
Introduce ReadOptions with builder API, for parquet filter row grou…
Browse files Browse the repository at this point in the history
…ps that satisfy all filters, and enable filter row groups by range. (#1389)

* Filter row groups by comparing midpoint with offset range

* lint

* ReadOptions with builder API

* fix comments

* precise range doc

* tab to space
  • Loading branch information
yjshen authored Mar 6, 2022
1 parent 1efd81d commit 2bca71e
Showing 1 changed file with 168 additions and 17 deletions.
185 changes: 168 additions & 17 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,57 @@ pub struct SerializedFileReader<R: ChunkReader> {
metadata: ParquetMetaData,
}

/// A builder for [`ReadOptions`].
/// For the predicates that are added to the builder,
/// they will be chained using 'AND' to filter the row groups.
pub struct ReadOptionsBuilder {
predicates: Vec<Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>>,
}

impl ReadOptionsBuilder {
/// New builder
pub fn new() -> Self {
ReadOptionsBuilder { predicates: vec![] }
}

/// Add a predicate on row group metadata to the reading option,
/// Filter only row groups that match the predicate criteria
pub fn with_predicate(
mut self,
predicate: Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>,
) -> Self {
self.predicates.push(predicate);
self
}

/// Add a range predicate on filtering row groups if their midpoints are within
/// the Closed-Open range `[start..end) {x | start <= x < end}`
pub fn with_range(mut self, start: i64, end: i64) -> Self {
assert!(start < end);
let predicate = move |rg: &RowGroupMetaData, _: usize| {
let mid = get_midpoint_offset(rg);
mid >= start && mid < end
};
self.predicates.push(Box::new(predicate));
self
}

/// Seal the builder and return the read options
pub fn build(self) -> ReadOptions {
ReadOptions {
predicates: self.predicates,
}
}
}

/// A collection of options for reading a Parquet file.
///
/// Currently, only predicates on row group metadata are supported.
/// All predicates will be chained using 'AND' to filter the row groups.
pub struct ReadOptions {
predicates: Vec<Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>>,
}

impl<R: 'static + ChunkReader> SerializedFileReader<R> {
/// Creates file reader from a Parquet file.
/// Returns error if Parquet file does not exist or is corrupt.
Expand All @@ -138,25 +189,48 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
})
}

/// Filters row group metadata to only those row groups,
/// for which the predicate function returns true
pub fn filter_row_groups(
&mut self,
predicate: &dyn Fn(&RowGroupMetaData, usize) -> bool,
) {
/// Creates file reader from a Parquet file with read options.
/// Returns error if Parquet file does not exist or is corrupt.
pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
let metadata = footer::parse_metadata(&chunk_reader)?;
let mut predicates = options.predicates;
let row_groups = metadata.row_groups().to_vec();
let mut filtered_row_groups = Vec::<RowGroupMetaData>::new();
for (i, row_group_metadata) in self.metadata.row_groups().iter().enumerate() {
if predicate(row_group_metadata, i) {
filtered_row_groups.push(row_group_metadata.clone());
for (i, rg_meta) in row_groups.into_iter().enumerate() {
let mut keep = true;
for predicate in &mut predicates {
if !predicate(&rg_meta, i) {
keep = false;
break;
}
}
if keep {
filtered_row_groups.push(rg_meta);
}
}
self.metadata = ParquetMetaData::new(
self.metadata.file_metadata().clone(),
filtered_row_groups,
);

Ok(Self {
chunk_reader: Arc::new(chunk_reader),
metadata: ParquetMetaData::new(
metadata.file_metadata().clone(),
filtered_row_groups,
),
})
}
}

/// Get midpoint offset for a row group
fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
let col = meta.column(0);
let mut offset = col.data_page_offset();
if let Some(dic_offset) = col.dictionary_page_offset() {
if offset > dic_offset {
offset = dic_offset
}
};
offset + meta.compressed_size() / 2
}

impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
fn metadata(&self) -> &ParquetMetaData {
&self.metadata
Expand Down Expand Up @@ -790,19 +864,96 @@ mod tests {
}

#[test]
fn test_file_reader_filter_row_groups() -> Result<()> {
fn test_file_reader_with_no_filter() -> Result<()> {
let test_file = get_test_file("alltypes_plain.parquet");
let origin_reader = SerializedFileReader::new(test_file)?;
// test initial number of row groups
let metadata = origin_reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
Ok(())
}

#[test]
fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
let test_file = get_test_file("alltypes_plain.parquet");
let mut reader = SerializedFileReader::new(test_file)?;
let read_options = ReadOptionsBuilder::new()
.with_predicate(Box::new(|_, _| false))
.build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);
Ok(())
}

#[test]
fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
let test_file = get_test_file("alltypes_plain.parquet");
let origin_reader = SerializedFileReader::new(test_file)?;
// test initial number of row groups
let metadata = origin_reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
let mid = get_midpoint_offset(metadata.row_group(0));

let test_file = get_test_file("alltypes_plain.parquet");
let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);

let test_file = get_test_file("alltypes_plain.parquet");
let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);
Ok(())
}

#[test]
fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
let test_file = get_test_file("alltypes_plain.parquet");
let origin_reader = SerializedFileReader::new(test_file)?;
let metadata = origin_reader.metadata();
let mid = get_midpoint_offset(metadata.row_group(0));

// true, true predicate
let test_file = get_test_file("alltypes_plain.parquet");
let read_options = ReadOptionsBuilder::new()
.with_predicate(Box::new(|_, _| true))
.with_range(mid, mid + 1)
.build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);

// test filtering out all row groups
reader.filter_row_groups(&|_, _| false);
// true, false predicate
let test_file = get_test_file("alltypes_plain.parquet");
let read_options = ReadOptionsBuilder::new()
.with_predicate(Box::new(|_, _| true))
.with_range(0, mid)
.build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);

// false, true predicate
let test_file = get_test_file("alltypes_plain.parquet");
let read_options = ReadOptionsBuilder::new()
.with_predicate(Box::new(|_, _| false))
.with_range(mid, mid + 1)
.build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);

// false, false predicate
let test_file = get_test_file("alltypes_plain.parquet");
let read_options = ReadOptionsBuilder::new()
.with_predicate(Box::new(|_, _| false))
.with_range(0, mid)
.build();
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);
Ok(())
}
}

0 comments on commit 2bca71e

Please sign in to comment.