Skip to content

Commit

Permalink
fix: apply missing filtering on parquet reader
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Nov 2, 2022
1 parent 4e6a728 commit bc60d9f
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 4 deletions.
8 changes: 7 additions & 1 deletion analytic_engine/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,16 @@ impl ProjectAndFilterReader {

Ok(Box::new(reverse_reader))
} else {
let filtered_row_groups = file_reader
.filtered_row_group_indexes()
.expect("filtered row groups must exist after filtering")
.to_vec();

let builder = ParquetRecordBatchReaderBuilder::try_new(file_reader)
.map_err(|e| Box::new(e) as _)
.context(DecodeRecordBatch)?
.with_batch_size(self.batch_size);
.with_batch_size(self.batch_size)
.with_row_groups(filtered_row_groups);

let builder = if self.projected_schema.is_all_projection() {
builder
Expand Down
21 changes: 18 additions & 3 deletions components/parquet_ext/src/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub struct CacheableSerializedFileReader<R: ChunkReader> {
name: String,
chunk_reader: Arc<R>,
metadata: Arc<ParquetMetaData>,
filtered_row_group_indexes: Option<Vec<usize>>,
data_cache: Option<DataCacheRef>,
}

Expand Down Expand Up @@ -95,23 +96,37 @@ impl<R: 'static + ChunkReader> CacheableSerializedFileReader<R> {
name,
chunk_reader: Arc::new(chunk_reader),
metadata,
filtered_row_group_indexes: None,
data_cache,
})
}

/// Returns the indexes of the filtered row groups.
///
/// [`None`] will be returned if `filter_row_groups` has not been called
/// yet.
pub fn filtered_row_group_indexes(&self) -> Option<&[usize]> {
self.filtered_row_group_indexes.as_deref()
}

/// Filters row group metadata to only those row groups,
/// for which the predicate function returns true
/// for which the predicate function returns true.
pub fn filter_row_groups(&mut self, predicate: &dyn Fn(&RowGroupMetaData, usize) -> bool) {
let mut filtered_row_groups = Vec::<RowGroupMetaData>::new();
for (i, row_group_metadata) in self.metadata.row_groups().iter().enumerate() {
let row_groups = self.metadata.row_groups();
let mut filtered_row_group_indexes = Vec::with_capacity(row_groups.len());
let mut filtered_row_groups = Vec::with_capacity(row_groups.len());
for (i, row_group_metadata) in row_groups.iter().enumerate() {
if predicate(row_group_metadata, i) {
filtered_row_groups.push(row_group_metadata.clone());
filtered_row_group_indexes.push(i);
}
}
self.metadata = Arc::new(ParquetMetaData::new(
self.metadata.file_metadata().clone(),
filtered_row_groups,
));

self.filtered_row_group_indexes = Some(filtered_row_group_indexes);
}
}

Expand Down

0 comments on commit bc60d9f

Please sign in to comment.