Skip to content

Commit

Permalink
Add statistics for parquet page level skipping
Browse files Browse the repository at this point in the history
Signed-off-by: yangjiang <yangjiang@ebay.com>
  • Loading branch information
Ted-Jiang committed Nov 4, 2022
1 parent f61b43a commit 1c97dc4
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 29 deletions.
60 changes: 40 additions & 20 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,25 +532,45 @@ pub(crate) mod test_util {

pub async fn store_parquet(
batches: Vec<RecordBatch>,
multi_page: bool,
) -> Result<(Vec<ObjectMeta>, Vec<NamedTempFile>)> {
let files: Vec<_> = batches
.into_iter()
.map(|batch| {
let mut output = NamedTempFile::new().expect("creating temp file");

let props = WriterProperties::builder().build();
let mut writer =
ArrowWriter::try_new(&mut output, batch.schema(), Some(props))
.expect("creating writer");

writer.write(&batch).expect("Writing batch");
writer.close().unwrap();
output
})
.collect();

let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();
Ok((meta, files))
if multi_page {
// All batches write in to one file, each batch must have same schema.
let mut output = NamedTempFile::new().expect("creating temp file");
let mut builder = WriterProperties::builder();
// todo https://github.com/apache/arrow-rs/issues/2941 release change to row limit.
builder = builder.set_data_pagesize_limit(1);
builder = builder.set_write_batch_size(1);
let proper = builder.build();
let mut writer =
ArrowWriter::try_new(&mut output, batches[0].schema(), Some(proper))
.expect("creating writer");
for b in batches {
writer.write(&b).expect("Writing batch");
}
writer.close().unwrap();
Ok((vec![local_unpartitioned_file(&output)], vec![output]))
} else {
// Each batch writes to their own file
let files: Vec<_> = batches
.into_iter()
.map(|batch| {
let mut output = NamedTempFile::new().expect("creating temp file");

let props = WriterProperties::builder().build();
let mut writer =
ArrowWriter::try_new(&mut output, batch.schema(), Some(props))
.expect("creating writer");

writer.write(&batch).expect("Writing batch");
writer.close().unwrap();
output
})
.collect();

let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();
Ok((meta, files))
}
}
}

Expand Down Expand Up @@ -599,7 +619,7 @@ mod tests {
let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();

let store = Arc::new(LocalFileSystem::new()) as _;
let (meta, _files) = store_parquet(vec![batch1, batch2]).await?;
let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;

let format = ParquetFormat::default();
let schema = format.infer_schema(&store, &meta).await.unwrap();
Expand Down Expand Up @@ -738,7 +758,7 @@ mod tests {
let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
LocalFileSystem::new(),
)));
let (meta, _files) = store_parquet(vec![batch1, batch2]).await?;
let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;

// Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch
// for the remaining metadata
Expand Down
61 changes: 52 additions & 9 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ impl FileOpener for ParquetOpener {
// page index pruning: if all data on individual pages can
// be ruled using page metadata, rows from other columns
// with that range can be skipped as well
if let Some(row_selection) = enable_page_index
if let Some(row_selection) = (enable_page_index && !row_groups.is_empty())
.then(|| {
page_filter::build_page_filter(
pruning_predicate.as_ref(),
Expand Down Expand Up @@ -919,7 +919,7 @@ mod tests {
datasource::file_format::{parquet::ParquetFormat, FileFormat},
physical_plan::collect,
};
use arrow::array::Float32Array;
use arrow::array::{Float32Array, Int32Array};
use arrow::datatypes::DataType::Decimal128;
use arrow::record_batch::RecordBatch;
use arrow::{
Expand Down Expand Up @@ -960,9 +960,16 @@ mod tests {
predicate: Option<Expr>,
pushdown_predicate: bool,
) -> Result<Vec<RecordBatch>> {
round_trip(batches, projection, schema, predicate, pushdown_predicate)
.await
.batches
round_trip(
batches,
projection,
schema,
predicate,
pushdown_predicate,
false,
)
.await
.batches
}

/// Writes each RecordBatch as an individual parquet file and then
Expand All @@ -974,6 +981,7 @@ mod tests {
schema: Option<SchemaRef>,
predicate: Option<Expr>,
pushdown_predicate: bool,
page_index_predicate: bool,
) -> RoundTripResult {
let file_schema = match schema {
Some(schema) => schema,
Expand All @@ -983,7 +991,7 @@ mod tests {
),
};

let (meta, _files) = store_parquet(batches).await.unwrap();
let (meta, _files) = store_parquet(batches, page_index_predicate).await.unwrap();
let file_groups = meta.into_iter().map(Into::into).collect();

// prepare the scan
Expand All @@ -1008,6 +1016,10 @@ mod tests {
.with_reorder_filters(true);
}

if page_index_predicate {
parquet_exec = parquet_exec.with_enable_page_index(true);
}

let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let parquet_exec = Arc::new(parquet_exec);
Expand Down Expand Up @@ -1225,7 +1237,8 @@ mod tests {
let filter = col("c2").eq(lit(2_i64));

// read/write them files:
let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await;
let rt =
round_trip(vec![batch1, batch2], None, None, Some(filter), true, false).await;
let expected = vec![
"+----+----+----+",
"| c1 | c3 | c2 |",
Expand Down Expand Up @@ -1374,7 +1387,8 @@ mod tests {
let filter = col("c2").eq(lit(1_i64));

// read/write them files:
let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await;
let rt =
round_trip(vec![batch1, batch2], None, None, Some(filter), true, false).await;

let expected = vec![
"+----+----+",
Expand Down Expand Up @@ -1695,6 +1709,35 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn parquet_page_index_exec_metrics() {
let c1: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(2)]));
let c2: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(4), Some(5)]));
let batch1 = create_batch(vec![("int", c1.clone())]);
let batch2 = create_batch(vec![("int", c2.clone())]);

let filter = col("int").eq(lit(4_i32));

let rt =
round_trip(vec![batch1, batch2], None, None, Some(filter), false, true).await;

let metrics = rt.parquet_exec.metrics().unwrap();

// todo fix this https://github.com/apache/arrow-rs/issues/2941 release change to row limit.
// assert the batches and some metrics
let expected = vec![
"+-----+", "| int |", "+-----+", "| |", "| 1 |", "| 2 |", "| 3 |",
"| 4 |", "| 5 |", "+-----+",
];
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 0);
assert!(
get_value(&metrics, "page_index_eval_time") > 0,
"no eval time in metrics: {:#?}",
metrics
);
}

#[tokio::test]
async fn parquet_exec_metrics() {
let c1: ArrayRef = Arc::new(StringArray::from(vec![
Expand All @@ -1714,7 +1757,7 @@ mod tests {
let filter = col("c1").not_eq(lit("bar"));

// read/write them files:
let rt = round_trip(vec![batch1], None, None, Some(filter), true).await;
let rt = round_trip(vec![batch1], None, None, Some(filter), true, false).await;

let metrics = rt.parquet_exec.metrics().unwrap();

Expand Down
13 changes: 13 additions & 0 deletions datafusion/core/src/physical_plan/file_format/parquet/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ pub struct ParquetFileMetrics {
pub pushdown_rows_filtered: Count,
/// Total time spent evaluating pushdown filters
pub pushdown_eval_time: Time,
/// Total rows filtered out by parquet page index
pub page_index_rows_filtered: Count,
/// Total time spent evaluating parquet page index filters
pub page_index_eval_time: Time,
}

impl ParquetFileMetrics {
Expand Down Expand Up @@ -63,13 +67,22 @@ impl ParquetFileMetrics {
let pushdown_eval_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.subset_time("pushdown_eval_time", partition);
let page_index_rows_filtered = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("page_index_rows_filtered", partition);

let page_index_eval_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.subset_time("page_index_eval_time", partition);

Self {
predicate_evaluation_errors,
row_groups_pruned,
bytes_scanned,
pushdown_rows_filtered,
pushdown_eval_time,
page_index_rows_filtered,
page_index_eval_time,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ pub(crate) fn build_page_filter(
file_metadata: &ParquetMetaData,
file_metrics: &ParquetFileMetrics,
) -> Result<Option<RowSelection>> {
// scoped timer updates on drop
let _timer_guard = file_metrics.page_index_eval_time.timer();
let page_index_predicates =
extract_page_index_push_down_predicates(pruning_predicate, schema)?;

Expand Down Expand Up @@ -154,6 +156,18 @@ pub(crate) fn build_page_filter(
row_selections.push_back(selectors.into_iter().flatten().collect::<Vec<_>>());
}
let final_selection = combine_multi_col_selection(row_selections);
let total_skip =
final_selection.iter().fold(
0,
|acc, x| {
if x.skip {
acc + x.row_count
} else {
acc
}
},
);
file_metrics.page_index_rows_filtered.add(total_skip);
Ok(Some(final_selection.into()))
} else {
Ok(None)
Expand Down

0 comments on commit 1c97dc4

Please sign in to comment.