-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-27986][SQL] Support ANSI SQL filter clause for aggregate expression #26656
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
maropu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your hard work and I have no comment now.
|
Test build #115675 has finished for PR 26656 at commit
|
|
retest this please |
|
Test build #115681 has finished for PR 26656 at commit
|
|
retest this please |
|
cuz the commit related to the failure has bee reverted. |
|
Test build #115695 has finished for PR 26656 at commit
|
|
Test build #115701 has finished for PR 26656 at commit
|
|
retest this please |
|
Test build #115708 has finished for PR 26656 at commit
|
|
thanks, merging to master! |
|
@cloud-fan @maropu @viirya @dongjoon-hyun Thanks! |
…ateExpression ### What changes were proposed in this pull request? This pr intends to add filter information in the explain output of an aggregate (This is a follow-up of #26656). Without this pr: ``` scala> sql("select k, SUM(v) filter (where v > 3) from t group by k").explain(true) == Parsed Logical Plan == 'Aggregate ['k], ['k, unresolvedalias('SUM('v, ('v > 3)), None)] +- 'UnresolvedRelation [t] == Analyzed Logical Plan == k: int, sum(v): bigint Aggregate [k#0], [k#0, sum(cast(v#1 as bigint)) AS sum(v)#3L] +- SubqueryAlias `default`.`t` +- Relation[k#0,v#1] parquet == Optimized Logical Plan == Aggregate [k#0], [k#0, sum(cast(v#1 as bigint)) AS sum(v)#3L] +- Relation[k#0,v#1] parquet == Physical Plan == HashAggregate(keys=[k#0], functions=[sum(cast(v#1 as bigint))], output=[k#0, sum(v)#3L]) +- Exchange hashpartitioning(k#0, 200), true, [id=#20] +- HashAggregate(keys=[k#0], functions=[partial_sum(cast(v#1 as bigint))], output=[k#0, sum#7L]) +- *(1) ColumnarToRow +- FileScan parquet default.t[k#0,v#1] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/t], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<k:int,v:int> scala> sql("select k, SUM(v) filter (where v > 3) from t group by k").show() +---+------+ | k|sum(v)| +---+------+ +---+------+ ``` With this pr: ``` scala> sql("select k, SUM(v) filter (where v > 3) from t group by k").explain(true) == Parsed Logical Plan == 'Aggregate ['k], ['k, unresolvedalias('SUM('v, ('v > 3)), None)] +- 'UnresolvedRelation [t] == Analyzed Logical Plan == k: int, sum(v) FILTER (v > 3): bigint Aggregate [k#0], [k#0, sum(cast(v#1 as bigint)) filter (v#1 > 3) AS sum(v) FILTER (v > 3)#5L] +- SubqueryAlias `default`.`t` +- Relation[k#0,v#1] parquet == Optimized Logical Plan == Aggregate [k#0], [k#0, sum(cast(v#1 as bigint)) filter (v#1 > 3) AS sum(v) FILTER (v > 3)#5L] +- Relation[k#0,v#1] parquet == Physical Plan == HashAggregate(keys=[k#0], functions=[sum(cast(v#1 as bigint))], output=[k#0, sum(v) FILTER (v > 3)#5L]) +- Exchange hashpartitioning(k#0, 200), true, [id=#20] +- HashAggregate(keys=[k#0], functions=[partial_sum(cast(v#1 as bigint)) filter (v#1 > 3)], output=[k#0, sum#9L]) +- *(1) ColumnarToRow +- FileScan parquet default.t[k#0,v#1] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/t], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<k:int,v:int> scala> sql("select k, SUM(v) filter (where v > 3) from t group by k").show() +---+---------------------+ | k|sum(v) FILTER (v > 3)| +---+---------------------+ +---+---------------------+ ``` ### Why are the changes needed? For better usability. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Manually. Closes #27198 from maropu/SPARK-27986-FOLLOWUP. Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
…edicate is not supported ### What changes were proposed in this pull request? This is a followup of #26656. We don't support window aggregate function with filter predicate yet and we should fail explicitly. Observable metrics has the same issue. This PR fixes it as well. ### Why are the changes needed? If we simply ignore filter predicate when we don't support it, the result is wrong. ### Does this PR introduce any user-facing change? yea, fix the query result. ### How was this patch tested? new tests Closes #27476 from cloud-fan/filter. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…edicate is not supported ### What changes were proposed in this pull request? This is a followup of #26656. We don't support window aggregate function with filter predicate yet and we should fail explicitly. Observable metrics has the same issue. This PR fixes it as well. ### Why are the changes needed? If we simply ignore filter predicate when we don't support it, the result is wrong. ### Does this PR introduce any user-facing change? yea, fix the query result. ### How was this patch tested? new tests Closes #27476 from cloud-fan/filter. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com> (cherry picked from commit 5a4c70b) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…ssion ### What changes were proposed in this pull request? The filter predicate for aggregate expression is an `ANSI SQL`. ``` <aggregate function> ::= COUNT <left paren> <asterisk> <right paren> [ <filter clause> ] | <general set function> [ <filter clause> ] | <binary set function> [ <filter clause> ] | <ordered set function> [ <filter clause> ] | <array aggregate function> [ <filter clause> ] | <row pattern count function> [ <filter clause> ] ``` There are some mainstream database support this syntax. **PostgreSQL:** https://www.postgresql.org/docs/current/sql-expressions.html#SYNTAX-AGGREGATES For example: ``` SELECT year, count(*) FILTER (WHERE gdp_per_capita >= 40000) FROM countries GROUP BY year ``` ``` SELECT year, code, gdp_per_capita, count(*) FILTER (WHERE gdp_per_capita >= 40000) OVER (PARTITION BY year) FROM countries ``` **jOOQ:** https://blog.jooq.org/2014/12/30/the-awesome-postgresql-9-4-sql2003-filter-clause-for-aggregate-functions/ **Notice:** 1.This PR only supports FILTER predicate without codegen. maropu will create another PR is related to SPARK-30027 to support codegen. 2.This PR only supports FILTER predicate without DISTINCT. I will create another PR is related to SPARK-30276 to support this. 3.This PR only supports FILTER predicate that can't reference the outer query. I created ticket SPARK-30219 to support it. 4.This PR only supports FILTER predicate that can't use IN/EXISTS predicate sub-queries. I created ticket SPARK-30220 to support it. 5.Spark SQL cannot supports a SQL with nested aggregate. I created ticket SPARK-30182 to support it. There are some show of the PR on my production environment. ``` spark-sql> desc gja_test_partition; key string NULL value string NULL other string NULL col2 int NULL # Partition Information # col_name data_type comment col2 int NULL Time taken: 0.79 s ``` ``` spark-sql> select * from gja_test_partition; a A ao 1 b B bo 1 c C co 1 d D do 1 e E eo 2 g G go 2 h H ho 2 j J jo 2 f F fo 3 k K ko 3 l L lo 4 i I io 4 Time taken: 1.75 s ``` ``` spark-sql> select count(key), sum(col2) from gja_test_partition; 12 26 Time taken: 1.848 s ``` ``` spark-sql> select count(key) filter (where col2 > 1) from gja_test_partition; 8 Time taken: 2.926 s ``` ``` spark-sql> select sum(col2) filter (where col2 > 2) from gja_test_partition; 14 Time taken: 2.087 s ``` ``` spark-sql> select count(key) filter (where col2 > 1), sum(col2) filter (where col2 > 2) from gja_test_partition; 8 14 Time taken: 2.847 s ``` ``` spark-sql> select count(key), count(key) filter (where col2 > 1), sum(col2), sum(col2) filter (where col2 > 2) from gja_test_partition; 12 8 26 14 Time taken: 1.787 s ``` ``` spark-sql> desc student; id int NULL name string NULL sex string NULL class_id int NULL Time taken: 0.206 s ``` ``` spark-sql> select * from student; 1 张三 man 1 2 李四 man 1 3 王五 man 2 4 赵六 man 2 5 钱小花 woman 1 6 赵九红 woman 2 7 郭丽丽 woman 2 Time taken: 0.786 s ``` ``` spark-sql> select class_id, count(id), sum(id) from student group by class_id; 1 3 8 2 4 20 Time taken: 18.783 s ``` ``` spark-sql> select class_id, count(id) filter (where sex = 'man'), sum(id) filter (where sex = 'woman') from student group by class_id; 1 2 5 2 2 13 Time taken: 3.887 s ``` ### Why are the changes needed? Add new SQL feature. ### Does this PR introduce any user-facing change? 'No'. ### How was this patch tested? Exists UT and new UT. Closes apache#26656 from beliefer/support-aggregate-clause. Lead-authored-by: gengjiaan <gengjiaan@360.cn> Co-authored-by: Jiaan Geng <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…edicate is not supported ### What changes were proposed in this pull request? This is a followup of apache#26656. We don't support window aggregate function with filter predicate yet and we should fail explicitly. Observable metrics has the same issue. This PR fixes it as well. ### Why are the changes needed? If we simply ignore filter predicate when we don't support it, the result is wrong. ### Does this PR introduce any user-facing change? yea, fix the query result. ### How was this patch tested? new tests Closes apache#27476 from cloud-fan/filter. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…of DISTINCT ### What changes were proposed in this pull request? This PR is related to #26656. #26656 only support use FILTER clause on aggregate expression without DISTINCT. This PR will enhance this feature when one or more DISTINCT aggregate expressions which allows the use of the FILTER clause. Such as: ``` select sum(distinct id) filter (where sex = 'man') from student; select class_id, sum(distinct id) filter (where sex = 'man') from student group by class_id; select count(id) filter (where class_id = 1), sum(distinct id) filter (where sex = 'man') from student; select class_id, count(id) filter (where class_id = 1), sum(distinct id) filter (where sex = 'man') from student group by class_id; select sum(distinct id), sum(distinct id) filter (where sex = 'man') from student; select class_id, sum(distinct id), sum(distinct id) filter (where sex = 'man') from student group by class_id; select class_id, count(id), count(id) filter (where class_id = 1), sum(distinct id), sum(distinct id) filter (where sex = 'man') from student group by class_id; ``` ### Why are the changes needed? Spark SQL only support use FILTER clause on aggregate expression without DISTINCT. This PR support Filter expression allows simultaneous use of DISTINCT ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Exists and new UT Closes #29291 from beliefer/support-distinct-with-filter. Lead-authored-by: gengjiaan <gengjiaan@360.cn> Co-authored-by: beliefer <beliefer@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
The filter predicate for aggregate expression is an
ANSI SQL.There are some mainstream database support this syntax.
PostgreSQL:
https://www.postgresql.org/docs/current/sql-expressions.html#SYNTAX-AGGREGATES
For example:
jOOQ:
https://blog.jooq.org/2014/12/30/the-awesome-postgresql-9-4-sql2003-filter-clause-for-aggregate-functions/
Notice:
1.This PR only supports FILTER predicate without codegen. maropu will create another PR is related to SPARK-30027 to support codegen.
2.This PR only supports FILTER predicate without DISTINCT. I will create another PR is related to SPARK-30276 to support this.
3.This PR only supports FILTER predicate that can't reference the outer query. I created ticket SPARK-30219 to support it.
4.This PR only supports FILTER predicate that can't use IN/EXISTS predicate sub-queries. I created ticket SPARK-30220 to support it.
5.Spark SQL cannot supports a SQL with nested aggregate. I created ticket SPARK-30182 to support it.
There are some show of the PR on my production environment.
Why are the changes needed?
Add new SQL feature.
Does this PR introduce any user-facing change?
'No'.
How was this patch tested?
Exists UT and new UT.