diff --git a/pkg/statistics/handle/autoanalyze/autoanalyze.go b/pkg/statistics/handle/autoanalyze/autoanalyze.go index 26deaeab3b5a6..26e4282c0f5a6 100644 --- a/pkg/statistics/handle/autoanalyze/autoanalyze.go +++ b/pkg/statistics/handle/autoanalyze/autoanalyze.go @@ -44,44 +44,24 @@ import ( // statsAnalyze implements util.StatsAnalyze. // statsAnalyze is used to handle auto-analyze and manage analyze jobs. type statsAnalyze struct { - pool statsutil.SessionPool - - // TODO: use interfaces instead of raw function pointers below - sysProcTracker sessionctx.SysProcTracker - getLockedTables func(tableIDs ...int64) (map[int64]struct{}, error) - getTableStats func(tblInfo *model.TableInfo) *statistics.Table - getPartitionStats func(tblInfo *model.TableInfo, pid int64) *statistics.Table - autoAnalyzeProcIDGetter func() uint64 - statsLease time.Duration + statsHandle statsutil.StatsHandle } // NewStatsAnalyze creates a new StatsAnalyze. -func NewStatsAnalyze(pool statsutil.SessionPool, - sysProcTracker sessionctx.SysProcTracker, - getLockedTables func(tableIDs ...int64) (map[int64]struct{}, error), - getTableStats func(tblInfo *model.TableInfo) *statistics.Table, - getPartitionStats func(tblInfo *model.TableInfo, pid int64) *statistics.Table, - autoAnalyzeProcIDGetter func() uint64, - statsLease time.Duration) statsutil.StatsAnalyze { - return &statsAnalyze{pool: pool, - sysProcTracker: sysProcTracker, - getLockedTables: getLockedTables, - getTableStats: getTableStats, - getPartitionStats: getPartitionStats, - autoAnalyzeProcIDGetter: autoAnalyzeProcIDGetter, - statsLease: statsLease} +func NewStatsAnalyze(statsHandle statsutil.StatsHandle) statsutil.StatsAnalyze { + return &statsAnalyze{statsHandle: statsHandle} } // InsertAnalyzeJob inserts the analyze job to the storage. func (sa *statsAnalyze) InsertAnalyzeJob(job *statistics.AnalyzeJob, instance string, procID uint64) error { - return statsutil.CallWithSCtx(sa.pool, func(sctx sessionctx.Context) error { + return statsutil.CallWithSCtx(sa.statsHandle.SPool(), func(sctx sessionctx.Context) error { return insertAnalyzeJob(sctx, job, instance, procID) }) } // DeleteAnalyzeJobs deletes the analyze jobs whose update time is earlier than updateTime. func (sa *statsAnalyze) DeleteAnalyzeJobs(updateTime time.Time) error { - return statsutil.CallWithSCtx(sa.pool, func(sctx sessionctx.Context) error { + return statsutil.CallWithSCtx(sa.statsHandle.SPool(), func(sctx sessionctx.Context) error { _, _, err := statsutil.ExecRows(sctx, "DELETE FROM mysql.analyze_jobs WHERE update_time < CONVERT_TZ(%?, '+00:00', @@TIME_ZONE)", updateTime.UTC().Format(types.TimeFormat)) return err }) @@ -89,15 +69,8 @@ func (sa *statsAnalyze) DeleteAnalyzeJobs(updateTime time.Time) error { // HandleAutoAnalyze analyzes the newly created table or index. func (sa *statsAnalyze) HandleAutoAnalyze(is infoschema.InfoSchema) (analyzed bool) { - _ = statsutil.CallWithSCtx(sa.pool, func(sctx sessionctx.Context) error { - analyzed = HandleAutoAnalyze(sctx, &Opt{ - StatsLease: sa.statsLease, - GetLockedTables: sa.getLockedTables, - GetTableStats: sa.getTableStats, - GetPartitionStats: sa.getPartitionStats, - SysProcTracker: sa.sysProcTracker, - AutoAnalyzeProcIDGetter: sa.autoAnalyzeProcIDGetter, - }, is) + _ = statsutil.CallWithSCtx(sa.statsHandle.SPool(), func(sctx sessionctx.Context) error { + analyzed = HandleAutoAnalyze(sctx, sa.statsHandle, is) return nil }) return @@ -139,25 +112,9 @@ func getAutoAnalyzeParameters(sctx sessionctx.Context) map[string]string { return parameters } -// Opt is used to hold parameters for auto analyze. -type Opt struct { - // SysProcTracker is used to track analyze resource consumption. - SysProcTracker sessionctx.SysProcTracker - // GetLockedTables is used to look up locked tables which will be skipped in auto analyze. - GetLockedTables func(tableIDs ...int64) (map[int64]struct{}, error) - // GetTableStats is used to look up table stats to decide whether to analyze the table. - GetTableStats func(tblInfo *model.TableInfo) *statistics.Table - // GetPartitionStats is used to look up partition stats to decide whether to analyze the partition. - GetPartitionStats func(tblInfo *model.TableInfo, pid int64) *statistics.Table - // AutoAnalyzeProcIDGetter is used to assign job ID for analyze jobs. - AutoAnalyzeProcIDGetter func() uint64 - // StatsLease is the current stats lease. - StatsLease time.Duration -} - // HandleAutoAnalyze analyzes the newly created table or index. func HandleAutoAnalyze(sctx sessionctx.Context, - opt *Opt, + statsHandle statsutil.StatsHandle, is infoschema.InfoSchema) (analyzed bool) { defer func() { if r := recover(); r != nil { @@ -207,7 +164,7 @@ func HandleAutoAnalyze(sctx sessionctx.Context, } } - lockedTables, err := opt.GetLockedTables(tidsAndPids...) + lockedTables, err := statsHandle.GetLockedTables(tidsAndPids...) if err != nil { logutil.BgLogger().Error("check table lock failed", zap.String("category", "stats"), zap.Error(err)) @@ -226,9 +183,9 @@ func HandleAutoAnalyze(sctx sessionctx.Context, } pi := tblInfo.GetPartitionInfo() if pi == nil { - statsTbl := opt.GetTableStats(tblInfo) + statsTbl := statsHandle.GetTableStats(tblInfo) sql := "analyze table %n.%n" - analyzed := autoAnalyzeTable(sctx, opt, tblInfo, statsTbl, autoAnalyzeRatio, sql, db, tblInfo.Name.O) + analyzed := autoAnalyzeTable(sctx, statsHandle, tblInfo, statsTbl, autoAnalyzeRatio, sql, db, tblInfo.Name.O) if analyzed { // analyze one table at a time to let it get the freshest parameters. // others will be analyzed next round which is just 3s later. @@ -244,7 +201,7 @@ func HandleAutoAnalyze(sctx sessionctx.Context, } } if pruneMode == variable.Dynamic { - analyzed := autoAnalyzePartitionTableInDynamicMode(sctx, opt, tblInfo, partitionDefs, db, autoAnalyzeRatio) + analyzed := autoAnalyzePartitionTableInDynamicMode(sctx, statsHandle, tblInfo, partitionDefs, db, autoAnalyzeRatio) if analyzed { return true } @@ -252,8 +209,8 @@ func HandleAutoAnalyze(sctx sessionctx.Context, } for _, def := range partitionDefs { sql := "analyze table %n.%n partition %n" - statsTbl := opt.GetPartitionStats(tblInfo, def.ID) - analyzed := autoAnalyzeTable(sctx, opt, tblInfo, statsTbl, autoAnalyzeRatio, sql, db, tblInfo.Name.O, def.Name.O) + statsTbl := statsHandle.GetPartitionStats(tblInfo, def.ID) + analyzed := autoAnalyzeTable(sctx, statsHandle, tblInfo, statsTbl, autoAnalyzeRatio, sql, db, tblInfo.Name.O, def.Name.O) if analyzed { return true } @@ -267,13 +224,13 @@ func HandleAutoAnalyze(sctx sessionctx.Context, var AutoAnalyzeMinCnt int64 = 1000 func autoAnalyzeTable(sctx sessionctx.Context, - opt *Opt, + statsHandle statsutil.StatsHandle, tblInfo *model.TableInfo, statsTbl *statistics.Table, ratio float64, sql string, params ...interface{}) bool { if statsTbl.Pseudo || statsTbl.RealtimeCount < AutoAnalyzeMinCnt { return false } - if needAnalyze, reason := NeedAnalyzeTable(statsTbl, 20*opt.StatsLease, ratio); needAnalyze { + if needAnalyze, reason := NeedAnalyzeTable(statsTbl, 20*statsHandle.Lease(), ratio); needAnalyze { escaped, err := sqlexec.EscapeSQL(sql, params...) if err != nil { return false @@ -281,7 +238,7 @@ func autoAnalyzeTable(sctx sessionctx.Context, logutil.BgLogger().Info("auto analyze triggered", zap.String("category", "stats"), zap.String("sql", escaped), zap.String("reason", reason)) tableStatsVer := sctx.GetSessionVars().AnalyzeVersion statistics.CheckAnalyzeVerOnTable(statsTbl, &tableStatsVer) - execAutoAnalyze(sctx, opt, tableStatsVer, sql, params...) + execAutoAnalyze(sctx, statsHandle, tableStatsVer, sql, params...) return true } for _, idx := range tblInfo.Indices { @@ -295,7 +252,7 @@ func autoAnalyzeTable(sctx sessionctx.Context, logutil.BgLogger().Info("auto analyze for unanalyzed", zap.String("category", "stats"), zap.String("sql", escaped)) tableStatsVer := sctx.GetSessionVars().AnalyzeVersion statistics.CheckAnalyzeVerOnTable(statsTbl, &tableStatsVer) - execAutoAnalyze(sctx, opt, tableStatsVer, sqlWithIdx, paramsWithIdx...) + execAutoAnalyze(sctx, statsHandle, tableStatsVer, sqlWithIdx, paramsWithIdx...) return true } } @@ -347,18 +304,18 @@ func TableAnalyzed(tbl *statistics.Table) bool { } func autoAnalyzePartitionTableInDynamicMode(sctx sessionctx.Context, - opt *Opt, + statsHandle statsutil.StatsHandle, tblInfo *model.TableInfo, partitionDefs []model.PartitionDefinition, db string, ratio float64) bool { tableStatsVer := sctx.GetSessionVars().AnalyzeVersion analyzePartitionBatchSize := int(variable.AutoAnalyzePartitionBatchSize.Load()) partitionNames := make([]interface{}, 0, len(partitionDefs)) for _, def := range partitionDefs { - partitionStatsTbl := opt.GetPartitionStats(tblInfo, def.ID) + partitionStatsTbl := statsHandle.GetPartitionStats(tblInfo, def.ID) if partitionStatsTbl.Pseudo || partitionStatsTbl.RealtimeCount < AutoAnalyzeMinCnt { continue } - if needAnalyze, _ := NeedAnalyzeTable(partitionStatsTbl, 20*opt.StatsLease, ratio); needAnalyze { + if needAnalyze, _ := NeedAnalyzeTable(partitionStatsTbl, 20*statsHandle.Lease(), ratio); needAnalyze { partitionNames = append(partitionNames, def.Name.O) statistics.CheckAnalyzeVerOnTable(partitionStatsTbl, &tableStatsVer) } @@ -380,7 +337,7 @@ func autoAnalyzePartitionTableInDynamicMode(sctx sessionctx.Context, zap.String("table", tblInfo.Name.String()), zap.Any("partitions", partitionNames), zap.Int("analyze partition batch size", analyzePartitionBatchSize)) - statsTbl := opt.GetTableStats(tblInfo) + statsTbl := statsHandle.GetTableStats(tblInfo) statistics.CheckAnalyzeVerOnTable(statsTbl, &tableStatsVer) for i := 0; i < len(partitionNames); i += analyzePartitionBatchSize { start := i @@ -393,7 +350,7 @@ func autoAnalyzePartitionTableInDynamicMode(sctx sessionctx.Context, logutil.BgLogger().Info("auto analyze triggered", zap.String("category", "stats"), zap.String("table", tblInfo.Name.String()), zap.Any("partitions", partitionNames[start:end])) - execAutoAnalyze(sctx, opt, tableStatsVer, sql, params...) + execAutoAnalyze(sctx, statsHandle, tableStatsVer, sql, params...) } return true } @@ -402,14 +359,14 @@ func autoAnalyzePartitionTableInDynamicMode(sctx sessionctx.Context, continue } for _, def := range partitionDefs { - partitionStatsTbl := opt.GetPartitionStats(tblInfo, def.ID) + partitionStatsTbl := statsHandle.GetPartitionStats(tblInfo, def.ID) if _, ok := partitionStatsTbl.Indices[idx.ID]; !ok { partitionNames = append(partitionNames, def.Name.O) statistics.CheckAnalyzeVerOnTable(partitionStatsTbl, &tableStatsVer) } } if len(partitionNames) > 0 { - statsTbl := opt.GetTableStats(tblInfo) + statsTbl := statsHandle.GetTableStats(tblInfo) statistics.CheckAnalyzeVerOnTable(statsTbl, &tableStatsVer) for i := 0; i < len(partitionNames); i += analyzePartitionBatchSize { start := i @@ -424,7 +381,7 @@ func autoAnalyzePartitionTableInDynamicMode(sctx sessionctx.Context, zap.String("table", tblInfo.Name.String()), zap.String("index", idx.Name.String()), zap.Any("partitions", partitionNames[start:end])) - execAutoAnalyze(sctx, opt, tableStatsVer, sql, params...) + execAutoAnalyze(sctx, statsHandle, tableStatsVer, sql, params...) } return true } @@ -439,11 +396,11 @@ var execOptionForAnalyze = map[int]sqlexec.OptionFuncAlias{ } func execAutoAnalyze(sctx sessionctx.Context, - opt *Opt, + statsHandle statsutil.StatsHandle, statsVer int, sql string, params ...interface{}) { startTime := time.Now() - _, _, err := execAnalyzeStmt(sctx, opt, statsVer, sql, params...) + _, _, err := execAnalyzeStmt(sctx, statsHandle, statsVer, sql, params...) dur := time.Since(startTime) metrics.AutoAnalyzeHistogram.Observe(dur.Seconds()) if err != nil { @@ -459,7 +416,7 @@ func execAutoAnalyze(sctx sessionctx.Context, } func execAnalyzeStmt(sctx sessionctx.Context, - opt *Opt, + statsHandle statsutil.StatsHandle, statsVer int, sql string, params ...interface{}) ([]chunk.Row, []*ast.ResultField, error) { pruneMode := sctx.GetSessionVars().PartitionPruneMode.Load() @@ -469,7 +426,7 @@ func execAnalyzeStmt(sctx sessionctx.Context, sqlexec.GetAnalyzeSnapshotOption(analyzeSnapshot), sqlexec.GetPartitionPruneModeOption(pruneMode), sqlexec.ExecOptionUseCurSession, - sqlexec.ExecOptionWithSysProcTrack(opt.AutoAnalyzeProcIDGetter(), opt.SysProcTracker.Track, opt.SysProcTracker.UnTrack), + sqlexec.ExecOptionWithSysProcTrack(statsHandle.AutoAnalyzeProcID(), statsHandle.SysProcTracker().Track, statsHandle.SysProcTracker().UnTrack), } return statsutil.ExecWithOpts(sctx, optFuncs, sql, params...) } diff --git a/pkg/statistics/handle/bootstrap.go b/pkg/statistics/handle/bootstrap.go index a6bdec461538d..bc42f866f7a23 100644 --- a/pkg/statistics/handle/bootstrap.go +++ b/pkg/statistics/handle/bootstrap.go @@ -70,7 +70,7 @@ func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (util.StatsCache, error return nil, errors.Trace(err) } defer terror.Call(rc.Close) - tables, err := cache.NewStatsCacheImpl(h.pool, h.TableInfoGetter, h.Lease(), h.TableStatsFromStorage) + tables, err := cache.NewStatsCacheImpl(h, h.TableStatsFromStorage) if err != nil { return nil, err } diff --git a/pkg/statistics/handle/cache/statscache.go b/pkg/statistics/handle/cache/statscache.go index 3770d1cbbbc29..4f9c6a283bafe 100644 --- a/pkg/statistics/handle/cache/statscache.go +++ b/pkg/statistics/handle/cache/statscache.go @@ -16,7 +16,6 @@ package cache import ( "sync/atomic" - "time" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/config" @@ -33,16 +32,14 @@ import ( // StatsCacheImpl implements util.StatsCache. type StatsCacheImpl struct { - pool util.SessionPool - tblInfo util.TableInfoGetter atomic.Pointer[StatsCache] + statsHandle util.StatsHandle tableStatsFromStorage func(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (statsTbl *statistics.Table, err error) - statsLease time.Duration } // NewStatsCacheImpl creates a new StatsCache. -func NewStatsCacheImpl(pool util.SessionPool, tblInfo util.TableInfoGetter, statsLease time.Duration, +func NewStatsCacheImpl(statsHandle util.StatsHandle, tableStatsFromStorage func(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (statsTbl *statistics.Table, err error), ) (util.StatsCache, error) { newCache, err := NewStatsCache() @@ -50,9 +47,7 @@ func NewStatsCacheImpl(pool util.SessionPool, tblInfo util.TableInfoGetter, stat return nil, err } result := &StatsCacheImpl{ - pool: pool, - tblInfo: tblInfo, - statsLease: statsLease, + statsHandle: statsHandle, tableStatsFromStorage: tableStatsFromStorage, } result.Store(newCache) @@ -61,7 +56,7 @@ func NewStatsCacheImpl(pool util.SessionPool, tblInfo util.TableInfoGetter, stat // NewStatsCacheImplForTest creates a new StatsCache for test. func NewStatsCacheImplForTest() (util.StatsCache, error) { - return NewStatsCacheImpl(nil, nil, 0, nil) + return NewStatsCacheImpl(nil, nil) } // Update reads stats meta from store and updates the stats map. @@ -72,7 +67,7 @@ func (s *StatsCacheImpl) Update(is infoschema.InfoSchema) error { // and A0 < B0 < B1 < A1. We will first read the stats of B, and update the lastVersion to B0, but we cannot read // the table stats of A0 if we read stats that greater than lastVersion which is B0. // We can read the stats if the diff between commit time and version is less than three lease. - offset := util.DurationToTS(3 * s.statsLease) + offset := util.DurationToTS(3 * s.statsHandle.Lease()) if s.MaxTableStatsVersion() >= offset { lastVersion = lastVersion - offset } else { @@ -81,7 +76,7 @@ func (s *StatsCacheImpl) Update(is infoschema.InfoSchema) error { var rows []chunk.Row var err error - err = util.CallWithSCtx(s.pool, func(sctx sessionctx.Context) error { + err = util.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error { rows, _, err = util.ExecRows(sctx, "SELECT version, table_id, modify_count, count from mysql.stats_meta where version > %? order by version", lastVersion) return err }) @@ -95,7 +90,7 @@ func (s *StatsCacheImpl) Update(is infoschema.InfoSchema) error { physicalID := row.GetInt64(1) modifyCount := row.GetInt64(2) count := row.GetInt64(3) - table, ok := s.tblInfo.TableInfoByID(is, physicalID) + table, ok := s.statsHandle.TableInfoByID(is, physicalID) if !ok { logutil.BgLogger().Debug("unknown physical ID in stats meta table, maybe it has been dropped", zap.Int64("ID", physicalID)) deletedTableIDs = append(deletedTableIDs, physicalID) diff --git a/pkg/statistics/handle/handle.go b/pkg/statistics/handle/handle.go index b98270ec7770e..66b3f8e72b61a 100644 --- a/pkg/statistics/handle/handle.go +++ b/pkg/statistics/handle/handle.go @@ -131,24 +131,18 @@ func NewHandle(_, initStatsCtx sessionctx.Context, lease time.Duration, pool uti TableInfoGetter: util.NewTableInfoGetter(), StatsLock: lockstats.NewStatsLock(pool), } - handle.StatsGC = storage.NewStatsGC(pool, lease, handle.TableInfoGetter, handle.MarkExtendedStatsDeleted) + handle.StatsGC = storage.NewStatsGC(handle, handle.MarkExtendedStatsDeleted) handle.initStatsCtx = initStatsCtx handle.lease.Store(lease) - statsCache, err := cache.NewStatsCacheImpl(handle.pool, handle.TableInfoGetter, handle.Lease(), handle.TableStatsFromStorage) + statsCache, err := cache.NewStatsCacheImpl(handle, handle.TableStatsFromStorage) if err != nil { return nil, err } handle.StatsCache = statsCache - handle.StatsHistory = history.NewStatsHistory(pool, handle.StatsCache, handle.tableStatsToJSON, handle.DumpStatsToJSON) - handle.StatsUsage = usage.NewStatsUsageImpl(pool, handle.TableInfoGetter, handle.StatsCache, - handle.StatsHistory, handle.GetLockedTables, handle.GetPartitionStats) - handle.StatsAnalyze = autoanalyze.NewStatsAnalyze(pool, handle.sysProcTracker, - handle.GetLockedTables, - handle.GetTableStats, - handle.GetPartitionStats, - handle.autoAnalyzeProcIDGetter, - handle.Lease()) + handle.StatsHistory = history.NewStatsHistory(handle, handle.tableStatsToJSON, handle.DumpStatsToJSON) + handle.StatsUsage = usage.NewStatsUsageImpl(handle) + handle.StatsAnalyze = autoanalyze.NewStatsAnalyze(handle) handle.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency) handle.StatsLoad.NeededItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize) handle.StatsLoad.TimeoutItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize) @@ -458,3 +452,23 @@ func (h *Handle) GetCurrentPruneMode() (mode string, err error) { }) return } + +// GPool returns the goroutine pool of handle. +func (h *Handle) GPool() *gp.Pool { + return h.gpool +} + +// SPool returns the session pool. +func (h *Handle) SPool() util.SessionPool { + return h.pool +} + +// SysProcTracker is used to track sys process like analyze +func (h *Handle) SysProcTracker() sessionctx.SysProcTracker { + return h.sysProcTracker +} + +// AutoAnalyzeProcID generates an analyze ID. +func (h *Handle) AutoAnalyzeProcID() uint64 { + return h.autoAnalyzeProcIDGetter() +} diff --git a/pkg/statistics/handle/history/history_stats.go b/pkg/statistics/handle/history/history_stats.go index 7d854e7d3f86c..58397dd042209 100644 --- a/pkg/statistics/handle/history/history_stats.go +++ b/pkg/statistics/handle/history/history_stats.go @@ -30,8 +30,7 @@ import ( // statsHistoryImpl implements util.StatsHistory. type statsHistoryImpl struct { - pool util.SessionPool - statsCache util.StatsCache + statsHandle util.StatsHandle // TODO: use interfaces instead of raw function pointers tableStatsToJSON func(dbName string, tableInfo *model.TableInfo, physicalID int64, snapshot uint64) (*storage.JSONTable, error) @@ -39,13 +38,12 @@ type statsHistoryImpl struct { } // NewStatsHistory creates a new StatsHistory. -func NewStatsHistory(pool util.SessionPool, statsCache util.StatsCache, +func NewStatsHistory(statsHandle util.StatsHandle, tableStatsToJSON func(dbName string, tableInfo *model.TableInfo, physicalID int64, snapshot uint64) (*storage.JSONTable, error), dumpStatsToJSON func(dbName string, tableInfo *model.TableInfo, historyStatsExec sqlexec.RestrictedSQLExecutor, dumpPartitionStats bool) (*storage.JSONTable, error), ) util.StatsHistory { return &statsHistoryImpl{ - pool: pool, - statsCache: statsCache, + statsHandle: statsHandle, tableStatsToJSON: tableStatsToJSON, dumpStatsToJSON: dumpStatsToJSON, } @@ -65,7 +63,7 @@ func (sh *statsHistoryImpl) RecordHistoricalStatsToStorage(dbName string, tableI } var version uint64 - err = util.CallWithSCtx(sh.pool, func(sctx sessionctx.Context) error { + err = util.CallWithSCtx(sh.statsHandle.SPool(), func(sctx sessionctx.Context) error { version, err = RecordHistoricalStatsToStorage(sctx, physicalID, js) return err }, util.FlagWrapTxn) @@ -77,14 +75,14 @@ func (sh *statsHistoryImpl) RecordHistoricalStatsMeta(tableID int64, version uin if version == 0 { return } - tbl, ok := sh.statsCache.Get(tableID) + tbl, ok := sh.statsHandle.Get(tableID) if !ok { return } if !tbl.IsInitialized() { return } - err := util.CallWithSCtx(sh.pool, func(sctx sessionctx.Context) error { + err := util.CallWithSCtx(sh.statsHandle.SPool(), func(sctx sessionctx.Context) error { return RecordHistoricalStatsMeta(sctx, tableID, version, source) }) if err != nil { // just log the error, hide the error from the outside caller. @@ -98,7 +96,7 @@ func (sh *statsHistoryImpl) RecordHistoricalStatsMeta(tableID int64, version uin // CheckHistoricalStatsEnable checks whether historical stats is enabled. func (sh *statsHistoryImpl) CheckHistoricalStatsEnable() (enable bool, err error) { - err = util.CallWithSCtx(sh.pool, func(sctx sessionctx.Context) error { + err = util.CallWithSCtx(sh.statsHandle.SPool(), func(sctx sessionctx.Context) error { enable = sctx.GetSessionVars().EnableHistoricalStats return nil }) diff --git a/pkg/statistics/handle/storage/gc.go b/pkg/statistics/handle/storage/gc.go index a407b3db0de6e..b93fe2fb27549 100644 --- a/pkg/statistics/handle/storage/gc.go +++ b/pkg/statistics/handle/storage/gc.go @@ -39,21 +39,16 @@ import ( // statsGCImpl implements StatsGC interface. type statsGCImpl struct { - pool util.SessionPool // used to recycle sessionctx. - tblInfoGetter util.TableInfoGetter - + statsHandle util.StatsHandle // TODO: it's ugly to use a raw function, solve it later on. markExtendedStatsDeleted func(statsName string, tableID int64, ifExists bool) (err error) - statsLease time.Duration // statistics lease } // NewStatsGC creates a new StatsGC. -func NewStatsGC(pool util.SessionPool, statsLease time.Duration, tblInfo util.TableInfoGetter, +func NewStatsGC(statsHandle util.StatsHandle, markExtendedStatsDeleted func(statsName string, tableID int64, ifExists bool) (err error)) util.StatsGC { return &statsGCImpl{ - pool: pool, - statsLease: statsLease, - tblInfoGetter: tblInfo, + statsHandle: statsHandle, markExtendedStatsDeleted: markExtendedStatsDeleted, } } @@ -62,21 +57,21 @@ func NewStatsGC(pool util.SessionPool, statsLease time.Duration, tblInfo util.Ta // For dropped tables, we will first update their version // so that other tidb could know that table is deleted. func (gc *statsGCImpl) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) (err error) { - return util.CallWithSCtx(gc.pool, func(sctx sessionctx.Context) error { - return GCStats(sctx, gc.tblInfoGetter, gc.markExtendedStatsDeleted, is, gc.statsLease, ddlLease) + return util.CallWithSCtx(gc.statsHandle.SPool(), func(sctx sessionctx.Context) error { + return GCStats(sctx, gc.statsHandle, gc.markExtendedStatsDeleted, is, ddlLease) }) } // ClearOutdatedHistoryStats clear outdated historical stats. // Only for test. func (gc *statsGCImpl) ClearOutdatedHistoryStats() error { - return util.CallWithSCtx(gc.pool, ClearOutdatedHistoryStats) + return util.CallWithSCtx(gc.statsHandle.SPool(), ClearOutdatedHistoryStats) } // DeleteTableStatsFromKV deletes table statistics from kv. // A statsID refers to statistic of a table or a partition. func (gc *statsGCImpl) DeleteTableStatsFromKV(statsIDs []int64) (err error) { - return util.CallWithSCtx(gc.pool, func(sctx sessionctx.Context) error { + return util.CallWithSCtx(gc.statsHandle.SPool(), func(sctx sessionctx.Context) error { return DeleteTableStatsFromKV(sctx, statsIDs) }, util.FlagWrapTxn) } @@ -85,12 +80,12 @@ func (gc *statsGCImpl) DeleteTableStatsFromKV(statsIDs []int64) (err error) { // For dropped tables, we will first update their version // so that other tidb could know that table is deleted. func GCStats(sctx sessionctx.Context, - tableInfoGetter util.TableInfoGetter, + statsHandle util.StatsHandle, markExtendedStatsDeleted func(statsName string, tableID int64, ifExists bool) (err error), - is infoschema.InfoSchema, statsLease, ddlLease time.Duration) (err error) { + is infoschema.InfoSchema, ddlLease time.Duration) (err error) { // To make sure that all the deleted tables' schema and stats info have been acknowledged to all tidb, // we only garbage collect version before 10 lease. - lease := max(statsLease, ddlLease) + lease := max(statsHandle.Lease(), ddlLease) offset := util.DurationToTS(10 * lease) now := oracle.GoTimeToTS(time.Now()) if now < offset { @@ -115,7 +110,7 @@ func GCStats(sctx sessionctx.Context, return errors.Trace(err) } for _, row := range rows { - if err := gcTableStats(sctx, tableInfoGetter, markExtendedStatsDeleted, is, row.GetInt64(0)); err != nil { + if err := gcTableStats(sctx, statsHandle, markExtendedStatsDeleted, is, row.GetInt64(0)); err != nil { return errors.Trace(err) } _, existed := is.TableByID(row.GetInt64(0)) diff --git a/pkg/statistics/handle/usage/index_usage.go b/pkg/statistics/handle/usage/index_usage.go index f7bb5f7259fe8..a84a19d09016e 100644 --- a/pkg/statistics/handle/usage/index_usage.go +++ b/pkg/statistics/handle/usage/index_usage.go @@ -35,14 +35,14 @@ func (u *statsUsageImpl) NewSessionIndexUsageCollector() interface{} { // DumpIndexUsageToKV dumps all collected index usage info to storage. func (u *statsUsageImpl) DumpIndexUsageToKV() error { - return util.CallWithSCtx(u.pool, func(sctx sessionctx.Context) error { + return util.CallWithSCtx(u.statsHandle.SPool(), func(sctx sessionctx.Context) error { return dumpIndexUsageToKV(sctx, u.idxUsageListHead) }) } // GCIndexUsage removes unnecessary index usage data. func (u *statsUsageImpl) GCIndexUsage() error { - return util.CallWithSCtx(u.pool, gcIndexUsageOnKV) + return util.CallWithSCtx(u.statsHandle.SPool(), gcIndexUsageOnKV) } // IndexUsageInformation is the data struct to store index usage information. diff --git a/pkg/statistics/handle/usage/predicate_column.go b/pkg/statistics/handle/usage/predicate_column.go index b280ebbace1ae..cd60c53b3afef 100644 --- a/pkg/statistics/handle/usage/predicate_column.go +++ b/pkg/statistics/handle/usage/predicate_column.go @@ -32,12 +32,7 @@ import ( // statsUsageImpl implements utilstats.StatsUsage. type statsUsageImpl struct { - pool utilstats.SessionPool - tblInfo utilstats.TableInfoGetter - statsCache utilstats.StatsCache - statsHis utilstats.StatsHistory - getLockedTables func(tableIDs ...int64) (map[int64]struct{}, error) // TODO: use an interface instead of a method pointer - getPartitionStats func(tblInfo *model.TableInfo, pid int64) *statistics.Table + statsHandle utilstats.StatsHandle // idxUsageListHead contains all the index usage collectors required by session. idxUsageListHead *SessionIndexUsageCollector @@ -47,28 +42,16 @@ type statsUsageImpl struct { } // NewStatsUsageImpl creates a utilstats.StatsUsage. -func NewStatsUsageImpl(pool utilstats.SessionPool, - tblInfo utilstats.TableInfoGetter, - statsCache utilstats.StatsCache, - statsHis utilstats.StatsHistory, - getLockedTables func(tableIDs ...int64) (map[int64]struct{}, error), - getPartitionStats func(tblInfo *model.TableInfo, pid int64) *statistics.Table, -) utilstats.StatsUsage { +func NewStatsUsageImpl(statsHandle utilstats.StatsHandle) utilstats.StatsUsage { return &statsUsageImpl{ - pool: pool, - tblInfo: tblInfo, - statsCache: statsCache, - statsHis: statsHis, - getLockedTables: getLockedTables, - getPartitionStats: getPartitionStats, - idxUsageListHead: newSessionIndexUsageCollector(nil), - SessionStatsList: NewSessionStatsList(), - } + statsHandle: statsHandle, + idxUsageListHead: newSessionIndexUsageCollector(nil), + SessionStatsList: NewSessionStatsList()} } // LoadColumnStatsUsage returns all columns' usage information. func (u *statsUsageImpl) LoadColumnStatsUsage(loc *time.Location) (colStatsMap map[model.TableItemID]utilstats.ColStatsTimeInfo, err error) { - err = utilstats.CallWithSCtx(u.pool, func(sctx sessionctx.Context) error { + err = utilstats.CallWithSCtx(u.statsHandle.SPool(), func(sctx sessionctx.Context) error { colStatsMap, err = LoadColumnStatsUsage(sctx, loc) return err }) @@ -77,7 +60,7 @@ func (u *statsUsageImpl) LoadColumnStatsUsage(loc *time.Location) (colStatsMap m // GetPredicateColumns returns IDs of predicate columns, which are the columns whose stats are used(needed) when generating query plans. func (u *statsUsageImpl) GetPredicateColumns(tableID int64) (columnIDs []int64, err error) { - err = utilstats.CallWithSCtx(u.pool, func(sctx sessionctx.Context) error { + err = utilstats.CallWithSCtx(u.statsHandle.SPool(), func(sctx sessionctx.Context) error { columnIDs, err = GetPredicateColumns(sctx, tableID) return err }) @@ -86,7 +69,7 @@ func (u *statsUsageImpl) GetPredicateColumns(tableID int64) (columnIDs []int64, // CollectColumnsInExtendedStats returns IDs of the columns involved in extended stats. func (u *statsUsageImpl) CollectColumnsInExtendedStats(tableID int64) (columnIDs []int64, err error) { - err = utilstats.CallWithSCtx(u.pool, func(sctx sessionctx.Context) error { + err = utilstats.CallWithSCtx(u.statsHandle.SPool(), func(sctx sessionctx.Context) error { columnIDs, err = CollectColumnsInExtendedStats(sctx, tableID) return err }) diff --git a/pkg/statistics/handle/usage/session_stats_collect.go b/pkg/statistics/handle/usage/session_stats_collect.go index 341073900b529..d677ccc8f39e5 100644 --- a/pkg/statistics/handle/usage/session_stats_collect.go +++ b/pkg/statistics/handle/usage/session_stats_collect.go @@ -46,7 +46,7 @@ var ( // 3. If the stats delta haven't been dumped in the past hour, then return true. // 4. If the table stats is pseudo or empty or `Modify Count / Table Count` exceeds the threshold. func (s *statsUsageImpl) needDumpStatsDelta(is infoschema.InfoSchema, dumpAll bool, id int64, item variable.TableDelta, currentTime time.Time) bool { - tbl, ok := s.tblInfo.TableInfoByID(is, id) + tbl, ok := s.statsHandle.TableInfoByID(is, id) if !ok { return false } @@ -67,7 +67,7 @@ func (s *statsUsageImpl) needDumpStatsDelta(is infoschema.InfoSchema, dumpAll bo // Dump the stats to kv at least once an hour. return true } - statsTbl := s.getPartitionStats(tbl.Meta(), id) + statsTbl := s.statsHandle.GetPartitionStats(tbl.Meta(), id) if statsTbl.Pseudo || statsTbl.RealtimeCount == 0 || float64(item.Count)/float64(statsTbl.RealtimeCount) > DumpStatsDeltaRatio { // Dump the stats when there are many modifications. return true @@ -84,7 +84,7 @@ func (s *statsUsageImpl) DumpStatsDeltaToKV(dumpAll bool) error { s.SessionTableDelta().Merge(deltaMap) }() - return utilstats.CallWithSCtx(s.pool, func(sctx sessionctx.Context) error { + return utilstats.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error { is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) currentTime := time.Now() for id, item := range deltaMap { @@ -120,14 +120,14 @@ func (s *statsUsageImpl) dumpTableStatCountToKV(is infoschema.InfoSchema, physic statsVersion := uint64(0) defer func() { if err == nil && statsVersion != 0 { - s.statsHis.RecordHistoricalStatsMeta(physicalTableID, statsVersion, "flush stats") + s.statsHandle.RecordHistoricalStatsMeta(physicalTableID, statsVersion, "flush stats") } }() if delta.Count == 0 { return true, nil } - err = utilstats.CallWithSCtx(s.pool, func(sctx sessionctx.Context) error { + err = utilstats.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error { statsVersion, err = utilstats.GetStartTS(sctx) if err != nil { return errors.Trace(err) @@ -140,7 +140,7 @@ func (s *statsUsageImpl) dumpTableStatCountToKV(is infoschema.InfoSchema, physic tidAndPid = append(tidAndPid, tbl.Meta().ID) } tidAndPid = append(tidAndPid, physicalTableID) - lockedTables, err := s.getLockedTables(tidAndPid...) + lockedTables, err := s.statsHandle.GetLockedTables(tidAndPid...) if err != nil { return err } @@ -244,7 +244,7 @@ func (s *statsUsageImpl) DumpColStatsUsageToKV() error { } } sqlexec.MustFormatSQL(sql, " ON DUPLICATE KEY UPDATE last_used_at = CASE WHEN last_used_at IS NULL THEN VALUES(last_used_at) ELSE GREATEST(last_used_at, VALUES(last_used_at)) END") - if err := utilstats.CallWithSCtx(s.pool, func(sctx sessionctx.Context) error { + if err := utilstats.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error { _, _, err := utilstats.ExecRows(sctx, sql.String()) return err }); err != nil { diff --git a/pkg/statistics/handle/util/BUILD.bazel b/pkg/statistics/handle/util/BUILD.bazel index 34746004b306e..7eb6a5192bf52 100644 --- a/pkg/statistics/handle/util/BUILD.bazel +++ b/pkg/statistics/handle/util/BUILD.bazel @@ -26,6 +26,7 @@ go_library( "//pkg/util/sqlexec/mock", "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", + "@com_github_tiancaiamao_gp//:gp", "@com_github_tikv_client_go_v2//oracle", ], ) diff --git a/pkg/statistics/handle/util/interfaces.go b/pkg/statistics/handle/util/interfaces.go index e78a212616a37..eaf2581d25909 100644 --- a/pkg/statistics/handle/util/interfaces.go +++ b/pkg/statistics/handle/util/interfaces.go @@ -19,8 +19,10 @@ import ( "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/types" + "github.com/tiancaiamao/gp" ) // StatsGC is used to GC unnecessary stats. @@ -201,3 +203,48 @@ type StatsLock interface { // GetTableLockedAndClearForTest for unit test only. GetTableLockedAndClearForTest() (map[int64]struct{}, error) } + +// StatsHandle is used to manage TiDB Statistics. +type StatsHandle interface { + // GPool returns the goroutine pool. + GPool() *gp.Pool + + // SPool returns the session pool. + SPool() SessionPool + + // Lease returns the stats lease. + Lease() time.Duration + + // SysProcTracker is used to track sys process like analyze + SysProcTracker() sessionctx.SysProcTracker + + // AutoAnalyzeProcID generates an analyze ID. + AutoAnalyzeProcID() uint64 + + // GetTableStats retrieves the statistics table from cache, and the cache will be updated by a goroutine. + GetTableStats(tblInfo *model.TableInfo) *statistics.Table + + // GetPartitionStats retrieves the partition stats from cache. + GetPartitionStats(tblInfo *model.TableInfo, pid int64) *statistics.Table + + // TableInfoGetter is used to get table meta info. + TableInfoGetter + + // StatsGC is used to do the GC job. + StatsGC + + // StatsUsage is used to handle table delta and stats usage. + StatsUsage + + // StatsHistory is used to manage historical stats. + StatsHistory + + // StatsAnalyze is used to handle auto-analyze and manage analyze jobs. + StatsAnalyze + + // StatsCache is used to manage all table statistics in memory. + StatsCache + + // StatsLock is used to manage locked stats. + StatsLock +}