-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support non-tuple expression for exists-subquery to join #5264
Conversation
\n Projection: orders.o_custkey [o_custkey:Int64]\ | ||
\n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]\ | ||
\n Projection: orders.o_custkey [o_custkey:Int64]\ | ||
\n TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this pr, this rule will not keep the projection.
I think keeping projection make it easy to read, and spark also keeps it.
#[test] | ||
fn exists_subquery_no_cols() -> Result<()> { | ||
let sq = Arc::new( | ||
LogicalPlanBuilder::from(scan_tpch_table("orders")) | ||
.filter(col("customer.c_custkey").eq(col("customer.c_custkey")))? | ||
.filter(col("customer.c_custkey").eq(lit(1u32)))? | ||
.project(vec![col("orders.o_custkey")])? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
customer.c_custkey = customer.c_custkey
is too specific here, so modify it to customer.c_custkey = 1
.
SubqueryAlias: l3 | ||
Filter: lineitem.l_receiptdate > lineitem.l_commitdate | ||
TableScan: lineitem projection=[l_orderkey, l_suppkey, l_commitdate, l_receiptdate] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The difference is we keep the projection now.
datafusion/core/tests/sql/joins.rs
Outdated
@@ -2187,7 +2187,6 @@ async fn left_anti_join() -> Result<()> { | |||
} | |||
|
|||
#[tokio::test] | |||
#[ignore = "Test ignored, will be enabled after fixing the anti join plan bug"] | |||
// https://github.com/apache/arrow-datafusion/issues/4366 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test passes now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Linked issue #4366 to this pr.
Please take a look @alamb @jackwener. |
I will try and find time to review this PR over the next day or two |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @ygf11
I went over this code and the plan changes carefully and they look great to me.
The only thing I think needs to be done prior is double check handling of distincts. Otherwise really nice work. Thank you 🏆
\n TableScan: test projection=[col_int32, col_uint32, col_utf8]"; | ||
\n Projection: t2.col_int32, t2.col_uint32\ | ||
\n SubqueryAlias: t2\ | ||
\n TableScan: test projection=[col_int32, col_uint32]"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a nice improvement too -- the unused column col_utf8
is filtered out
for expr in subquery_filter_exprs { | ||
let cols = expr.to_columns()?; | ||
if check_all_column_from_schema(&cols, input_schema.clone()) { | ||
subquery_filters.push(expr.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if you need to clone the expr here. It seems like it is owned already it could be used directly here and below
Though I see this code was just refactored into a different module
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need clone here.
subquery_filter_exprs
is the result of split_conjunction
function, and the type is Vec<&Expr>
.
|
||
// join our sub query into the main plan | ||
let join_type = match query_info.negated { | ||
true => JoinType::LeftAnti, | ||
false => JoinType::LeftSemi, | ||
}; | ||
|
||
// TODO: add Distinct if the original plan is a Distinct. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this still a todo? I see the code above looking into Distinct children, but the distinct is not added back
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is still a todo, some distinct cases need more checks. like:
// SELECT t1.t1_id,
// t1.t1_name,
// t1.t1_int
// FROM t1
// WHERE EXISTS(SELECT DISTINCT t2_int
// FROM t2
// WHERE t2.t2_id > t1.t1_id);
// if we just add back the `DISTINCT`, the result:
Projection: t1.t1_id, t1.t1_name, t1.t1_int
LeftSemi Join: Filter: t2.t2_int > t1.t1_int
TableScan: t1
Distinct:
Projection: t2.t2_int
TableScan: t2
// expected result:
Projection: t1.t1_id, t1.t1_name, t1.t1_int
LeftSemi Join: Filter: t2.t2_int > t1.t1_int
TableScan: t1
Distinct:
Projection: t2.t2_int, t2.t2_id
TableScan: t2
t2_id
will not be in the projection.
The reason is we just consider the columns from join filter as the project items, we should also consider columns from original projection when there is an outer distinct.
I will do this in the following pr if it is ok 🤣.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will do this in the following pr if it is ok 🤣.
Absolutely it is ok! Thank you for all your work on this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again @ygf11 !
There appears to be a conflict in this PR -- I think once that is fixed we can merge it in.
Thanks for reviewing @alamb. I fixed the conflict. |
Benchmark runs are scheduled for baseline = 5d5b1a0 and contender = 27b15fd. 27b15fd is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
@ygf11 |
@ygf11 @jackwener |
Agree, most logics are same, we should combine them. I can take this task if others do not work on 😄. |
One more thing, looks like the SubqueryAlias logic is not consistent between the two rules I think you can make them consistent in the refactoring work. let sql = "SELECT t1_id, t1_name FROM t1 WHERE t1_id IN (SELECT t2_id FROM t2) ORDER BY t1_id";
let msg = format!("Creating logical plan for '{sql}'");
let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
let plan = dataframe.into_optimized_plan()?;
let expected = vec![
"Explain [plan_type:Utf8, plan:Utf8]",
" Sort: t1.t1_id ASC NULLS LAST [t1_id:UInt32;N, t1_name:Utf8;N]",
" Projection: t1.t1_id, t1.t1_name [t1_id:UInt32;N, t1_name:Utf8;N]",
" LeftSemi Join: t1.t1_id = __correlated_sq_1.t2_id [t1_id:UInt32;N, t1_name:Utf8;N]",
" TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
" SubqueryAlias: __correlated_sq_1 [t2_id:UInt32;N]",
" Projection: t2.t2_id AS t2_id [t2_id:UInt32;N]",
" TableScan: t2 projection=[t2_id] [t2_id:UInt32;N]",
]; let sql = "SELECT t1_id, t1_name FROM t1 WHERE NOT EXISTS (SELECT 1 FROM t2 WHERE t1_id = t2_id and t1_id > 11) ORDER BY t1_id";
let msg = format!("Creating logical plan for '{sql}'");
let dataframe = ctx.sql(&("explain ".to_owned() + sql)).await.expect(&msg);
let plan = dataframe.into_optimized_plan()?;
let expected = vec![
"Explain [plan_type:Utf8, plan:Utf8]",
" Sort: t1.t1_id ASC NULLS LAST [t1_id:UInt32;N, t1_name:Utf8;N]",
" Projection: t1.t1_id, t1.t1_name [t1_id:UInt32;N, t1_name:Utf8;N]",
" LeftAnti Join: t1.t1_id = t2.t2_id Filter: t1.t1_id > UInt32(11) [t1_id:UInt32;N, t1_name:Utf8;N]",
" TableScan: t1 projection=[t1_id, t1_name] [t1_id:UInt32;N, t1_name:Utf8;N]",
" Projection: t2.t2_id [t2_id:UInt32;N]",
" TableScan: t2 projection=[t2_id] [t2_id:UInt32;N]",
]; |
BTW, this PR looks really nice and fixed couple of issues related to the Exist subquery. |
Agree it. After unification, the changes of the two rules are synchronized, instead of differences |
* Support non-tuple expression for exists-subquery to join * fix tests * add tests * add comments * fix tests * fix test comment
Which issue does this PR close?
Closes #4934.
Closes #4366.
Rationale for this change
This sql works correctly in datafusion, and will be rewrited to
LeftSemi Join
.But the following sql will not:
These sql should also be rewrited to
LeftSemi Join
.What changes are included in this PR?
Are these changes tested?
Yes.
Are there any user-facing changes?