Skip to content

Commit

Permalink
*: add tiflash_hashagg_preaggregation_mode sys var (#54186)
Browse files Browse the repository at this point in the history
close #54867
  • Loading branch information
guo-shaoge authored Aug 8, 2024
1 parent 99857e3 commit 0cfa66f
Show file tree
Hide file tree
Showing 13 changed files with 114 additions and 14 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -6023,13 +6023,13 @@ def go_deps():
name = "com_github_pingcap_tipb",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/tipb",
sha256 = "42c365f3f99d2577fe29c89d433894b2d0d69b054248bb3bafbced33332933e3",
strip_prefix = "github.com/pingcap/tipb@v0.0.0-20240318032315-55a7867ddd50",
sha256 = "6e910c9689f1a81bad2ae55be1746d456c317d696ff2687390d3fb30f7d05c6d",
strip_prefix = "github.com/pingcap/tipb@v0.0.0-20240703084358-e46e4632bd2b",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20240318032315-55a7867ddd50.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20240318032315-55a7867ddd50.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20240318032315-55a7867ddd50.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20240318032315-55a7867ddd50.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20240703084358-e46e4632bd2b.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20240703084358-e46e4632bd2b.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20240703084358-e46e4632bd2b.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20240703084358-e46e4632bd2b.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ require (
github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5
github.com/pingcap/tidb/pkg/parser v0.0.0-20211011031125-9b13dc409c5e
github.com/pingcap/tipb v0.0.0-20240318032315-55a7867ddd50
github.com/pingcap/tipb v0.0.0-20240703084358-e46e4632bd2b
github.com/prometheus/client_golang v1.19.1
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.55.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -704,8 +704,8 @@ github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d h1:y3EueKVfVykdpTyfU
github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d/go.mod h1:ORfBOFp1eteu2odzsyaxI+b8TzJwgjwyQcGhI+9SfEA=
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 h1:T4pXRhBflzDeAhmOQHNPRRogMYxP13V7BkYw3ZsoSfE=
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5/go.mod h1:rlimy0GcTvjiJqvD5mXTRr8O2eNZPBrcUgiWVYp9530=
github.com/pingcap/tipb v0.0.0-20240318032315-55a7867ddd50 h1:fVNBE06Rjec+EIHaYAKAHa/bIt5lnu3Zh9O6kV7ZAdg=
github.com/pingcap/tipb v0.0.0-20240318032315-55a7867ddd50/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pingcap/tipb v0.0.0-20240703084358-e46e4632bd2b h1:tySAGYw21A3Xa8CcA9jBTfrgAB3+KQWyqyW7fUyokzk=
github.com/pingcap/tipb v0.0.0-20240703084358-e46e4632bd2b/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (base basePhysicalAgg) Init(ctx base.PlanContext, stats *property.StatsInfo
}

func (base basePhysicalAgg) initForHash(ctx base.PlanContext, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalHashAgg {
p := &PhysicalHashAgg{base}
p := &PhysicalHashAgg{base, ""}
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeHashAgg, p, offset)
p.childrenReqProps = props
p.SetStats(stats)
Expand Down
25 changes: 25 additions & 0 deletions pkg/planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2276,3 +2276,28 @@ func TestIssue52472(t *testing.T) {
require.Equal(t, mysql.TypeNewDecimal, rs.Fields()[0].Column.FieldType.GetType())
require.NoError(t, rs.Close())
}

func TestTiFlashHashAggPreAggMode(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("set @@tiflash_hashagg_preaggregation_mode = default;")
tk.MustQuery("select @@tiflash_hashagg_preaggregation_mode;").Check(testkit.Rows("force_preagg"))

tk.MustExec("set @@tiflash_hashagg_preaggregation_mode = 'auto';")
tk.MustQuery("select @@tiflash_hashagg_preaggregation_mode;").Check(testkit.Rows("auto"))
tk.MustExec("set @@tiflash_hashagg_preaggregation_mode = 'force_streaming';")
tk.MustQuery("select @@tiflash_hashagg_preaggregation_mode;").Check(testkit.Rows("force_streaming"))
tk.MustExec("set @@tiflash_hashagg_preaggregation_mode = 'force_preagg';")
tk.MustQuery("select @@tiflash_hashagg_preaggregation_mode;").Check(testkit.Rows("force_preagg"))

tk.MustExec("set global tiflash_hashagg_preaggregation_mode = 'auto';")
tk.MustQuery("select @@global.tiflash_hashagg_preaggregation_mode;").Check(testkit.Rows("auto"))
tk.MustExec("set global tiflash_hashagg_preaggregation_mode = 'force_streaming';")
tk.MustQuery("select @@global.tiflash_hashagg_preaggregation_mode;").Check(testkit.Rows("force_streaming"))
tk.MustExec("set global tiflash_hashagg_preaggregation_mode = 'force_preagg';")
tk.MustQuery("select @@global.tiflash_hashagg_preaggregation_mode;").Check(testkit.Rows("force_preagg"))

err := tk.ExecToErr("set @@tiflash_hashagg_preaggregation_mode = 'test';")
require.ErrorContains(t, err, "incorrect value: `test`. tiflash_hashagg_preaggregation_mode options: force_preagg, auto, force_streaming")
}
2 changes: 2 additions & 0 deletions pkg/planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -2100,6 +2100,7 @@ func (p *basePhysicalAgg) MemoryUsage() (sum int64) {
// PhysicalHashAgg is hash operator of aggregate.
type PhysicalHashAgg struct {
basePhysicalAgg
tiflashPreAggMode string
}

func (p *PhysicalHashAgg) getPointer() *basePhysicalAgg {
Expand All @@ -2115,6 +2116,7 @@ func (p *PhysicalHashAgg) Clone(newCtx base.PlanContext) (base.PhysicalPlan, err
return nil, err
}
cloned.basePhysicalAgg = *base
cloned.tiflashPreAggMode = p.tiflashPreAggMode
return cloned, nil
}

Expand Down
15 changes: 15 additions & 0 deletions pkg/planner/core/plan_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
package core

import (
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/expression/aggregation"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/ranger"
Expand Down Expand Up @@ -110,6 +113,18 @@ func (p *PhysicalHashAgg) ToPB(ctx *base.BuildPBContext, storeType kv.StoreType)
return nil, errors.Trace(err)
}
executorID = p.ExplainID().String()
// If p.tiflashPreAggMode is empty, means no need to consider preagg mode.
// For example it's the the second stage of hashagg.
if len(p.tiflashPreAggMode) != 0 {
if preAggModeVal, ok := variable.ToTiPBTiFlashPreAggMode(p.tiflashPreAggMode); !ok {
err = fmt.Errorf("unexpected tiflash pre agg mode: %v", p.tiflashPreAggMode)
} else {
aggExec.PreAggMode = &preAggModeVal
}
if err != nil {
return nil, err
}
}
}
return &tipb.Executor{
Tp: tipb.ExecType_TypeAggregation,
Expand Down
10 changes: 6 additions & 4 deletions pkg/planner/core/planbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,12 @@ func TestPhysicalPlanClone(t *testing.T) {
require.NoError(t, checkPhysicalPlanClone(streamAgg))

// hash agg
hashAgg := &PhysicalHashAgg{basePhysicalAgg{
AggFuncs: aggDescs,
GroupByItems: []expression.Expression{col, cst},
}}
hashAgg := &PhysicalHashAgg{
basePhysicalAgg: basePhysicalAgg{
AggFuncs: aggDescs,
GroupByItems: []expression.Expression{col, cst},
},
}
hashAgg = hashAgg.initForHash(ctx, stats, 0)
hashAgg.SetSchema(schema)
require.NoError(t, checkPhysicalPlanClone(hashAgg))
Expand Down
6 changes: 6 additions & 0 deletions pkg/planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2168,6 +2168,9 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...base.Task) base.Task {
})
}
}
if partialHashAgg, ok := partialAgg.(*PhysicalHashAgg); ok && len(partitionCols) != 0 {
partialHashAgg.tiflashPreAggMode = p.SCtx().GetSessionVars().TiFlashPreAggMode
}
prop := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, MPPPartitionTp: property.HashType, MPPPartitionCols: partitionCols}
newMpp := mpp.enforceExchangerImpl(prop)
if newMpp.Invalid() {
Expand Down Expand Up @@ -2235,6 +2238,9 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...base.Task) base.Task {
newMpp := mpp.enforceExchanger(exProp)
attachPlan2Task(middle, newMpp)
mpp = newMpp
if partialHashAgg, ok := partial.(*PhysicalHashAgg); ok && len(partitionCols) != 0 {
partialHashAgg.tiflashPreAggMode = p.SCtx().GetSessionVars().TiFlashPreAggMode
}
}

// prop here still be the first generated single-partition requirement.
Expand Down
1 change: 1 addition & 0 deletions pkg/sessionctx/variable/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ go_library(
"@com_github_docker_go_units//:go-units",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
Expand Down
32 changes: 32 additions & 0 deletions pkg/sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import (
"github.com/pingcap/tidb/pkg/util/tiflash"
"github.com/pingcap/tidb/pkg/util/tiflashcompute"
"github.com/pingcap/tidb/pkg/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/tikv"
"github.com/twmb/murmur3"
Expand Down Expand Up @@ -1651,6 +1652,9 @@ type SessionVars struct {
// GroupConcatMaxLen represents the maximum length of the result of GROUP_CONCAT.
GroupConcatMaxLen uint64

// TiFlashPreAggMode indicates the policy of pre aggregation.
TiFlashPreAggMode string

// EnableLazyCursorFetch defines whether to enable the lazy cursor fetch.
EnableLazyCursorFetch bool

Expand Down Expand Up @@ -3906,6 +3910,34 @@ func (s *SessionVars) GetOptObjective() string {
return s.OptObjective
}

// ForcePreAggStr means 1st hashagg will be pre aggregated.
// AutoStr means TiFlash will decide which policy for 1st hashagg.
// ForceStreamingStr means 1st hashagg will for pass through all blocks.
const (
ForcePreAggStr = "force_preagg"
AutoStr = "auto"
ForceStreamingStr = "force_streaming"
)

// ValidTiFlashPreAggMode returns all valid modes.
func ValidTiFlashPreAggMode() string {
return ForcePreAggStr + ", " + AutoStr + ", " + ForceStreamingStr
}

// ToTiPBTiFlashPreAggMode return the corresponding tipb value of preaggregation mode.
func ToTiPBTiFlashPreAggMode(mode string) (tipb.TiFlashPreAggMode, bool) {
switch mode {
case ForcePreAggStr:
return tipb.TiFlashPreAggMode_ForcePreAgg, true
case ForceStreamingStr:
return tipb.TiFlashPreAggMode_ForceStreaming, true
case AutoStr:
return tipb.TiFlashPreAggMode_Auto, true
default:
return tipb.TiFlashPreAggMode_ForcePreAgg, false
}
}

// UseLowResolutionTSO indicates whether low resolution tso could be used for execution.
func (s *SessionVars) UseLowResolutionTSO() bool {
return !s.InRestrictedSQL && s.lowResolutionTSO
Expand Down
14 changes: 14 additions & 0 deletions pkg/sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -3285,6 +3285,20 @@ var defaultSysVars = []*SysVar{
},
IsHintUpdatableVerified: true,
},
{Scope: ScopeGlobal | ScopeSession, Name: TiFlashHashAggPreAggMode, Value: DefTiFlashPreAggMode, Type: TypeStr,
Validation: func(_ *SessionVars, normalizedValue string, originalValue string, _ ScopeFlag) (string, error) {
if _, ok := ToTiPBTiFlashPreAggMode(normalizedValue); ok {
return normalizedValue, nil
}
errMsg := fmt.Sprintf("incorrect value: `%s`. %s options: %s",
originalValue, TiFlashHashAggPreAggMode, ValidTiFlashPreAggMode())
return normalizedValue, errors.New(errMsg)
},
SetSession: func(s *SessionVars, val string) error {
s.TiFlashPreAggMode = val
return nil
},
},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableLazyCursorFetch, Value: BoolToOnOff(DefTiDBEnableLazyCursorFetch), Type: TypeBool, SetSession: func(s *SessionVars, val string) error {
s.EnableLazyCursorFetch = TiDBOptOn(val)
return nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,8 @@ const (
// The value can be STANDARD, BULK.
// Currently, the BULK mode only affects auto-committed DML.
TiDBDMLType = "tidb_dml_type"
// TiFlashHashAggPreAggMode indicates the policy of 1st hashagg.
TiFlashHashAggPreAggMode = "tiflash_hashagg_preaggregation_mode"
// TiDBEnableLazyCursorFetch defines whether to enable the lazy cursor fetch. If it's `OFF`, all results of
// of a cursor will be stored in the tidb node in `EXECUTE` command.
TiDBEnableLazyCursorFetch = "tidb_enable_lazy_cursor_fetch"
Expand Down Expand Up @@ -1527,6 +1529,7 @@ const (
DefTiDBDMLType = "STANDARD"
DefGroupConcatMaxLen = uint64(1024)
DefDefaultWeekFormat = "0"
DefTiFlashPreAggMode = ForcePreAggStr
DefTiDBEnableLazyCursorFetch = false
DefOptEnableProjectionPushDown = true
DefTiDBEnableSharedLockPromotion = false
Expand Down

0 comments on commit 0cfa66f

Please sign in to comment.