diff --git a/pkg/config/config.go b/pkg/config/config.go index 1fbec8af7b27d..8c1a59c201e09 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -552,9 +552,10 @@ type Instance struct { PluginDir string `toml:"plugin_dir" json:"plugin_dir"` PluginLoad string `toml:"plugin_load" json:"plugin_load"` // MaxConnections is the maximum permitted number of simultaneous client connections. - MaxConnections uint32 `toml:"max_connections" json:"max_connections"` - TiDBEnableDDL AtomicBool `toml:"tidb_enable_ddl" json:"tidb_enable_ddl"` - TiDBRCReadCheckTS bool `toml:"tidb_rc_read_check_ts" json:"tidb_rc_read_check_ts"` + MaxConnections uint32 `toml:"max_connections" json:"max_connections"` + TiDBEnableDDL AtomicBool `toml:"tidb_enable_ddl" json:"tidb_enable_ddl"` + TiDBEnableStatsOwner AtomicBool `toml:"tidb_enable_stats_owner" json:"tidb_enable_stats_owner"` + TiDBRCReadCheckTS bool `toml:"tidb_rc_read_check_ts" json:"tidb_rc_read_check_ts"` // TiDBServiceScope indicates the role for tidb for distributed task framework. TiDBServiceScope string `toml:"tidb_service_scope" json:"tidb_service_scope"` } @@ -978,6 +979,7 @@ var defaultConf = Config{ PluginLoad: "", MaxConnections: 0, TiDBEnableDDL: *NewAtomicBool(true), + TiDBEnableStatsOwner: *NewAtomicBool(true), TiDBRCReadCheckTS: false, TiDBServiceScope: "", }, diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 48fbd26a23146..2609b9e7b7df1 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -1054,7 +1054,7 @@ func TestConflictInstanceConfig(t *testing.T) { _, err = f.WriteString("check-mb4-value-in-utf8 = true \nrun-ddl = true \n" + "[log] \nenable-slow-log = true \n" + "[performance] \nforce-priority = \"NO_PRIORITY\"\n" + - "[instance] \ntidb_check_mb4_value_in_utf8 = false \ntidb_enable_slow_log = false \ntidb_force_priority = \"LOW_PRIORITY\"\ntidb_enable_ddl = false") + "[instance] \ntidb_check_mb4_value_in_utf8 = false \ntidb_enable_slow_log = false \ntidb_force_priority = \"LOW_PRIORITY\"\ntidb_enable_ddl = false\ntidb_enable_stats_owner = false") require.NoError(t, err) require.NoError(t, f.Sync()) err = conf.Load(configFile) @@ -1068,6 +1068,7 @@ func TestConflictInstanceConfig(t *testing.T) { require.Equal(t, "LOW_PRIORITY", conf.Instance.ForcePriority) require.Equal(t, true, conf.RunDDL) require.Equal(t, false, conf.Instance.TiDBEnableDDL.Load()) + require.Equal(t, false, conf.Instance.TiDBEnableStatsOwner.Load()) require.Equal(t, 0, len(DeprecatedOptions)) for _, conflictOption := range ConflictOptions { expectedConflictOption, ok := expectedConflictOptions[conflictOption.SectionName] diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index d059b7c734968..4dde502fc4df5 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -215,6 +215,7 @@ type Domain struct { instancePlanCache sessionctx.InstancePlanCache // the instance level plan cache + statsOwner owner.Manager // deferFn is used to release infoschema object lazily during v1 and v2 switch deferFn } @@ -2300,20 +2301,22 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err if do.statsLease >= 0 { do.wg.Run(do.loadStatsWorker, "loadStatsWorker") } - owner := do.newOwnerManager(handle.StatsPrompt, handle.StatsOwnerKey) + variable.EnableStatsOwner = do.enableStatsOwner + variable.DisableStatsOwner = do.disableStatsOwner + do.statsOwner = do.newOwnerManager(handle.StatsPrompt, handle.StatsOwnerKey) do.wg.Run(func() { do.indexUsageWorker() }, "indexUsageWorker") if do.statsLease <= 0 { // For statsLease > 0, `updateStatsWorker` handles the quit of stats owner. - do.wg.Run(func() { quitStatsOwner(do, owner) }, "quitStatsOwner") + do.wg.Run(func() { quitStatsOwner(do, do.statsOwner) }, "quitStatsOwner") return nil } do.SetStatsUpdating(true) // The stats updated worker doesn't require the stats initialization to be completed. // This is because the updated worker's primary responsibilities are to update the change delta and handle DDL operations. // These tasks do not interfere with or depend on the initialization process. - do.wg.Run(func() { do.updateStatsWorker(ctx, owner) }, "updateStatsWorker") + do.wg.Run(func() { do.updateStatsWorker(ctx) }, "updateStatsWorker") do.wg.Run(func() { do.handleDDLEvent() }, "handleDDLEvent") @@ -2326,7 +2329,7 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err case <-do.exit: // It may happen that before initStatsDone, tidb receive Ctrl+C return } - do.autoAnalyzeWorker(owner) + do.autoAnalyzeWorker() }, "autoAnalyzeWorker", ) @@ -2337,7 +2340,7 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err case <-do.exit: // It may happen that before initStatsDone, tidb receive Ctrl+C return } - do.analyzeJobsCleanupWorker(owner) + do.analyzeJobsCleanupWorker() }, "analyzeJobsCleanupWorker", ) @@ -2367,6 +2370,25 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err return nil } +// enableStatsOwner enables this node to execute stats owner jobs. +// Since ownerManager.CampaignOwner will start a new goroutine to run ownerManager.campaignLoop, +// we should make sure that before invoking enableStatsOwner(), stats owner is DISABLE. +func (do *Domain) enableStatsOwner() error { + if !do.statsOwner.IsOwner() { + err := do.statsOwner.CampaignOwner() + return errors.Trace(err) + } + return nil +} + +// disableStatsOwner disable this node to execute stats owner. +// We should make sure that before invoking disableStatsOwner(), stats owner is ENABLE. +func (do *Domain) disableStatsOwner() error { + // disable campaign by interrupting campaignLoop + do.statsOwner.CampaignCancel() + return nil +} + func quitStatsOwner(do *Domain, mgr owner.Manager) { <-do.exit mgr.Cancel() @@ -2391,9 +2413,11 @@ func (do *Domain) newOwnerManager(prompt, ownerKey string) owner.Manager { statsOwner = owner.NewOwnerManager(context.Background(), do.etcdClient, prompt, id, ownerKey) } // TODO: Need to do something when err is not nil. - err := statsOwner.CampaignOwner() - if err != nil { - logutil.BgLogger().Warn("campaign owner failed", zap.Error(err)) + if ownerKey == handle.StatsOwnerKey && config.GetGlobalConfig().Instance.TiDBEnableStatsOwner.Load() { + err := statsOwner.CampaignOwner() + if err != nil { + logutil.BgLogger().Warn("campaign owner failed", zap.Error(err)) + } } return statsOwner } @@ -2484,7 +2508,7 @@ func (do *Domain) indexUsageWorker() { } } -func (*Domain) updateStatsWorkerExitPreprocessing(statsHandle *handle.Handle, owner owner.Manager) { +func (do *Domain) updateStatsWorkerExitPreprocessing(statsHandle *handle.Handle) { ch := make(chan struct{}, 1) timeout, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -2492,7 +2516,7 @@ func (*Domain) updateStatsWorkerExitPreprocessing(statsHandle *handle.Handle, ow logutil.BgLogger().Info("updateStatsWorker is going to exit, start to flush stats") statsHandle.FlushStats() logutil.BgLogger().Info("updateStatsWorker ready to release owner") - owner.Cancel() + do.statsOwner.Cancel() ch <- struct{}{} }() select { @@ -2523,7 +2547,7 @@ func (do *Domain) handleDDLEvent() { } } -func (do *Domain) updateStatsWorker(_ sessionctx.Context, owner owner.Manager) { +func (do *Domain) updateStatsWorker(_ sessionctx.Context) { defer util.Recover(metrics.LabelDomain, "updateStatsWorker", nil, false) logutil.BgLogger().Info("updateStatsWorker started.") lease := do.statsLease @@ -2547,7 +2571,7 @@ func (do *Domain) updateStatsWorker(_ sessionctx.Context, owner owner.Manager) { for { select { case <-do.exit: - do.updateStatsWorkerExitPreprocessing(statsHandle, owner) + do.updateStatsWorkerExitPreprocessing(statsHandle) return case <-deltaUpdateTicker.C: err := statsHandle.DumpStatsDeltaToKV(false) @@ -2555,7 +2579,7 @@ func (do *Domain) updateStatsWorker(_ sessionctx.Context, owner owner.Manager) { logutil.BgLogger().Debug("dump stats delta failed", zap.Error(err)) } case <-gcStatsTicker.C: - if !owner.IsOwner() { + if !do.statsOwner.IsOwner() { continue } err := statsHandle.GCStats(do.InfoSchema(), do.GetSchemaLease()) @@ -2575,7 +2599,7 @@ func (do *Domain) updateStatsWorker(_ sessionctx.Context, owner owner.Manager) { } } -func (do *Domain) autoAnalyzeWorker(owner owner.Manager) { +func (do *Domain) autoAnalyzeWorker() { defer util.Recover(metrics.LabelDomain, "autoAnalyzeWorker", nil, false) statsHandle := do.StatsHandle() analyzeTicker := time.NewTicker(do.statsLease) @@ -2586,7 +2610,7 @@ func (do *Domain) autoAnalyzeWorker(owner owner.Manager) { for { select { case <-analyzeTicker.C: - if variable.RunAutoAnalyze.Load() && !do.stopAutoAnalyze.Load() && owner.IsOwner() { + if variable.RunAutoAnalyze.Load() && !do.stopAutoAnalyze.Load() && do.statsOwner.IsOwner() { statsHandle.HandleAutoAnalyze() } case <-do.exit: @@ -2609,7 +2633,7 @@ func (do *Domain) autoAnalyzeWorker(owner owner.Manager) { // It first retrieves the list of current analyze processes, then removes any analyze job // that is not associated with a current process. Additionally, if the current instance is the owner, // it also cleans up corrupted analyze jobs on dead instances. -func (do *Domain) analyzeJobsCleanupWorker(owner owner.Manager) { +func (do *Domain) analyzeJobsCleanupWorker() { defer util.Recover(metrics.LabelDomain, "analyzeJobsCleanupWorker", nil, false) // For GC. const gcInterval = time.Hour @@ -2630,7 +2654,7 @@ func (do *Domain) analyzeJobsCleanupWorker(owner owner.Manager) { select { case <-gcTicker.C: // Only the owner should perform this operation. - if owner.IsOwner() { + if do.statsOwner.IsOwner() { updateTime := time.Now().AddDate(0, 0, -daysToKeep) err := statsHandle.DeleteAnalyzeJobs(updateTime) if err != nil { @@ -2654,7 +2678,7 @@ func (do *Domain) analyzeJobsCleanupWorker(owner owner.Manager) { logutil.BgLogger().Warn("cleanup analyze jobs on current instance failed", zap.Error(err)) } - if owner.IsOwner() { + if do.statsOwner.IsOwner() { err = statsHandle.CleanupCorruptedAnalyzeJobsOnDeadInstances() if err != nil { logutil.BgLogger().Warn("cleanup analyze jobs on dead instances failed", zap.Error(err)) diff --git a/pkg/domain/domain_test.go b/pkg/domain/domain_test.go index db521e24a7951..a6ac1da87955d 100644 --- a/pkg/domain/domain_test.go +++ b/pkg/domain/domain_test.go @@ -181,8 +181,8 @@ func TestStatWorkRecoverFromPanic(t *testing.T) { metrics.PanicCounter.Reset() // Since the stats lease is 0 now, so create a new ticker will panic. // Test that they can recover from panic correctly. - dom.updateStatsWorker(mock.NewContext(), nil) - dom.autoAnalyzeWorker(nil) + dom.updateStatsWorker(mock.NewContext()) + dom.autoAnalyzeWorker() counter := metrics.PanicCounter.WithLabelValues(metrics.LabelDomain) pb := &dto.Metric{} err = counter.Write(pb) diff --git a/pkg/sessionctx/variable/sysvar.go b/pkg/sessionctx/variable/sysvar.go index a2b623206579a..6ddb1b2b9972a 100644 --- a/pkg/sessionctx/variable/sysvar.go +++ b/pkg/sessionctx/variable/sysvar.go @@ -558,6 +558,22 @@ var defaultSysVars = []*SysVar{ return BoolToOnOff(config.GetGlobalConfig().Instance.TiDBEnableDDL.Load()), nil }, }, + {Scope: ScopeInstance, Name: TiDBEnableStatsOwner, Value: BoolToOnOff(config.GetGlobalConfig().Instance.TiDBEnableStatsOwner.Load()), Type: TypeBool, + SetGlobal: func(_ context.Context, s *SessionVars, val string) error { + oldVal, newVal := config.GetGlobalConfig().Instance.TiDBEnableStatsOwner.Load(), TiDBOptOn(val) + if oldVal != newVal { + err := switchStats(newVal) + if err != nil { + return err + } + config.GetGlobalConfig().Instance.TiDBEnableStatsOwner.Store(newVal) + } + return nil + }, + GetGlobal: func(_ context.Context, s *SessionVars) (string, error) { + return BoolToOnOff(config.GetGlobalConfig().Instance.TiDBEnableStatsOwner.Load()), nil + }, + }, {Scope: ScopeInstance, Name: TiDBRCReadCheckTS, Value: BoolToOnOff(DefRCReadCheckTS), Type: TypeBool, SetGlobal: func(_ context.Context, s *SessionVars, val string) error { EnableRCReadCheckTS.Store(TiDBOptOn(val)) return nil @@ -3535,6 +3551,8 @@ const ( PluginLoad = "plugin_load" // TiDBEnableDDL indicates whether the tidb-server campaigns the DDL owner, TiDBEnableDDL = "tidb_enable_ddl" + // TiDBEnableStatsOwner indicates whether the tidb-server campaigns the Stats owner, + TiDBEnableStatsOwner = "tidb_enable_stats_owner" // Port is the name for 'port' system variable. Port = "port" // DataDir is the name for 'datadir' system variable. diff --git a/pkg/sessionctx/variable/tidb_vars.go b/pkg/sessionctx/variable/tidb_vars.go index dfb6fe272d4de..528623d038b59 100644 --- a/pkg/sessionctx/variable/tidb_vars.go +++ b/pkg/sessionctx/variable/tidb_vars.go @@ -1689,6 +1689,10 @@ var ( SetLowResolutionTSOUpdateInterval func(interval time.Duration) error = nil // ChangeSchemaCacheSize is called when tidb_schema_cache_size is changed. ChangeSchemaCacheSize func(ctx context.Context, size uint64) error + // EnableStatsOwner is the func registered by stats to enable running stats in this instance. + EnableStatsOwner func() error = nil + // DisableStatsOwner is the func registered by stats to disable running stats in this instance. + DisableStatsOwner func() error = nil ) // Hooks functions for Cluster Resource Control. diff --git a/pkg/sessionctx/variable/varsutil.go b/pkg/sessionctx/variable/varsutil.go index 0e3ae63162417..d79147d87914a 100644 --- a/pkg/sessionctx/variable/varsutil.go +++ b/pkg/sessionctx/variable/varsutil.go @@ -511,6 +511,16 @@ func switchDDL(on bool) error { return nil } +// switchStats turns on/off stats owner in an instance +func switchStats(on bool) error { + if on && EnableStatsOwner != nil { + return EnableStatsOwner() + } else if !on && DisableStatsOwner != nil { + return DisableStatsOwner() + } + return nil +} + func collectAllowFuncName4ExpressionIndex() string { str := make([]string, 0, len(GAFunction4ExpressionIndex)) for funcName := range GAFunction4ExpressionIndex {