-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Factorize common AND factors out of OR predicates to support filterPu… #3903
Conversation
…shDown as possible Signed-off-by: yangjiang <yangjiang@ebay.com>
@@ -797,7 +817,7 @@ impl OptimizerRule for FilterPushDown { | |||
plan: &LogicalPlan, | |||
_: &mut OptimizerConfig, | |||
) -> Result<LogicalPlan> { | |||
optimize(plan, State::default()) | |||
optimize(plan, State::default().with_cnf_rewrite()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Ted-Jiang -- I have one set of proposed changes here: Ted-Jiang#27
@@ -80,6 +83,11 @@ impl State { | |||
.zip(predicates.1) | |||
.for_each(|(expr, cols)| self.filters.push((expr.clone(), cols.clone()))) | |||
} | |||
|
|||
fn with_cnf_rewrite(mut self) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should have some doccomments about what it is and when one it should/should not be set
datafusion/optimizer/src/utils.rs
Outdated
} | ||
} | ||
|
||
impl ExprRewriter for CnfHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like this code is a little confusing -- it isn't really doing a rewrite the same way other ExprRewriters
do. Instead it is doing an analysis in one pass through (via pre_visit
) and then replacing it in one go
I also feel like number of clone
s is significant -- it both copies the original expr (in case the rewrite doesn't work) but also copies the exprs in the BinaryExprs
I feel like there must be a way to avoid at least one of those copies 🤔 But I am not sure without trying it myself
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ted-Jiang did you try to simplify this further -- if not, would you like me to try and find time to give it a try?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb you can try this 😂, maybe i will learn something from you, i am not very familiar with this rewrite framework
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can continue to work on page index
add split_binary_owned rather than change signature of split_conjuction_owned
Signed-off-by: yangjiang <yangjiang@ebay.com>
@alamb Sorry for the late respond.
Yes😂, i think it because not mutate in the same place, this rewrite build an expr tress in another shape. I try to add |
Thanks a lot !😄 |
benchmarks/expected-plans/q7.txt
Outdated
@@ -3,7 +3,7 @@ Sort: shipping.supp_nation ASC NULLS LAST, shipping.cust_nation ASC NULLS LAST, | |||
Aggregate: groupBy=[[shipping.supp_nation, shipping.cust_nation, shipping.l_year]], aggr=[[SUM(shipping.volume)]] | |||
Projection: shipping.supp_nation, shipping.cust_nation, shipping.l_year, shipping.volume, alias=shipping | |||
Projection: n1.n_name AS supp_nation, n2.n_name AS cust_nation, datepart(Utf8("YEAR"), lineitem.l_shipdate) AS l_year, CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) AS volume, alias=shipping | |||
Filter: n1.n_name = Utf8("FRANCE") AND n2.n_name = Utf8("GERMANY") OR n1.n_name = Utf8("GERMANY") AND n2.n_name = Utf8("FRANCE") | |||
Filter: n1.n_name = Utf8("FRANCE") OR n2.n_name = Utf8("FRANCE") AND n2.n_name = Utf8("GERMANY") OR n1.n_name = Utf8("GERMANY") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is so clever
Sounds good ! I will try and find time tomorrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ted-Jiang I changed how this worked a little -- any chance you can give it a look?
pub fn cnf_rewrite(expr: Expr) -> Expr { | ||
// Find all exprs joined by OR | ||
let disjuncts = split_binary(&expr, Operator::Or); | ||
|
||
// For each expr, split now on AND | ||
// A OR B OR C --> split each A, B and C | ||
let disjunct_conjuncts: VecDeque<Vec<&Expr>> = disjuncts | ||
.into_iter() | ||
.map(|e| split_binary(e, Operator::And)) | ||
.collect::<VecDeque<_>>(); | ||
|
||
// Decide if we want to distribute the clauses. Heuristic is | ||
// chosen to avoid creating huge predicates | ||
let num_conjuncts = disjunct_conjuncts | ||
.iter() | ||
.fold(1usize, |sz, exprs| sz.saturating_mul(exprs.len())); | ||
|
||
if disjunct_conjuncts.iter().any(|exprs| exprs.len() > 1) | ||
&& num_conjuncts < MAX_CNF_REWRITE_CONJUNCTS | ||
{ | ||
let or_clauses = permutations(disjunct_conjuncts) | ||
.into_iter() | ||
// form the OR clauses( A OR B OR C ..) | ||
.map(|exprs| disjunction(exprs.into_iter().cloned()).unwrap()); | ||
conjunction(or_clauses).unwrap() | ||
} | ||
// otherwise return the original expression | ||
else { | ||
expr | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ted-Jiang I got nerdsniped working on this PR -- I hope you don't mind I rewrote your logic into something that avoided cloning unless it is actually doing the rewrite
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, without using the rewrite trait seems more clearly 👍
} | ||
|
||
#[test] | ||
fn test_rewrite_cnf() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests are all the same
} | ||
} | ||
|
||
const MAX_CNF_REWRITE_CONJUNCTS: usize = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By setting limiting the number of exprs to be created, I also avoided having to explicitly disable the cnf rewrite
|
||
// Decide if we want to distribute the clauses. Heuristic is | ||
// chosen to avoid creating huge predicates | ||
let num_conjuncts = disjunct_conjuncts |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice check!
pub fn cnf_rewrite(expr: Expr) -> Expr { | ||
// Find all exprs joined by OR | ||
let disjuncts = split_binary(&expr, Operator::Or); | ||
|
||
// For each expr, split now on AND | ||
// A OR B OR C --> split each A, B and C | ||
let disjunct_conjuncts: VecDeque<Vec<&Expr>> = disjuncts | ||
.into_iter() | ||
.map(|e| split_binary(e, Operator::And)) | ||
.collect::<VecDeque<_>>(); | ||
|
||
// Decide if we want to distribute the clauses. Heuristic is | ||
// chosen to avoid creating huge predicates | ||
let num_conjuncts = disjunct_conjuncts | ||
.iter() | ||
.fold(1usize, |sz, exprs| sz.saturating_mul(exprs.len())); | ||
|
||
if disjunct_conjuncts.iter().any(|exprs| exprs.len() > 1) | ||
&& num_conjuncts < MAX_CNF_REWRITE_CONJUNCTS | ||
{ | ||
let or_clauses = permutations(disjunct_conjuncts) | ||
.into_iter() | ||
// form the OR clauses( A OR B OR C ..) | ||
.map(|exprs| disjunction(exprs.into_iter().cloned()).unwrap()); | ||
conjunction(or_clauses).unwrap() | ||
} | ||
// otherwise return the original expression | ||
else { | ||
expr | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, without using the rewrite trait seems more clearly 👍
} | ||
Expr::Alias(expr, _) => split_conjunction_owned_impl(*expr, exprs), | ||
Expr::Alias(expr, _) => split_binary_owned_impl(*expr, operator, exprs), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my bad 😂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that was actually my bad when I refactored it back to conjunction
🤦
let or_clauses = permutations(disjunct_conjuncts) | ||
.into_iter() | ||
// form the OR clauses( A OR B OR C ..) | ||
.map(|exprs| disjunction(exprs.into_iter().cloned()).unwrap()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems we only clone the exprs once here🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that is my analysis too -- this code will clone exprs only if it rewrites the expression -- if not, it will not clone anything
@alamb Thanks for your help. This change looks better than mine 😄 |
Thanks for all your work on Page Index ❤️ ! |
Perhaps @isidentical or @mingmwang would have time to review this logic as well |
I'll take a look at it tonight, thanks for the ping @alamb! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Amazing work people, love it!
|
||
#[test] | ||
fn test_permutations_two_and_one_and_two() { | ||
// [[a, b], [c], [d, e]] --> [[a, c, d], [a, c, e], [b, c, d], [b, c, e]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are super useful, thanks 💯
@@ -3,7 +3,7 @@ Sort: shipping.supp_nation ASC NULLS LAST, shipping.cust_nation ASC NULLS LAST, | |||
Aggregate: groupBy=[[shipping.supp_nation, shipping.cust_nation, shipping.l_year]], aggr=[[SUM(shipping.volume)]] | |||
Projection: shipping.supp_nation, shipping.cust_nation, shipping.l_year, shipping.volume, alias=shipping | |||
Projection: n1.n_name AS supp_nation, n2.n_name AS cust_nation, datepart(Utf8("YEAR"), lineitem.l_shipdate) AS l_year, CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) AS volume, alias=shipping | |||
Filter: n1.n_name = Utf8("FRANCE") AND n2.n_name = Utf8("GERMANY") OR n1.n_name = Utf8("GERMANY") AND n2.n_name = Utf8("FRANCE") | |||
Filter: (n1.n_name = Utf8("FRANCE") OR n2.n_name = Utf8("FRANCE")) AND (n2.n_name = Utf8("GERMANY") OR n1.n_name = Utf8("GERMANY")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've also wanted to check TPC-H (it shouldn't affect, but just to see if there is an unexpected regression). It seems like there aren't any regressions (873.27 ms
vs 878.54 ms
, only noise) 🚀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@isidentical thanks for testing👍! I think this rewrite will make row_filter
work, I think it will boost up query when row_filter
is stable! 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for checking @isidentical -- I think there are only 25 rows in the NATION
table in TPCH, so the ordering of predicates doesn't really matter for performance in that case 😆
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think there are only 25 rows in the NATION table in TPCH,
😂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I plan to merge this tomorrow unless anyone has any final thoughts
I think for the Physical FilterExec, we should still keep the original form or a simplified form for runtime evaluation. |
@mingmwang I didn't quite understand this comment -- do you mean you think this PR is a good idea, or that it needs more work (to keep the original form)? |
@alamb |
I double checked @mingmwang This code is called as part of And then simplification is then called so I think the simplifications should happen before physical planning |
I merged this with master locally and all the tests pass -- so I am merging this one in |
Thanks again @Ted-Jiang |
Benchmark runs are scheduled for baseline = 6fae0bd and contender = 3452345. 3452345 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
apache#3903) * Factorize common AND factors out of OR predicates to support filterPushDown as possible Signed-off-by: yangjiang <yangjiang@ebay.com> * add split_binary_owned rather than change signature of split_conjuction_owned * add `split_binary` and avoid some clone calls Signed-off-by: yangjiang <yangjiang@ebay.com> * Update plans * Simplify tests * Update tests * Rewrite CNF without recursion * Change heuristic * Keep cleaning up * tests pass * cleanup * cleanups * Clean up docstrings * Clippy, restore missing test Signed-off-by: yangjiang <yangjiang@ebay.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
apache#3903) * Factorize common AND factors out of OR predicates to support filterPushDown as possible Signed-off-by: yangjiang <yangjiang@ebay.com> * add split_binary_owned rather than change signature of split_conjuction_owned * add `split_binary` and avoid some clone calls Signed-off-by: yangjiang <yangjiang@ebay.com> * Update plans * Simplify tests * Update tests * Rewrite CNF without recursion * Change heuristic * Keep cleaning up * tests pass * cleanup * cleanups * Clean up docstrings * Clippy, restore missing test Signed-off-by: yangjiang <yangjiang@ebay.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
…shDown as possible
Signed-off-by: yangjiang yangjiang@ebay.com
Which issue does this PR close?
Closes #3858 .
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?