-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
*: Support stream aggregation in new plan #4481
Conversation
@@ -380,7 +380,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderSubquery(c *C) { | |||
// Test Nested sub query. | |||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add test for stream agg + index join and stream agg + merge join and stream agg + limit and stream agg + sort
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
plan/logical_plans.go
Outdated
// groupByCols stores the columns that are group-by items. | ||
groupByCols []*expression.Column | ||
|
||
possibleProperties [][]*expression.Column | ||
childCount float64 // childCount is the child plan's count. | ||
cardinality float64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cardinality is always equal to LogicalAggregation's profile's count.
plan/logical_plans.go
Outdated
// groupByCols stores the columns that are group-by items. | ||
groupByCols []*expression.Column | ||
|
||
possibleProperties [][]*expression.Column | ||
childCount float64 // childCount is the child plan's count. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe inputCount
is a better name. childCount seems to indicate the number of children.
plan/new_physical_plan_builder.go
Outdated
@@ -1090,26 +1090,80 @@ func (p *TopN) getChildrenPossibleProps(prop *requiredProp) [][]*requiredProp { | |||
return props | |||
} | |||
|
|||
func (p *LogicalAggregation) getStreamAggs() []PhysicalPlan { | |||
for _, aggFunc := range p.AggFuncs { | |||
if aggFunc.GetMode() == expression.FinalMode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code seems never be touched ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do decompose
will have FinalMode
aggregation.
return nil | ||
} | ||
} | ||
// group by a + b is not interested in any order. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add test for this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
plan/task.go
Outdated
} | ||
} | ||
task = finishCopTask(cop, p.ctx, p.allocator) | ||
task.addCost(task.count()*cpuFactor + p.cardinality*hashAggMemFactor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use p.statsProfile().count to replace p.cardinality
plan/stats.go
Outdated
@@ -81,7 +81,6 @@ func (p *DataSource) getStatsProfileByFilter(conds expression.CNFExprs) *statsPr | |||
} | |||
selectivity, err := p.statisticTable.Selectivity(p.ctx, conds) | |||
if err != nil { | |||
log.Warnf("An error happened: %v, we have to use the default selectivity", err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why remove this log ?
plan/new_physical_plan_builder.go
Outdated
|
||
reqProp := &requiredProp{taskTp: rootTaskType, cols: p.propKeys, expectedCnt: prop.expectedCnt * p.childCount / p.profile.count} | ||
if !prop.isEmpty() { | ||
if prop.desc { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Desc can also pass
plan/new_physical_plan_builder.go
Outdated
agg.SetSchema(p.schema.Clone()) | ||
agg.profile = p.profile | ||
aggs = append(aggs, agg) | ||
if len(p.possibleProperties) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can remove this check.
plan/new_physical_plan_builder.go
Outdated
func (p *LogicalAggregation) generatePhysicalPlans() []PhysicalPlan { | ||
ha := PhysicalAggregation{ | ||
aggs := make([]PhysicalPlan, 0, 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 is not reasonable, len(p.possibleProperties) + 1 is the upper bound.
@hanfei1991 PTAL |
}, | ||
{ | ||
sql: "select count(*) from t where e > 1 group by b", | ||
best: "TableReader(Table(t)->Sel([gt(test.t.e, 1)])->HashAgg)->HashAgg", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use index on e for filtering or use index on b for streaming aggregation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using index will cause a double read.
LGTM |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rest LGTM
best: "IndexLookUp(Index(t.b_c)[[-inf <nil>,20 +inf]], Table(t)->HashAgg)->HashAgg", | ||
}, | ||
{ | ||
sql: "select count(e) from t where t.b <= 30", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the case that t.b <= 30
and t.b <= 40
is not necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are old tests. @hanfei1991
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh
/run-all-test |
PTAL @winoros |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
support stream aggregation in the new plan and clean the tests.