Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config: make tidb_enable_stats_owner controlling the stats owner #55592

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,9 +552,10 @@ type Instance struct {
PluginDir string `toml:"plugin_dir" json:"plugin_dir"`
PluginLoad string `toml:"plugin_load" json:"plugin_load"`
// MaxConnections is the maximum permitted number of simultaneous client connections.
MaxConnections uint32 `toml:"max_connections" json:"max_connections"`
TiDBEnableDDL AtomicBool `toml:"tidb_enable_ddl" json:"tidb_enable_ddl"`
TiDBRCReadCheckTS bool `toml:"tidb_rc_read_check_ts" json:"tidb_rc_read_check_ts"`
MaxConnections uint32 `toml:"max_connections" json:"max_connections"`
TiDBEnableDDL AtomicBool `toml:"tidb_enable_ddl" json:"tidb_enable_ddl"`
TiDBEnableStatsOwner AtomicBool `toml:"tidb_enable_stats_owner" json:"tidb_enable_stats_owner"`
TiDBRCReadCheckTS bool `toml:"tidb_rc_read_check_ts" json:"tidb_rc_read_check_ts"`
// TiDBServiceScope indicates the role for tidb for distributed task framework.
TiDBServiceScope string `toml:"tidb_service_scope" json:"tidb_service_scope"`
}
Expand Down Expand Up @@ -978,6 +979,7 @@ var defaultConf = Config{
PluginLoad: "",
MaxConnections: 0,
TiDBEnableDDL: *NewAtomicBool(true),
TiDBEnableStatsOwner: *NewAtomicBool(true),
TiDBRCReadCheckTS: false,
TiDBServiceScope: "",
},
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1054,7 +1054,7 @@ func TestConflictInstanceConfig(t *testing.T) {
_, err = f.WriteString("check-mb4-value-in-utf8 = true \nrun-ddl = true \n" +
"[log] \nenable-slow-log = true \n" +
"[performance] \nforce-priority = \"NO_PRIORITY\"\n" +
"[instance] \ntidb_check_mb4_value_in_utf8 = false \ntidb_enable_slow_log = false \ntidb_force_priority = \"LOW_PRIORITY\"\ntidb_enable_ddl = false")
"[instance] \ntidb_check_mb4_value_in_utf8 = false \ntidb_enable_slow_log = false \ntidb_force_priority = \"LOW_PRIORITY\"\ntidb_enable_ddl = false\ntidb_enable_stats_owner = false")
require.NoError(t, err)
require.NoError(t, f.Sync())
err = conf.Load(configFile)
Expand All @@ -1068,6 +1068,7 @@ func TestConflictInstanceConfig(t *testing.T) {
require.Equal(t, "LOW_PRIORITY", conf.Instance.ForcePriority)
require.Equal(t, true, conf.RunDDL)
require.Equal(t, false, conf.Instance.TiDBEnableDDL.Load())
require.Equal(t, false, conf.Instance.TiDBEnableStatsOwner.Load())
require.Equal(t, 0, len(DeprecatedOptions))
for _, conflictOption := range ConflictOptions {
expectedConflictOption, ok := expectedConflictOptions[conflictOption.SectionName]
Expand Down
60 changes: 42 additions & 18 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ type Domain struct {

instancePlanCache sessionctx.InstancePlanCache // the instance level plan cache

statsOwner owner.Manager
// deferFn is used to release infoschema object lazily during v1 and v2 switch
deferFn
}
Expand Down Expand Up @@ -2300,20 +2301,22 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
if do.statsLease >= 0 {
do.wg.Run(do.loadStatsWorker, "loadStatsWorker")
}
owner := do.newOwnerManager(handle.StatsPrompt, handle.StatsOwnerKey)
variable.EnableStatsOwner = do.enableStatsOwner
variable.DisableStatsOwner = do.disableStatsOwner
do.statsOwner = do.newOwnerManager(handle.StatsPrompt, handle.StatsOwnerKey)
do.wg.Run(func() {
do.indexUsageWorker()
}, "indexUsageWorker")
if do.statsLease <= 0 {
// For statsLease > 0, `updateStatsWorker` handles the quit of stats owner.
do.wg.Run(func() { quitStatsOwner(do, owner) }, "quitStatsOwner")
do.wg.Run(func() { quitStatsOwner(do, do.statsOwner) }, "quitStatsOwner")
return nil
}
do.SetStatsUpdating(true)
// The stats updated worker doesn't require the stats initialization to be completed.
// This is because the updated worker's primary responsibilities are to update the change delta and handle DDL operations.
// These tasks do not interfere with or depend on the initialization process.
do.wg.Run(func() { do.updateStatsWorker(ctx, owner) }, "updateStatsWorker")
do.wg.Run(func() { do.updateStatsWorker(ctx) }, "updateStatsWorker")
do.wg.Run(func() {
do.handleDDLEvent()
}, "handleDDLEvent")
Expand All @@ -2326,7 +2329,7 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
case <-do.exit: // It may happen that before initStatsDone, tidb receive Ctrl+C
return
}
do.autoAnalyzeWorker(owner)
do.autoAnalyzeWorker()
},
"autoAnalyzeWorker",
)
Expand All @@ -2337,7 +2340,7 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
case <-do.exit: // It may happen that before initStatsDone, tidb receive Ctrl+C
return
}
do.analyzeJobsCleanupWorker(owner)
do.analyzeJobsCleanupWorker()
},
"analyzeJobsCleanupWorker",
)
Expand Down Expand Up @@ -2367,6 +2370,25 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
return nil
}

// enableStatsOwner enables this node to execute stats owner jobs.
// Since ownerManager.CampaignOwner will start a new goroutine to run ownerManager.campaignLoop,
// we should make sure that before invoking enableStatsOwner(), stats owner is DISABLE.
func (do *Domain) enableStatsOwner() error {
if !do.statsOwner.IsOwner() {
err := do.statsOwner.CampaignOwner()
return errors.Trace(err)
}
return nil
}

// disableStatsOwner disable this node to execute stats owner.
// We should make sure that before invoking disableStatsOwner(), stats owner is ENABLE.
func (do *Domain) disableStatsOwner() error {
// disable campaign by interrupting campaignLoop
do.statsOwner.CampaignCancel()
return nil
}

func quitStatsOwner(do *Domain, mgr owner.Manager) {
<-do.exit
mgr.Cancel()
Expand All @@ -2391,9 +2413,11 @@ func (do *Domain) newOwnerManager(prompt, ownerKey string) owner.Manager {
statsOwner = owner.NewOwnerManager(context.Background(), do.etcdClient, prompt, id, ownerKey)
}
// TODO: Need to do something when err is not nil.
err := statsOwner.CampaignOwner()
if err != nil {
logutil.BgLogger().Warn("campaign owner failed", zap.Error(err))
if ownerKey == handle.StatsOwnerKey && config.GetGlobalConfig().Instance.TiDBEnableStatsOwner.Load() {
err := statsOwner.CampaignOwner()
if err != nil {
logutil.BgLogger().Warn("campaign owner failed", zap.Error(err))
}
}
return statsOwner
}
Expand Down Expand Up @@ -2484,15 +2508,15 @@ func (do *Domain) indexUsageWorker() {
}
}

func (*Domain) updateStatsWorkerExitPreprocessing(statsHandle *handle.Handle, owner owner.Manager) {
func (do *Domain) updateStatsWorkerExitPreprocessing(statsHandle *handle.Handle) {
ch := make(chan struct{}, 1)
timeout, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go func() {
logutil.BgLogger().Info("updateStatsWorker is going to exit, start to flush stats")
statsHandle.FlushStats()
logutil.BgLogger().Info("updateStatsWorker ready to release owner")
owner.Cancel()
do.statsOwner.Cancel()
ch <- struct{}{}
}()
select {
Expand Down Expand Up @@ -2523,7 +2547,7 @@ func (do *Domain) handleDDLEvent() {
}
}

func (do *Domain) updateStatsWorker(_ sessionctx.Context, owner owner.Manager) {
func (do *Domain) updateStatsWorker(_ sessionctx.Context) {
defer util.Recover(metrics.LabelDomain, "updateStatsWorker", nil, false)
logutil.BgLogger().Info("updateStatsWorker started.")
lease := do.statsLease
Expand All @@ -2547,15 +2571,15 @@ func (do *Domain) updateStatsWorker(_ sessionctx.Context, owner owner.Manager) {
for {
select {
case <-do.exit:
do.updateStatsWorkerExitPreprocessing(statsHandle, owner)
do.updateStatsWorkerExitPreprocessing(statsHandle)
return
case <-deltaUpdateTicker.C:
err := statsHandle.DumpStatsDeltaToKV(false)
if err != nil {
logutil.BgLogger().Debug("dump stats delta failed", zap.Error(err))
}
case <-gcStatsTicker.C:
if !owner.IsOwner() {
if !do.statsOwner.IsOwner() {
continue
}
err := statsHandle.GCStats(do.InfoSchema(), do.GetSchemaLease())
Expand All @@ -2575,7 +2599,7 @@ func (do *Domain) updateStatsWorker(_ sessionctx.Context, owner owner.Manager) {
}
}

func (do *Domain) autoAnalyzeWorker(owner owner.Manager) {
func (do *Domain) autoAnalyzeWorker() {
defer util.Recover(metrics.LabelDomain, "autoAnalyzeWorker", nil, false)
statsHandle := do.StatsHandle()
analyzeTicker := time.NewTicker(do.statsLease)
Expand All @@ -2586,7 +2610,7 @@ func (do *Domain) autoAnalyzeWorker(owner owner.Manager) {
for {
select {
case <-analyzeTicker.C:
if variable.RunAutoAnalyze.Load() && !do.stopAutoAnalyze.Load() && owner.IsOwner() {
if variable.RunAutoAnalyze.Load() && !do.stopAutoAnalyze.Load() && do.statsOwner.IsOwner() {
statsHandle.HandleAutoAnalyze()
}
case <-do.exit:
Expand All @@ -2609,7 +2633,7 @@ func (do *Domain) autoAnalyzeWorker(owner owner.Manager) {
// It first retrieves the list of current analyze processes, then removes any analyze job
// that is not associated with a current process. Additionally, if the current instance is the owner,
// it also cleans up corrupted analyze jobs on dead instances.
func (do *Domain) analyzeJobsCleanupWorker(owner owner.Manager) {
func (do *Domain) analyzeJobsCleanupWorker() {
defer util.Recover(metrics.LabelDomain, "analyzeJobsCleanupWorker", nil, false)
// For GC.
const gcInterval = time.Hour
Expand All @@ -2630,7 +2654,7 @@ func (do *Domain) analyzeJobsCleanupWorker(owner owner.Manager) {
select {
case <-gcTicker.C:
// Only the owner should perform this operation.
if owner.IsOwner() {
if do.statsOwner.IsOwner() {
updateTime := time.Now().AddDate(0, 0, -daysToKeep)
err := statsHandle.DeleteAnalyzeJobs(updateTime)
if err != nil {
Expand All @@ -2654,7 +2678,7 @@ func (do *Domain) analyzeJobsCleanupWorker(owner owner.Manager) {
logutil.BgLogger().Warn("cleanup analyze jobs on current instance failed", zap.Error(err))
}

if owner.IsOwner() {
if do.statsOwner.IsOwner() {
err = statsHandle.CleanupCorruptedAnalyzeJobsOnDeadInstances()
if err != nil {
logutil.BgLogger().Warn("cleanup analyze jobs on dead instances failed", zap.Error(err))
Expand Down
4 changes: 2 additions & 2 deletions pkg/domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ func TestStatWorkRecoverFromPanic(t *testing.T) {
metrics.PanicCounter.Reset()
// Since the stats lease is 0 now, so create a new ticker will panic.
// Test that they can recover from panic correctly.
dom.updateStatsWorker(mock.NewContext(), nil)
dom.autoAnalyzeWorker(nil)
dom.updateStatsWorker(mock.NewContext())
dom.autoAnalyzeWorker()
counter := metrics.PanicCounter.WithLabelValues(metrics.LabelDomain)
pb := &dto.Metric{}
err = counter.Write(pb)
Expand Down
18 changes: 18 additions & 0 deletions pkg/sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,22 @@ var defaultSysVars = []*SysVar{
return BoolToOnOff(config.GetGlobalConfig().Instance.TiDBEnableDDL.Load()), nil
},
},
{Scope: ScopeInstance, Name: TiDBEnableStatsOwner, Value: BoolToOnOff(config.GetGlobalConfig().Instance.TiDBEnableStatsOwner.Load()), Type: TypeBool,
SetGlobal: func(_ context.Context, s *SessionVars, val string) error {
oldVal, newVal := config.GetGlobalConfig().Instance.TiDBEnableStatsOwner.Load(), TiDBOptOn(val)
if oldVal != newVal {
err := switchStats(newVal)
if err != nil {
return err
}
config.GetGlobalConfig().Instance.TiDBEnableStatsOwner.Store(newVal)
}
return nil
},
GetGlobal: func(_ context.Context, s *SessionVars) (string, error) {
return BoolToOnOff(config.GetGlobalConfig().Instance.TiDBEnableStatsOwner.Load()), nil
},
},
{Scope: ScopeInstance, Name: TiDBRCReadCheckTS, Value: BoolToOnOff(DefRCReadCheckTS), Type: TypeBool, SetGlobal: func(_ context.Context, s *SessionVars, val string) error {
EnableRCReadCheckTS.Store(TiDBOptOn(val))
return nil
Expand Down Expand Up @@ -3535,6 +3551,8 @@ const (
PluginLoad = "plugin_load"
// TiDBEnableDDL indicates whether the tidb-server campaigns the DDL owner,
TiDBEnableDDL = "tidb_enable_ddl"
// TiDBEnableStatsOwner indicates whether the tidb-server campaigns the Stats owner,
TiDBEnableStatsOwner = "tidb_enable_stats_owner"
// Port is the name for 'port' system variable.
Port = "port"
// DataDir is the name for 'datadir' system variable.
Expand Down
4 changes: 4 additions & 0 deletions pkg/sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -1689,6 +1689,10 @@ var (
SetLowResolutionTSOUpdateInterval func(interval time.Duration) error = nil
// ChangeSchemaCacheSize is called when tidb_schema_cache_size is changed.
ChangeSchemaCacheSize func(ctx context.Context, size uint64) error
// EnableStatsOwner is the func registered by stats to enable running stats in this instance.
EnableStatsOwner func() error = nil
// DisableStatsOwner is the func registered by stats to disable running stats in this instance.
DisableStatsOwner func() error = nil
)

// Hooks functions for Cluster Resource Control.
Expand Down
10 changes: 10 additions & 0 deletions pkg/sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,16 @@ func switchDDL(on bool) error {
return nil
}

// switchStats turns on/off stats owner in an instance
func switchStats(on bool) error {
if on && EnableStatsOwner != nil {
return EnableStatsOwner()
} else if !on && DisableStatsOwner != nil {
return DisableStatsOwner()
}
return nil
}

func collectAllowFuncName4ExpressionIndex() string {
str := make([]string, 0, len(GAFunction4ExpressionIndex))
for funcName := range GAFunction4ExpressionIndex {
Expand Down