Skip to content

Commit

Permalink
planner, expression: pushdown AggFuncMode to coprocessor (pingcap#31392
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Jul 13, 2022
1 parent 8c3a06d commit 124887c
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 29 deletions.
45 changes: 42 additions & 3 deletions expression/aggregation/agg_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,48 @@ func AggFuncToPBExpr(sctx sessionctx.Context, client kv.Client, aggFunc *AggFunc
sc.AppendWarning(errors.Errorf("Error happened when buildGroupConcat: %s", err.Error()))
return nil
}
return &tipb.Expr{Tp: tp, Val: codec.EncodeUint(nil, maxLen), Children: children, FieldType: expression.ToPBFieldType(aggFunc.RetTp), HasDistinct: aggFunc.HasDistinct, OrderBy: orderBy}
return &tipb.Expr{Tp: tp, Val: codec.EncodeUint(nil, maxLen), Children: children, FieldType: expression.ToPBFieldType(aggFunc.RetTp), HasDistinct: aggFunc.HasDistinct, OrderBy: orderBy, AggFuncMode: AggFunctionModeToPB(aggFunc.Mode)}
}
return &tipb.Expr{Tp: tp, Children: children, FieldType: expression.ToPBFieldType(aggFunc.RetTp), HasDistinct: aggFunc.HasDistinct}
return &tipb.Expr{Tp: tp, Children: children, FieldType: expression.ToPBFieldType(aggFunc.RetTp), HasDistinct: aggFunc.HasDistinct, AggFuncMode: AggFunctionModeToPB(aggFunc.Mode)}
}

// AggFunctionModeToPB converts aggregate function mode to PB.
func AggFunctionModeToPB(mode AggFunctionMode) (pbMode *tipb.AggFunctionMode) {
pbMode = new(tipb.AggFunctionMode)
switch mode {
case CompleteMode:
*pbMode = tipb.AggFunctionMode_CompleteMode
case FinalMode:
*pbMode = tipb.AggFunctionMode_FinalMode
case Partial1Mode:
*pbMode = tipb.AggFunctionMode_Partial1Mode
case Partial2Mode:
*pbMode = tipb.AggFunctionMode_Partial2Mode
case DedupMode:
*pbMode = tipb.AggFunctionMode_DedupMode
}
return pbMode
}

// PBAggFuncModeToAggFuncMode converts pb to aggregate function mode.
func PBAggFuncModeToAggFuncMode(pbMode *tipb.AggFunctionMode) (mode AggFunctionMode) {
// Default mode of the aggregate function is PartialMode.
mode = Partial1Mode
if pbMode != nil {
switch *pbMode {
case tipb.AggFunctionMode_CompleteMode:
mode = CompleteMode
case tipb.AggFunctionMode_FinalMode:
mode = FinalMode
case tipb.AggFunctionMode_Partial1Mode:
mode = Partial1Mode
case tipb.AggFunctionMode_Partial2Mode:
mode = Partial2Mode
case tipb.AggFunctionMode_DedupMode:
mode = DedupMode
}
}
return mode
}

// PBExprToAggFuncDesc converts pb to aggregate function.
Expand Down Expand Up @@ -149,7 +188,7 @@ func PBExprToAggFuncDesc(ctx sessionctx.Context, aggFunc *tipb.Expr, fieldTps []
base.WrapCastForAggArgs(ctx)
return &AggFuncDesc{
baseFuncDesc: base,
Mode: Partial1Mode,
Mode: PBAggFuncModeToAggFuncMode(aggFunc.AggFuncMode),
HasDistinct: false,
}, nil
}
14 changes: 7 additions & 7 deletions expression/aggregation/agg_to_pb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ func TestAggFunc2Pb(t *testing.T) {
}

jsons := []string{
`{"tp":3002,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":%v}`,
`{"tp":3001,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":false}],"sig":0,"field_type":{"tp":8,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":%v}`,
`{"tp":3003,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":%v}`,
`{"tp":3007,"val":"AAAAAAAABAA=","children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":false}],"sig":0,"field_type":{"tp":15,"flag":0,"flen":-1,"decimal":-1,"collate":46,"charset":"utf8mb4"},"has_distinct":%v}`,
`{"tp":3005,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":%v}`,
`{"tp":3004,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":%v}`,
`{"tp":3006,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":%v}`,
`{"tp":3002,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":%v,"aggFuncMode":0}`,
`{"tp":3001,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":false}],"sig":0,"field_type":{"tp":8,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":%v,"aggFuncMode":0}`,
`{"tp":3003,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":%v,"aggFuncMode":0}`,
`{"tp":3007,"val":"AAAAAAAABAA=","children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":false}],"sig":0,"field_type":{"tp":15,"flag":0,"flen":-1,"decimal":-1,"collate":46,"charset":"utf8mb4"},"has_distinct":%v,"aggFuncMode":0}`,
`{"tp":3005,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":%v,"aggFuncMode":0}`,
`{"tp":3004,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":%v,"aggFuncMode":0}`,
`{"tp":3006,"children":[{"tp":201,"val":"gAAAAAAAAAE=","sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":false}],"sig":0,"field_type":{"tp":5,"flag":0,"flen":-1,"decimal":-1,"collate":63,"charset":"binary"},"has_distinct":%v,"aggFuncMode":0}`,
}
for i, funcName := range funcNames {
for _, hasDistinct := range []bool{true, false} {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ require (
github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d
github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible
github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e
github.com/pingcap/tipb v0.0.0-20220107024056-3b91949a18a7
github.com/pingcap/tipb v0.0.0-20220704075611-01674e683e38
github.com/prometheus/client_golang v1.11.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.26.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -660,8 +660,8 @@ github.com/pingcap/tidb-dashboard v0.0.0-20211008050453-a25c25809529/go.mod h1:O
github.com/pingcap/tidb-dashboard v0.0.0-20211107164327-80363dfbe884/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ=
github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible h1:c7+izmker91NkjkZ6FgTlmD4k1A5FLOAq+li6Ki2/GY=
github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20220107024056-3b91949a18a7 h1:DHU4vw0o15qdKsf7d/Pyhun4YtX8FwoDQxG0plPByUg=
github.com/pingcap/tipb v0.0.0-20220107024056-3b91949a18a7/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pingcap/tipb v0.0.0-20220704075611-01674e683e38 h1:3/gxQYPaiDNRZ0pgfZtpQSBk400aLz6MnX9RaG1gaxE=
github.com/pingcap/tipb v0.0.0-20220704075611-01674e683e38/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 h1:49lOXmGaUpV9Fz3gd7TFZY106KVlPVa5jcYD1gaQf98=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
4 changes: 2 additions & 2 deletions planner/cascades/transformation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func (r *PushAggDownGather) OnTransform(old *memo.ExprIter) (newExprs []*memo.Gr
gbyItems := make([]expression.Expression, len(agg.GroupByItems))
copy(gbyItems, agg.GroupByItems)

partialPref, finalPref, funcMap := plannercore.BuildFinalModeAggregation(agg.SCtx(),
partialPref, finalPref, firstRowFuncMap := plannercore.BuildFinalModeAggregation(agg.SCtx(),
&plannercore.AggInfo{
AggFuncs: aggFuncs,
GroupByItems: gbyItems,
Expand All @@ -453,7 +453,7 @@ func (r *PushAggDownGather) OnTransform(old *memo.ExprIter) (newExprs []*memo.Gr
}
// Remove unnecessary FirstRow.
partialPref.AggFuncs =
plannercore.RemoveUnnecessaryFirstRow(agg.SCtx(), finalPref.AggFuncs, finalPref.GroupByItems, partialPref.AggFuncs, partialPref.GroupByItems, partialPref.Schema, funcMap)
plannercore.RemoveUnnecessaryFirstRow(agg.SCtx(), finalPref.GroupByItems, partialPref.AggFuncs, partialPref.GroupByItems, partialPref.Schema, firstRowFuncMap)

partialAgg := plannercore.LogicalAggregation{
AggFuncs: partialPref.AggFuncs,
Expand Down
154 changes: 154 additions & 0 deletions planner/core/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,24 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/planner/util"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
kit "github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/israce"
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/testutil"
"github.com/stretchr/testify/require"
)

var _ = Suite(&testPlanNormalize{})
Expand Down Expand Up @@ -703,3 +711,149 @@ func TestCopPaging(t *testing.T) {
" └─TableRowIDScan 819.20 cop[tikv] table:t keep order:false"))
}
}

func TestBuildFinalModeAggregation(t *testing.T) {
aggSchemaBuilder := func(sctx sessionctx.Context, aggFuncs []*aggregation.AggFuncDesc) *expression.Schema {
schema := expression.NewSchema(make([]*expression.Column, 0, len(aggFuncs))...)
for _, agg := range aggFuncs {
newCol := &expression.Column{
UniqueID: sctx.GetSessionVars().AllocPlanColumnID(),
RetType: agg.RetTp,
}
schema.Append(newCol)
}
return schema
}
isFinalAggMode := func(mode aggregation.AggFunctionMode) bool {
return mode == aggregation.FinalMode || mode == aggregation.CompleteMode
}
checkResult := func(sctx sessionctx.Context, aggFuncs []*aggregation.AggFuncDesc, groubyItems []expression.Expression) {
for partialIsCop := 0; partialIsCop < 2; partialIsCop++ {
for isMPPTask := 0; isMPPTask < 2; isMPPTask++ {
partial, final, _ := core.BuildFinalModeAggregation(sctx, &core.AggInfo{
AggFuncs: aggFuncs,
GroupByItems: groubyItems,
Schema: aggSchemaBuilder(sctx, aggFuncs),
}, partialIsCop == 0, isMPPTask == 0)
if partial != nil {
for _, aggFunc := range partial.AggFuncs {
if partialIsCop == 0 {
require.True(t, !isFinalAggMode(aggFunc.Mode))
} else {
require.True(t, isFinalAggMode(aggFunc.Mode))
}
}
}
if final != nil {
for _, aggFunc := range final.AggFuncs {
require.True(t, isFinalAggMode(aggFunc.Mode))
}
}
}
}
}

ctx := core.MockContext()

aggCol := &expression.Column{
Index: 0,
RetType: types.NewFieldType(mysql.TypeLonglong),
}
gbyCol := &expression.Column{
Index: 1,
RetType: types.NewFieldType(mysql.TypeLonglong),
}
orderCol := &expression.Column{
Index: 2,
RetType: types.NewFieldType(mysql.TypeLonglong),
}

emptyGroupByItems := make([]expression.Expression, 0, 1)
groupByItems := make([]expression.Expression, 0, 1)
groupByItems = append(groupByItems, gbyCol)

orderByItems := make([]*util.ByItems, 0, 1)
orderByItems = append(orderByItems, &util.ByItems{
Expr: orderCol,
Desc: true,
})

aggFuncs := make([]*aggregation.AggFuncDesc, 0, 5)
desc, err := aggregation.NewAggFuncDesc(ctx, ast.AggFuncMax, []expression.Expression{aggCol}, false)
require.NoError(t, err)
aggFuncs = append(aggFuncs, desc)
desc, err = aggregation.NewAggFuncDesc(ctx, ast.AggFuncFirstRow, []expression.Expression{aggCol}, false)
require.NoError(t, err)
aggFuncs = append(aggFuncs, desc)
desc, err = aggregation.NewAggFuncDesc(ctx, ast.AggFuncCount, []expression.Expression{aggCol}, false)
require.NoError(t, err)
aggFuncs = append(aggFuncs, desc)
desc, err = aggregation.NewAggFuncDesc(ctx, ast.AggFuncSum, []expression.Expression{aggCol}, false)
require.NoError(t, err)
aggFuncs = append(aggFuncs, desc)
desc, err = aggregation.NewAggFuncDesc(ctx, ast.AggFuncAvg, []expression.Expression{aggCol}, false)
require.NoError(t, err)
aggFuncs = append(aggFuncs, desc)

aggFuncsWithDistinct := make([]*aggregation.AggFuncDesc, 0, 2)
desc, err = aggregation.NewAggFuncDesc(ctx, ast.AggFuncAvg, []expression.Expression{aggCol}, true)
require.NoError(t, err)
aggFuncsWithDistinct = append(aggFuncsWithDistinct, desc)
desc, err = aggregation.NewAggFuncDesc(ctx, ast.AggFuncCount, []expression.Expression{aggCol}, true)
require.NoError(t, err)
aggFuncsWithDistinct = append(aggFuncsWithDistinct, desc)

groupConcatAggFuncs := make([]*aggregation.AggFuncDesc, 0, 4)
groupConcatWithoutDistinctWithoutOrderBy, err := aggregation.NewAggFuncDesc(ctx, ast.AggFuncGroupConcat, []expression.Expression{aggCol, aggCol}, false)
require.NoError(t, err)
groupConcatAggFuncs = append(groupConcatAggFuncs, groupConcatWithoutDistinctWithoutOrderBy)
groupConcatWithoutDistinctWithOrderBy, err := aggregation.NewAggFuncDesc(ctx, ast.AggFuncGroupConcat, []expression.Expression{aggCol, aggCol}, false)
require.NoError(t, err)
groupConcatWithoutDistinctWithOrderBy.OrderByItems = orderByItems
groupConcatAggFuncs = append(groupConcatAggFuncs, groupConcatWithoutDistinctWithOrderBy)
groupConcatWithDistinctWithoutOrderBy, err := aggregation.NewAggFuncDesc(ctx, ast.AggFuncGroupConcat, []expression.Expression{aggCol, aggCol}, true)
require.NoError(t, err)
groupConcatAggFuncs = append(groupConcatAggFuncs, groupConcatWithDistinctWithoutOrderBy)
groupConcatWithDistinctWithOrderBy, err := aggregation.NewAggFuncDesc(ctx, ast.AggFuncGroupConcat, []expression.Expression{aggCol, aggCol}, true)
require.NoError(t, err)
groupConcatWithDistinctWithOrderBy.OrderByItems = orderByItems
groupConcatAggFuncs = append(groupConcatAggFuncs, groupConcatWithDistinctWithOrderBy)

// case 1 agg without distinct
checkResult(ctx, aggFuncs, emptyGroupByItems)
checkResult(ctx, aggFuncs, groupByItems)

// case 2 agg with distinct
checkResult(ctx, aggFuncsWithDistinct, emptyGroupByItems)
checkResult(ctx, aggFuncsWithDistinct, groupByItems)

// case 3 mixed with distinct and without distinct
mixedAggFuncs := make([]*aggregation.AggFuncDesc, 0, 10)
mixedAggFuncs = append(mixedAggFuncs, aggFuncs...)
mixedAggFuncs = append(mixedAggFuncs, aggFuncsWithDistinct...)
checkResult(ctx, mixedAggFuncs, emptyGroupByItems)
checkResult(ctx, mixedAggFuncs, groupByItems)

// case 4 group concat
for _, groupConcatAggFunc := range groupConcatAggFuncs {
checkResult(ctx, []*aggregation.AggFuncDesc{groupConcatAggFunc}, emptyGroupByItems)
checkResult(ctx, []*aggregation.AggFuncDesc{groupConcatAggFunc}, groupByItems)
}
checkResult(ctx, groupConcatAggFuncs, emptyGroupByItems)
checkResult(ctx, groupConcatAggFuncs, groupByItems)

// case 5 mixed group concat and other agg funcs
for _, groupConcatAggFunc := range groupConcatAggFuncs {
funcs := make([]*aggregation.AggFuncDesc, 0, 10)
funcs = append(funcs, groupConcatAggFunc)
funcs = append(funcs, aggFuncs...)
checkResult(ctx, funcs, emptyGroupByItems)
checkResult(ctx, funcs, groupByItems)
funcs = append(funcs, aggFuncsWithDistinct...)
checkResult(ctx, funcs, emptyGroupByItems)
checkResult(ctx, funcs, groupByItems)
}
mixedAggFuncs = append(mixedAggFuncs, groupConcatAggFuncs...)
checkResult(ctx, mixedAggFuncs, emptyGroupByItems)
checkResult(ctx, mixedAggFuncs, groupByItems)
}
Loading

0 comments on commit 124887c

Please sign in to comment.