Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move subquery alias assignment onto rules #4767

Merged
merged 1 commit into from
Dec 30, 2022

Conversation

tustvold
Copy link
Contributor

Which issue does this PR close?

Closes #3437
Closes #3791

(although these were actually fixed by #4038)

Rationale for this change

Having mutable state associated with OptimizerConfig complicates implementing it for SessionState. I originally hoped to implement OptimizerConfig for DataFrame, but this would have required churning all the planning machinery and so I decided against this.

There is one implication potentially worth highlighting with this change, as the state is now associated with the OptimizerRule and not an OptimizerContext created for the call to SessionState::optimize, state will now bleed across calls whereas it previously wouldn't. In some ways this is better, as previously calling optimize twice could theoretically result in an alias collision, but I thought it worth highlighting.

What changes are included in this PR?

Move alias state from OptimizerConfig onto the individual rules

Are these changes tested?

Are there any user-facing changes?

Yes, queries will get potentially different aliases assigned to them by the optimizer.

@tustvold tustvold added the api change Changes the API exposed to users of the crate label Dec 29, 2022
@github-actions github-actions bot added core Core DataFusion crate optimizer Optimizer rules labels Dec 29, 2022
/// See https://github.com/apache/arrow-datafusion/issues/3791
#[tokio::test]
#[ignore]
#[tokio::test]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was fixed by #4038

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense to me

@@ -242,7 +245,7 @@ fn optimize_scalar(
}

// Only operate if one column is present and the other closed upon from outside scope
let subqry_alias = format!("__sq_{}", config.next_id());
let subqry_alias = alias.next("__scalar_sq");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I debated passing an Arc<AliasGenerator> into each of the rules, this created a fair amount of churn, and it is easier for the rules to just assign a unique prefix. As an added bonus it highlights where the alias came from, which is nice I guess

@@ -230,7 +230,6 @@ async fn tpch_q4_correlated() -> Result<()> {
Ok(())
}

#[ignore] // https://github.com/apache/arrow-datafusion/issues/3437
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This query was also fixed by #4038

@@ -255,17 +254,18 @@ async fn tpch_q17_correlated() -> Result<()> {
let dataframe = ctx.sql(sql).await.unwrap();
let plan = dataframe.into_optimized_plan().unwrap();
let actual = format!("{}", plan.display_indent());
let expected = r#"Projection: CAST(SUM(lineitem.l_extendedprice) AS Decimal128(38, 33)) / CAST(Float64(7) AS Decimal128(38, 33)) AS avg_yearly
let expected = r#"Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) / Float64(7) AS avg_yearly
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix from #4038

@@ -275,7 +275,7 @@ async fn tpch_q17_correlated() -> Result<()> {
"+--------------------+",
"| avg_yearly |",
"+--------------------+",
"| 1901.3714285714286 |",
"| 190.13714285714286 |",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've not independently verified this result, but I believe @andygrove added CI to check the TPCH queries are returning the correct results, so I think this was just out of date

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the basic idea in this PR

One thing I wonder about is what happens if the same rule is run multiple times? Won't the subsequent runs potentially reuse the same aliases?

/// See https://github.com/apache/arrow-datafusion/issues/3791
#[tokio::test]
#[ignore]
#[tokio::test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@xudong963 xudong963 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change makes sense to me.

@tustvold tustvold marked this pull request as draft December 29, 2022 13:57
@tustvold
Copy link
Contributor Author

tustvold commented Dec 30, 2022

One thing I wonder about is what happens if the same rule is run multiple times?

This will be fine, there is a potential collision if the same rule appears multiple times in the same list of optimizers, i.e. something like

let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
    Arc::new(DecorrelateWhereIn::new()),
    Arc::new(ScalarSubqueryToJoin::new()),
    ...,
    Arc::new(ScalarSubqueryToJoin::new()),
    ...
];

However, this doesn't occur in practice and could easily be worked around by using

let subquery = Arc::new(ScalarSubqueryToJoin::new());
let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
    Arc::new(DecorrelateWhereIn::new()),
    subquery.clone(),
    ...,
    subquery,
    ...
];

@tustvold tustvold marked this pull request as ready for review December 30, 2022 14:23
@tustvold tustvold merged commit cf45eb9 into apache:master Dec 30, 2022
@ursabot
Copy link

ursabot commented Dec 30, 2022

Benchmark runs are scheduled for baseline = f7477dc and contender = cf45eb9. cf45eb9 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@mingmwang
Copy link
Contributor

One question regarding the Subquery Alias generation logic:

Why does the InSubquery generate a Subquery Alias, but the Exists Subquery does not ?

@mingmwang
Copy link
Contributor

@jackwener
Do you have some idea ?

@jackwener
Copy link
Member

#[test]
fn exists_subquery() {
    let sql = "SELECT id FROM person p WHERE EXISTS \
            (SELECT first_name FROM person \
            WHERE last_name = p.last_name \
            AND state = p.state)";

    let expected = "Projection: p.id\
        \n  Filter: EXISTS (<subquery>)\
        \n    Subquery:\
        \n      Projection: person.first_name\
        \n        Filter: person.last_name = p.last_name AND person.state = p.state\
        \n          TableScan: person\
        \n    SubqueryAlias: p\
        \n      TableScan: person";
    quick_test(sql, expected);
}

looks like exists subquery also can generate a Subquery Alias. @mingmwang

@mingmwang
Copy link
Contributor

Yes, but after the logical optimization, the SubqueryAlias is gone.

@jackwener
Copy link
Member

Yes, but after the logical optimization, the SubqueryAlias is gone.

Because optimize_where_in have param AliasGenerator but optimize_exists don't.

@ygf11
Copy link
Contributor

ygf11 commented Feb 21, 2023

Why does the InSubquery generate a Subquery Alias, but the Exists Subquery does not ?

I think where-exists to join doesn't need subquery alias.

The main purpose of alias is to make some queries work(subquery and outer query relative to same table), but where-exists-to-join should not optimized these queries.

For example:

select * from t1 where exists (select t1_id from t1 where t1_int > t1_id)

In this case, although it looks t1_int > t1_id belongs to both outer table and subquery, actually we should treat it as subquery filter. Then the sql will not be rewritten to join, the plan in spark is also not a join:

spark-sql> explain extended  select * from t1 where exists (select t1_id from t1 where t1_int > t1_id);
...
== Optimized Logical Plan ==
Filter isnotnull(scalar-subquery#123 [])
:  +- GlobalLimit 1
:     +- LocalLimit 1
:        +- Project [1 AS col#135]
:           +- Filter ((isnotnull(t1_int#134) AND isnotnull(t1_id#132)) AND (t1_int#134 > t1_id#132))
:              +- HiveTableRelation [`test`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [t1_id#132, t1_name#133, t1_int#134], Partition Cols: []]
+- HiveTableRelation [`test`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [t1_id#129, t1_name#130, t1_int#131], Partition Cols: []]
...

InSubquery is different, it has an extra predicate, and the similar query will be rewritten to join.

explain select * from t1 where t1_id in (select t1_int from t1 where t1.t1_int > t1.t1_id);

The join predicate is t1.t1_id = t1.t1_id, it is not clear if we do not add subquery alias.

spark result:

spark-sql> explain extended  select * from t1 where t1_id in (select t1_int from t1 where t1.t1_int > t1.t1_id);
...
== Optimized Logical Plan ==
Join LeftSemi, (t1_id#203 = t1_int#208)
:- HiveTableRelation [`test`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [t1_id#203, t1_name#204, t1_int#205], Partition Cols: []]
+- Project [t1_int#208]
   +- Filter ((isnotnull(t1_int#208) AND isnotnull(t1_id#206)) AND (t1_int#208 > t1_id#206))
      +- HiveTableRelation [`test`.`t1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [t1_id#206, t1_name#207, t1_int#208], Partition Cols: []]
...

@jackwener
Copy link
Member

jackwener commented Feb 21, 2023

I think where-exists to join doesn't need subquery alias.

@ygf11 In fact, subquery alias doesn't actually work, we can eliminate it, and I'm getting ready to do it.


subquery alias just exist in Bound Plan/Analyzed Plan, it will be eliminated.

AND, SubqueryAlias in both two query are eliminated in rule EliminateSubqueryAlias instead of in Unnest-Subquery.

You may mistake Unnest-Subquery eliminate subquery alias

@ygf11
Copy link
Contributor

ygf11 commented Feb 22, 2023

In fact, subquery alias doesn't actually work, we can eliminate it, and I'm getting ready to do it.

Thanks for explaining @jackwener. It helps a lot.

I find the behavior of subquery alias is different from it in spark:

  • DataFusion doesn't add subquery alias in Bind/Analyze stage.
  • If the in-subquery can be optimized to join, it adds a subquery alias to the plan.

Following are examples:

> explain select * from t1 where t1_id in (select t2_id from t2);
# optimized plan
Projection: t1.t1_id, t1.t1_name, t1.t1_int                                                                                                                     
  LeftSemi Join: t1.t1_id = __correlated_sq_3.t2_id                                                                                                                
    TableScan: t1 projection=[t1_id, t1_name, t1_int]                                                                                                               
    SubqueryAlias: __correlated_sq_3 # Added in DecorrelateWhereIn rule                                                                                                                               
      Projection: t2.t2_id                                                                                                                                       
          TableScan: t2 projection=[t2_id]

> explain select * from t1 where t1_id in (select t2_id from t2 as x);
# optimized plan
Projection: t1.t1_id, t1.t1_name, t1.t1_int                                                                                                                     
  LeftSemi Join: t1.t1_id = __correlated_sq_3.t2_id                                                                                                                
    TableScan: t1 projection=[t1_id, t1_name, t1_int]                                                                                                               
    SubqueryAlias: __correlated_sq_3   # Added in DecorrelateWhereIn rule                                                                                                                                   
      Projection: x.t2_id AS t2_id                                                                                                                                      
        SubqueryAlias: x     # Introduced by `AS` keyword                                                                                                                                             
          TableScan: t2 projection=[t2_id]

As you plan to eliminate subquery alias, I think maybe we should add subquery alias in Bind/Analyze stage not in optimization stage.

BTW, DecorrelateWhereExists doesn't add subquery alias, so alias seems gone after optimization.

How do you think? @jackwener @mingmwang

@jackwener
Copy link
Member

jackwener commented Feb 22, 2023

As you plan to eliminate subquery alias, I think maybe we should add subquery alias in Bind/Analyze stage not in optimization stage.

Current, There isn't a binder/analyzer in Datafusion, I tried to implemented it in the past, but it's a difficult and huge work.

I think add subquery alias in optimization also is reasonable.

DecorrelateWhereExists doesn't add subquery alias

I think it's an oversight, and if anyone want to unify DecorrelateWhereExists and DecorrelateWhereIn, I think this problem will be fixed.

I also suspect that this problem may be caused by out-of-sync modifications (DecorrelateWhereIn was fixed but DecorrelateWhereExists not)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate core Core DataFusion crate optimizer Optimizer rules
Projects
None yet
8 participants