Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: sync wait stats loading for stable plan #30026

Merged
merged 64 commits into from
Dec 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
3bd7e45
init commit for lazy load
chrysan Nov 22, 2021
b175ee9
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Nov 22, 2021
594df44
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Nov 23, 2021
cdf4e2a
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Nov 25, 2021
f016416
configs/vars for concurrency,queue-size,syncWait,fallbackPseudo
chrysan Nov 25, 2021
e884305
fix
chrysan Dec 1, 2021
0e8344a
fix
chrysan Dec 2, 2021
3914128
remove sysvars
chrysan Dec 3, 2021
8086b91
impl missing columns
chrysan Dec 3, 2021
44ce671
fix to workable
chrysan Dec 7, 2021
1783a26
add metrics
chrysan Dec 7, 2021
f360d21
refactor sync-load
chrysan Dec 8, 2021
a815170
fix concurrent issue
chrysan Dec 8, 2021
952c36d
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Dec 8, 2021
f10fffa
add partial uts
chrysan Dec 10, 2021
78c6fa4
refactor
chrysan Dec 13, 2021
9dd4148
refactor and simplify
chrysan Dec 13, 2021
b0b7ca3
refactor
chrysan Dec 13, 2021
0900149
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Dec 13, 2021
adb224f
add ut for plan stats
chrysan Dec 14, 2021
6273b92
fix deadlock
chrysan Dec 15, 2021
abc2efd
refactor
chrysan Dec 15, 2021
ddb4023
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Dec 15, 2021
fc369a0
fix
chrysan Dec 15, 2021
69c6ce0
fix format
chrysan Dec 15, 2021
75c4317
fix check
chrysan Dec 15, 2021
c3efa75
fix
chrysan Dec 15, 2021
5a5b0dc
separate test suite
chrysan Dec 15, 2021
1ccb9bc
fix for partition table
chrysan Dec 15, 2021
34ed21b
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Dec 16, 2021
938ac41
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Dec 16, 2021
f6dba58
move configs to sysvars
chrysan Dec 18, 2021
fff27b9
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Dec 18, 2021
d1565f4
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Dec 22, 2021
583bfa5
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Dec 22, 2021
2b7c3fc
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Dec 23, 2021
ec4882b
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Dec 23, 2021
00fb138
disable sync-load by default and make waitTime configurable both in s…
chrysan Dec 23, 2021
f0473a9
add debug log
chrysan Dec 23, 2021
84e307c
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Dec 28, 2021
807faed
integration and comments
chrysan Dec 28, 2021
4bb34b0
fix test
chrysan Dec 28, 2021
1664a2b
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Dec 28, 2021
7a6fa70
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Dec 28, 2021
a4ea19b
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Dec 28, 2021
843f67c
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Dec 29, 2021
76d832f
fix conflict & enable sync-load by default
chrysan Dec 29, 2021
5918132
fix ut
chrysan Dec 29, 2021
da39ca2
fix write channel timeout and add ut
chrysan Dec 29, 2021
1eee1e5
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Dec 29, 2021
49a8e59
fix gofmt
chrysan Dec 29, 2021
f7e07cd
fix gofmt
chrysan Dec 29, 2021
393ce5a
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Dec 29, 2021
a9b902c
fix gofmt
chrysan Dec 29, 2021
9947f29
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Dec 30, 2021
a26719a
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Dec 30, 2021
1273760
disable syncload by default
chrysan Dec 30, 2021
53e759c
integration with columnTracking
chrysan Dec 30, 2021
0dc2a7c
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Dec 30, 2021
b8c99b4
add config range validation
chrysan Dec 30, 2021
477284e
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Dec 30, 2021
d2e748b
Merge remote-tracking branch 'pingcap/master' into lazy-load-fix
chrysan Dec 30, 2021
8abdb06
fix for recursive CTE and add more uts
chrysan Dec 31, 2021
8e0e3d7
Merge branch 'master' into lazy-load-fix
ti-chi-bot Dec 31, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ const (
DefTableColumnCountLimit = 1017
// DefMaxOfTableColumnCountLimit is maximum limitation of the number of columns in a table
DefMaxOfTableColumnCountLimit = 4096
// DefStatsLoadConcurrencyLimit is limit of the concurrency of stats-load
DefStatsLoadConcurrencyLimit = 1
chrysan marked this conversation as resolved.
Show resolved Hide resolved
// DefMaxOfStatsLoadConcurrencyLimit is maximum limitation of the concurrency of stats-load
DefMaxOfStatsLoadConcurrencyLimit = 128
// DefStatsLoadQueueSizeLimit is limit of the size of stats-load request queue
DefStatsLoadQueueSizeLimit = 1
// DefMaxOfStatsLoadQueueSizeLimit is maximum limitation of the size of stats-load request queue
DefMaxOfStatsLoadQueueSizeLimit = 100000
)

// Valid config maps
Expand Down Expand Up @@ -483,11 +491,13 @@ type Performance struct {
CommitterConcurrency int `toml:"committer-concurrency" json:"committer-concurrency"`
MaxTxnTTL uint64 `toml:"max-txn-ttl" json:"max-txn-ttl"`
// Deprecated
MemProfileInterval string `toml:"-" json:"-"`
IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"`
PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"`
GOGC int `toml:"gogc" json:"gogc"`
EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"`
MemProfileInterval string `toml:"-" json:"-"`
IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"`
PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"`
GOGC int `toml:"gogc" json:"gogc"`
EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"`
StatsLoadConcurrency uint `toml:"stats-load-concurrency" json:"stats-load-concurrency"`
StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"`
}

// PlanCache is the PlanCache section of the config.
Expand Down Expand Up @@ -702,10 +712,12 @@ var defaultConf = Config{
CommitterConcurrency: defTiKVCfg.CommitterConcurrency,
MaxTxnTTL: defTiKVCfg.MaxTxnTTL, // 1hour
// TODO: set indexUsageSyncLease to 60s.
IndexUsageSyncLease: "0s",
GOGC: 100,
EnforceMPP: false,
PlanReplayerGCLease: "10m",
IndexUsageSyncLease: "0s",
GOGC: 100,
EnforceMPP: false,
PlanReplayerGCLease: "10m",
StatsLoadConcurrency: 5,
StatsLoadQueueSize: 1000,
},
ProxyProtocol: ProxyProtocol{
Networks: "",
Expand Down Expand Up @@ -1001,6 +1013,14 @@ func (c *Config) Valid() error {
c.Security.SpilledFileEncryptionMethod, SpilledFileEncryptionMethodPlaintext, SpilledFileEncryptionMethodAES128CTR)
}

// check stats load config
if c.Performance.StatsLoadConcurrency < DefStatsLoadConcurrencyLimit || c.Performance.StatsLoadConcurrency > DefMaxOfStatsLoadConcurrencyLimit {
return fmt.Errorf("stats-load-concurrency should be [%d, %d]", DefStatsLoadConcurrencyLimit, DefMaxOfStatsLoadConcurrencyLimit)
}
if c.Performance.StatsLoadQueueSize < DefStatsLoadQueueSizeLimit || c.Performance.StatsLoadQueueSize > DefMaxOfStatsLoadQueueSizeLimit {
return fmt.Errorf("stats-load-queue-size should be [%d, %d]", DefStatsLoadQueueSizeLimit, DefMaxOfStatsLoadQueueSizeLimit)
}

// test log level
l := zap.NewAtomicLevel()
return l.UnmarshalText([]byte(c.Log.Level))
Expand Down
21 changes: 21 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,3 +674,24 @@ func TestConfigExample(t *testing.T) {
}
}
}

func TestStatsLoadLimit(t *testing.T) {
conf := NewConfig()
checkConcurrencyValid := func(concurrency int, shouldBeValid bool) {
conf.Performance.StatsLoadConcurrency = uint(concurrency)
require.Equal(t, shouldBeValid, conf.Valid() == nil)
}
checkConcurrencyValid(DefStatsLoadConcurrencyLimit, true)
checkConcurrencyValid(DefStatsLoadConcurrencyLimit-1, false)
checkConcurrencyValid(DefMaxOfStatsLoadConcurrencyLimit, true)
checkConcurrencyValid(DefMaxOfStatsLoadConcurrencyLimit+1, false)
conf = NewConfig()
checkQueueSizeValid := func(queueSize int, shouldBeValid bool) {
conf.Performance.StatsLoadQueueSize = uint(queueSize)
require.Equal(t, shouldBeValid, conf.Valid() == nil)
}
checkQueueSizeValid(DefStatsLoadQueueSizeLimit, true)
checkQueueSizeValid(DefStatsLoadQueueSizeLimit-1, false)
checkQueueSizeValid(DefMaxOfStatsLoadQueueSizeLimit, true)
checkQueueSizeValid(DefMaxOfStatsLoadQueueSizeLimit+1, false)
}
10 changes: 10 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,16 @@ func (do *Domain) UpdateTableStatsLoop(ctx sessionctx.Context) error {
return nil
}

// StartLoadStatsSubWorkers starts sub workers with new sessions to load stats concurrently
func (do *Domain) StartLoadStatsSubWorkers(ctxList []sessionctx.Context) {
statsHandle := do.StatsHandle()
for i, ctx := range ctxList {
statsHandle.StatsLoad.SubCtxs[i] = ctx
do.wg.Add(1)
go statsHandle.SubLoadWorker(ctx, do.exit, &do.wg)
}
}

func (do *Domain) newOwnerManager(prompt, ownerKey string) owner.Manager {
id := do.ddl.OwnerManager().ID()
var statsOwner owner.Manager
Expand Down
9 changes: 9 additions & 0 deletions domain/sysvar_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,15 @@ func (do *Domain) checkEnableServerGlobalVar(name, sVal string) {
variable.PersistAnalyzeOptions.Store(variable.TiDBOptOn(sVal))
case variable.TiDBEnableColumnTracking:
variable.EnableColumnTracking.Store(variable.TiDBOptOn(sVal))
case variable.TiDBStatsLoadSyncWait:
var val int64
val, err = strconv.ParseInt(sVal, 10, 64)
if err != nil {
break
}
variable.StatsLoadSyncWait.Store(val)
case variable.TiDBStatsLoadPseudoTimeout:
variable.StatsLoadPseudoTimeout.Store(variable.TiDBOptOn(sVal))
}
if err != nil {
logutil.BgLogger().Error(fmt.Sprintf("load global variable %s error", name), zap.Error(err))
Expand Down
4 changes: 4 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ func RegisterMetrics() {
prometheus.MustRegister(HandleJobHistogram)
prometheus.MustRegister(SignificantFeedbackCounter)
prometheus.MustRegister(FastAnalyzeHistogram)
prometheus.MustRegister(SyncLoadCounter)
prometheus.MustRegister(SyncLoadTimeoutCounter)
prometheus.MustRegister(SyncLoadHistogram)
prometheus.MustRegister(ReadStatsHistogram)
prometheus.MustRegister(JobsGauge)
prometheus.MustRegister(KeepAliveCounter)
prometheus.MustRegister(LoadPrivilegeCounter)
Expand Down
34 changes: 34 additions & 0 deletions metrics/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,38 @@ var (
Help: "Bucketed histogram of some stats in fast analyze.",
Buckets: prometheus.ExponentialBuckets(1, 2, 16),
}, []string{LblSQLType, LblType})

SyncLoadCounter = prometheus.NewCounter(
chrysan marked this conversation as resolved.
Show resolved Hide resolved
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "statistics",
Name: "sync_load_total",
Help: "Counter of sync load.",
})

SyncLoadTimeoutCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "statistics",
Name: "sync_load_timeout_total",
Help: "Counter of sync load timeout.",
})

SyncLoadHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "statistics",
Name: "sync_load_latency_millis",
Help: "Bucketed histogram of latency time (ms) of sync load.",
Buckets: prometheus.ExponentialBuckets(1, 2, 22), // 1ms ~ 1h
})

ReadStatsHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "statistics",
Name: "read_stats_latency_millis",
Help: "Bucketed histogram of latency time (ms) of stats read during sync-load.",
Buckets: prometheus.ExponentialBuckets(1, 2, 22), // 1ms ~ 1h
})
)
11 changes: 6 additions & 5 deletions planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ const (
flagPredicatePushDown
flagEliminateOuterJoin
flagPartitionProcessor
flagCollectPredicateColumnsPoint
flagPushDownAgg
flagPushDownTopN
flagSyncWaitStatsLoadPoint
flagJoinReOrder
flagPrunColumnsAgain
)
Expand All @@ -80,8 +82,10 @@ var optRuleList = []logicalOptRule{
&ppdSolver{},
&outerJoinEliminator{},
&partitionProcessor{},
&collectPredicateColumnsPoint{},
&aggregationPushDownSolver{},
&pushDownTopNOptimizer{},
&syncWaitStatsLoadPoint{},
&joinReOrderSolver{},
&columnPruner{}, // column pruning again at last, note it will mess up the results of buildKeySolver
}
Expand Down Expand Up @@ -257,18 +261,15 @@ func checkStableResultMode(sctx sessionctx.Context) bool {

// DoOptimize optimizes a logical plan to a physical plan.
func DoOptimize(ctx context.Context, sctx sessionctx.Context, flag uint64, logic LogicalPlan) (PhysicalPlan, float64, error) {
// TODO: move it to the logic of sync load hist-needed columns.
if variable.EnableColumnTracking.Load() {
predicateColumns, _ := CollectColumnStatsUsage(logic, true, false)
sctx.UpdateColStatsUsage(predicateColumns)
}
// if there is something after flagPrunColumns, do flagPrunColumnsAgain
if flag&flagPrunColumns > 0 && flag-flagPrunColumns > flagPrunColumns {
flag |= flagPrunColumnsAgain
}
if checkStableResultMode(sctx) {
flag |= flagStabilizeResults
}
flag |= flagCollectPredicateColumnsPoint
flag |= flagSyncWaitStatsLoadPoint
logic, err := logicalOptimize(ctx, flag, logic)
if err != nil {
return nil, 0, err
Expand Down
112 changes: 112 additions & 0 deletions planner/core/plan_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package core

import (
"context"
"time"

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
)

type collectPredicateColumnsPoint struct{}

func (c collectPredicateColumnsPoint) optimize(ctx context.Context, plan LogicalPlan, op *logicalOptimizeOp) (LogicalPlan, error) {
if plan.SCtx().GetSessionVars().InRestrictedSQL {
return plan, nil
}
predicateNeeded := variable.EnableColumnTracking.Load()
syncWait := plan.SCtx().GetSessionVars().StatsLoadSyncWait * time.Millisecond.Nanoseconds()
histNeeded := syncWait > 0
predicateColumns, histNeededColumns := CollectColumnStatsUsage(plan, predicateNeeded, histNeeded)
if len(predicateColumns) > 0 {
plan.SCtx().UpdateColStatsUsage(predicateColumns)
}
if histNeeded && len(histNeededColumns) > 0 {
err := RequestLoadColumnStats(plan.SCtx(), histNeededColumns, syncWait)
return plan, err
}
return plan, nil
}

func (c collectPredicateColumnsPoint) name() string {
return "collect_predicate_columns_point"
}

type syncWaitStatsLoadPoint struct{}
chrysan marked this conversation as resolved.
Show resolved Hide resolved

func (s syncWaitStatsLoadPoint) optimize(ctx context.Context, plan LogicalPlan, op *logicalOptimizeOp) (LogicalPlan, error) {
if plan.SCtx().GetSessionVars().InRestrictedSQL {
return plan, nil
}
_, err := SyncWaitStatsLoad(plan)
return plan, err
}

func (s syncWaitStatsLoadPoint) name() string {
return "sync_wait_stats_load_point"
}

const maxDuration = 1<<63 - 1

// RequestLoadColumnStats send requests to stats handle
func RequestLoadColumnStats(ctx sessionctx.Context, neededColumns []model.TableColumnID, syncWait int64) error {
stmtCtx := ctx.GetSessionVars().StmtCtx
hintMaxExecutionTime := int64(stmtCtx.MaxExecutionTime)
if hintMaxExecutionTime <= 0 {
hintMaxExecutionTime = maxDuration
}
sessMaxExecutionTime := int64(ctx.GetSessionVars().MaxExecutionTime)
if sessMaxExecutionTime <= 0 {
sessMaxExecutionTime = maxDuration
}
waitTime := mathutil.MinInt64(syncWait, mathutil.MinInt64(hintMaxExecutionTime, sessMaxExecutionTime))
var timeout = time.Duration(waitTime)
err := domain.GetDomain(ctx).StatsHandle().SendLoadRequests(stmtCtx, neededColumns, timeout)
if err != nil {
return handleTimeout(stmtCtx)
}
return nil
}

// SyncWaitStatsLoad sync-wait for stats load until timeout
func SyncWaitStatsLoad(plan LogicalPlan) (bool, error) {
stmtCtx := plan.SCtx().GetSessionVars().StmtCtx
if stmtCtx.StatsLoad.Fallback {
return false, nil
}
success := domain.GetDomain(plan.SCtx()).StatsHandle().SyncWaitStatsLoad(stmtCtx)
if success {
return true, nil
}
err := handleTimeout(stmtCtx)
return false, err
}

func handleTimeout(stmtCtx *stmtctx.StatementContext) error {
err := errors.New("Timeout when sync-load full stats for needed columns")
if variable.StatsLoadPseudoTimeout.Load() {
stmtCtx.AppendWarning(err)
stmtCtx.StatsLoad.Fallback = true
return nil
}
return err
}
Loading