Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

variable: mark analyze-partition-concurrency-quota as deprecated #55409

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,15 +717,16 @@ type Performance struct {
MemProfileInterval string `toml:"-" json:"-"`

// Deprecated: this config will not have any effect
IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"`
PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"`
GOGC int `toml:"gogc" json:"gogc"`
EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"`
StatsLoadConcurrency int `toml:"stats-load-concurrency" json:"stats-load-concurrency"`
StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"`
AnalyzePartitionConcurrencyQuota uint `toml:"analyze-partition-concurrency-quota" json:"analyze-partition-concurrency-quota"`
PlanReplayerDumpWorkerConcurrency uint `toml:"plan-replayer-dump-worker-concurrency" json:"plan-replayer-dump-worker-concurrency"`
EnableStatsCacheMemQuota bool `toml:"enable-stats-cache-mem-quota" json:"enable-stats-cache-mem-quota"`
IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"`
PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"`
GOGC int `toml:"gogc" json:"gogc"`
EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"`
StatsLoadConcurrency int `toml:"stats-load-concurrency" json:"stats-load-concurrency"`
StatsLoadQueueSize uint `toml:"stats-load-queue-size" json:"stats-load-queue-size"`
// Deprecated: this config has been deprecated. It has no effect.
AnalyzePartitionConcurrencyQuota uint `toml:"analyze-partition-concurrency-quota" json:"analyze-partition-concurrency-quota"`
PlanReplayerDumpWorkerConcurrency uint `toml:"plan-replayer-dump-worker-concurrency" json:"plan-replayer-dump-worker-concurrency"`
EnableStatsCacheMemQuota bool `toml:"enable-stats-cache-mem-quota" json:"enable-stats-cache-mem-quota"`
// The following items are deprecated. We need to keep them here temporarily
// to support the upgrade process. They can be removed in future.

Expand Down
45 changes: 0 additions & 45 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,6 @@ type Domain struct {

mdlCheckTableInfo *mdlCheckTableInfo

analyzeMu struct {
sync.Mutex
sctxs map[sessionctx.Context]bool
}

mdlCheckCh chan struct{}
stopAutoAnalyze atomicutil.Bool
minJobIDRefresher *systable.MinJobIDRefresher
Expand Down Expand Up @@ -2273,46 +2268,6 @@ func (do *Domain) SetStatsUpdating(val bool) {
}
}

// ReleaseAnalyzeExec returned extra exec for Analyze
func (do *Domain) ReleaseAnalyzeExec(sctxs []sessionctx.Context) {
do.analyzeMu.Lock()
defer do.analyzeMu.Unlock()
for _, ctx := range sctxs {
do.analyzeMu.sctxs[ctx] = false
}
}

// FetchAnalyzeExec get needed exec for analyze
func (do *Domain) FetchAnalyzeExec(need int) []sessionctx.Context {
if need < 1 {
return nil
}
count := 0
r := make([]sessionctx.Context, 0)
do.analyzeMu.Lock()
defer do.analyzeMu.Unlock()
for sctx, used := range do.analyzeMu.sctxs {
if used {
continue
}
r = append(r, sctx)
do.analyzeMu.sctxs[sctx] = true
count++
if count >= need {
break
}
}
return r
}

// SetupAnalyzeExec setups exec for Analyze Executor
func (do *Domain) SetupAnalyzeExec(ctxs []sessionctx.Context) {
do.analyzeMu.sctxs = make(map[sessionctx.Context]bool)
for _, ctx := range ctxs {
do.analyzeMu.sctxs[ctx] = false
}
}

// LoadAndUpdateStatsLoop loads and updates stats info.
func (do *Domain) LoadAndUpdateStatsLoop(ctxs []sessionctx.Context, initStatsCtx sessionctx.Context) error {
if err := do.UpdateTableStatsLoop(ctxs[0], initStatsCtx); err != nil {
Expand Down
77 changes: 29 additions & 48 deletions pkg/executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,16 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
}

// Get the min number of goroutines for parallel execution.
concurrency, err := getBuildStatsConcurrency(e.Ctx())
buildStatsConcurrency, err := getBuildStatsConcurrency(e.Ctx())
if err != nil {
return err
}
concurrency = min(len(tasks), concurrency)
buildStatsConcurrency = min(len(tasks), buildStatsConcurrency)

// Start workers with channel to collect results.
taskCh := make(chan *analyzeTask, concurrency)
taskCh := make(chan *analyzeTask, buildStatsConcurrency)
resultsCh := make(chan *statistics.AnalyzeResults, 1)
for i := 0; i < concurrency; i++ {
for i := 0; i < buildStatsConcurrency; i++ {
e.wg.Run(func() { e.analyzeWorker(taskCh, resultsCh) })
}
pruneMode := variable.PartitionPruneMode(sessionVars.PartitionPruneMode.Load())
Expand All @@ -118,7 +118,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
globalStatsMap := make(map[globalStatsKey]statstypes.GlobalStatsInfo)
g, gctx := errgroup.WithContext(ctx)
g.Go(func() error {
return e.handleResultsError(ctx, concurrency, needGlobalStats, globalStatsMap, resultsCh, len(tasks))
return e.handleResultsError(buildStatsConcurrency, needGlobalStats, globalStatsMap, resultsCh, len(tasks))
})
for _, task := range tasks {
prepareV2AnalyzeJobInfo(task.colExec)
Expand Down Expand Up @@ -368,8 +368,7 @@ func recordHistoricalStats(sctx sessionctx.Context, tableID int64) error {

// handleResultsError will handle the error fetch from resultsCh and record it in log
func (e *AnalyzeExec) handleResultsError(
ctx context.Context,
concurrency int,
buildStatsConcurrency int,
needGlobalStats bool,
globalStatsMap globalStatsMap,
resultsCh <-chan *statistics.AnalyzeResults,
Expand All @@ -385,56 +384,37 @@ func (e *AnalyzeExec) handleResultsError(
}
}
}()
partitionStatsConcurrency := e.Ctx().GetSessionVars().AnalyzePartitionConcurrency
// the concurrency of handleResultsError cannot be more than partitionStatsConcurrency
partitionStatsConcurrency = min(taskNum, partitionStatsConcurrency)
// If partitionStatsConcurrency > 1, we will try to demand extra session from Domain to save Analyze results in concurrency.
// If there is no extra session we can use, we will save analyze results in single-thread.
dom := domain.GetDomain(e.Ctx())
internalCtx := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
if partitionStatsConcurrency > 1 {
// FIXME: Since we don't use it either to save analysis results or to store job history, it has no effect. Please remove this :(
subSctxs := dom.FetchAnalyzeExec(partitionStatsConcurrency)
warningMessage := "Insufficient sessions to save analyze results. Consider increasing the 'analyze-partition-concurrency-quota' configuration to improve analyze performance. " +
"This value should typically be greater than or equal to the 'tidb_analyze_partition_concurrency' variable."
if len(subSctxs) < partitionStatsConcurrency {
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackError(warningMessage))
logutil.BgLogger().Warn(
warningMessage,
zap.Int("sessionCount", len(subSctxs)),
zap.Int("needSessionCount", partitionStatsConcurrency),
)
}
if len(subSctxs) > 0 {
sessionCount := len(subSctxs)
logutil.BgLogger().Info("use multiple sessions to save analyze results", zap.Int("sessionCount", sessionCount))
defer func() {
dom.ReleaseAnalyzeExec(subSctxs)
}()
return e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh)
}
saveStatsConcurrency := e.Ctx().GetSessionVars().AnalyzePartitionConcurrency
// The buildStatsConcurrency of saving partition-level stats should not exceed the total number of tasks.
saveStatsConcurrency = min(taskNum, saveStatsConcurrency)
if saveStatsConcurrency > 1 {
logutil.BgLogger().Info("save analyze results concurrently",
zap.Int("buildStatsConcurrency", buildStatsConcurrency),
zap.Int("saveStatsConcurrency", saveStatsConcurrency),
)
return e.handleResultsErrorWithConcurrency(buildStatsConcurrency, saveStatsConcurrency, needGlobalStats, globalStatsMap, resultsCh)
}
logutil.BgLogger().Info("use single session to save analyze results")
logutil.BgLogger().Info("save analyze results in single-thread",
zap.Int("buildStatsConcurrency", buildStatsConcurrency),
zap.Int("saveStatsConcurrency", saveStatsConcurrency),
)
failpoint.Inject("handleResultsErrorSingleThreadPanic", nil)
subSctxs := []sessionctx.Context{e.Ctx()}
return e.handleResultsErrorWithConcurrency(internalCtx, concurrency, needGlobalStats, subSctxs, globalStatsMap, resultsCh)
return e.handleResultsErrorWithConcurrency(buildStatsConcurrency, saveStatsConcurrency, needGlobalStats, globalStatsMap, resultsCh)
}

func (e *AnalyzeExec) handleResultsErrorWithConcurrency(
ctx context.Context,
statsConcurrency int,
buildStatsConcurrency int,
saveStatsConcurrency int,
needGlobalStats bool,
subSctxs []sessionctx.Context,
globalStatsMap globalStatsMap,
resultsCh <-chan *statistics.AnalyzeResults,
) error {
partitionStatsConcurrency := len(subSctxs)
statsHandle := domain.GetDomain(e.Ctx()).StatsHandle()
wg := util.NewWaitGroupPool(e.gp)
saveResultsCh := make(chan *statistics.AnalyzeResults, partitionStatsConcurrency)
errCh := make(chan error, partitionStatsConcurrency)
for i := 0; i < partitionStatsConcurrency; i++ {
worker := newAnalyzeSaveStatsWorker(saveResultsCh, subSctxs[i], errCh, &e.Ctx().GetSessionVars().SQLKiller)
saveResultsCh := make(chan *statistics.AnalyzeResults, saveStatsConcurrency)
errCh := make(chan error, saveStatsConcurrency)
for i := 0; i < saveStatsConcurrency; i++ {
worker := newAnalyzeSaveStatsWorker(saveResultsCh, errCh, &e.Ctx().GetSessionVars().SQLKiller)
ctx1 := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
wg.Run(func() {
worker.run(ctx1, statsHandle, e.Ctx().GetSessionVars().EnableAnalyzeSnapshot)
Expand All @@ -443,7 +423,8 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(
tableIDs := map[int64]struct{}{}
panicCnt := 0
var err error
for panicCnt < statsConcurrency {
// Only if all the analyze workers exit can we close the saveResultsCh.
for panicCnt < buildStatsConcurrency {
if err := e.Ctx().GetSessionVars().SQLKiller.HandleSignal(); err != nil {
close(saveResultsCh)
return err
Expand All @@ -457,7 +438,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(
if isAnalyzeWorkerPanic(err) {
panicCnt++
} else {
logutil.Logger(ctx).Error("analyze failed", zap.Error(err))
logutil.BgLogger().Error("receive error when saving analyze results", zap.Error(err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the different between the logger and Bglogger ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logger will try to get the current contextual logger and log some common fields from the context.

In this case, I think there is no difference.

logutil.Logger(internalCtx).Info("use multiple sessions to save analyze results", zap.Int("sessionCount", sessionCount))
[2024/08/15 15:10:14.671 +08:00] [INFO] [analyze.go:410] ["use multiple sessions to save analyze results"] [sessionCount=2]

}
finishJobWithLog(statsHandle, results.Job, err)
continue
Expand Down
4 changes: 0 additions & 4 deletions pkg/executor/analyze_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package executor
import (
"context"

"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
Expand All @@ -28,19 +27,16 @@ import (

type analyzeSaveStatsWorker struct {
resultsCh <-chan *statistics.AnalyzeResults
sctx sessionctx.Context
errCh chan<- error
killer *sqlkiller.SQLKiller
}

func newAnalyzeSaveStatsWorker(
resultsCh <-chan *statistics.AnalyzeResults,
sctx sessionctx.Context,
errCh chan<- error,
killer *sqlkiller.SQLKiller) *analyzeSaveStatsWorker {
worker := &analyzeSaveStatsWorker{
resultsCh: resultsCh,
sctx: sctx,
errCh: errCh,
killer: killer,
}
Expand Down
10 changes: 0 additions & 10 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3460,7 +3460,6 @@ func bootstrapSessionImpl(store kv.Storage, createSessionsImpl func(store kv.Sto
},
)

analyzeConcurrencyQuota := int(config.GetGlobalConfig().Performance.AnalyzePartitionConcurrencyQuota)
concurrency := config.GetGlobalConfig().Performance.StatsLoadConcurrency
if concurrency == 0 {
// if concurrency is 0, we will set the concurrency of sync load by CPU.
Expand Down Expand Up @@ -3628,15 +3627,6 @@ func bootstrapSessionImpl(store kv.Storage, createSessionsImpl func(store kv.Sto
}
dom.StartTTLJobManager()

analyzeCtxs, err := createSessions(store, analyzeConcurrencyQuota)
if err != nil {
return nil, err
}
subCtxs2 := make([]sessionctx.Context, analyzeConcurrencyQuota)
for i := 0; i < analyzeConcurrencyQuota; i++ {
subCtxs2[i] = analyzeCtxs[i]
}
dom.SetupAnalyzeExec(subCtxs2)
dom.LoadSigningCertLoop(cfg.Security.SessionTokenSigningCert, cfg.Security.SessionTokenSigningKey)

if raw, ok := store.(kv.EtcdBackend); ok {
Expand Down
12 changes: 9 additions & 3 deletions pkg/sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2628,11 +2628,17 @@ var defaultSysVars = []*SysVar{
s.OptimizerUseInvisibleIndexes = TiDBOptOn(val)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBAnalyzePartitionConcurrency, Value: strconv.FormatInt(DefTiDBAnalyzePartitionConcurrency, 10),
MinValue: 1, MaxValue: uint64(config.GetGlobalConfig().Performance.AnalyzePartitionConcurrencyQuota), SetSession: func(s *SessionVars, val string) error {
{Scope: ScopeGlobal | ScopeSession,
Name: TiDBAnalyzePartitionConcurrency,
Value: strconv.FormatInt(DefTiDBAnalyzePartitionConcurrency, 10),
Type: TypeInt,
MinValue: 1,
MaxValue: 128,
SetSession: func(s *SessionVars, val string) error {
s.AnalyzePartitionConcurrency = int(TidbOptInt64(val, DefTiDBAnalyzePartitionConcurrency))
return nil
}},
},
},
{
Scope: ScopeGlobal | ScopeSession, Name: TiDBMergePartitionStatsConcurrency, Value: strconv.FormatInt(DefTiDBMergePartitionStatsConcurrency, 10), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency,
SetSession: func(s *SessionVars, val string) error {
Expand Down