-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[refactor](nereids) agg strategy refactor #54079
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
| } | ||
| this.hboPlanStatisticsProvider = Objects.requireNonNull(Env.getCurrentEnv().getHboPlanStatisticsManager() | ||
| .getHboPlanStatisticsProvider(), "HboPlanStatisticsProvider is null"); | ||
| this.requestChildrenProperties = childrenProperties; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why need this?
| // 2. collect agg expressions and generate agg function to slot reference map | ||
| List<Slot> aggFunctionOutput = Lists.newArrayList(); | ||
| ArrayList<FunctionCallExpr> execAggregateFunctions = Lists.newArrayListWithCapacity(outputExpressions.size()); | ||
| boolean isPartial = aggregate.getAggregateParam().aggMode.productAggregateBuffer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a bug in base code? maybe we should change the logic that generate aggMode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@feiniaofeiafei why aggMode isPartial but contains partial aggregation function
| if (curChildIndex == groupExpression.arity()) { | ||
| if (!calculateEnforce(requestChildrenProperties, outputChildrenProperties)) { | ||
| return; // if error exists, return | ||
| clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is a bug in base code too? we should fix it in a seperate PR and add some UT and regression case for it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
| // rewriteJobs.addAll(jobs(topic("split multi distinct", | ||
| // custom(RuleType.SPLIT_MULTI_DISTINCT, () -> SplitMultiDistinct.INSTANCE)))); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
redundant code?
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
Show resolved
Hide resolved
fe/fe-core/src/main/java/org/apache/doris/nereids/util/AggregateUtils.java
Show resolved
Hide resolved
| && children.get(0).getPlan() instanceof PhysicalDistribute) { | ||
| return ImmutableList.of(); | ||
| // 如果origin property 满足group by key, 但是不满足required, 那么禁用这个计划 | ||
| PhysicalProperties originChildProperty = originChildrenProperties.get(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add ut
| public static final List<Class<? extends AggregateFunction>> finalMultiDistinctSupportOtherFunc = | ||
| ImmutableList.of(Count.class, Sum.class, Min.class, Max.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sum0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
| * - Available memory resources | ||
| * - Query complexity | ||
| */ | ||
| public class DistinctAggStrategySelector extends DefaultPlanRewriter<DistinctSelectorContext> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add ut
| /** | ||
| * Split multi distinct strategy | ||
| * */ | ||
| public class SplitMultiDistinctStrategy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add ut
|
run buildall |
cd9c2fc to
f9fee5b
Compare
|
run buildall |
TPC-H: Total hot run time: 34723 ms |
TPC-DS: Total hot run time: 175102 ms |
|
run buildall |
TPC-H: Total hot run time: 34390 ms |
TPC-DS: Total hot run time: 173148 ms |
68d260d to
a6255fe
Compare
|
run buildall |
TPC-H: Total hot run time: 34130 ms |
TPC-DS: Total hot run time: 172100 ms |
FE UT Coverage ReportIncrement line coverage |
|
run buildall |
TPC-H: Total hot run time: 34388 ms |
TPC-DS: Total hot run time: 186091 ms |
|
run buildall |
TPC-H: Total hot run time: 34455 ms |
TPC-DS: Total hot run time: 186143 ms |
ClickBench: Total hot run time: 32.94 s |
FE UT Coverage ReportIncrement line coverage |
|
run buildall |
TPC-H: Total hot run time: 34269 ms |
TPC-DS: Total hot run time: 186447 ms |
ClickBench: Total hot run time: 32.57 s |
FE UT Coverage ReportIncrement line coverage |
FE Regression Coverage ReportIncrement line coverage |
|
PR approved by at least one committer and no changes requested. |
|
PR approved by anyone and no changes requested. |
6b77923 to
4e330cc
Compare
4e330cc to
df8951b
Compare
|
run buildall |
TPC-H: Total hot run time: 34763 ms |
TPC-DS: Total hot run time: 190027 ms |
ClickBench: Total hot run time: 32.95 s |
FE UT Coverage ReportIncrement line coverage |
FE Regression Coverage ReportIncrement line coverage |
|
PR approved by at least one committer and no changes requested. |
| default Statistics getStats() { | ||
| throw new IllegalStateException("Not support getStats for " + getClass().getName()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why add a default impl?
### What problem does this PR solve?
#### Current Issues:
1. When the children of an agg satisfy a distribution, redundant
PhysicalAggregates are generated.
2. count(distinct a, b) generates a three-stage gather: the first two
stages deduplicate a and b, the third stage gathers the data, and then
counts. (This modification avoids a direct gather by adding a local agg
before the gather.)
3. The original code did not use statistical information to determine
the different Aggregate strategies. This refactor incorporates
statistical information as a basis for selection:
3.1 Utilizes statistical information in the logic for determining
whether to use multi_distinct or CTE splitting: When the ndv of the
group by key is low and the ndv of the distinct key is high,
multi_distinct is advantageous.
3.2 In the single distinct scenario, statistical information is also
used to determine whether to use a three-stage or four-stage Aggregate.
When the ndv of the group by key is low, a four-stage Aggregate is used.
#### REWRITE PHASE
##### DistinctAggStrategySelector
Add the DistinctAggStrategySelector rule to handle scenarios with
multiple distincts. Use statistical information to determine when to use
CTE splitting or multidistinct.
After this rule is implemented, only single distinct scenarios need to
be considered.
##### DistinctAggregateRewriter
Add the DistinctAggregateSplitter rule to handle scenarios with a single
distinct. Determine whether to split or use multi-distinct. Distinguish
between scenarios with and without group by.
No group by: Not processed here. With group by: If splitting is possible
and statistical information indicates that multi-stage splitting is more
optimal, split.
So, after this is complete, cases with distinct without group by:
No processing, move to the cascades phase.
Cases with distinct with group by:
1. distinct is rewritten to multi-distinct,
2. it is split, leaving distinct.
3. if it is neither rewritten to multi-distinct nor split, then
splitting is performed directly into two, three, or four stages in the
cascades phase.
#### CASCADES STAGE
##### SplitAgg
Add a SplitAgg rule to handle scenarios without distinct.
1. Split the Agg into two layers: local and global.
This allows you to avoid splitting the Agg into two stages, for example,
in group_concat scenarios. (If not, shuffle first, then agg.)
2. This also implements a single-stage Agg.
##### SplitAggMultiPhaseWithoutGbyKey
ADD implementation rule: SplitAggMultiPhaseWithoutGbyKey
only process agg without group by key, and with one distinct function
provide there three rewrite:
```sql
select count(distinct a) from t
splitToThreePhase:
agg(count(a); distinct global)
+--gather
+--agg(count(a); distinct local)
+--agg(group by a; global)
+--hashShuffle(a)
splitToFourPhase:
agg(count(a); distinct global)
+--gather
+--agg(count(a); distinct local)
+--agg(group by a; global)
+--hashShuffle(a)
+--agg(group by a; local)
twoPhaseAggregateWithFinalMultiDistinct:
agg(sum0(c1))
+--gather
+--agg(multi_distinct(a) as c1)
+--hashShuffle(a)
```
splitToThreePhase and twoPhaseAggregateWithFinalMultiDistinct provide
plan when scan(or other plan) satisfy distributed expr
##### SplitAggMultiPhase
ADD implementation rule: SplitAggMultiPhase
only process agg with group by key, and with one distinct function
select count(distinct a) group by b (deduplicated agg hashShuffle by
group by key b)
```sql
splitToTwoPlusOnePhase:
agg(group by b, count(a); distinct global)
+--agg(group by a,b; global)
+--hashShuffle(b)
+--agg(group by a,b; local)
agg(group by b, count(a); distinct global)
+--agg(group by a,b; global)
+--hashShuffle(b)
splitToTwoPlusTwoPhase:
agg(group by b, count(a); distinct global)
+--hashShuffle(b)
+--agg(group by b, count(a); distinct local)
+--agg(group by a,b; global)
+--hashShuffle(a,b)
+--agg(group by a,b; local)
agg(group by b, count(a); distinct global)
+--hashShuffle(b)
+--agg(group by b, count(a); distinct local)
+--agg(group by a,b; global)
+--hashShuffle(a,b)
splitToOnePlusTwoPhase: (deduplicated agg hashShuffle by distinct key a)
agg(group by b, count(a); distinct global)
+--hashShuffle(b)
+--agg(group by b, count(a); distinct local)
+--agg(group by a,b; global)
+--hashShuffle(a)
```
The second plan of splitToTwoPlusOnePhase and splitToTwoPlusTwoPhase
provide plan when scan(or other plan) satisfy group by distributed expr;
splitToOnePlusTwoPhase provide plan when scan(or other plan) satisfy
distinct key distribution.
#### RequestPropertyDeriver
Implement visitPhysicalHashAggregate. The main logic is:
If the request received by the AGG is hash-distributed with the
distribution column set S1, and the request sent by the AGG is also
hash-distributed with the distribution column set S2,
and S = S2.intersect(S1), and the NDV of S1 is calculated, and if the
NDV of S1 is found to be no less than a threshold, then the AGG sends
the child a hash distribution based on S1.
Otherwise, the AGG sends the child a hash distribution based on S.
When directly generating a three-stage/four-stage AGG, the partitionby
columns are specified. If the partitionby columns are not empty, the
child is hash-distributed based on the partitionby columns.
#### ChildrenPropertiesRegulator
Disabled one-stage AGG: AGG-Distribute
Special handling for one-stage AGG with CTE:
AGG-Distribute-CTE.
If the Distribute option is Gather, the option is disabled.
If the Distribute option is HashShuffle, the option is determined based
on statistics.
If no statistics are available, the option is enabled.
If statistics are available but the shuffle column is skewed, the option
is disabled.
… have statistics (#55855) Related PR: #54079 Problem Summary: Using the multi-distinct strategy instead of multi-stage/CTE splitting strategy for AGG in scenarios without statistical information, consistent with the default strategy before refactor(#54079), can achieve better performance in scenarios without statistical information.
…epeat (#55907) Related PR: #54079 Problem Summary: There may be some issues in scenarios where the grouping() function is present. When merging a project, ```sql -- Upper level project output: projections : "GROUPING_PREFIX_publish_date#36 originExpression=Grouping(publish_date#6) AS `dim_207`#32" --Lower level project output: projections : "GROUP PING_PREVIX_publish_date # 79 AS ` GROUP PING_PREVIX_publish_date ` # 36" ``` In theory, it should be rewritten as `GROUPPING_PREVIX_publish_date# 79 AS dim_207# 32`, but there is a problem here: The map generated by the lower level project is: key : `GROUPING_PREFIX_publish_date#36 , value: GROUPING_PREFIX_publish_date#79 ` Dealing with upper level projects `GROUPING_PREFIX_publish_date#36 originExpression=Grouping(publish_date#6) AS dim_207#32` In the upper level project, the 'virtual slot' is used, and the 'key' in the map is' slot '. Although the ExprId is the same, it cannot be hit, resulting in an error. So temporarily disable the use of CTE splitting to handle grouping+multiple counts (distinctions) scenarios.
…d count distinct multi expr exists same time (#56271) ### What problem does this PR solve? Related PR: #54079 Problem Summary: 1. Added a check for multi_distinct_count(a,b) in the MultiDistinctCount constructor to prevent the use of multiple columns. Because BE doesn't report an error in this case, it only uses the first argument of the multi_distinct function, resulting in incorrect results. 2. In scenarios without a group by key, when multi_distinct_func and count(distinct a,b) appear together, the original code converts count(distinct a,b) to multi_distinct_count(a), resulting in incorrect results. The correct approach is to use multi-stage splitting when count distinct multi_expr appears. 3. Removed the mustUseMultiDistinct flag. This flag is useless.
…d count distinct multi expr exists same time (#56271) ### What problem does this PR solve? Related PR: #54079 Problem Summary: 1. Added a check for multi_distinct_count(a,b) in the MultiDistinctCount constructor to prevent the use of multiple columns. Because BE doesn't report an error in this case, it only uses the first argument of the multi_distinct function, resulting in incorrect results. 2. In scenarios without a group by key, when multi_distinct_func and count(distinct a,b) appear together, the original code converts count(distinct a,b) to multi_distinct_count(a), resulting in incorrect results. The correct approach is to use multi-stage splitting when count distinct multi_expr appears. 3. Removed the mustUseMultiDistinct flag. This flag is useless.
What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
Current Issues:
3.1 Utilizes statistical information in the logic for determining whether to use multi_distinct or CTE splitting: When the ndv of the group by key is low and the ndv of the distinct key is high, multi_distinct is advantageous.
3.2 In the single distinct scenario, statistical information is also used to determine whether to use a three-stage or four-stage Aggregate. When the ndv of the group by key is low, a four-stage Aggregate is used.
REWRITE PHASE
DistinctAggStrategySelector
Add the DistinctAggStrategySelector rule to handle scenarios with multiple distincts. Use statistical information to determine when to use CTE splitting or multidistinct.
After this rule is implemented, only single distinct scenarios need to be considered.
DistinctAggregateRewriter
Add the DistinctAggregateSplitter rule to handle scenarios with a single distinct. Determine whether to split or use multi-distinct. Distinguish between scenarios with and without group by.
No group by: Not processed here. With group by: If splitting is possible and statistical information indicates that multi-stage splitting is more optimal, split.
So, after this is complete, cases with distinct without group by:
No processing, move to the cascades phase.
Cases with distinct with group by:
CASCADES STAGE
SplitAgg
Add a SplitAgg rule to handle scenarios without distinct.
This allows you to avoid splitting the Agg into two stages, for example, in group_concat scenarios. (If not, shuffle first, then agg.)
SplitAggMultiPhaseWithoutGbyKey
ADD implementation rule: SplitAggMultiPhaseWithoutGbyKey
only process agg without group by key, and with one distinct function
provide there three rewrite:
splitToThreePhase and twoPhaseAggregateWithFinalMultiDistinct provide plan when scan(or other plan) satisfy distributed expr
SplitAggMultiPhase
ADD implementation rule: SplitAggMultiPhase
only process agg with group by key, and with one distinct function
select count(distinct a) group by b (deduplicated agg hashShuffle by group by key b)
The second plan of splitToTwoPlusOnePhase and splitToTwoPlusTwoPhase provide plan when scan(or other plan) satisfy group by distributed expr;
splitToOnePlusTwoPhase provide plan when scan(or other plan) satisfy distinct key distribution.
RequestPropertyDeriver
Implement visitPhysicalHashAggregate. The main logic is:
If the request received by the AGG is hash-distributed with the distribution column set S1, and the request sent by the AGG is also hash-distributed with the distribution column set S2,
and S = S2.intersect(S1), and the NDV of S1 is calculated, and if the NDV of S1 is found to be no less than a threshold, then the AGG sends the child a hash distribution based on S1.
Otherwise, the AGG sends the child a hash distribution based on S.
When directly generating a three-stage/four-stage AGG, the partitionby columns are specified. If the partitionby columns are not empty, the child is hash-distributed based on the partitionby columns.
ChildrenPropertiesRegulator
Disabled one-stage AGG: AGG-Distribute
Special handling for one-stage AGG with CTE:
AGG-Distribute-CTE.
If the Distribute option is Gather, the option is disabled.
If the Distribute option is HashShuffle, the option is determined based on statistics.
If no statistics are available, the option is enabled.
If statistics are available but the shuffle column is skewed, the option is disabled.
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)