Skip to content

Commit

Permalink
Make expression manipulation consistent and easier to use: `combine/s…
Browse files Browse the repository at this point in the history
…plit filter` `conjunction`, etc (#3810)

* Improve split_conjunction API, combine split_disjunction_owned, rename  conjunction/disjunction and reduce clone

* Add tests for conjunction/disjunction, make API uniform
  • Loading branch information
alamb authored Oct 15, 2022
1 parent 0b90a8a commit fc5081d
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 132 deletions.
4 changes: 2 additions & 2 deletions benchmarks/src/bin/parquet_filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_expr::{lit, or, Expr};
use datafusion::optimizer::utils::combine_filters_disjunctive;
use datafusion::optimizer::utils::disjunction;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_plan::collect;
use datafusion::physical_plan::file_format::{
Expand Down Expand Up @@ -143,7 +143,7 @@ async fn run_benchmarks(
col("response_status").eq(lit(403_u16)),
)),
// Many filters
combine_filters_disjunctive(&[
disjunction([
col("request_method").not_eq(lit("GET")),
col("response_status").eq(lit(400_u16)),
// TODO this fails in the FilterExec with Error: Internal("The type of Dictionary(Int32, Utf8) = Utf8 of binary physical should be same")
Expand Down
5 changes: 3 additions & 2 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,8 @@ mod tests {
if !actual.is_empty() {
actual += "\n";
}
actual += &format!("{}", plan.display_indent());
use std::fmt::Write as _;
write!(actual, "{}", plan.display_indent()).unwrap();
}

let possibilities = vec![
Expand Down Expand Up @@ -1247,7 +1248,7 @@ mod tests {
DataType::Decimal128(_,_) => {
// if decimal, then round it to 2 decimal places like the answers
// round() doesn't support the second argument for decimal places to round to
// this can be simplified to remove the mul and div when
// this can be simplified to remove the mul and div when
// https://github.com/apache/arrow-datafusion/issues/2420 is completed
// cast it back to an over-sized Decimal with 2 precision when done rounding
let round = Box::new(ScalarFunction {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use datafusion_common::DataFusionError;
use datafusion_optimizer::utils::combine_filters;
use datafusion_optimizer::utils::conjunction;
use hashbrown::HashMap;
use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::parquet_to_arrow_schema;
Expand Down Expand Up @@ -177,7 +177,7 @@ impl FileFormat for ParquetFormat {
// If disable pruning then set the predicate to None, thus readers
// will not prune data based on the statistics.
let predicate = if self.enable_pruning {
combine_filters(filters)
conjunction(filters.to_vec())
} else {
None
};
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/file_format/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use datafusion_common::{Column, DataFusionError, Result, ScalarValue, ToDFSchema
use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecursion};

use datafusion_expr::Expr;
use datafusion_optimizer::utils::uncombine_filter;
use datafusion_optimizer::utils::split_conjunction_owned;
use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
Expand Down Expand Up @@ -253,7 +253,7 @@ pub fn build_row_filter(
metadata: &ParquetMetaData,
reorder_predicates: bool,
) -> Result<Option<RowFilter>> {
let predicates = uncombine_filter(expr);
let predicates = split_conjunction_owned(expr);

let mut candidates: Vec<FilterCandidate> = predicates
.into_iter()
Expand Down
12 changes: 5 additions & 7 deletions datafusion/optimizer/src/decorrelate_where_exists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::utils::{
combine_filters, exprs_to_join_cols, find_join_exprs, split_conjunction,
conjunction, exprs_to_join_cols, find_join_exprs, split_conjunction,
verify_not_disjunction,
};
use crate::{utils, OptimizerConfig, OptimizerRule};
Expand Down Expand Up @@ -48,8 +48,7 @@ impl DecorrelateWhereExists {
predicate: &Expr,
optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<(Vec<SubqueryInfo>, Vec<Expr>)> {
let mut filters = vec![];
split_conjunction(predicate, &mut filters);
let filters = split_conjunction(predicate);

let mut subqueries = vec![];
let mut others = vec![];
Expand Down Expand Up @@ -153,8 +152,7 @@ fn optimize_exists(
.map_err(|e| context!("cannot optimize non-correlated subquery", e))?;

// split into filters
let mut subqry_filter_exprs = vec![];
split_conjunction(subqry_filter.predicate(), &mut subqry_filter_exprs);
let subqry_filter_exprs = split_conjunction(subqry_filter.predicate());
verify_not_disjunction(&subqry_filter_exprs)?;

// Grab column names to join on
Expand All @@ -169,7 +167,7 @@ fn optimize_exists(
// build subquery side of join - the thing the subquery was querying
let mut subqry_plan =
LogicalPlanBuilder::from(subqry_filter.input().as_ref().clone());
if let Some(expr) = combine_filters(&other_subqry_exprs) {
if let Some(expr) = conjunction(other_subqry_exprs) {
subqry_plan = subqry_plan.filter(expr)? // if the subquery had additional expressions, restore them
}
let subqry_plan = subqry_plan.build()?;
Expand All @@ -187,7 +185,7 @@ fn optimize_exists(
join_keys,
join_filters,
)?;
if let Some(expr) = combine_filters(outer_other_exprs) {
if let Some(expr) = conjunction(outer_other_exprs.to_vec()) {
new_plan = new_plan.filter(expr)? // if the main query had additional expressions, restore them
}

Expand Down
12 changes: 5 additions & 7 deletions datafusion/optimizer/src/decorrelate_where_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::utils::{
alias_cols, combine_filters, exprs_to_join_cols, find_join_exprs, merge_cols,
alias_cols, conjunction, exprs_to_join_cols, find_join_exprs, merge_cols,
only_or_err, split_conjunction, swap_table, verify_not_disjunction,
};
use crate::{utils, OptimizerConfig, OptimizerRule};
Expand Down Expand Up @@ -48,8 +48,7 @@ impl DecorrelateWhereIn {
predicate: &Expr,
optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<(Vec<SubqueryInfo>, Vec<Expr>)> {
let mut filters = vec![];
split_conjunction(predicate, &mut filters); // TODO: disjunctions
let filters = split_conjunction(predicate); // TODO: disjunctions

let mut subqueries = vec![];
let mut others = vec![];
Expand Down Expand Up @@ -151,8 +150,7 @@ fn optimize_where_in(
let mut other_subqry_exprs = vec![];
if let LogicalPlan::Filter(subqry_filter) = (*subqry_input).clone() {
// split into filters
let mut subqry_filter_exprs = vec![];
split_conjunction(subqry_filter.predicate(), &mut subqry_filter_exprs);
let subqry_filter_exprs = split_conjunction(subqry_filter.predicate());
verify_not_disjunction(&subqry_filter_exprs)?;

// Grab column names to join on
Expand All @@ -175,7 +173,7 @@ fn optimize_where_in(
// build subquery side of join - the thing the subquery was querying
let subqry_alias = format!("__sq_{}", optimizer_config.next_id());
let mut subqry_plan = LogicalPlanBuilder::from((*subqry_input).clone());
if let Some(expr) = combine_filters(&other_subqry_exprs) {
if let Some(expr) = conjunction(other_subqry_exprs) {
// if the subquery had additional expressions, restore them
subqry_plan = subqry_plan.filter(expr)?
}
Expand All @@ -200,7 +198,7 @@ fn optimize_where_in(
join_keys,
join_filters,
)?;
if let Some(expr) = combine_filters(outer_other_exprs) {
if let Some(expr) = conjunction(outer_other_exprs.to_vec()) {
new_plan = new_plan.filter(expr)? // if the main query had additional expressions, restore them
}
let new_plan = new_plan.build()?;
Expand Down
6 changes: 2 additions & 4 deletions datafusion/optimizer/src/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
}
LogicalPlan::Analyze { .. } => push_down(&state, plan),
LogicalPlan::Filter(filter) => {
let mut predicates = vec![];
utils::split_conjunction(filter.predicate(), &mut predicates);
let predicates = utils::split_conjunction(filter.predicate());

predicates
.into_iter()
Expand Down Expand Up @@ -466,8 +465,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
let on_filters = filter
.as_ref()
.map(|e| {
let mut predicates = vec![];
utils::split_conjunction(e, &mut predicates);
let predicates = utils::split_conjunction(e);

predicates
.into_iter()
Expand Down
18 changes: 9 additions & 9 deletions datafusion/optimizer/src/scalar_subquery_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::utils::{
combine_filters, exprs_to_join_cols, find_join_exprs, only_or_err, split_conjunction,
conjunction, exprs_to_join_cols, find_join_exprs, only_or_err, split_conjunction,
verify_not_disjunction,
};
use crate::{utils, OptimizerConfig, OptimizerRule};
Expand Down Expand Up @@ -48,8 +48,7 @@ impl ScalarSubqueryToJoin {
predicate: &Expr,
optimizer_config: &mut OptimizerConfig,
) -> Result<(Vec<SubqueryInfo>, Vec<Expr>)> {
let mut filters = vec![];
split_conjunction(predicate, &mut filters); // TODO: disjunctions
let filters = split_conjunction(predicate); // TODO: disjunctions

let mut subqueries = vec![];
let mut others = vec![];
Expand Down Expand Up @@ -234,10 +233,11 @@ fn optimize_scalar(
};

// if there were filters, split and capture them
let mut subqry_filter_exprs = vec![];
if let Some(filter) = filter {
split_conjunction(filter.predicate(), &mut subqry_filter_exprs);
}
let subqry_filter_exprs = if let Some(filter) = filter {
split_conjunction(filter.predicate())
} else {
vec![]
};
verify_not_disjunction(&subqry_filter_exprs)?;

// Grab column names to join on
Expand All @@ -258,7 +258,7 @@ fn optimize_scalar(

// build subquery side of join - the thing the subquery was querying
let mut subqry_plan = LogicalPlanBuilder::from((**input).clone());
if let Some(expr) = combine_filters(&other_subqry_exprs) {
if let Some(expr) = conjunction(other_subqry_exprs) {
subqry_plan = subqry_plan.filter(expr)? // if the subquery had additional expressions, restore them
}

Expand Down Expand Up @@ -314,7 +314,7 @@ fn optimize_scalar(
new_plan = new_plan.filter(filter_expr)?;

// if the main query had additional expressions, restore them
if let Some(expr) = combine_filters(outer_others) {
if let Some(expr) = conjunction(outer_others.to_vec()) {
new_plan = new_plan.filter(expr)?
}
let new_plan = new_plan.build()?;
Expand Down
3 changes: 1 addition & 2 deletions datafusion/optimizer/src/subquery_filter_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ impl OptimizerRule for SubqueryFilterToJoin {
let optimized_input = self.optimize(filter.input(), optimizer_config)?;

// Splitting filter expression into components by AND
let mut filters = vec![];
utils::split_conjunction(filter.predicate(), &mut filters);
let filters = utils::split_conjunction(filter.predicate());

// Searching for subquery-based filters
let (subquery_filters, regular_filters): (Vec<&Expr>, Vec<&Expr>) =
Expand Down
Loading

0 comments on commit fc5081d

Please sign in to comment.