Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Support using offset index in ParquetRecordBatchStream when pu… #3616

Merged
merged 2 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,9 +577,15 @@ mod tests {
use datafusion_common::ScalarValue;
use futures::stream::BoxStream;
use futures::StreamExt;
use log::error;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::{GetResult, ListResult, MultipartId};
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex};
use parquet::file::page_index::index::Index;
use tokio::fs::File;
use tokio::io::AsyncWrite;

#[tokio::test]
Expand Down Expand Up @@ -1126,6 +1132,73 @@ mod tests {

Ok(())
}
#[tokio::test]
async fn test_read_parquet_page_index() -> Result<()> {
let testdata = crate::test_util::parquet_test_data();
let path = format!("{}/alltypes_tiny_pages.parquet", testdata);
let file = File::open(path).await.unwrap();
let options = ArrowReaderOptions::new().with_page_index(true);
let builder =
ParquetRecordBatchStreamBuilder::new_with_options(file, options.clone())
.await
.unwrap()
.metadata()
.clone();
check_page_index_validation(builder.page_indexes(), builder.offset_indexes());

let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata);
let file = File::open(path).await.unwrap();

let builder = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
.await
.unwrap()
.metadata()
.clone();
check_page_index_validation(builder.page_indexes(), builder.offset_indexes());

Ok(())
}

fn check_page_index_validation(
page_index: Option<&ParquetColumnIndex>,
offset_index: Option<&ParquetOffsetIndex>,
) {
assert!(page_index.is_some());
assert!(offset_index.is_some());

let page_index = page_index.unwrap();
let offset_index = offset_index.unwrap();

// there is only one row group in one file.
assert_eq!(page_index.len(), 1);
assert_eq!(offset_index.len(), 1);
let page_index = page_index.get(0).unwrap();
let offset_index = offset_index.get(0).unwrap();

// 13 col in one row group
assert_eq!(page_index.len(), 13);
assert_eq!(offset_index.len(), 13);

// test result in int_col
let int_col_index = page_index.get(4).unwrap();
let int_col_offset = offset_index.get(4).unwrap();

// 325 pages in int_col
assert_eq!(int_col_offset.len(), 325);
match int_col_index {
Index::INT32(index) => {
assert_eq!(index.indexes.len(), 325);
for min_max in index.clone().indexes {
assert!(min_max.min.is_some());
assert!(min_max.max.is_some());
assert!(min_max.null_count.is_some());
}
}
_ => {
error!("fail to read page index.")
}
}
}

fn assert_bytes_scanned(exec: Arc<dyn ExecutionPlan>, expected: usize) {
let actual = exec
Expand Down
18 changes: 17 additions & 1 deletion datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use log::debug;
use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::basic::{ConvertedType, LogicalType};
Expand All @@ -78,6 +79,11 @@ pub struct ParquetScanOptions {
/// If true, the generated `RowFilter` may reorder the predicate `Expr`s to try and optimize
/// the cost of filter evaluation.
reorder_predicates: bool,
/// If enabled, the reader will read the page index
/// This is used to optimise filter pushdown
/// via `RowSelector` and `RowFilter` by
/// eliminating unnecessary IO and decoding
enable_page_index: bool,
}

impl ParquetScanOptions {
Expand All @@ -92,6 +98,12 @@ impl ParquetScanOptions {
self.reorder_predicates = reorder_predicates;
self
}

/// Set whether to read page index when reading parquet
pub fn with_page_index(mut self, page_index: bool) -> Self {
self.enable_page_index = page_index;
self
}
}

/// Execution plan for scanning one or more Parquet partitions
Expand Down Expand Up @@ -393,9 +405,13 @@ impl FileOpener for ParquetOpener {
let table_schema = self.table_schema.clone();
let reorder_predicates = self.scan_options.reorder_predicates;
let pushdown_filters = self.scan_options.pushdown_filters;
let enable_page_index = self.scan_options.enable_page_index;

Ok(Box::pin(async move {
let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
let options = ArrowReaderOptions::new().with_page_index(enable_page_index);
let mut builder =
ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
.await?;
let adapted_projections =
schema_adapter.map_projections(builder.schema(), &projection)?;

Expand Down
2 changes: 1 addition & 1 deletion testing
Submodule testing updated 52 files
+ data/arrow-ipc-file/clusterfuzz-testcase-arrow-ipc-file-fuzz-5873085270589440
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-5480145071243264
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-5577412021190656
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-5749190446153728
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-5864855240835072.fuzz
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-6023524637081600
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-6177196536889344
+ data/arrow-ipc-file/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-6318558565498880.fuzz
+ data/arrow-ipc-stream/clusterfuzz-testcase-arrow-ipc-file-fuzz-5298734406172672
+ data/arrow-ipc-stream/clusterfuzz-testcase-arrow-ipc-file-fuzz-5502930036326400
+ data/arrow-ipc-stream/clusterfuzz-testcase-arrow-ipc-file-fuzz-6065820480962560.fuzz
+ data/arrow-ipc-stream/clusterfuzz-testcase-arrow-ipc-file-fuzz-6537416932982784
+ data/arrow-ipc-stream/clusterfuzz-testcase-arrow-ipc-file-fuzz-6598997234548736
+ data/arrow-ipc-stream/clusterfuzz-testcase-arrow-ipc-stream-fuzz-4895056843112448
+ data/arrow-ipc-stream/clusterfuzz-testcase-minimized-arrow-ipc-file-fuzz-6674891504484352
+ data/arrow-ipc-stream/clusterfuzz-testcase-minimized-arrow-ipc-stream-fuzz-4757582821064704
+ data/arrow-ipc-stream/clusterfuzz-testcase-minimized-arrow-ipc-stream-fuzz-4961281405222912
+ data/arrow-ipc-stream/clusterfuzz-testcase-minimized-arrow-ipc-stream-fuzz-5281967462023168
+ data/arrow-ipc-stream/clusterfuzz-testcase-minimized-arrow-ipc-stream-fuzz-6589380504977408.fuzz
+37 −0 data/avro/README.md
+ data/parquet/ARROW-17100.parquet
+24 −0 data/parquet/README.md
+ data/parquet/fuzzing/clusterfuzz-testcase-5913005913407488
+ data/parquet/fuzzing/clusterfuzz-testcase-6606237035003904
+ data/parquet/fuzzing/clusterfuzz-testcase-dictbitwidth-4680774947569664
+ data/parquet/fuzzing/clusterfuzz-testcase-dictbitwidth-5882232959270912
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-4738122420715520
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-4866999088447488
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-4938338763669504
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5004902418481152
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5103039558582272
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5106889906585600
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5152654819459072.fuzz
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5251250357141504
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5385788188131328
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5798108001337344
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5841507574743040
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-5915095763386368
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-6122962147737600.fuzz
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-6125206807642112.fuzz
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-6289584196026368
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-6358005443592192
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-6539993600884736
+ data/parquet/fuzzing/clusterfuzz-testcase-minimized-parquet-arrow-fuzz-6696667471020032
+ data/parquet/fuzzing/clusterfuzz-testcase-parquet-arrow-fuzz-5004902418481152
+ data/parquet/fuzzing/clusterfuzz-testcase-parquet-arrow-fuzz-5415048864989184
+ data/parquet/fuzzing/clusterfuzz-testcase-parquet-arrow-fuzz-5973249794637824
+ data/parquet/fuzzing/clusterfuzz-testcase-parquet-arrow-fuzz-6196357887557632.fuzz
+ data/parquet/fuzzing/clusterfuzz-testcase-parquet-arrow-fuzz-6702965604876288
+ data/parquet/fuzzing/crash-61d6204d481340860da54e30f1937b67234ad0f7
+ data/parquet/fuzzing/crash-649c71a618ae2fd80cec177a9676eb3e280fc1fa
+ data/parquet/fuzzing/crash-9840a7b1a0d24996069f6ee0779bbe9875e8aca3