Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make expression manipulation consistent and easier to use: combine/split filter conjunction, etc #3810

Merged
merged 3 commits into from
Oct 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benchmarks/src/bin/parquet_filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_expr::{lit, or, Expr};
use datafusion::optimizer::utils::combine_filters_disjunctive;
use datafusion::optimizer::utils::disjunction;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_plan::collect;
use datafusion::physical_plan::file_format::{
Expand Down Expand Up @@ -143,7 +143,7 @@ async fn run_benchmarks(
col("response_status").eq(lit(403_u16)),
)),
// Many filters
combine_filters_disjunctive(&[
disjunction([
col("request_method").not_eq(lit("GET")),
col("response_status").eq(lit(400_u16)),
// TODO this fails in the FilterExec with Error: Internal("The type of Dictionary(Int32, Utf8) = Utf8 of binary physical should be same")
Expand Down
5 changes: 3 additions & 2 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,8 @@ mod tests {
if !actual.is_empty() {
actual += "\n";
}
actual += &format!("{}", plan.display_indent());
use std::fmt::Write as _;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clippy told me to do this

write!(actual, "{}", plan.display_indent()).unwrap();
}

let possibilities = vec![
Expand Down Expand Up @@ -1247,7 +1248,7 @@ mod tests {
DataType::Decimal128(_,_) => {
// if decimal, then round it to 2 decimal places like the answers
// round() doesn't support the second argument for decimal places to round to
// this can be simplified to remove the mul and div when
// this can be simplified to remove the mul and div when
// https://github.com/apache/arrow-datafusion/issues/2420 is completed
// cast it back to an over-sized Decimal with 2 precision when done rounding
let round = Box::new(ScalarFunction {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use datafusion_common::DataFusionError;
use datafusion_optimizer::utils::combine_filters;
use datafusion_optimizer::utils::conjunction;
use hashbrown::HashMap;
use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::parquet_to_arrow_schema;
Expand Down Expand Up @@ -177,7 +177,7 @@ impl FileFormat for ParquetFormat {
// If disable pruning then set the predicate to None, thus readers
// will not prune data based on the statistics.
let predicate = if self.enable_pruning {
combine_filters(filters)
conjunction(filters.to_vec())
} else {
None
};
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/file_format/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use datafusion_common::{Column, DataFusionError, Result, ScalarValue, ToDFSchema
use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecursion};

use datafusion_expr::Expr;
use datafusion_optimizer::utils::uncombine_filter;
use datafusion_optimizer::utils::split_conjunction_owned;
use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
Expand Down Expand Up @@ -253,7 +253,7 @@ pub fn build_row_filter(
metadata: &ParquetMetaData,
reorder_predicates: bool,
) -> Result<Option<RowFilter>> {
let predicates = uncombine_filter(expr);
let predicates = split_conjunction_owned(expr);

let mut candidates: Vec<FilterCandidate> = predicates
.into_iter()
Expand Down
12 changes: 5 additions & 7 deletions datafusion/optimizer/src/decorrelate_where_exists.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::utils::{
combine_filters, exprs_to_join_cols, find_join_exprs, split_conjunction,
conjunction, exprs_to_join_cols, find_join_exprs, split_conjunction,
verify_not_disjunction,
};
use crate::{utils, OptimizerConfig, OptimizerRule};
Expand Down Expand Up @@ -48,8 +48,7 @@ impl DecorrelateWhereExists {
predicate: &Expr,
optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<(Vec<SubqueryInfo>, Vec<Expr>)> {
let mut filters = vec![];
split_conjunction(predicate, &mut filters);
let filters = split_conjunction(predicate);

let mut subqueries = vec![];
let mut others = vec![];
Expand Down Expand Up @@ -153,8 +152,7 @@ fn optimize_exists(
.map_err(|e| context!("cannot optimize non-correlated subquery", e))?;

// split into filters
let mut subqry_filter_exprs = vec![];
split_conjunction(subqry_filter.predicate(), &mut subqry_filter_exprs);
let subqry_filter_exprs = split_conjunction(subqry_filter.predicate());
verify_not_disjunction(&subqry_filter_exprs)?;

// Grab column names to join on
Expand All @@ -169,7 +167,7 @@ fn optimize_exists(
// build subquery side of join - the thing the subquery was querying
let mut subqry_plan =
LogicalPlanBuilder::from(subqry_filter.input().as_ref().clone());
if let Some(expr) = combine_filters(&other_subqry_exprs) {
if let Some(expr) = conjunction(other_subqry_exprs) {
subqry_plan = subqry_plan.filter(expr)? // if the subquery had additional expressions, restore them
}
let subqry_plan = subqry_plan.build()?;
Expand All @@ -187,7 +185,7 @@ fn optimize_exists(
join_keys,
join_filters,
)?;
if let Some(expr) = combine_filters(outer_other_exprs) {
if let Some(expr) = conjunction(outer_other_exprs.to_vec()) {
new_plan = new_plan.filter(expr)? // if the main query had additional expressions, restore them
}

Expand Down
12 changes: 5 additions & 7 deletions datafusion/optimizer/src/decorrelate_where_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::utils::{
alias_cols, combine_filters, exprs_to_join_cols, find_join_exprs, merge_cols,
alias_cols, conjunction, exprs_to_join_cols, find_join_exprs, merge_cols,
only_or_err, split_conjunction, swap_table, verify_not_disjunction,
};
use crate::{utils, OptimizerConfig, OptimizerRule};
Expand Down Expand Up @@ -48,8 +48,7 @@ impl DecorrelateWhereIn {
predicate: &Expr,
optimizer_config: &mut OptimizerConfig,
) -> datafusion_common::Result<(Vec<SubqueryInfo>, Vec<Expr>)> {
let mut filters = vec![];
split_conjunction(predicate, &mut filters); // TODO: disjunctions
let filters = split_conjunction(predicate); // TODO: disjunctions

let mut subqueries = vec![];
let mut others = vec![];
Expand Down Expand Up @@ -151,8 +150,7 @@ fn optimize_where_in(
let mut other_subqry_exprs = vec![];
if let LogicalPlan::Filter(subqry_filter) = (*subqry_input).clone() {
// split into filters
let mut subqry_filter_exprs = vec![];
split_conjunction(subqry_filter.predicate(), &mut subqry_filter_exprs);
let subqry_filter_exprs = split_conjunction(subqry_filter.predicate());
verify_not_disjunction(&subqry_filter_exprs)?;

// Grab column names to join on
Expand All @@ -175,7 +173,7 @@ fn optimize_where_in(
// build subquery side of join - the thing the subquery was querying
let subqry_alias = format!("__sq_{}", optimizer_config.next_id());
let mut subqry_plan = LogicalPlanBuilder::from((*subqry_input).clone());
if let Some(expr) = combine_filters(&other_subqry_exprs) {
if let Some(expr) = conjunction(other_subqry_exprs) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is an example of less copying (can use other_subqry_exprsdirectly)

// if the subquery had additional expressions, restore them
subqry_plan = subqry_plan.filter(expr)?
}
Expand All @@ -200,7 +198,7 @@ fn optimize_where_in(
join_keys,
join_filters,
)?;
if let Some(expr) = combine_filters(outer_other_exprs) {
if let Some(expr) = conjunction(outer_other_exprs.to_vec()) {
new_plan = new_plan.filter(expr)? // if the main query had additional expressions, restore them
}
let new_plan = new_plan.build()?;
Expand Down
6 changes: 2 additions & 4 deletions datafusion/optimizer/src/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
}
LogicalPlan::Analyze { .. } => push_down(&state, plan),
LogicalPlan::Filter(filter) => {
let mut predicates = vec![];
utils::split_conjunction(filter.predicate(), &mut predicates);
let predicates = utils::split_conjunction(filter.predicate());

predicates
.into_iter()
Expand Down Expand Up @@ -466,8 +465,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
let on_filters = filter
.as_ref()
.map(|e| {
let mut predicates = vec![];
utils::split_conjunction(e, &mut predicates);
let predicates = utils::split_conjunction(e);

predicates
.into_iter()
Expand Down
18 changes: 9 additions & 9 deletions datafusion/optimizer/src/scalar_subquery_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::utils::{
combine_filters, exprs_to_join_cols, find_join_exprs, only_or_err, split_conjunction,
conjunction, exprs_to_join_cols, find_join_exprs, only_or_err, split_conjunction,
verify_not_disjunction,
};
use crate::{utils, OptimizerConfig, OptimizerRule};
Expand Down Expand Up @@ -48,8 +48,7 @@ impl ScalarSubqueryToJoin {
predicate: &Expr,
optimizer_config: &mut OptimizerConfig,
) -> Result<(Vec<SubqueryInfo>, Vec<Expr>)> {
let mut filters = vec![];
split_conjunction(predicate, &mut filters); // TODO: disjunctions
let filters = split_conjunction(predicate); // TODO: disjunctions

let mut subqueries = vec![];
let mut others = vec![];
Expand Down Expand Up @@ -234,10 +233,11 @@ fn optimize_scalar(
};

// if there were filters, split and capture them
let mut subqry_filter_exprs = vec![];
if let Some(filter) = filter {
split_conjunction(filter.predicate(), &mut subqry_filter_exprs);
}
let subqry_filter_exprs = if let Some(filter) = filter {
split_conjunction(filter.predicate())
} else {
vec![]
};
verify_not_disjunction(&subqry_filter_exprs)?;

// Grab column names to join on
Expand All @@ -258,7 +258,7 @@ fn optimize_scalar(

// build subquery side of join - the thing the subquery was querying
let mut subqry_plan = LogicalPlanBuilder::from((**input).clone());
if let Some(expr) = combine_filters(&other_subqry_exprs) {
if let Some(expr) = conjunction(other_subqry_exprs) {
subqry_plan = subqry_plan.filter(expr)? // if the subquery had additional expressions, restore them
}

Expand Down Expand Up @@ -314,7 +314,7 @@ fn optimize_scalar(
new_plan = new_plan.filter(filter_expr)?;

// if the main query had additional expressions, restore them
if let Some(expr) = combine_filters(outer_others) {
if let Some(expr) = conjunction(outer_others.to_vec()) {
new_plan = new_plan.filter(expr)?
}
let new_plan = new_plan.build()?;
Expand Down
3 changes: 1 addition & 2 deletions datafusion/optimizer/src/subquery_filter_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ impl OptimizerRule for SubqueryFilterToJoin {
let optimized_input = self.optimize(filter.input(), optimizer_config)?;

// Splitting filter expression into components by AND
let mut filters = vec![];
utils::split_conjunction(filter.predicate(), &mut filters);
let filters = utils::split_conjunction(filter.predicate());

// Searching for subquery-based filters
let (subquery_filters, regular_filters): (Vec<&Expr>, Vec<&Expr>) =
Expand Down
Loading