From fc5081d48ef59e39c1b353dd45fcd13af6186676 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 15 Oct 2022 07:57:01 -0400 Subject: [PATCH] Make expression manipulation consistent and easier to use: `combine/split filter` `conjunction`, etc (#3810) * Improve split_conjunction API, combine split_disjunction_owned, rename conjunction/disjunction and reduce clone * Add tests for conjunction/disjunction, make API uniform --- benchmarks/src/bin/parquet_filter_pushdown.rs | 4 +- benchmarks/src/bin/tpch.rs | 5 +- .../src/datasource/file_format/parquet.rs | 4 +- .../physical_plan/file_format/row_filter.rs | 4 +- .../optimizer/src/decorrelate_where_exists.rs | 12 +- .../optimizer/src/decorrelate_where_in.rs | 12 +- datafusion/optimizer/src/filter_push_down.rs | 6 +- .../optimizer/src/scalar_subquery_to_join.rs | 18 +- .../optimizer/src/subquery_filter_to_join.rs | 3 +- datafusion/optimizer/src/utils.rs | 267 +++++++++++------- 10 files changed, 203 insertions(+), 132 deletions(-) diff --git a/benchmarks/src/bin/parquet_filter_pushdown.rs b/benchmarks/src/bin/parquet_filter_pushdown.rs index 46a65587f194..e4bc9295e5ab 100644 --- a/benchmarks/src/bin/parquet_filter_pushdown.rs +++ b/benchmarks/src/bin/parquet_filter_pushdown.rs @@ -27,7 +27,7 @@ use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile}; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::execution::context::ExecutionProps; use datafusion::logical_expr::{lit, or, Expr}; -use datafusion::optimizer::utils::combine_filters_disjunctive; +use datafusion::optimizer::utils::disjunction; use datafusion::physical_expr::create_physical_expr; use datafusion::physical_plan::collect; use datafusion::physical_plan::file_format::{ @@ -143,7 +143,7 @@ async fn run_benchmarks( col("response_status").eq(lit(403_u16)), )), // Many filters - combine_filters_disjunctive(&[ + disjunction([ col("request_method").not_eq(lit("GET")), col("response_status").eq(lit(400_u16)), // TODO this fails in the FilterExec with Error: Internal("The type of Dictionary(Int32, Utf8) = Utf8 of binary physical should be same") diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index cf6346fd0ec4..7930cb73c6b7 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -779,7 +779,8 @@ mod tests { if !actual.is_empty() { actual += "\n"; } - actual += &format!("{}", plan.display_indent()); + use std::fmt::Write as _; + write!(actual, "{}", plan.display_indent()).unwrap(); } let possibilities = vec![ @@ -1247,7 +1248,7 @@ mod tests { DataType::Decimal128(_,_) => { // if decimal, then round it to 2 decimal places like the answers // round() doesn't support the second argument for decimal places to round to - // this can be simplified to remove the mul and div when + // this can be simplified to remove the mul and div when // https://github.com/apache/arrow-datafusion/issues/2420 is completed // cast it back to an over-sized Decimal with 2 precision when done rounding let round = Box::new(ScalarFunction { diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 23bb3557ac09..07819bdf52cb 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -25,7 +25,7 @@ use arrow::datatypes::SchemaRef; use async_trait::async_trait; use bytes::{BufMut, BytesMut}; use datafusion_common::DataFusionError; -use datafusion_optimizer::utils::combine_filters; +use datafusion_optimizer::utils::conjunction; use hashbrown::HashMap; use object_store::{ObjectMeta, ObjectStore}; use parquet::arrow::parquet_to_arrow_schema; @@ -177,7 +177,7 @@ impl FileFormat for ParquetFormat { // If disable pruning then set the predicate to None, thus readers // will not prune data based on the statistics. let predicate = if self.enable_pruning { - combine_filters(filters) + conjunction(filters.to_vec()) } else { None }; diff --git a/datafusion/core/src/physical_plan/file_format/row_filter.rs b/datafusion/core/src/physical_plan/file_format/row_filter.rs index 574e2ad44bff..dd9c8fb650fd 100644 --- a/datafusion/core/src/physical_plan/file_format/row_filter.rs +++ b/datafusion/core/src/physical_plan/file_format/row_filter.rs @@ -23,7 +23,7 @@ use datafusion_common::{Column, DataFusionError, Result, ScalarValue, ToDFSchema use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecursion}; use datafusion_expr::Expr; -use datafusion_optimizer::utils::uncombine_filter; +use datafusion_optimizer::utils::split_conjunction_owned; use datafusion_physical_expr::execution_props::ExecutionProps; use datafusion_physical_expr::{create_physical_expr, PhysicalExpr}; use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter}; @@ -253,7 +253,7 @@ pub fn build_row_filter( metadata: &ParquetMetaData, reorder_predicates: bool, ) -> Result> { - let predicates = uncombine_filter(expr); + let predicates = split_conjunction_owned(expr); let mut candidates: Vec = predicates .into_iter() diff --git a/datafusion/optimizer/src/decorrelate_where_exists.rs b/datafusion/optimizer/src/decorrelate_where_exists.rs index 434c57bf1999..78e8fc32d474 100644 --- a/datafusion/optimizer/src/decorrelate_where_exists.rs +++ b/datafusion/optimizer/src/decorrelate_where_exists.rs @@ -16,7 +16,7 @@ // under the License. use crate::utils::{ - combine_filters, exprs_to_join_cols, find_join_exprs, split_conjunction, + conjunction, exprs_to_join_cols, find_join_exprs, split_conjunction, verify_not_disjunction, }; use crate::{utils, OptimizerConfig, OptimizerRule}; @@ -48,8 +48,7 @@ impl DecorrelateWhereExists { predicate: &Expr, optimizer_config: &mut OptimizerConfig, ) -> datafusion_common::Result<(Vec, Vec)> { - let mut filters = vec![]; - split_conjunction(predicate, &mut filters); + let filters = split_conjunction(predicate); let mut subqueries = vec![]; let mut others = vec![]; @@ -153,8 +152,7 @@ fn optimize_exists( .map_err(|e| context!("cannot optimize non-correlated subquery", e))?; // split into filters - let mut subqry_filter_exprs = vec![]; - split_conjunction(subqry_filter.predicate(), &mut subqry_filter_exprs); + let subqry_filter_exprs = split_conjunction(subqry_filter.predicate()); verify_not_disjunction(&subqry_filter_exprs)?; // Grab column names to join on @@ -169,7 +167,7 @@ fn optimize_exists( // build subquery side of join - the thing the subquery was querying let mut subqry_plan = LogicalPlanBuilder::from(subqry_filter.input().as_ref().clone()); - if let Some(expr) = combine_filters(&other_subqry_exprs) { + if let Some(expr) = conjunction(other_subqry_exprs) { subqry_plan = subqry_plan.filter(expr)? // if the subquery had additional expressions, restore them } let subqry_plan = subqry_plan.build()?; @@ -187,7 +185,7 @@ fn optimize_exists( join_keys, join_filters, )?; - if let Some(expr) = combine_filters(outer_other_exprs) { + if let Some(expr) = conjunction(outer_other_exprs.to_vec()) { new_plan = new_plan.filter(expr)? // if the main query had additional expressions, restore them } diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs index c7a072dac880..052ed796a407 100644 --- a/datafusion/optimizer/src/decorrelate_where_in.rs +++ b/datafusion/optimizer/src/decorrelate_where_in.rs @@ -16,7 +16,7 @@ // under the License. use crate::utils::{ - alias_cols, combine_filters, exprs_to_join_cols, find_join_exprs, merge_cols, + alias_cols, conjunction, exprs_to_join_cols, find_join_exprs, merge_cols, only_or_err, split_conjunction, swap_table, verify_not_disjunction, }; use crate::{utils, OptimizerConfig, OptimizerRule}; @@ -48,8 +48,7 @@ impl DecorrelateWhereIn { predicate: &Expr, optimizer_config: &mut OptimizerConfig, ) -> datafusion_common::Result<(Vec, Vec)> { - let mut filters = vec![]; - split_conjunction(predicate, &mut filters); // TODO: disjunctions + let filters = split_conjunction(predicate); // TODO: disjunctions let mut subqueries = vec![]; let mut others = vec![]; @@ -151,8 +150,7 @@ fn optimize_where_in( let mut other_subqry_exprs = vec![]; if let LogicalPlan::Filter(subqry_filter) = (*subqry_input).clone() { // split into filters - let mut subqry_filter_exprs = vec![]; - split_conjunction(subqry_filter.predicate(), &mut subqry_filter_exprs); + let subqry_filter_exprs = split_conjunction(subqry_filter.predicate()); verify_not_disjunction(&subqry_filter_exprs)?; // Grab column names to join on @@ -175,7 +173,7 @@ fn optimize_where_in( // build subquery side of join - the thing the subquery was querying let subqry_alias = format!("__sq_{}", optimizer_config.next_id()); let mut subqry_plan = LogicalPlanBuilder::from((*subqry_input).clone()); - if let Some(expr) = combine_filters(&other_subqry_exprs) { + if let Some(expr) = conjunction(other_subqry_exprs) { // if the subquery had additional expressions, restore them subqry_plan = subqry_plan.filter(expr)? } @@ -200,7 +198,7 @@ fn optimize_where_in( join_keys, join_filters, )?; - if let Some(expr) = combine_filters(outer_other_exprs) { + if let Some(expr) = conjunction(outer_other_exprs.to_vec()) { new_plan = new_plan.filter(expr)? // if the main query had additional expressions, restore them } let new_plan = new_plan.build()?; diff --git a/datafusion/optimizer/src/filter_push_down.rs b/datafusion/optimizer/src/filter_push_down.rs index 22b6aa5a1ba3..4d720eb22b3e 100644 --- a/datafusion/optimizer/src/filter_push_down.rs +++ b/datafusion/optimizer/src/filter_push_down.rs @@ -341,8 +341,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { } LogicalPlan::Analyze { .. } => push_down(&state, plan), LogicalPlan::Filter(filter) => { - let mut predicates = vec![]; - utils::split_conjunction(filter.predicate(), &mut predicates); + let predicates = utils::split_conjunction(filter.predicate()); predicates .into_iter() @@ -466,8 +465,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { let on_filters = filter .as_ref() .map(|e| { - let mut predicates = vec![]; - utils::split_conjunction(e, &mut predicates); + let predicates = utils::split_conjunction(e); predicates .into_iter() diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 3399e5576ef1..149b2913a474 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -16,7 +16,7 @@ // under the License. use crate::utils::{ - combine_filters, exprs_to_join_cols, find_join_exprs, only_or_err, split_conjunction, + conjunction, exprs_to_join_cols, find_join_exprs, only_or_err, split_conjunction, verify_not_disjunction, }; use crate::{utils, OptimizerConfig, OptimizerRule}; @@ -48,8 +48,7 @@ impl ScalarSubqueryToJoin { predicate: &Expr, optimizer_config: &mut OptimizerConfig, ) -> Result<(Vec, Vec)> { - let mut filters = vec![]; - split_conjunction(predicate, &mut filters); // TODO: disjunctions + let filters = split_conjunction(predicate); // TODO: disjunctions let mut subqueries = vec![]; let mut others = vec![]; @@ -234,10 +233,11 @@ fn optimize_scalar( }; // if there were filters, split and capture them - let mut subqry_filter_exprs = vec![]; - if let Some(filter) = filter { - split_conjunction(filter.predicate(), &mut subqry_filter_exprs); - } + let subqry_filter_exprs = if let Some(filter) = filter { + split_conjunction(filter.predicate()) + } else { + vec![] + }; verify_not_disjunction(&subqry_filter_exprs)?; // Grab column names to join on @@ -258,7 +258,7 @@ fn optimize_scalar( // build subquery side of join - the thing the subquery was querying let mut subqry_plan = LogicalPlanBuilder::from((**input).clone()); - if let Some(expr) = combine_filters(&other_subqry_exprs) { + if let Some(expr) = conjunction(other_subqry_exprs) { subqry_plan = subqry_plan.filter(expr)? // if the subquery had additional expressions, restore them } @@ -314,7 +314,7 @@ fn optimize_scalar( new_plan = new_plan.filter(filter_expr)?; // if the main query had additional expressions, restore them - if let Some(expr) = combine_filters(outer_others) { + if let Some(expr) = conjunction(outer_others.to_vec()) { new_plan = new_plan.filter(expr)? } let new_plan = new_plan.build()?; diff --git a/datafusion/optimizer/src/subquery_filter_to_join.rs b/datafusion/optimizer/src/subquery_filter_to_join.rs index bd07eeab2f82..8ec6f3892890 100644 --- a/datafusion/optimizer/src/subquery_filter_to_join.rs +++ b/datafusion/optimizer/src/subquery_filter_to_join.rs @@ -60,8 +60,7 @@ impl OptimizerRule for SubqueryFilterToJoin { let optimized_input = self.optimize(filter.input(), optimizer_config)?; // Splitting filter expression into components by AND - let mut filters = vec![]; - utils::split_conjunction(filter.predicate(), &mut filters); + let filters = utils::split_conjunction(filter.predicate()); // Searching for subquery-based filters let (subquery_filters, regular_filters): (Vec<&Expr>, Vec<&Expr>) = diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs index bf1968587f72..57702a71f8d8 100644 --- a/datafusion/optimizer/src/utils.rs +++ b/datafusion/optimizer/src/utils.rs @@ -51,67 +51,107 @@ pub fn optimize_children( from_plan(plan, &new_exprs, &new_inputs) } -/// converts "A AND B AND C" => [A, B, C] -pub fn split_conjunction<'a>(predicate: &'a Expr, predicates: &mut Vec<&'a Expr>) { - match predicate { +/// Splits a conjunctive [`Expr`] such as `A AND B AND C` => `[A, B, C]` +/// +/// See [`split_conjunction_owned`] for more details and an example. +pub fn split_conjunction(expr: &Expr) -> Vec<&Expr> { + split_conjunction_impl(expr, vec![]) +} + +fn split_conjunction_impl<'a>(expr: &'a Expr, mut exprs: Vec<&'a Expr>) -> Vec<&'a Expr> { + match expr { Expr::BinaryExpr { right, op: Operator::And, left, } => { - split_conjunction(left, predicates); - split_conjunction(right, predicates); + let exprs = split_conjunction_impl(left, exprs); + split_conjunction_impl(right, exprs) } - Expr::Alias(expr, _) => { - split_conjunction(expr, predicates); + Expr::Alias(expr, _) => split_conjunction_impl(expr, exprs), + other => { + exprs.push(other); + exprs } - other => predicates.push(other), } } -/// Combines an array of filter expressions into a single filter expression -/// consisting of the input filter expressions joined with logical AND. -/// Returns None if the filters array is empty. -pub fn combine_filters(filters: &[Expr]) -> Option { - if filters.is_empty() { - return None; - } - let combined_filter = filters - .iter() - .skip(1) - .fold(filters[0].clone(), |acc, filter| and(acc, filter.clone())); - Some(combined_filter) +/// Splits an owned conjunctive [`Expr`] such as `A AND B AND C` => `[A, B, C]` +/// +/// This is often used to "split" filter expressions such as `col1 = 5 +/// AND col2 = 10` into [`col1 = 5`, `col2 = 10`]; +/// +/// # Example +/// ``` +/// # use datafusion_expr::{col, lit}; +/// # use datafusion_optimizer::utils::split_conjunction_owned; +/// // a=1 AND b=2 +/// let expr = col("a").eq(lit(1)).and(col("b").eq(lit(2))); +/// +/// // [a=1, b=2] +/// let split = vec![ +/// col("a").eq(lit(1)), +/// col("b").eq(lit(2)), +/// ]; +/// +/// // use split_conjunction_owned to split them +/// assert_eq!(split_conjunction_owned(expr), split); +/// ``` +pub fn split_conjunction_owned(expr: Expr) -> Vec { + split_conjunction_owned_impl(expr, vec![]) } -/// Take combined filter (multiple boolean expressions ANDed together) -/// and break down into distinct filters. This should be the inverse of -/// `combine_filters` -pub fn uncombine_filter(filter: Expr) -> Vec { - match filter { +fn split_conjunction_owned_impl(expr: Expr, mut exprs: Vec) -> Vec { + match expr { Expr::BinaryExpr { - left, - op: Operator::And, right, + op: Operator::And, + left, } => { - let mut exprs = uncombine_filter(*left); - exprs.extend(uncombine_filter(*right)); - exprs + let exprs = split_conjunction_owned_impl(*left, exprs); + split_conjunction_owned_impl(*right, exprs) } - expr => { - vec![expr] + Expr::Alias(expr, _) => split_conjunction_owned_impl(*expr, exprs), + other => { + exprs.push(other); + exprs } } } -/// Combines an array of filter expressions into a single filter expression -/// consisting of the input filter expressions joined with logical OR. +/// Combines an array of filter expressions into a single filter +/// expression consisting of the input filter expressions joined with +/// logical AND. +/// /// Returns None if the filters array is empty. -pub fn combine_filters_disjunctive(filters: &[Expr]) -> Option { - if filters.is_empty() { - return None; - } +/// +/// # Example +/// ``` +/// # use datafusion_expr::{col, lit}; +/// # use datafusion_optimizer::utils::conjunction; +/// // a=1 AND b=2 +/// let expr = col("a").eq(lit(1)).and(col("b").eq(lit(2))); +/// +/// // [a=1, b=2] +/// let split = vec![ +/// col("a").eq(lit(1)), +/// col("b").eq(lit(2)), +/// ]; +/// +/// // use conjunction to join them together with `AND` +/// assert_eq!(conjunction(split), Some(expr)); +/// ``` +pub fn conjunction(filters: impl IntoIterator) -> Option { + filters.into_iter().reduce(|accum, expr| accum.and(expr)) +} - filters.iter().cloned().reduce(datafusion_expr::or) +/// Combines an array of filter expressions into a single filter +/// expression consisting of the input filter expressions joined with +/// logical OR. +/// +/// Returns None if the filters array is empty. +pub fn disjunction(filters: impl IntoIterator) -> Option { + filters.into_iter().reduce(|accum, expr| accum.or(expr)) } /// Recursively un-alias an expressions @@ -294,7 +334,7 @@ pub fn exprs_to_join_cols( .into_iter() .map(|(l, r)| (Column::from(l.as_str()), Column::from(r.as_str()))) .unzip(); - let pred = combine_filters(&others); + let pred = conjunction(others); Ok((left_cols, right_cols, pred)) } @@ -424,31 +464,108 @@ mod tests { use super::*; use arrow::datatypes::DataType; use datafusion_common::Column; - use datafusion_expr::{binary_expr, col, lit, utils::expr_to_columns}; + use datafusion_expr::{col, lit, utils::expr_to_columns}; use std::collections::HashSet; use std::ops::Add; #[test] - fn combine_zero_filters() { - let result = combine_filters(&[]); - assert_eq!(result, None); + fn test_split_conjunction() { + let expr = col("a"); + let result = split_conjunction(&expr); + assert_eq!(result, vec![&expr]); + } + + #[test] + fn test_split_conjunction_two() { + let expr = col("a").eq(lit(5)).and(col("b")); + let expr1 = col("a").eq(lit(5)); + let expr2 = col("b"); + + let result = split_conjunction(&expr); + assert_eq!(result, vec![&expr1, &expr2]); + } + + #[test] + fn test_split_conjunction_alias() { + let expr = col("a").eq(lit(5)).and(col("b").alias("the_alias")); + let expr1 = col("a").eq(lit(5)); + let expr2 = col("b"); // has no alias + + let result = split_conjunction(&expr); + assert_eq!(result, vec![&expr1, &expr2]); } #[test] - fn combine_one_filter() { - let filter = binary_expr(col("c1"), Operator::Lt, lit(1)); - let result = combine_filters(&[filter.clone()]); - assert_eq!(result, Some(filter)); + fn test_split_conjunction_or() { + let expr = col("a").eq(lit(5)).or(col("b")); + let result = split_conjunction(&expr); + assert_eq!(result, vec![&expr]); } #[test] - fn combine_multiple_filters() { - let filter1 = binary_expr(col("c1"), Operator::Lt, lit(1)); - let filter2 = binary_expr(col("c2"), Operator::Lt, lit(2)); - let filter3 = binary_expr(col("c3"), Operator::Lt, lit(3)); - let result = - combine_filters(&[filter1.clone(), filter2.clone(), filter3.clone()]); - assert_eq!(result, Some(and(and(filter1, filter2), filter3))); + fn test_split_conjunction_owned() { + let expr = col("a"); + assert_eq!(split_conjunction_owned(expr.clone()), vec![expr]); + } + + #[test] + fn test_split_conjunction_owned_two() { + assert_eq!( + split_conjunction_owned(col("a").eq(lit(5)).and(col("b"))), + vec![col("a").eq(lit(5)), col("b")] + ); + } + + #[test] + fn test_split_conjunction_owned_alias() { + assert_eq!( + split_conjunction_owned(col("a").eq(lit(5)).and(col("b").alias("the_alias"))), + vec![ + col("a").eq(lit(5)), + // no alias on b + col("b") + ] + ); + } + + #[test] + fn test_conjunction_empty() { + assert_eq!(conjunction(vec![]), None); + } + + #[test] + fn test_conjunction() { + // `[A, B, C]` + let expr = conjunction(vec![col("a"), col("b"), col("c")]); + + // --> `(A AND B) AND C` + assert_eq!(expr, Some(col("a").and(col("b")).and(col("c")))); + + // which is different than `A AND (B AND C)` + assert_ne!(expr, Some(col("a").and(col("b").and(col("c"))))); + } + + #[test] + fn test_disjunction_empty() { + assert_eq!(disjunction(vec![]), None); + } + + #[test] + fn test_disjunction() { + // `[A, B, C]` + let expr = disjunction(vec![col("a"), col("b"), col("c")]); + + // --> `(A OR B) OR C` + assert_eq!(expr, Some(col("a").or(col("b")).or(col("c")))); + + // which is different than `A OR (B OR C)` + assert_ne!(expr, Some(col("a").or(col("b").or(col("c"))))); + } + + #[test] + fn test_split_conjunction_owned_or() { + let expr = col("a").eq(lit(5)).or(col("b")); + assert_eq!(split_conjunction_owned(expr.clone()), vec![expr]); } #[test] @@ -473,46 +590,6 @@ mod tests { Ok(()) } - #[test] - fn test_uncombine_filter() { - let expr = col("a").eq(lit("s")); - let actual = uncombine_filter(expr); - - assert_predicates(actual, vec![col("a").eq(lit("s"))]); - } - - #[test] - fn test_uncombine_filter_recursively() { - let expr = and(col("a"), col("b")); - let actual = uncombine_filter(expr); - - assert_predicates(actual, vec![col("a"), col("b")]); - - let expr = col("a").and(col("b")).or(col("c")); - let actual = uncombine_filter(expr.clone()); - - assert_predicates(actual, vec![expr]); - } - - fn assert_predicates(actual: Vec, expected: Vec) { - assert_eq!( - actual.len(), - expected.len(), - "Predicates are not equal, found {} predicates but expected {}", - actual.len(), - expected.len() - ); - - for expr in expected.into_iter() { - assert!( - actual.contains(&expr), - "Predicates are not equal, predicate {:?} not found in {:?}", - expr, - actual - ); - } - } - #[test] fn test_rewrite_preserving_name() { test_rewrite(col("a"), col("a"));