-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pushdown single column predicates from ON join clauses #3578
Conversation
Codecov Report
@@ Coverage Diff @@
## master #3578 +/- ##
=======================================
Coverage 85.92% 85.92%
=======================================
Files 301 300 -1
Lines 56249 56309 +60
=======================================
+ Hits 48330 48383 +53
- Misses 7919 7926 +7
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @AssHero -- I need to think about this transformation carefully and will review the code in the next day or two
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this transformation is correct, though I may be mistaken
@@ -248,6 +249,128 @@ fn get_pushable_join_predicates<'a>( | |||
.unzip() | |||
} | |||
|
|||
// examine OR clause to see if any useful clauses can be extracted and push down. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this transformation is correct. In particular, I don't think the results will always be the same
Schematically, we have this type of predicate (that is being evaluated during the join)
(A AND B) OR (C AND D)
This transformation proposes adding another (A OR B)
clause (evaluated before the join), so effectively
((A AND B) OR (C AND D)) AND (A OR B)
In order to do this transformation, the boolean statements must be equivalent for all inputs.
However, a counter example is
A: false, B: false, C: true, D: true
In this case, the original predicate would be true, but the rewrite would be false
Here is the program I wrote to generate the entire truth table: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=334938478775ba3cd55e7c400ea89b06
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This transformation should extract at least one quals from each sub-clauses of OR, else do nothing.
(A AND B) OR (C AND D)
will be transformed to
((A AND B) OR (C AND D)) AND (A OR C)
OR
((A AND B) OR (C AND D)) AND ((A AND B) OR C)
OR
do nothing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see -- thanks -- I checked those rewrites and https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=3b41b0409c8ecf4df0027f323668e0db they do look good to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @AssHero -- this makes more sense now. I'll try and find some time to review it more carefully over the next day or two
// TableScan: projection=[c, d] | ||
fn extract_or_clauses_for_join( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// TableScan: projection=[c, d] | |
fn extract_or_clauses_for_join( | |
// TableScan: projection=[c, d] | |
// | |
// In general, predicates of this form: | |
// | |
// (A AND B) OR (C AND D) | |
// | |
// will be transformed to | |
// | |
// ((A AND B) OR (C AND D)) AND (A OR C) | |
// | |
// OR | |
// | |
// ((A AND B) OR (C AND D)) AND ((A AND B) OR C) | |
// | |
// OR | |
// | |
// do nothing. | |
// | |
fn extract_or_clauses_for_join( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add this comments later. Thanks!
@@ -248,6 +249,128 @@ fn get_pushable_join_predicates<'a>( | |||
.unzip() | |||
} | |||
|
|||
// examine OR clause to see if any useful clauses can be extracted and push down. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see -- thanks -- I checked those rewrites and https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=3b41b0409c8ecf4df0027f323668e0db they do look good to me
I am sorry for the late review here -- it is on my list. Basically I am struggling to find time enough to sit down and convince myself that this is a correct transformation in all cases (esp with outer joins and nullability), and then also that we want to do this kind of pushdown. I just haven't had the time yet. |
Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this PR again @AssHero and I apologize for the late review
I spent a while this afternoon thinking about the transformation it proposes and I have basically convinced myself that it is correct (and quite clever, I want to point out)
I think this optimization might actually slow some plans down (if the filters that are pushed down don't actually filter many rows, they will just consume CPU). I don't have a great suggestion about this
Given the potential for very subtle wrong results with this kind of optimization I am trying to be cautious. I would like to add some additional comments and tests (will point out where inline below) prior to merging this. Maybe we can even add a config option to disable it as a way to further derisk the process.
cc @Dandandan / @andygrove / @avantgardnerio / @xudong963 / @thinkharderdev in case you have some other thoughts.
" TableScan: lineitem projection=[l_partkey, l_quantity] [l_partkey:Int64, l_quantity:Decimal128(15, 2)]", | ||
" Filter: #part.p_size >= Int32(1) [p_partkey:Int64, p_brand:Utf8, p_size:Int32]", | ||
" TableScan: part projection=[p_partkey, p_brand, p_size], partial_filters=[#part.p_size >= Int32(1)] [p_partkey:Int64, p_brand:Utf8, p_size:Int32]", | ||
" Filter: #lineitem.l_quantity >= Decimal128(Some(100),15,2) AND #lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR #lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND #lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR #lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND #lineitem.l_quantity <= Decimal128(Some(3000),15,2) [l_partkey:Int64, l_quantity:Decimal128(15, 2)]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went through this plan and I agree it seems correct (as in the pushed down filters don't filter out anything that would have passed the original filter)
datafusion/core/tests/sql/joins.rs
Outdated
@@ -1484,7 +1484,8 @@ async fn reduce_left_join_2() -> Result<()> { | |||
" Filter: CAST(#t2.t2_int AS Int64) < Int64(10) OR CAST(#t1.t1_int AS Int64) > Int64(2) AND #t2.t2_name != Utf8(\"w\") [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", | |||
" Inner Join: #t1.t1_id = #t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", | |||
" TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]", | |||
" TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", | |||
" Filter: CAST(#t2.t2_int AS Int64) < Int64(10) OR #t2.t2_name != Utf8(\"w\") [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 nice
// to new OR clause as predicate. | ||
// |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to explain the conditions under which a qual can be extracted as it may not be obvious to someone when they initially look at this.
// to new OR clause as predicate. | |
// | |
// to new OR clause as predicate. | |
// | |
// A qual is extracted if it it contains (only) common set of column references with the other quals. |
I am not sure that is correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would also like to see the return type documented here (as in what does the (Vec<Expr>, Vec<HashSet<Column>>)
represent? I think it is the extracted quals and their column references but I am not sure
(exprs, expr_columns) | ||
} | ||
|
||
// extract qual from OR sub-clause. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add some additional comments under what conditions the OR
clause is extracted? I tried to explain above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, I can add more comments for this.
I would be willing to help write some more tests / comments for this PR if others think it is a good idea. |
@AssHero do you have some numbers, e.g. how much does it help q19? @alamb one thing we might consider is to apply the optimization whenever there is an expensive parent node (joins/aggregate) that benefits from more filtering? Anyway, in most cases, filtering should be cheap compared to other operations as long as the expression is not very expensive (for example, no expensive UDF). |
Transformation is OK after I thought about it more
Before this commit 'Convert more cross joins to inner joins'(https://github.com/apache/arrow-datafusion/pull/3482), this optimization helps much in q19, and I intruduce this optimization to solve q19's performance problem. |
I have made some tests on q19 with this optimization on current version, it does not slow down the queries even through it helps little(may consume more CPU). |
add some comments about extract_or_clause. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @AssHero
I think this is good enough and we can adjust / fix this this transformation if we find in the future the predicates are hurting more than helping
Another nice thing about pushing single column predicates down is that they can be applied during the scan as well (e.g. #3463 )
I took the liberty of merging this branch from master and resolving the merge conflict in fe582a7 |
Thanks again for sticking with this one @AssHero |
The newly added TPCH plan benchmarks needed to be updated as well |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @avantgardnerio and @andygrove who I know run the tpch benchmarks regularly
I think we should probably run https://github.com/apache/arrow-datafusion/tree/master/benchmarks#benchmark-derived-from-tpc-h before merging this in
(Kudos to @andygrove for adding the tpch plan benchmarks -- they are super helpful)
@@ -14,7 +14,9 @@ Sort: shipping.supp_nation ASC NULLS LAST, shipping.cust_nation ASC NULLS LAST, | |||
TableScan: lineitem projection=[l_orderkey, l_suppkey, l_extendedprice, l_discount, l_shipdate] | |||
TableScan: orders projection=[o_orderkey, o_custkey] | |||
TableScan: customer projection=[c_custkey, c_nationkey] | |||
SubqueryAlias: n1 | |||
Filter: n1.n_name = Utf8("FRANCE") OR n1.n_name = Utf8("GERMANY") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 these should be fine, though I expect the performance to be negligible as the number of rows in nation
are very small
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think so too - although if it stays on the probe side (e.g. the HashBuildProbeOrder rule doesn't apply) - the joins might still quite a bit slower, and the input and output of the joins will be bigger as well making them more expensive.
FYI @andygrove this might change the perf in Ballista too.
@@ -3,7 +3,7 @@ Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS re | |||
Projection: lineitem.l_shipinstruct = Utf8("DELIVER IN PERSON") AS lineitem.l_shipinstruct = Utf8("DELIVER IN PERSON")Utf8("DELIVER IN PERSON")lineitem.l_shipinstruct, lineitem.l_shipmode IN ([Utf8("AIR"), Utf8("AIR REG")]) AS lineitem.l_shipmode IN ([Utf8("AIR"), Utf8("AIR REG")])Utf8("AIR REG")Utf8("AIR")lineitem.l_shipmode, part.p_size >= Int32(1) AS part.p_size >= Int32(1)Int32(1)part.p_size, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, part.p_brand, part.p_size, part.p_container | |||
Filter: part.p_brand = Utf8("Brand#12") AND part.p_container IN ([Utf8("SM CASE"), Utf8("SM BOX"), Utf8("SM PACK"), Utf8("SM PKG")]) AND lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) AND part.p_size <= Int32(5) OR part.p_brand = Utf8("Brand#23") AND part.p_container IN ([Utf8("MED BAG"), Utf8("MED BOX"), Utf8("MED PKG"), Utf8("MED PACK")]) AND lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) AND part.p_size <= Int32(10) OR part.p_brand = Utf8("Brand#34") AND part.p_container IN ([Utf8("LG CASE"), Utf8("LG BOX"), Utf8("LG PACK"), Utf8("LG PKG")]) AND lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2) AND part.p_size <= Int32(15) | |||
Inner Join: lineitem.l_partkey = part.p_partkey | |||
Filter: lineitem.l_shipmode IN ([Utf8("AIR"), Utf8("AIR REG")]) AND lineitem.l_shipinstruct = Utf8("DELIVER IN PERSON") | |||
Filter: lineitem.l_shipmode IN ([Utf8("AIR"), Utf8("AIR REG")]) AND lineitem.l_shipinstruct = Utf8("DELIVER IN PERSON") AND lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are probably good filters to have added as long as they are selective, though they may be expensive to evaluate because they will be evaluated against all rows in lineitem rather than only those rows which were not filtered out by the join with part.
To make this super fast, maybe could use "sideways information passing" to push down the part
filtering into the lineitem scan as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sometimes surprised when profiling how little CPU the filters consume - it is really cheap compared to parquet reads / joins / etc. so almost any filtering we can do probably helps as long as it filters out some data and a subsequent operators can benefit from it.
@alamb running the benchmarks now for q7 and q19 posting them here when done |
Looks like roughly a 1.35x speedup on q7 and a smaller 1.16x speedup on q19 🚀 Running with (on partitioned parquet data - 16 partitions, 16 vCPU):
Master:
This branch:
|
Hm getting different results now - may have been something running in the background on my machine - will update the results accordingly |
Ok - updated the benchmark results - looks good to go now! |
Thanks @AssHero @alamb |
Benchmark runs are scheduled for baseline = fc5081d and contender = e02376d. e02376d is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Nice! Thanks everyone |
Which issue does this PR close?
Closes #3577
Rationale for this change
optimize join's performance when join has OR clauses in filter or on clause.
examine any OR clauses of join to see if any useful clauses can be extracted and push down to join's rel to filter more rows before join.
for TPCH q19
the logical plan before optimized
the logical plan after optimized
we extract new predicate and push down to join's rel, this predicate filters more rows before join , makes join more effective.
What changes are included in this PR?
add extract OR clasue in datafusion/optimizer/src/filter_push_down.rs.