Skip to content

Commit

Permalink
MINOR: Add comments about rewrite_disjunctive_predicate (#3351)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Sep 4, 2022
1 parent 0bbf127 commit d7fa064
Showing 1 changed file with 146 additions and 54 deletions.
200 changes: 146 additions & 54 deletions datafusion/optimizer/src/rewrite_disjunctive_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,152 @@ use datafusion_expr::Expr::BinaryExpr;
use datafusion_expr::{Expr, LogicalPlan, Operator};
use std::sync::Arc;

/// Optimizer pass that rewrites predicates of the form
///
/// ```text
/// (A = B AND <expr1>) OR (A = B AND <expr2>) OR ... (A = B AND <exprN>)
/// ```
///
/// Into
/// ```text
/// (A = B) AND (<expr1> OR <expr2> OR ... <exprN> )
/// ```
///
/// Predicates connected by `OR` typically not able to be broken down
/// and distributed as well as those connected by `AND`.
///
/// The idea is to rewrite predicates into `good_predicate1 AND
/// good_predicate2 AND ...` where `good_predicate` means the
/// predicate has special support in the execution engine.
///
/// Equality join predicates (e.g. `col1 = col2`), or single column
/// expressions (e.g. `col = 5`) are examples of predicates with
/// special support.
///
/// # TPCH Q19
///
/// This optimization is admittedly somewhat of a niche usecase. It's
/// main use is that it appears in TPCH Q19 and is required to avoid a
/// CROSS JOIN.
///
/// Specificially, Q19 has a WHERE clause that looks like
///
/// ```sql
/// where
/// p_partkey = l_partkey
/// and l_shipmode in (‘AIR’, ‘AIR REG’)
/// and l_shipinstruct = ‘DELIVER IN PERSON’
/// and (
/// (
/// and p_brand = ‘[BRAND1]’
/// and p_container in ( ‘SM CASE’, ‘SM BOX’, ‘SM PACK’, ‘SM PKG’)
/// and l_quantity >= [QUANTITY1] and l_quantity <= [QUANTITY1] + 10
/// and p_size between 1 and 5
/// )
/// or
/// (
/// and p_brand = ‘[BRAND2]’
/// and p_container in (‘MED BAG’, ‘MED BOX’, ‘MED PKG’, ‘MED PACK’)
/// and l_quantity >= [QUANTITY2] and l_quantity <= [QUANTITY2] + 10
/// and p_size between 1 and 10
/// )
/// or
/// (
/// and p_brand = ‘[BRAND3]’
/// and p_container in ( ‘LG CASE’, ‘LG BOX’, ‘LG PACK’, ‘LG PKG’)
/// and l_quantity >= [QUANTITY3] and l_quantity <= [QUANTITY3] + 10
/// and p_size between 1 and 15
/// )
/// )
/// ```
///
/// Niavely planning this query will result in a CROSS join with that
/// single large OR filter. However, rewriting it using the rewrite in
/// this pass results in a proper join predicate, `p_partkey = l_partkey`:
///
/// ```sql
/// where
/// p_partkey = l_partkey
/// and l_shipmode in (‘AIR’, ‘AIR REG’)
/// and l_shipinstruct = ‘DELIVER IN PERSON’
/// and (
/// (
/// and p_brand = ‘[BRAND1]’
/// and p_container in ( ‘SM CASE’, ‘SM BOX’, ‘SM PACK’, ‘SM PKG’)
/// and l_quantity >= [QUANTITY1] and l_quantity <= [QUANTITY1] + 10
/// and p_size between 1 and 5
/// )
/// or
/// (
/// and p_brand = ‘[BRAND2]’
/// and p_container in (‘MED BAG’, ‘MED BOX’, ‘MED PKG’, ‘MED PACK’)
/// and l_quantity >= [QUANTITY2] and l_quantity <= [QUANTITY2] + 10
/// and p_size between 1 and 10
/// )
/// or
/// (
/// and p_brand = ‘[BRAND3]’
/// and p_container in ( ‘LG CASE’, ‘LG BOX’, ‘LG PACK’, ‘LG PKG’)
/// and l_quantity >= [QUANTITY3] and l_quantity <= [QUANTITY3] + 10
/// and p_size between 1 and 15
/// )
/// )
/// ```
///
#[derive(Default)]
pub struct RewriteDisjunctivePredicate;

impl RewriteDisjunctivePredicate {
pub fn new() -> Self {
Self::default()
}
fn rewrite_disjunctive_predicate(
&self,
plan: &LogicalPlan,
_optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Filter(filter) => {
let predicate = predicate(&filter.predicate)?;
let rewritten_predicate = rewrite_predicate(predicate);
let rewritten_expr = normalize_predicate(rewritten_predicate);
Ok(LogicalPlan::Filter(Filter {
predicate: rewritten_expr,
input: Arc::new(self.rewrite_disjunctive_predicate(
&filter.input,
_optimizer_config,
)?),
}))
}
_ => {
let expr = plan.expressions();
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
.map(|input| {
self.rewrite_disjunctive_predicate(input, _optimizer_config)
})
.collect::<Result<Vec<_>>>()?;
from_plan(plan, &expr, &new_inputs)
}
}
}
}

impl OptimizerRule for RewriteDisjunctivePredicate {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
self.rewrite_disjunctive_predicate(plan, optimizer_config)
}

fn name(&self) -> &str {
"rewrite_disjunctive_predicate"
}
}

#[derive(Clone, PartialEq, Debug)]
enum Predicate {
And { args: Vec<Predicate> },
Expand Down Expand Up @@ -224,60 +370,6 @@ fn delete_duplicate_predicates(or_predicates: &[Predicate]) -> Predicate {
}
}

#[derive(Default)]
pub struct RewriteDisjunctivePredicate;

impl RewriteDisjunctivePredicate {
pub fn new() -> Self {
Self::default()
}
fn rewrite_disjunctive_predicate(
&self,
plan: &LogicalPlan,
_optimizer_config: &OptimizerConfig,
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Filter(filter) => {
let predicate = predicate(&filter.predicate)?;
let rewritten_predicate = rewrite_predicate(predicate);
let rewritten_expr = normalize_predicate(rewritten_predicate);
Ok(LogicalPlan::Filter(Filter {
predicate: rewritten_expr,
input: Arc::new(self.rewrite_disjunctive_predicate(
&filter.input,
_optimizer_config,
)?),
}))
}
_ => {
let expr = plan.expressions();
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
.map(|input| {
self.rewrite_disjunctive_predicate(input, _optimizer_config)
})
.collect::<Result<Vec<_>>>()?;
from_plan(plan, &expr, &new_inputs)
}
}
}
}

impl OptimizerRule for RewriteDisjunctivePredicate {
fn optimize(
&self,
plan: &LogicalPlan,
optimizer_config: &mut OptimizerConfig,
) -> Result<LogicalPlan> {
self.rewrite_disjunctive_predicate(plan, optimizer_config)
}

fn name(&self) -> &str {
"rewrite_disjunctive_predicate"
}
}

#[cfg(test)]

mod tests {
Expand Down

0 comments on commit d7fa064

Please sign in to comment.