Skip to content

Commit

Permalink
planner: move more methods from StatsHandle to its sub-packages (#47679
Browse files Browse the repository at this point in the history
…) (#47725)

ref #46905
  • Loading branch information
ti-chi-bot authored Oct 17, 2023
1 parent 8d585ac commit 4803895
Show file tree
Hide file tree
Showing 9 changed files with 296 additions and 212 deletions.
16 changes: 16 additions & 0 deletions pkg/statistics/handle/autoanalyze/autoanalyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,22 @@ func (sa *statsAnalyze) HandleAutoAnalyze(is infoschema.InfoSchema) (analyzed bo
return
}

// CheckAnalyzeVersion checks whether all the statistics versions of this table's columns and indexes are the same.
func (sa *statsAnalyze) CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalIDs []int64, version *int) bool {
// We simply choose one physical id to get its stats.
var tbl *statistics.Table
for _, pid := range physicalIDs {
tbl = sa.statsHandle.GetPartitionStats(tblInfo, pid)
if !tbl.Pseudo {
break
}
}
if tbl == nil || tbl.Pseudo {
return true
}
return statistics.CheckAnalyzeVerOnTable(tbl, version)
}

func parseAutoAnalyzeRatio(ratio string) float64 {
autoAnalyzeRatio, err := strconv.ParseFloat(ratio, 64)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (util.StatsCache, error
return nil, errors.Trace(err)
}
defer terror.Call(rc.Close)
tables, err := cache.NewStatsCacheImpl(h, h.TableStatsFromStorage)
tables, err := cache.NewStatsCacheImpl(h)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/statistics/handle/cache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//pkg/statistics/handle/cache/internal/lfu",
"//pkg/statistics/handle/cache/internal/mapcache",
"//pkg/statistics/handle/cache/internal/metrics",
"//pkg/statistics/handle/metrics",
"//pkg/statistics/handle/util",
"//pkg/types",
"//pkg/util/chunk",
Expand Down
40 changes: 30 additions & 10 deletions pkg/statistics/handle/cache/statscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/cache/internal/metrics"
handle_metrics "github.com/pingcap/tidb/pkg/statistics/handle/metrics"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand All @@ -34,29 +34,25 @@ import (
type StatsCacheImpl struct {
atomic.Pointer[StatsCache]

statsHandle util.StatsHandle
tableStatsFromStorage func(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (statsTbl *statistics.Table, err error)
statsHandle util.StatsHandle
}

// NewStatsCacheImpl creates a new StatsCache.
func NewStatsCacheImpl(statsHandle util.StatsHandle,
tableStatsFromStorage func(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (statsTbl *statistics.Table, err error),
) (util.StatsCache, error) {
func NewStatsCacheImpl(statsHandle util.StatsHandle) (util.StatsCache, error) {
newCache, err := NewStatsCache()
if err != nil {
return nil, err
}
result := &StatsCacheImpl{
statsHandle: statsHandle,
tableStatsFromStorage: tableStatsFromStorage,
statsHandle: statsHandle,
}
result.Store(newCache)
return result, nil
}

// NewStatsCacheImplForTest creates a new StatsCache for test.
func NewStatsCacheImplForTest() (util.StatsCache, error) {
return NewStatsCacheImpl(nil, nil)
return NewStatsCacheImpl(nil)
}

// Update reads stats meta from store and updates the stats map.
Expand Down Expand Up @@ -100,7 +96,7 @@ func (s *StatsCacheImpl) Update(is infoschema.InfoSchema) error {
if oldTbl, ok := s.Get(physicalID); ok && oldTbl.Version >= version && tableInfo.UpdateTS == oldTbl.TblInfoUpdateTS {
continue
}
tbl, err := s.tableStatsFromStorage(tableInfo, physicalID, false, 0)
tbl, err := s.statsHandle.TableStatsFromStorage(tableInfo, physicalID, false, 0)
// Error is not nil may mean that there are some ddl changes on this table, we will not update it.
if err != nil {
logutil.BgLogger().Error("error occurred when read table stats", zap.String("category", "stats"), zap.String("table", tableInfo.Name.O), zap.Error(err))
Expand Down Expand Up @@ -196,3 +192,27 @@ func (s *StatsCacheImpl) Len() int {
func (s *StatsCacheImpl) SetStatsCacheCapacity(c int64) {
s.Load().SetCapacity(c)
}

// UpdateStatsHealthyMetrics updates stats healthy distribution metrics according to stats cache.
func (s *StatsCacheImpl) UpdateStatsHealthyMetrics() {
distribution := make([]int64, 5)
for _, tbl := range s.Values() {
healthy, ok := tbl.GetStatsHealthy()
if !ok {
continue
}
if healthy < 50 {
distribution[0]++
} else if healthy < 80 {
distribution[1]++
} else if healthy < 100 {
distribution[2]++
} else {
distribution[3]++
}
distribution[4]++
}
for i, val := range distribution {
handle_metrics.StatsHealthyGauges[i].Set(float64(val))
}
}
194 changes: 6 additions & 188 deletions pkg/statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package handle

import (
"fmt"
"math"
"time"

Expand All @@ -33,12 +32,9 @@ import (
"github.com/pingcap/tidb/pkg/statistics/handle/globalstats"
"github.com/pingcap/tidb/pkg/statistics/handle/history"
"github.com/pingcap/tidb/pkg/statistics/handle/lockstats"
handle_metrics "github.com/pingcap/tidb/pkg/statistics/handle/metrics"
"github.com/pingcap/tidb/pkg/statistics/handle/storage"
"github.com/pingcap/tidb/pkg/statistics/handle/usage"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tiancaiamao/gp"
atomic2 "go.uber.org/atomic"
Expand Down Expand Up @@ -77,6 +73,9 @@ type Handle struct {
// StatsAnalyze is used to handle auto-analyze and manage analyze jobs.
util.StatsAnalyze

// StatsReadWriter is used to read/write stats from/to storage.
util.StatsReadWriter

// StatsLock is used to manage locked stats.
util.StatsLock

Expand All @@ -101,14 +100,6 @@ type Handle struct {
lease atomic2.Duration
}

func (h *Handle) execRows(sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, rerr error) {
_ = h.callWithSCtx(func(sctx sessionctx.Context) error {
rows, fields, rerr = util.ExecRows(sctx, sql, args...)
return nil
})
return
}

// Clear the statsCache, only for test.
func (h *Handle) Clear() {
h.StatsCache.Clear()
Expand All @@ -131,11 +122,12 @@ func NewHandle(_, initStatsCtx sessionctx.Context, lease time.Duration, pool uti
TableInfoGetter: util.NewTableInfoGetter(),
StatsLock: lockstats.NewStatsLock(pool),
}
handle.StatsGC = storage.NewStatsGC(handle, handle.MarkExtendedStatsDeleted)
handle.StatsGC = storage.NewStatsGC(handle)
handle.StatsReadWriter = storage.NewStatsReadWriter(handle)

handle.initStatsCtx = initStatsCtx
handle.lease.Store(lease)
statsCache, err := cache.NewStatsCacheImpl(handle, handle.TableStatsFromStorage)
statsCache, err := cache.NewStatsCacheImpl(handle)
if err != nil {
return nil, err
}
Expand All @@ -160,30 +152,6 @@ func (h *Handle) SetLease(lease time.Duration) {
h.lease.Store(lease)
}

// UpdateStatsHealthyMetrics updates stats healthy distribution metrics according to stats cache.
func (h *Handle) UpdateStatsHealthyMetrics() {
distribution := make([]int64, 5)
for _, tbl := range h.Values() {
healthy, ok := tbl.GetStatsHealthy()
if !ok {
continue
}
if healthy < 50 {
distribution[0]++
} else if healthy < 80 {
distribution[1]++
} else if healthy < 100 {
distribution[2]++
} else {
distribution[3]++
}
distribution[4]++
}
for i, val := range distribution {
handle_metrics.StatsHealthyGauges[i].Set(float64(val))
}
}

// MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID.
func (h *Handle) MergePartitionStats2GlobalStatsByTableID(sc sessionctx.Context,
opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema,
Expand All @@ -195,21 +163,6 @@ func (h *Handle) MergePartitionStats2GlobalStatsByTableID(sc sessionctx.Context,
return globalstats.MergePartitionStats2GlobalStatsByTableID(sc, h.gpool, opts, is, physicalID, isIndex, histIDs, h.TableInfoByID, h.callWithSCtx)
}

func (h *Handle) loadTablePartitionStats(tableInfo *model.TableInfo, partitionDef *model.PartitionDefinition) (*statistics.Table, error) {
var partitionStats *statistics.Table
partitionStats, err := h.TableStatsFromStorage(tableInfo, partitionDef.ID, true, 0)
if err != nil {
return nil, err
}
// if the err == nil && partitionStats == nil, it means we lack the partition-level stats which the physicalID is equal to partitionID.
if partitionStats == nil {
errMsg := fmt.Sprintf("table `%s` partition `%s`", tableInfo.Name.L, partitionDef.Name.L)
err = types.ErrPartitionStatsMissing.GenWithStackByArgs(errMsg)
return nil, err
}
return partitionStats, nil
}

// MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableInfo.
func (h *Handle) mergePartitionStats2GlobalStats(
opts map[ast.AnalyzeOptionType]uint64,
Expand Down Expand Up @@ -254,15 +207,6 @@ func (h *Handle) GetPartitionStats(tblInfo *model.TableInfo, pid int64) *statist
return tbl
}

// LoadNeededHistograms will load histograms for those needed columns/indices.
func (h *Handle) LoadNeededHistograms() (err error) {
err = h.callWithSCtx(func(sctx sessionctx.Context) error {
loadFMSketch := config.GetGlobalConfig().Performance.EnableLoadFMSketch
return storage.LoadNeededHistograms(sctx, h.StatsCache, loadFMSketch)
}, util.FlagWrapTxn)
return err
}

// FlushStats flushes the cached stats update into store.
func (h *Handle) FlushStats() {
for len(h.ddlEventCh) > 0 {
Expand All @@ -276,29 +220,6 @@ func (h *Handle) FlushStats() {
}
}

// TableStatsFromStorage loads table stats info from storage.
func (h *Handle) TableStatsFromStorage(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (statsTbl *statistics.Table, err error) {
err = h.callWithSCtx(func(sctx sessionctx.Context) error {
var ok bool
statsTbl, ok = h.Get(physicalID)
if !ok {
statsTbl = nil
}
statsTbl, err = storage.TableStatsFromStorage(sctx, snapshot, tableInfo, physicalID, loadAll, h.Lease(), statsTbl)
return err
}, util.FlagWrapTxn)
return
}

// StatsMetaCountAndModifyCount reads count and modify_count for the given table from mysql.stats_meta.
func (h *Handle) StatsMetaCountAndModifyCount(tableID int64) (count, modifyCount int64, err error) {
err = h.callWithSCtx(func(sctx sessionctx.Context) error {
count, modifyCount, _, err = storage.StatsMetaCountAndModifyCount(sctx, tableID)
return err
}, util.FlagWrapTxn)
return
}

// SaveTableStatsToStorage saves the stats of a table to storage.
func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, analyzeSnapshot bool, source string) (err error) {
return h.callWithSCtx(func(sctx sessionctx.Context) error {
Expand All @@ -322,80 +243,6 @@ func SaveTableStatsToStorage(sctx sessionctx.Context, results *statistics.Analyz
return err
}

// SaveStatsToStorage saves the stats to storage.
// If count is negative, both count and modify count would not be used and not be written to the table. Unless, corresponding
// fields in the stats_meta table will be updated.
// TODO: refactor to reduce the number of parameters
func (h *Handle) SaveStatsToStorage(tableID int64, count, modifyCount int64, isIndex int, hg *statistics.Histogram,
cms *statistics.CMSketch, topN *statistics.TopN, statsVersion int, isAnalyzed int64, updateAnalyzeTime bool, source string) (err error) {
var statsVer uint64
err = h.callWithSCtx(func(sctx sessionctx.Context) error {
statsVer, err = storage.SaveStatsToStorage(sctx, tableID,
count, modifyCount, isIndex, hg, cms, topN, statsVersion, isAnalyzed, updateAnalyzeTime)
return err
})
if err == nil && statsVer != 0 {
h.RecordHistoricalStatsMeta(tableID, statsVer, source)
}
return
}

// SaveMetaToStorage will save stats_meta to storage.
func (h *Handle) SaveMetaToStorage(tableID, count, modifyCount int64, source string) (err error) {
var statsVer uint64
err = h.callWithSCtx(func(sctx sessionctx.Context) error {
statsVer, err = storage.SaveMetaToStorage(sctx, tableID, count, modifyCount)
return err
})
if err == nil && statsVer != 0 {
h.RecordHistoricalStatsMeta(tableID, statsVer, source)
}
return
}

// InsertExtendedStats inserts a record into mysql.stats_extended and update version in mysql.stats_meta.
func (h *Handle) InsertExtendedStats(statsName string, colIDs []int64, tp int, tableID int64, ifNotExists bool) (err error) {
var statsVer uint64
err = h.callWithSCtx(func(sctx sessionctx.Context) error {
statsVer, err = storage.InsertExtendedStats(sctx, h.StatsCache, statsName, colIDs, tp, tableID, ifNotExists)
return err
})
if err == nil && statsVer != 0 {
h.RecordHistoricalStatsMeta(tableID, statsVer, util.StatsMetaHistorySourceExtendedStats)
}
return
}

// MarkExtendedStatsDeleted update the status of mysql.stats_extended to be `deleted` and the version of mysql.stats_meta.
func (h *Handle) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExists bool) (err error) {
var statsVer uint64
err = h.callWithSCtx(func(sctx sessionctx.Context) error {
statsVer, err = storage.MarkExtendedStatsDeleted(sctx, h.StatsCache, statsName, tableID, ifExists)
return err
})
if err == nil && statsVer != 0 {
h.RecordHistoricalStatsMeta(tableID, statsVer, util.StatsMetaHistorySourceExtendedStats)
}
return
}

// ReloadExtendedStatistics drops the cache for extended statistics and reload data from mysql.stats_extended.
// TODO: move this method to the `extstats` package.
func (h *Handle) ReloadExtendedStatistics() error {
return h.callWithSCtx(func(sctx sessionctx.Context) error {
tables := make([]*statistics.Table, 0, h.Len())
for _, tbl := range h.Values() {
t, err := storage.ExtendedStatsFromStorage(sctx, tbl.Copy(), tbl.PhysicalID, true)
if err != nil {
return err
}
tables = append(tables, t)
}
h.UpdateStatsCache(tables, nil)
return nil
}, util.FlagWrapTxn)
}

// BuildExtendedStats build extended stats for column groups if needed based on the column samples.
func (h *Handle) BuildExtendedStats(tableID int64, cols []*model.ColumnInfo, collectors []*statistics.SampleCollector) (es *statistics.ExtendedStatsColl, err error) {
err = h.callWithSCtx(func(sctx sessionctx.Context) error {
Expand All @@ -405,35 +252,6 @@ func (h *Handle) BuildExtendedStats(tableID int64, cols []*model.ColumnInfo, col
return es, err
}

// SaveExtendedStatsToStorage writes extended stats of a table into mysql.stats_extended.
func (h *Handle) SaveExtendedStatsToStorage(tableID int64, extStats *statistics.ExtendedStatsColl, isLoad bool) (err error) {
var statsVer uint64
err = h.callWithSCtx(func(sctx sessionctx.Context) error {
statsVer, err = storage.SaveExtendedStatsToStorage(sctx, tableID, extStats, isLoad)
return err
})
if err == nil && statsVer != 0 {
h.RecordHistoricalStatsMeta(tableID, statsVer, util.StatsMetaHistorySourceExtendedStats)
}
return
}

// CheckAnalyzeVersion checks whether all the statistics versions of this table's columns and indexes are the same.
func (h *Handle) CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalIDs []int64, version *int) bool {
// We simply choose one physical id to get its stats.
var tbl *statistics.Table
for _, pid := range physicalIDs {
tbl = h.GetPartitionStats(tblInfo, pid)
if !tbl.Pseudo {
break
}
}
if tbl == nil || tbl.Pseudo {
return true
}
return statistics.CheckAnalyzeVerOnTable(tbl, version)
}

// Close stops the background
func (h *Handle) Close() {
h.gpool.Close()
Expand Down
Loading

0 comments on commit 4803895

Please sign in to comment.