Skip to content

Commit

Permalink
Skip useless pruning predicates in ParquetExec (#4280)
Browse files Browse the repository at this point in the history
* Skip useless pruning predicates

* Update datafusion/core/src/physical_plan/file_format/parquet.rs

Co-authored-by: Daniël Heres <danielheres@gmail.com>

Co-authored-by: Daniël Heres <danielheres@gmail.com>
  • Loading branch information
alamb and Dandandan authored Nov 18, 2022
1 parent 949d5af commit a584ff5
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 12 deletions.
10 changes: 10 additions & 0 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use datafusion_expr::expr::{BinaryExpr, Cast};
use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter};
use datafusion_expr::{binary_expr, cast, try_cast, ExprSchemable};
use datafusion_physical_expr::create_physical_expr;
use datafusion_physical_expr::expressions::Literal;
use log::trace;

/// Interface to pass statistics information to [`PruningPredicate`]
Expand Down Expand Up @@ -224,6 +225,15 @@ impl PruningPredicate {
&self.predicate_expr
}

/// Returns true if this pruning predicate is "always true" (aka will not prune anything)
pub fn allways_true(&self) -> bool {
self.predicate_expr
.as_any()
.downcast_ref::<Literal>()
.map(|l| matches!(l.value(), ScalarValue::Boolean(Some(true))))
.unwrap_or_default()
}

/// Returns all need column indexes to evaluate this pruning predicate
pub(crate) fn need_input_columns_ids(&self) -> HashSet<usize> {
let mut set = HashSet::new();
Expand Down
68 changes: 56 additions & 12 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,28 @@ impl ParquetExec {
let predicate_creation_errors =
MetricBuilder::new(&metrics).global_counter("num_predicate_creation_errors");

let pruning_predicate = predicate.and_then(|predicate_expr| {
match PruningPredicate::try_new(
predicate_expr,
base_config.file_schema.clone(),
) {
Ok(pruning_predicate) => Some(pruning_predicate),
Err(e) => {
debug!("Could not create pruning predicate for: {}", e);
predicate_creation_errors.add(1);
let pruning_predicate = predicate
.and_then(|predicate_expr| {
match PruningPredicate::try_new(
predicate_expr,
base_config.file_schema.clone(),
) {
Ok(pruning_predicate) => Some(pruning_predicate),
Err(e) => {
debug!("Could not create pruning predicate for: {}", e);
predicate_creation_errors.add(1);
None
}
}
})
.and_then(|pruning_predicate| {
// If the pruning predicate can't prune anything, don't try
if pruning_predicate.allways_true() {
None
} else {
Some(pruning_predicate)
}
}
});
});

let (projected_schema, projected_statistics) = base_config.project();

Expand Down Expand Up @@ -680,7 +689,7 @@ mod tests {
use chrono::{TimeZone, Utc};
use datafusion_common::assert_contains;
use datafusion_common::ScalarValue;
use datafusion_expr::{col, lit};
use datafusion_expr::{col, lit, when};
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
Expand Down Expand Up @@ -1544,6 +1553,10 @@ mod tests {

let rt = round_trip(vec![batch1], None, None, Some(filter), true, false).await;

// should have a pruning predicate
let pruning_predicate = &rt.parquet_exec.pruning_predicate;
assert!(pruning_predicate.is_some());

// convert to explain plan form
let display = displayable(rt.parquet_exec.as_ref()).indent().to_string();

Expand All @@ -1554,6 +1567,37 @@ mod tests {
assert_contains!(&display, "projection=[c1]");
}

#[tokio::test]
async fn parquet_exec_skip_empty_pruning() {
let c1: ArrayRef = Arc::new(StringArray::from(vec![
Some("Foo"),
None,
Some("bar"),
Some("bar"),
Some("bar"),
Some("bar"),
Some("zzz"),
]));

// batch1: c1(string)
let batch1 = create_batch(vec![("c1", c1.clone())]);

// filter is too complicated for pruning
let filter = when(col("c1").not_eq(lit("bar")), lit(true))
.otherwise(lit(false))
.unwrap();

let rt = round_trip(vec![batch1], None, None, Some(filter), true, false).await;

// Should not contain a pruning predicate
let pruning_predicate = &rt.parquet_exec.pruning_predicate;
assert!(
pruning_predicate.is_none(),
"Still had pruning predicate: {:?}",
pruning_predicate
)
}

/// returns the sum of all the metrics with the specified name
/// the returned set.
///
Expand Down

0 comments on commit a584ff5

Please sign in to comment.