diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 07819bdf52cb..3de58d456a89 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -532,25 +532,43 @@ pub(crate) mod test_util { pub async fn store_parquet( batches: Vec, + multi_page: bool, ) -> Result<(Vec, Vec)> { - let files: Vec<_> = batches - .into_iter() - .map(|batch| { - let mut output = NamedTempFile::new().expect("creating temp file"); - - let props = WriterProperties::builder().build(); - let mut writer = - ArrowWriter::try_new(&mut output, batch.schema(), Some(props)) - .expect("creating writer"); - - writer.write(&batch).expect("Writing batch"); - writer.close().unwrap(); - output - }) - .collect(); - - let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect(); - Ok((meta, files)) + if multi_page { + // All batches write in to one file, each batch must have same schema. + let mut output = NamedTempFile::new().expect("creating temp file"); + let mut builder = WriterProperties::builder(); + builder = builder.set_data_page_row_count_limit(2); + let proper = builder.build(); + let mut writer = + ArrowWriter::try_new(&mut output, batches[0].schema(), Some(proper)) + .expect("creating writer"); + for b in batches { + writer.write(&b).expect("Writing batch"); + } + writer.close().unwrap(); + Ok((vec![local_unpartitioned_file(&output)], vec![output])) + } else { + // Each batch writes to their own file + let files: Vec<_> = batches + .into_iter() + .map(|batch| { + let mut output = NamedTempFile::new().expect("creating temp file"); + + let props = WriterProperties::builder().build(); + let mut writer = + ArrowWriter::try_new(&mut output, batch.schema(), Some(props)) + .expect("creating writer"); + + writer.write(&batch).expect("Writing batch"); + writer.close().unwrap(); + output + }) + .collect(); + + let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect(); + Ok((meta, files)) + } } } @@ -599,7 +617,7 @@ mod tests { let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap(); let store = Arc::new(LocalFileSystem::new()) as _; - let (meta, _files) = store_parquet(vec![batch1, batch2]).await?; + let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?; let format = ParquetFormat::default(); let schema = format.infer_schema(&store, &meta).await.unwrap(); @@ -738,7 +756,7 @@ mod tests { let store = Arc::new(RequestCountingObjectStore::new(Arc::new( LocalFileSystem::new(), ))); - let (meta, _files) = store_parquet(vec![batch1, batch2]).await?; + let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?; // Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch // for the remaining metadata diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 1f98dd88ca6e..61d2e5badb6b 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -445,7 +445,7 @@ impl FileOpener for ParquetOpener { // page index pruning: if all data on individual pages can // be ruled using page metadata, rows from other columns // with that range can be skipped as well - if let Some(row_selection) = enable_page_index + if let Some(row_selection) = (enable_page_index && !row_groups.is_empty()) .then(|| { page_filter::build_page_filter( pruning_predicate.as_ref(), @@ -914,7 +914,7 @@ mod tests { datasource::file_format::{parquet::ParquetFormat, FileFormat}, physical_plan::collect, }; - use arrow::array::Float32Array; + use arrow::array::{Float32Array, Int32Array}; use arrow::datatypes::DataType::Decimal128; use arrow::record_batch::RecordBatch; use arrow::{ @@ -955,9 +955,16 @@ mod tests { predicate: Option, pushdown_predicate: bool, ) -> Result> { - round_trip(batches, projection, schema, predicate, pushdown_predicate) - .await - .batches + round_trip( + batches, + projection, + schema, + predicate, + pushdown_predicate, + false, + ) + .await + .batches } /// Writes each RecordBatch as an individual parquet file and then @@ -969,6 +976,7 @@ mod tests { schema: Option, predicate: Option, pushdown_predicate: bool, + page_index_predicate: bool, ) -> RoundTripResult { let file_schema = match schema { Some(schema) => schema, @@ -978,7 +986,7 @@ mod tests { ), }; - let (meta, _files) = store_parquet(batches).await.unwrap(); + let (meta, _files) = store_parquet(batches, page_index_predicate).await.unwrap(); let file_groups = meta.into_iter().map(Into::into).collect(); // prepare the scan @@ -1003,6 +1011,10 @@ mod tests { .with_reorder_filters(true); } + if page_index_predicate { + parquet_exec = parquet_exec.with_enable_page_index(true); + } + let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let parquet_exec = Arc::new(parquet_exec); @@ -1220,7 +1232,8 @@ mod tests { let filter = col("c2").eq(lit(2_i64)); // read/write them files: - let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await; + let rt = + round_trip(vec![batch1, batch2], None, None, Some(filter), true, false).await; let expected = vec![ "+----+----+----+", "| c1 | c3 | c2 |", @@ -1369,7 +1382,8 @@ mod tests { let filter = col("c2").eq(lit(1_i64)); // read/write them files: - let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await; + let rt = + round_trip(vec![batch1, batch2], None, None, Some(filter), true, false).await; let expected = vec![ "+----+----+", @@ -1690,6 +1704,33 @@ mod tests { Ok(()) } + #[tokio::test] + async fn parquet_page_index_exec_metrics() { + let c1: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(2)])); + let c2: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(4), Some(5)])); + let batch1 = create_batch(vec![("int", c1.clone())]); + let batch2 = create_batch(vec![("int", c2.clone())]); + + let filter = col("int").eq(lit(4_i32)); + + let rt = + round_trip(vec![batch1, batch2], None, None, Some(filter), false, true).await; + + let metrics = rt.parquet_exec.metrics().unwrap(); + + // assert the batches and some metrics + let expected = vec![ + "+-----+", "| int |", "+-----+", "| 3 |", "| 4 |", "| 5 |", "+-----+", + ]; + assert_batches_sorted_eq!(expected, &rt.batches.unwrap()); + assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 3); + assert!( + get_value(&metrics, "page_index_eval_time") > 0, + "no eval time in metrics: {:#?}", + metrics + ); + } + #[tokio::test] async fn parquet_exec_metrics() { let c1: ArrayRef = Arc::new(StringArray::from(vec![ @@ -1709,7 +1750,7 @@ mod tests { let filter = col("c1").not_eq(lit("bar")); // read/write them files: - let rt = round_trip(vec![batch1], None, None, Some(filter), true).await; + let rt = round_trip(vec![batch1], None, None, Some(filter), true, false).await; let metrics = rt.parquet_exec.metrics().unwrap(); diff --git a/datafusion/core/src/physical_plan/file_format/parquet/metrics.rs b/datafusion/core/src/physical_plan/file_format/parquet/metrics.rs index 58e340e62417..64e08753abe2 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/metrics.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/metrics.rs @@ -35,6 +35,10 @@ pub struct ParquetFileMetrics { pub pushdown_rows_filtered: Count, /// Total time spent evaluating pushdown filters pub pushdown_eval_time: Time, + /// Total rows filtered out by parquet page index + pub page_index_rows_filtered: Count, + /// Total time spent evaluating parquet page index filters + pub page_index_eval_time: Time, } impl ParquetFileMetrics { @@ -63,6 +67,13 @@ impl ParquetFileMetrics { let pushdown_eval_time = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) .subset_time("pushdown_eval_time", partition); + let page_index_rows_filtered = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .counter("page_index_rows_filtered", partition); + + let page_index_eval_time = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .subset_time("page_index_eval_time", partition); Self { predicate_evaluation_errors, @@ -70,6 +81,8 @@ impl ParquetFileMetrics { bytes_scanned, pushdown_rows_filtered, pushdown_eval_time, + page_index_rows_filtered, + page_index_eval_time, } } } diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs index 37002af87608..828d213758fd 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs @@ -100,6 +100,8 @@ pub(crate) fn build_page_filter( file_metadata: &ParquetMetaData, file_metrics: &ParquetFileMetrics, ) -> Result> { + // scoped timer updates on drop + let _timer_guard = file_metrics.page_index_eval_time.timer(); let page_index_predicates = extract_page_index_push_down_predicates(pruning_predicate, schema)?; @@ -154,6 +156,18 @@ pub(crate) fn build_page_filter( row_selections.push_back(selectors.into_iter().flatten().collect::>()); } let final_selection = combine_multi_col_selection(row_selections); + let total_skip = + final_selection.iter().fold( + 0, + |acc, x| { + if x.skip { + acc + x.row_count + } else { + acc + } + }, + ); + file_metrics.page_index_rows_filtered.add(total_skip); Ok(Some(final_selection.into())) } else { Ok(None)