Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: implement aggregation eliminate optimize trace #30114

Merged
merged 20 commits into from
Nov 29, 2021
15 changes: 10 additions & 5 deletions expression/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ func (ki KeyInfo) Clone() KeyInfo {
return result
}

// String implements fmt.Stringer interface.
func (ki KeyInfo) String() string {
time-and-fate marked this conversation as resolved.
Show resolved Hide resolved
ukColStrs := make([]string, 0, len(ki))
for _, col := range ki {
ukColStrs = append(ukColStrs, col.String())
}
return "[" + strings.Join(ukColStrs, ",") + "]"
}

// Schema stands for the row schema and unique key information get from input.
type Schema struct {
Columns []*Column
Expand All @@ -47,11 +56,7 @@ func (s *Schema) String() string {
}
ukStrs := make([]string, 0, len(s.Keys))
for _, key := range s.Keys {
ukColStrs := make([]string, 0, len(key))
for _, col := range key {
ukColStrs = append(ukColStrs, col.String())
}
ukStrs = append(ukStrs, "["+strings.Join(ukColStrs, ",")+"]")
ukStrs = append(ukStrs, key.String())
}
return "Column: [" + strings.Join(colStrs, ",") + "] Unique key: [" + strings.Join(ukStrs, ",") + "]"
}
Expand Down
19 changes: 19 additions & 0 deletions planner/core/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tidb/util/tracing"
"github.com/pingcap/tipb/go-tipb"
)

Expand Down Expand Up @@ -890,11 +891,23 @@ func (p *LogicalAggregation) ExplainInfo() string {
return buffer.String()
}

func (p *LogicalAggregation) buildLogicalPlanTrace() *tracing.LogicalPlanTrace {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we going to have different implementations for different logical plans? If not, can we align them to one func?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to attach explain info in trace, thus it should be implemented by each logical operator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExplainInfo is also defined as an interface, what about "func buildLogicalPlanTrace(p *baseLogicalPlan) {...}"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

lpt := p.baseLogicalPlan.buildLogicalPlanTrace()
lpt.ExplainInfo = p.ExplainInfo()
return lpt
}

// ExplainInfo implements Plan interface.
func (p *LogicalProjection) ExplainInfo() string {
return expression.ExplainExpressionList(p.Exprs, p.schema)
}

func (p *LogicalProjection) buildLogicalPlanTrace() *tracing.LogicalPlanTrace {
lpt := p.baseLogicalPlan.buildLogicalPlanTrace()
lpt.ExplainInfo = p.ExplainInfo()
return lpt
}

// ExplainInfo implements Plan interface.
func (p *LogicalSelection) ExplainInfo() string {
return string(expression.SortedExplainExpressionList(p.Conditions))
Expand Down Expand Up @@ -930,6 +943,12 @@ func (ds *DataSource) ExplainInfo() string {
return buffer.String()
}

func (ds *DataSource) buildLogicalPlanTrace() *tracing.LogicalPlanTrace {
lpt := ds.baseLogicalPlan.buildLogicalPlanTrace()
lpt.ExplainInfo = ds.ExplainInfo()
return lpt
}

// ExplainInfo implements Plan interface.
func (p *PhysicalExchangeSender) ExplainInfo() string {
buffer := bytes.NewBufferString("ExchangeType: ")
Expand Down
55 changes: 0 additions & 55 deletions planner/core/logical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2071,58 +2071,3 @@ func (s *testPlanSuite) TestWindowLogicalPlanAmbiguous(c *C) {
}
}
}

func (s *testPlanSuite) TestLogicalOptimizeWithTraceEnabled(c *C) {
sql := "select * from t where a in (1,2)"
defer testleak.AfterTest(c)()
tt := []struct {
flags []uint64
steps int
}{
{
flags: []uint64{
flagEliminateAgg,
flagPushDownAgg},
steps: 2,
},
{
flags: []uint64{
flagEliminateAgg,
flagPushDownAgg,
flagPrunColumns,
flagBuildKeyInfo,
},
steps: 4,
},
{
flags: []uint64{},
steps: 0,
},
}

for i, tc := range tt {
comment := Commentf("case:%v sql:%s", i, sql)
stmt, err := s.ParseOneStmt(sql, "", "")
c.Assert(err, IsNil, comment)
err = Preprocess(s.ctx, stmt, WithPreprocessorReturn(&PreprocessorReturn{InfoSchema: s.is}))
c.Assert(err, IsNil, comment)
sctx := MockContext()
sctx.GetSessionVars().EnableStmtOptimizeTrace = true
builder, _ := NewPlanBuilder().Init(sctx, s.is, &hint.BlockHintProcessor{})
domain.GetDomain(sctx).MockInfoCacheAndLoadInfoSchema(s.is)
ctx := context.TODO()
p, err := builder.Build(ctx, stmt)
c.Assert(err, IsNil)
flag := uint64(0)
for _, f := range tc.flags {
flag = flag | f
}
p, err = logicalOptimize(ctx, flag, p.(LogicalPlan))
c.Assert(err, IsNil)
_, ok := p.(*LogicalProjection)
c.Assert(ok, IsTrue)
otrace := sctx.GetSessionVars().StmtCtx.LogicalOptimizeTrace
c.Assert(otrace, NotNil)
c.Assert(len(otrace.Steps), Equals, tc.steps)
}
}
147 changes: 147 additions & 0 deletions planner/core/logical_plan_trace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright 2021 PingCAP, Inc.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to move all the optimize trace test in a dedicated file.

//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package core

import (
"context"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/util/hint"
"github.com/pingcap/tidb/util/testleak"
)

func (s *testPlanSuite) TestLogicalOptimizeWithTraceEnabled(c *C) {
sql := "select * from t where a in (1,2)"
defer testleak.AfterTest(c)()
tt := []struct {
flags []uint64
steps int
}{
{
flags: []uint64{
flagEliminateAgg,
flagPushDownAgg},
steps: 2,
},
{
flags: []uint64{
flagEliminateAgg,
flagPushDownAgg,
flagPrunColumns,
flagBuildKeyInfo,
},
steps: 4,
},
{
flags: []uint64{},
steps: 0,
},
}

for i, tc := range tt {
comment := Commentf("case:%v sql:%s", i, sql)
stmt, err := s.ParseOneStmt(sql, "", "")
c.Assert(err, IsNil, comment)
err = Preprocess(s.ctx, stmt, WithPreprocessorReturn(&PreprocessorReturn{InfoSchema: s.is}))
c.Assert(err, IsNil, comment)
sctx := MockContext()
sctx.GetSessionVars().EnableStmtOptimizeTrace = true
builder, _ := NewPlanBuilder().Init(sctx, s.is, &hint.BlockHintProcessor{})
domain.GetDomain(sctx).MockInfoCacheAndLoadInfoSchema(s.is)
ctx := context.TODO()
p, err := builder.Build(ctx, stmt)
c.Assert(err, IsNil)
flag := uint64(0)
for _, f := range tc.flags {
flag = flag | f
}
p, err = logicalOptimize(ctx, flag, p.(LogicalPlan))
c.Assert(err, IsNil)
_, ok := p.(*LogicalProjection)
c.Assert(ok, IsTrue)
otrace := sctx.GetSessionVars().StmtCtx.LogicalOptimizeTrace
c.Assert(otrace, NotNil)
c.Assert(len(otrace.Steps), Equals, tc.steps)
}
}

func (s *testPlanSuite) TestSingleRuleTraceStep(c *C) {
defer testleak.AfterTest(c)()
tt := []struct {
sql string
flags []uint64
assertRuleName string
assertRuleSteps []assertTraceStep
}{
{
sql: "select min(distinct a) from t group by a",
flags: []uint64{flagBuildKeyInfo, flagEliminateAgg},
assertRuleName: "aggregation_eliminate",
assertRuleSteps: []assertTraceStep{
{
assertReason: "[test.t.a] is a unique key",
assertAction: "min(distinct ...) is simplified to min(...)",
},
{
assertReason: "[test.t.a] is a unique key",
assertAction: "aggregation is simplified to a projection",
},
},
},
}

for i, tc := range tt {
sql := tc.sql
comment := Commentf("case:%v sql:%s", i, sql)
stmt, err := s.ParseOneStmt(sql, "", "")
c.Assert(err, IsNil, comment)
err = Preprocess(s.ctx, stmt, WithPreprocessorReturn(&PreprocessorReturn{InfoSchema: s.is}))
c.Assert(err, IsNil, comment)
sctx := MockContext()
sctx.GetSessionVars().EnableStmtOptimizeTrace = true
builder, _ := NewPlanBuilder().Init(sctx, s.is, &hint.BlockHintProcessor{})
domain.GetDomain(sctx).MockInfoCacheAndLoadInfoSchema(s.is)
ctx := context.TODO()
p, err := builder.Build(ctx, stmt)
c.Assert(err, IsNil)
flag := uint64(0)
for _, f := range tc.flags {
flag = flag | f
}
p, err = logicalOptimize(ctx, flag, p.(LogicalPlan))
c.Assert(err, IsNil)
_, ok := p.(*LogicalProjection)
c.Assert(ok, IsTrue)
otrace := sctx.GetSessionVars().StmtCtx.LogicalOptimizeTrace
c.Assert(otrace, NotNil)
assert := false
for _, step := range otrace.Steps {
if step.RuleName == tc.assertRuleName {
assert = true
for i, ruleStep := range step.Steps {
c.Assert(ruleStep.Action, Equals, tc.assertRuleSteps[i].assertAction)
c.Assert(ruleStep.Reason, Equals, tc.assertRuleSteps[i].assertReason)
}
}
}
c.Assert(assert, IsTrue)
}
}

type assertTraceStep struct {
assertReason string
assertAction string
}
29 changes: 25 additions & 4 deletions planner/core/rule_aggregation_elimination.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package core

import (
"context"
"fmt"
"math"

"github.com/pingcap/tidb/expression"
Expand All @@ -37,7 +38,7 @@ type aggregationEliminateChecker struct {
// e.g. select min(b) from t group by a. If a is a unique key, then this sql is equal to `select b from t group by a`.
// For count(expr), sum(expr), avg(expr), count(distinct expr, [expr...]) we may need to rewrite the expr. Details are shown below.
// If we can eliminate agg successful, we return a projection. Else we return a nil pointer.
func (a *aggregationEliminateChecker) tryToEliminateAggregation(agg *LogicalAggregation) *LogicalProjection {
func (a *aggregationEliminateChecker) tryToEliminateAggregation(agg *LogicalAggregation, opt *logicalOptimizeOp) *LogicalProjection {
for _, af := range agg.AggFuncs {
// TODO(issue #9968): Actually, we can rewrite GROUP_CONCAT when all the
// arguments it accepts are promised to be NOT-NULL.
Expand All @@ -54,22 +55,27 @@ func (a *aggregationEliminateChecker) tryToEliminateAggregation(agg *LogicalAggr
}
schemaByGroupby := expression.NewSchema(agg.GetGroupByCols()...)
coveredByUniqueKey := false
var uniqueKey expression.KeyInfo
for _, key := range agg.children[0].Schema().Keys {
if schemaByGroupby.ColumnsIndices(key) != nil {
coveredByUniqueKey = true
uniqueKey = key
break
}
}
if coveredByUniqueKey {
// GroupByCols has unique key, so this aggregation can be removed.
if ok, proj := ConvertAggToProj(agg, agg.schema); ok {
proj.SetChildren(agg.children[0])
appendAggregationEliminateTraceStep(agg, uniqueKey, opt)
return proj
}
}
return nil
}

// tryToEliminateDistinct will eliminate distinct in the aggregation function if the aggregation args
// have unique key column. see detail example in https://github.com/pingcap/tidb/issues/23436
func (a *aggregationEliminateChecker) tryToEliminateDistinct(agg *LogicalAggregation, opt *logicalOptimizeOp) {
for _, af := range agg.AggFuncs {
if af.HasDistinct {
Expand All @@ -86,28 +92,43 @@ func (a *aggregationEliminateChecker) tryToEliminateDistinct(agg *LogicalAggrega
if canEliminate {
distinctByUniqueKey := false
schemaByDistinct := expression.NewSchema(cols...)
var uniqueKey expression.KeyInfo
for _, key := range agg.children[0].Schema().Keys {
if schemaByDistinct.ColumnsIndices(key) != nil {
distinctByUniqueKey = true
uniqueKey = key
break
}
}
for _, key := range agg.children[0].Schema().UniqueKeys {
if schemaByDistinct.ColumnsIndices(key) != nil {
distinctByUniqueKey = true
uniqueKey = key
break
}
}
if distinctByUniqueKey {
af.HasDistinct = false
// TODO: fulfill in future pr
opt.appendStepToCurrent(agg.ID(), agg.TP(), "", "")
appendDistinctEliminateTraceStep(agg, uniqueKey, af, opt)
}
}
}
}
}

func appendAggregationEliminateTraceStep(agg *LogicalAggregation, uniqueKey expression.KeyInfo, opt *logicalOptimizeOp) {
opt.appendStepToCurrent(agg.ID(), agg.TP(),
fmt.Sprintf("%s is a unique key", uniqueKey.String()),
"aggregation is simplified to a projection")
}

func appendDistinctEliminateTraceStep(agg *LogicalAggregation, uniqueKey expression.KeyInfo, af *aggregation.AggFuncDesc,
opt *logicalOptimizeOp) {
opt.appendStepToCurrent(agg.ID(), agg.TP(),
fmt.Sprintf("%s is a unique key", uniqueKey.String()),
fmt.Sprintf("%s(distinct ...) is simplified to %s(...)", af.Name, af.Name))
}

// ConvertAggToProj convert aggregation to projection.
func ConvertAggToProj(agg *LogicalAggregation, schema *expression.Schema) (bool, *LogicalProjection) {
proj := LogicalProjection{
Expand Down Expand Up @@ -196,7 +217,7 @@ func (a *aggregationEliminator) optimize(ctx context.Context, p LogicalPlan, opt
return p, nil
}
a.tryToEliminateDistinct(agg, opt)
if proj := a.tryToEliminateAggregation(agg); proj != nil {
if proj := a.tryToEliminateAggregation(agg, opt); proj != nil {
return proj, nil
}
return p, nil
Expand Down
Loading