From ed799248d667eff1a6f3173caa2801eda5ca6253 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Tue, 17 Oct 2023 11:00:28 +0800 Subject: [PATCH] planner: move more methods from StatsHandle to its sub-packages (#47679) ref pingcap/tidb#46905 --- .../handle/autoanalyze/autoanalyze.go | 16 ++ pkg/statistics/handle/bootstrap.go | 2 +- pkg/statistics/handle/cache/BUILD.bazel | 1 + pkg/statistics/handle/cache/statscache.go | 40 +++- pkg/statistics/handle/handle.go | 194 +----------------- pkg/statistics/handle/storage/BUILD.bazel | 1 + pkg/statistics/handle/storage/gc.go | 20 +- .../handle/storage/stats_read_writer.go | 185 +++++++++++++++++ pkg/statistics/handle/util/interfaces.go | 49 +++++ 9 files changed, 296 insertions(+), 212 deletions(-) create mode 100644 pkg/statistics/handle/storage/stats_read_writer.go diff --git a/pkg/statistics/handle/autoanalyze/autoanalyze.go b/pkg/statistics/handle/autoanalyze/autoanalyze.go index 26e4282c0f5a61..d1c20b9e67aaa3 100644 --- a/pkg/statistics/handle/autoanalyze/autoanalyze.go +++ b/pkg/statistics/handle/autoanalyze/autoanalyze.go @@ -76,6 +76,22 @@ func (sa *statsAnalyze) HandleAutoAnalyze(is infoschema.InfoSchema) (analyzed bo return } +// CheckAnalyzeVersion checks whether all the statistics versions of this table's columns and indexes are the same. +func (sa *statsAnalyze) CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalIDs []int64, version *int) bool { + // We simply choose one physical id to get its stats. + var tbl *statistics.Table + for _, pid := range physicalIDs { + tbl = sa.statsHandle.GetPartitionStats(tblInfo, pid) + if !tbl.Pseudo { + break + } + } + if tbl == nil || tbl.Pseudo { + return true + } + return statistics.CheckAnalyzeVerOnTable(tbl, version) +} + func parseAutoAnalyzeRatio(ratio string) float64 { autoAnalyzeRatio, err := strconv.ParseFloat(ratio, 64) if err != nil { diff --git a/pkg/statistics/handle/bootstrap.go b/pkg/statistics/handle/bootstrap.go index bc42f866f7a237..1e49bec642bef1 100644 --- a/pkg/statistics/handle/bootstrap.go +++ b/pkg/statistics/handle/bootstrap.go @@ -70,7 +70,7 @@ func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (util.StatsCache, error return nil, errors.Trace(err) } defer terror.Call(rc.Close) - tables, err := cache.NewStatsCacheImpl(h, h.TableStatsFromStorage) + tables, err := cache.NewStatsCacheImpl(h) if err != nil { return nil, err } diff --git a/pkg/statistics/handle/cache/BUILD.bazel b/pkg/statistics/handle/cache/BUILD.bazel index 9b299a7cb86d4a..162ea53d55e992 100644 --- a/pkg/statistics/handle/cache/BUILD.bazel +++ b/pkg/statistics/handle/cache/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "//pkg/statistics/handle/cache/internal/lfu", "//pkg/statistics/handle/cache/internal/mapcache", "//pkg/statistics/handle/cache/internal/metrics", + "//pkg/statistics/handle/metrics", "//pkg/statistics/handle/util", "//pkg/types", "//pkg/util/chunk", diff --git a/pkg/statistics/handle/cache/statscache.go b/pkg/statistics/handle/cache/statscache.go index 4f9c6a283bafef..e7f6c05f146cd5 100644 --- a/pkg/statistics/handle/cache/statscache.go +++ b/pkg/statistics/handle/cache/statscache.go @@ -20,10 +20,10 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/infoschema" - "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/statistics/handle/cache/internal/metrics" + handle_metrics "github.com/pingcap/tidb/pkg/statistics/handle/metrics" "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/logutil" @@ -34,21 +34,17 @@ import ( type StatsCacheImpl struct { atomic.Pointer[StatsCache] - statsHandle util.StatsHandle - tableStatsFromStorage func(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (statsTbl *statistics.Table, err error) + statsHandle util.StatsHandle } // NewStatsCacheImpl creates a new StatsCache. -func NewStatsCacheImpl(statsHandle util.StatsHandle, - tableStatsFromStorage func(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (statsTbl *statistics.Table, err error), -) (util.StatsCache, error) { +func NewStatsCacheImpl(statsHandle util.StatsHandle) (util.StatsCache, error) { newCache, err := NewStatsCache() if err != nil { return nil, err } result := &StatsCacheImpl{ - statsHandle: statsHandle, - tableStatsFromStorage: tableStatsFromStorage, + statsHandle: statsHandle, } result.Store(newCache) return result, nil @@ -56,7 +52,7 @@ func NewStatsCacheImpl(statsHandle util.StatsHandle, // NewStatsCacheImplForTest creates a new StatsCache for test. func NewStatsCacheImplForTest() (util.StatsCache, error) { - return NewStatsCacheImpl(nil, nil) + return NewStatsCacheImpl(nil) } // Update reads stats meta from store and updates the stats map. @@ -100,7 +96,7 @@ func (s *StatsCacheImpl) Update(is infoschema.InfoSchema) error { if oldTbl, ok := s.Get(physicalID); ok && oldTbl.Version >= version && tableInfo.UpdateTS == oldTbl.TblInfoUpdateTS { continue } - tbl, err := s.tableStatsFromStorage(tableInfo, physicalID, false, 0) + tbl, err := s.statsHandle.TableStatsFromStorage(tableInfo, physicalID, false, 0) // Error is not nil may mean that there are some ddl changes on this table, we will not update it. if err != nil { logutil.BgLogger().Error("error occurred when read table stats", zap.String("category", "stats"), zap.String("table", tableInfo.Name.O), zap.Error(err)) @@ -196,3 +192,27 @@ func (s *StatsCacheImpl) Len() int { func (s *StatsCacheImpl) SetStatsCacheCapacity(c int64) { s.Load().SetCapacity(c) } + +// UpdateStatsHealthyMetrics updates stats healthy distribution metrics according to stats cache. +func (s *StatsCacheImpl) UpdateStatsHealthyMetrics() { + distribution := make([]int64, 5) + for _, tbl := range s.Values() { + healthy, ok := tbl.GetStatsHealthy() + if !ok { + continue + } + if healthy < 50 { + distribution[0]++ + } else if healthy < 80 { + distribution[1]++ + } else if healthy < 100 { + distribution[2]++ + } else { + distribution[3]++ + } + distribution[4]++ + } + for i, val := range distribution { + handle_metrics.StatsHealthyGauges[i].Set(float64(val)) + } +} diff --git a/pkg/statistics/handle/handle.go b/pkg/statistics/handle/handle.go index 66b3f8e72b61a2..09ae94b10e55a0 100644 --- a/pkg/statistics/handle/handle.go +++ b/pkg/statistics/handle/handle.go @@ -15,7 +15,6 @@ package handle import ( - "fmt" "math" "time" @@ -33,12 +32,9 @@ import ( "github.com/pingcap/tidb/pkg/statistics/handle/globalstats" "github.com/pingcap/tidb/pkg/statistics/handle/history" "github.com/pingcap/tidb/pkg/statistics/handle/lockstats" - handle_metrics "github.com/pingcap/tidb/pkg/statistics/handle/metrics" "github.com/pingcap/tidb/pkg/statistics/handle/storage" "github.com/pingcap/tidb/pkg/statistics/handle/usage" "github.com/pingcap/tidb/pkg/statistics/handle/util" - "github.com/pingcap/tidb/pkg/types" - "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/tiancaiamao/gp" atomic2 "go.uber.org/atomic" @@ -77,6 +73,9 @@ type Handle struct { // StatsAnalyze is used to handle auto-analyze and manage analyze jobs. util.StatsAnalyze + // StatsReadWriter is used to read/write stats from/to storage. + util.StatsReadWriter + // StatsLock is used to manage locked stats. util.StatsLock @@ -101,14 +100,6 @@ type Handle struct { lease atomic2.Duration } -func (h *Handle) execRows(sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, rerr error) { - _ = h.callWithSCtx(func(sctx sessionctx.Context) error { - rows, fields, rerr = util.ExecRows(sctx, sql, args...) - return nil - }) - return -} - // Clear the statsCache, only for test. func (h *Handle) Clear() { h.StatsCache.Clear() @@ -131,11 +122,12 @@ func NewHandle(_, initStatsCtx sessionctx.Context, lease time.Duration, pool uti TableInfoGetter: util.NewTableInfoGetter(), StatsLock: lockstats.NewStatsLock(pool), } - handle.StatsGC = storage.NewStatsGC(handle, handle.MarkExtendedStatsDeleted) + handle.StatsGC = storage.NewStatsGC(handle) + handle.StatsReadWriter = storage.NewStatsReadWriter(handle) handle.initStatsCtx = initStatsCtx handle.lease.Store(lease) - statsCache, err := cache.NewStatsCacheImpl(handle, handle.TableStatsFromStorage) + statsCache, err := cache.NewStatsCacheImpl(handle) if err != nil { return nil, err } @@ -160,30 +152,6 @@ func (h *Handle) SetLease(lease time.Duration) { h.lease.Store(lease) } -// UpdateStatsHealthyMetrics updates stats healthy distribution metrics according to stats cache. -func (h *Handle) UpdateStatsHealthyMetrics() { - distribution := make([]int64, 5) - for _, tbl := range h.Values() { - healthy, ok := tbl.GetStatsHealthy() - if !ok { - continue - } - if healthy < 50 { - distribution[0]++ - } else if healthy < 80 { - distribution[1]++ - } else if healthy < 100 { - distribution[2]++ - } else { - distribution[3]++ - } - distribution[4]++ - } - for i, val := range distribution { - handle_metrics.StatsHealthyGauges[i].Set(float64(val)) - } -} - // MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID. func (h *Handle) MergePartitionStats2GlobalStatsByTableID(sc sessionctx.Context, opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema, @@ -195,21 +163,6 @@ func (h *Handle) MergePartitionStats2GlobalStatsByTableID(sc sessionctx.Context, return globalstats.MergePartitionStats2GlobalStatsByTableID(sc, h.gpool, opts, is, physicalID, isIndex, histIDs, h.TableInfoByID, h.callWithSCtx) } -func (h *Handle) loadTablePartitionStats(tableInfo *model.TableInfo, partitionDef *model.PartitionDefinition) (*statistics.Table, error) { - var partitionStats *statistics.Table - partitionStats, err := h.TableStatsFromStorage(tableInfo, partitionDef.ID, true, 0) - if err != nil { - return nil, err - } - // if the err == nil && partitionStats == nil, it means we lack the partition-level stats which the physicalID is equal to partitionID. - if partitionStats == nil { - errMsg := fmt.Sprintf("table `%s` partition `%s`", tableInfo.Name.L, partitionDef.Name.L) - err = types.ErrPartitionStatsMissing.GenWithStackByArgs(errMsg) - return nil, err - } - return partitionStats, nil -} - // MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableInfo. func (h *Handle) mergePartitionStats2GlobalStats( opts map[ast.AnalyzeOptionType]uint64, @@ -254,15 +207,6 @@ func (h *Handle) GetPartitionStats(tblInfo *model.TableInfo, pid int64) *statist return tbl } -// LoadNeededHistograms will load histograms for those needed columns/indices. -func (h *Handle) LoadNeededHistograms() (err error) { - err = h.callWithSCtx(func(sctx sessionctx.Context) error { - loadFMSketch := config.GetGlobalConfig().Performance.EnableLoadFMSketch - return storage.LoadNeededHistograms(sctx, h.StatsCache, loadFMSketch) - }, util.FlagWrapTxn) - return err -} - // FlushStats flushes the cached stats update into store. func (h *Handle) FlushStats() { for len(h.ddlEventCh) > 0 { @@ -276,29 +220,6 @@ func (h *Handle) FlushStats() { } } -// TableStatsFromStorage loads table stats info from storage. -func (h *Handle) TableStatsFromStorage(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (statsTbl *statistics.Table, err error) { - err = h.callWithSCtx(func(sctx sessionctx.Context) error { - var ok bool - statsTbl, ok = h.Get(physicalID) - if !ok { - statsTbl = nil - } - statsTbl, err = storage.TableStatsFromStorage(sctx, snapshot, tableInfo, physicalID, loadAll, h.Lease(), statsTbl) - return err - }, util.FlagWrapTxn) - return -} - -// StatsMetaCountAndModifyCount reads count and modify_count for the given table from mysql.stats_meta. -func (h *Handle) StatsMetaCountAndModifyCount(tableID int64) (count, modifyCount int64, err error) { - err = h.callWithSCtx(func(sctx sessionctx.Context) error { - count, modifyCount, _, err = storage.StatsMetaCountAndModifyCount(sctx, tableID) - return err - }, util.FlagWrapTxn) - return -} - // SaveTableStatsToStorage saves the stats of a table to storage. func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, analyzeSnapshot bool, source string) (err error) { return h.callWithSCtx(func(sctx sessionctx.Context) error { @@ -322,80 +243,6 @@ func SaveTableStatsToStorage(sctx sessionctx.Context, results *statistics.Analyz return err } -// SaveStatsToStorage saves the stats to storage. -// If count is negative, both count and modify count would not be used and not be written to the table. Unless, corresponding -// fields in the stats_meta table will be updated. -// TODO: refactor to reduce the number of parameters -func (h *Handle) SaveStatsToStorage(tableID int64, count, modifyCount int64, isIndex int, hg *statistics.Histogram, - cms *statistics.CMSketch, topN *statistics.TopN, statsVersion int, isAnalyzed int64, updateAnalyzeTime bool, source string) (err error) { - var statsVer uint64 - err = h.callWithSCtx(func(sctx sessionctx.Context) error { - statsVer, err = storage.SaveStatsToStorage(sctx, tableID, - count, modifyCount, isIndex, hg, cms, topN, statsVersion, isAnalyzed, updateAnalyzeTime) - return err - }) - if err == nil && statsVer != 0 { - h.RecordHistoricalStatsMeta(tableID, statsVer, source) - } - return -} - -// SaveMetaToStorage will save stats_meta to storage. -func (h *Handle) SaveMetaToStorage(tableID, count, modifyCount int64, source string) (err error) { - var statsVer uint64 - err = h.callWithSCtx(func(sctx sessionctx.Context) error { - statsVer, err = storage.SaveMetaToStorage(sctx, tableID, count, modifyCount) - return err - }) - if err == nil && statsVer != 0 { - h.RecordHistoricalStatsMeta(tableID, statsVer, source) - } - return -} - -// InsertExtendedStats inserts a record into mysql.stats_extended and update version in mysql.stats_meta. -func (h *Handle) InsertExtendedStats(statsName string, colIDs []int64, tp int, tableID int64, ifNotExists bool) (err error) { - var statsVer uint64 - err = h.callWithSCtx(func(sctx sessionctx.Context) error { - statsVer, err = storage.InsertExtendedStats(sctx, h.StatsCache, statsName, colIDs, tp, tableID, ifNotExists) - return err - }) - if err == nil && statsVer != 0 { - h.RecordHistoricalStatsMeta(tableID, statsVer, util.StatsMetaHistorySourceExtendedStats) - } - return -} - -// MarkExtendedStatsDeleted update the status of mysql.stats_extended to be `deleted` and the version of mysql.stats_meta. -func (h *Handle) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExists bool) (err error) { - var statsVer uint64 - err = h.callWithSCtx(func(sctx sessionctx.Context) error { - statsVer, err = storage.MarkExtendedStatsDeleted(sctx, h.StatsCache, statsName, tableID, ifExists) - return err - }) - if err == nil && statsVer != 0 { - h.RecordHistoricalStatsMeta(tableID, statsVer, util.StatsMetaHistorySourceExtendedStats) - } - return -} - -// ReloadExtendedStatistics drops the cache for extended statistics and reload data from mysql.stats_extended. -// TODO: move this method to the `extstats` package. -func (h *Handle) ReloadExtendedStatistics() error { - return h.callWithSCtx(func(sctx sessionctx.Context) error { - tables := make([]*statistics.Table, 0, h.Len()) - for _, tbl := range h.Values() { - t, err := storage.ExtendedStatsFromStorage(sctx, tbl.Copy(), tbl.PhysicalID, true) - if err != nil { - return err - } - tables = append(tables, t) - } - h.UpdateStatsCache(tables, nil) - return nil - }, util.FlagWrapTxn) -} - // BuildExtendedStats build extended stats for column groups if needed based on the column samples. func (h *Handle) BuildExtendedStats(tableID int64, cols []*model.ColumnInfo, collectors []*statistics.SampleCollector) (es *statistics.ExtendedStatsColl, err error) { err = h.callWithSCtx(func(sctx sessionctx.Context) error { @@ -405,35 +252,6 @@ func (h *Handle) BuildExtendedStats(tableID int64, cols []*model.ColumnInfo, col return es, err } -// SaveExtendedStatsToStorage writes extended stats of a table into mysql.stats_extended. -func (h *Handle) SaveExtendedStatsToStorage(tableID int64, extStats *statistics.ExtendedStatsColl, isLoad bool) (err error) { - var statsVer uint64 - err = h.callWithSCtx(func(sctx sessionctx.Context) error { - statsVer, err = storage.SaveExtendedStatsToStorage(sctx, tableID, extStats, isLoad) - return err - }) - if err == nil && statsVer != 0 { - h.RecordHistoricalStatsMeta(tableID, statsVer, util.StatsMetaHistorySourceExtendedStats) - } - return -} - -// CheckAnalyzeVersion checks whether all the statistics versions of this table's columns and indexes are the same. -func (h *Handle) CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalIDs []int64, version *int) bool { - // We simply choose one physical id to get its stats. - var tbl *statistics.Table - for _, pid := range physicalIDs { - tbl = h.GetPartitionStats(tblInfo, pid) - if !tbl.Pseudo { - break - } - } - if tbl == nil || tbl.Pseudo { - return true - } - return statistics.CheckAnalyzeVerOnTable(tbl, version) -} - // Close stops the background func (h *Handle) Close() { h.gpool.Close() diff --git a/pkg/statistics/handle/storage/BUILD.bazel b/pkg/statistics/handle/storage/BUILD.bazel index 5ac1cb1ecadd5c..0ed9cf56db5ffb 100644 --- a/pkg/statistics/handle/storage/BUILD.bazel +++ b/pkg/statistics/handle/storage/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "json.go", "read.go", "save.go", + "stats_read_writer.go", "update.go", ], importpath = "github.com/pingcap/tidb/pkg/statistics/handle/storage", diff --git a/pkg/statistics/handle/storage/gc.go b/pkg/statistics/handle/storage/gc.go index b93fe2fb27549d..de6a788de687e3 100644 --- a/pkg/statistics/handle/storage/gc.go +++ b/pkg/statistics/handle/storage/gc.go @@ -40,16 +40,12 @@ import ( // statsGCImpl implements StatsGC interface. type statsGCImpl struct { statsHandle util.StatsHandle - // TODO: it's ugly to use a raw function, solve it later on. - markExtendedStatsDeleted func(statsName string, tableID int64, ifExists bool) (err error) } // NewStatsGC creates a new StatsGC. -func NewStatsGC(statsHandle util.StatsHandle, - markExtendedStatsDeleted func(statsName string, tableID int64, ifExists bool) (err error)) util.StatsGC { +func NewStatsGC(statsHandle util.StatsHandle) util.StatsGC { return &statsGCImpl{ - statsHandle: statsHandle, - markExtendedStatsDeleted: markExtendedStatsDeleted, + statsHandle: statsHandle, } } @@ -58,7 +54,7 @@ func NewStatsGC(statsHandle util.StatsHandle, // so that other tidb could know that table is deleted. func (gc *statsGCImpl) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) (err error) { return util.CallWithSCtx(gc.statsHandle.SPool(), func(sctx sessionctx.Context) error { - return GCStats(sctx, gc.statsHandle, gc.markExtendedStatsDeleted, is, ddlLease) + return GCStats(sctx, gc.statsHandle, is, ddlLease) }) } @@ -81,7 +77,6 @@ func (gc *statsGCImpl) DeleteTableStatsFromKV(statsIDs []int64) (err error) { // so that other tidb could know that table is deleted. func GCStats(sctx sessionctx.Context, statsHandle util.StatsHandle, - markExtendedStatsDeleted func(statsName string, tableID int64, ifExists bool) (err error), is infoschema.InfoSchema, ddlLease time.Duration) (err error) { // To make sure that all the deleted tables' schema and stats info have been acknowledged to all tidb, // we only garbage collect version before 10 lease. @@ -110,7 +105,7 @@ func GCStats(sctx sessionctx.Context, return errors.Trace(err) } for _, row := range rows { - if err := gcTableStats(sctx, statsHandle, markExtendedStatsDeleted, is, row.GetInt64(0)); err != nil { + if err := gcTableStats(sctx, statsHandle, is, row.GetInt64(0)); err != nil { return errors.Trace(err) } _, existed := is.TableByID(row.GetInt64(0)) @@ -256,8 +251,7 @@ func removeDeletedExtendedStats(sctx sessionctx.Context, version uint64) (err er // gcTableStats GC this table's stats. func gcTableStats(sctx sessionctx.Context, - tableInfoGetter util.TableInfoGetter, - markExtendedStatsDeleted func(statsName string, tableID int64, ifExists bool) (err error), + statsHandler util.StatsHandle, is infoschema.InfoSchema, physicalID int64) error { rows, _, err := util.ExecRows(sctx, "select is_index, hist_id from mysql.stats_histograms where table_id = %?", physicalID) if err != nil { @@ -272,7 +266,7 @@ func gcTableStats(sctx sessionctx.Context, } cache.TableRowStatsCache.Invalidate(physicalID) } - tbl, ok := tableInfoGetter.TableInfoByID(is, physicalID) + tbl, ok := statsHandler.TableInfoByID(is, physicalID) if !ok { logutil.BgLogger().Info("remove stats in GC due to dropped table", zap.Int64("table_id", physicalID)) return util.WrapTxn(sctx, func(sctx sessionctx.Context) error { @@ -334,7 +328,7 @@ func gcTableStats(sctx sessionctx.Context, } if !found { logutil.BgLogger().Info("mark mysql.stats_extended record as 'deleted' in GC due to dropped columns", zap.String("table_name", tblInfo.Name.L), zap.Int64("table_id", physicalID), zap.String("stats_name", statsName), zap.Int64("dropped_column_id", colID)) - err = markExtendedStatsDeleted(statsName, physicalID, true) + err = statsHandler.MarkExtendedStatsDeleted(statsName, physicalID, true) if err != nil { logutil.BgLogger().Debug("update stats_extended status failed", zap.String("stats_name", statsName), zap.Error(err)) return errors.Trace(err) diff --git a/pkg/statistics/handle/storage/stats_read_writer.go b/pkg/statistics/handle/storage/stats_read_writer.go new file mode 100644 index 00000000000000..8b5b6f60f81824 --- /dev/null +++ b/pkg/statistics/handle/storage/stats_read_writer.go @@ -0,0 +1,185 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle/util" +) + +// statsReadWriter implements the util.StatsReadWriter interface. +type statsReadWriter struct { + statsHandler util.StatsHandle +} + +// NewStatsReadWriter creates a new StatsReadWriter. +func NewStatsReadWriter(statsHandler util.StatsHandle) util.StatsReadWriter { + return &statsReadWriter{statsHandler: statsHandler} +} + +// StatsMetaCountAndModifyCount reads count and modify_count for the given table from mysql.stats_meta. +func (s *statsReadWriter) StatsMetaCountAndModifyCount(tableID int64) (count, modifyCount int64, err error) { + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + count, modifyCount, _, err = StatsMetaCountAndModifyCount(sctx, tableID) + return err + }, util.FlagWrapTxn) + return +} + +// TableStatsFromStorage loads table stats info from storage. +func (s *statsReadWriter) TableStatsFromStorage(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (statsTbl *statistics.Table, err error) { + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + var ok bool + statsTbl, ok = s.statsHandler.Get(physicalID) + if !ok { + statsTbl = nil + } + statsTbl, err = TableStatsFromStorage(sctx, snapshot, tableInfo, physicalID, loadAll, s.statsHandler.Lease(), statsTbl) + return err + }, util.FlagWrapTxn) + return +} + +// SaveStatsToStorage saves the stats to storage. +// If count is negative, both count and modify count would not be used and not be written to the table. Unless, corresponding +// fields in the stats_meta table will be updated. +// TODO: refactor to reduce the number of parameters +func (s *statsReadWriter) SaveStatsToStorage(tableID int64, count, modifyCount int64, isIndex int, hg *statistics.Histogram, + cms *statistics.CMSketch, topN *statistics.TopN, statsVersion int, isAnalyzed int64, updateAnalyzeTime bool, source string) (err error) { + var statsVer uint64 + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + statsVer, err = SaveStatsToStorage(sctx, tableID, + count, modifyCount, isIndex, hg, cms, topN, statsVersion, isAnalyzed, updateAnalyzeTime) + return err + }) + if err == nil && statsVer != 0 { + s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source) + } + return +} + +// SaveMetaToStorage saves stats meta to the storage. +func (s *statsReadWriter) SaveMetaToStorage(tableID, count, modifyCount int64, source string) (err error) { + var statsVer uint64 + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + statsVer, err = SaveMetaToStorage(sctx, tableID, count, modifyCount) + return err + }) + if err == nil && statsVer != 0 { + s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source) + } + return +} + +// InsertExtendedStats inserts a record into mysql.stats_extended and update version in mysql.stats_meta. +func (s *statsReadWriter) InsertExtendedStats(statsName string, colIDs []int64, tp int, tableID int64, ifNotExists bool) (err error) { + var statsVer uint64 + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + statsVer, err = InsertExtendedStats(sctx, s.statsHandler, statsName, colIDs, tp, tableID, ifNotExists) + return err + }) + if err == nil && statsVer != 0 { + s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats") + } + return +} + +// MarkExtendedStatsDeleted update the status of mysql.stats_extended to be `deleted` and the version of mysql.stats_meta. +func (s *statsReadWriter) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExists bool) (err error) { + var statsVer uint64 + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + statsVer, err = MarkExtendedStatsDeleted(sctx, s.statsHandler, statsName, tableID, ifExists) + return err + }) + if err == nil && statsVer != 0 { + s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats") + } + return +} + +// SaveExtendedStatsToStorage writes extended stats of a table into mysql.stats_extended. +func (s *statsReadWriter) SaveExtendedStatsToStorage(tableID int64, extStats *statistics.ExtendedStatsColl, isLoad bool) (err error) { + var statsVer uint64 + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + statsVer, err = SaveExtendedStatsToStorage(sctx, tableID, extStats, isLoad) + return err + }) + if err == nil && statsVer != 0 { + s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats") + } + return +} + +// SaveStatsFromJSON saves stats from JSON to the storage. +func (s *statsReadWriter) SaveStatsFromJSON(tableInfo *model.TableInfo, physicalID int64, jsonTblI interface{}) error { + jsonTbl := jsonTblI.(*JSONTable) + tbl, err := TableStatsFromJSON(tableInfo, physicalID, jsonTbl) + if err != nil { + return errors.Trace(err) + } + + for _, col := range tbl.Columns { + // loadStatsFromJSON doesn't support partition table now. + // The table level count and modify_count would be overridden by the SaveMetaToStorage below, so we don't need + // to care about them here. + err = s.SaveStatsToStorage(tbl.PhysicalID, tbl.RealtimeCount, 0, 0, &col.Histogram, col.CMSketch, col.TopN, int(col.GetStatsVer()), 1, false, "load stats") + if err != nil { + return errors.Trace(err) + } + } + for _, idx := range tbl.Indices { + // loadStatsFromJSON doesn't support partition table now. + // The table level count and modify_count would be overridden by the SaveMetaToStorage below, so we don't need + // to care about them here. + err = s.SaveStatsToStorage(tbl.PhysicalID, tbl.RealtimeCount, 0, 1, &idx.Histogram, idx.CMSketch, idx.TopN, int(idx.GetStatsVer()), 1, false, "load stats") + if err != nil { + return errors.Trace(err) + } + } + err = s.SaveExtendedStatsToStorage(tbl.PhysicalID, tbl.ExtendedStats, true) + if err != nil { + return errors.Trace(err) + } + return s.SaveMetaToStorage(tbl.PhysicalID, tbl.RealtimeCount, tbl.ModifyCount, "load stats") +} + +// LoadNeededHistograms will load histograms for those needed columns/indices. +func (s *statsReadWriter) LoadNeededHistograms() (err error) { + err = util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + loadFMSketch := config.GetGlobalConfig().Performance.EnableLoadFMSketch + return LoadNeededHistograms(sctx, s.statsHandler, loadFMSketch) + }, util.FlagWrapTxn) + return err +} + +// ReloadExtendedStatistics drops the cache for extended statistics and reload data from mysql.stats_extended. +func (s *statsReadWriter) ReloadExtendedStatistics() error { + return util.CallWithSCtx(s.statsHandler.SPool(), func(sctx sessionctx.Context) error { + tables := make([]*statistics.Table, 0, s.statsHandler.Len()) + for _, tbl := range s.statsHandler.Values() { + t, err := ExtendedStatsFromStorage(sctx, tbl.Copy(), tbl.PhysicalID, true) + if err != nil { + return err + } + tables = append(tables, t) + } + s.statsHandler.UpdateStatsCache(tables, nil) + return nil + }, util.FlagWrapTxn) +} diff --git a/pkg/statistics/handle/util/interfaces.go b/pkg/statistics/handle/util/interfaces.go index eaf2581d25909e..955c30b4d58d63 100644 --- a/pkg/statistics/handle/util/interfaces.go +++ b/pkg/statistics/handle/util/interfaces.go @@ -111,6 +111,9 @@ type StatsAnalyze interface { // HandleAutoAnalyze analyzes the newly created table or index. HandleAutoAnalyze(is infoschema.InfoSchema) (analyzed bool) + + // CheckAnalyzeVersion checks whether all the statistics versions of this table's columns and indexes are the same. + CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalIDs []int64, version *int) bool } // StatsCache is used to manage all table statistics in memory. @@ -151,6 +154,9 @@ type StatsCache interface { // Replace replaces this cache. Replace(cache StatsCache) + + // UpdateStatsHealthyMetrics updates stats healthy distribution metrics according to stats cache. + UpdateStatsHealthyMetrics() } // StatsLockTable is the table info of which will be locked. @@ -204,6 +210,46 @@ type StatsLock interface { GetTableLockedAndClearForTest() (map[int64]struct{}, error) } +// StatsReadWriter is used to read and write stats to the storage. +type StatsReadWriter interface { + // TableStatsFromStorage loads table stats info from storage. + TableStatsFromStorage(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (statsTbl *statistics.Table, err error) + + // StatsMetaCountAndModifyCount reads count and modify_count for the given table from mysql.stats_meta. + StatsMetaCountAndModifyCount(tableID int64) (count, modifyCount int64, err error) + + // LoadNeededHistograms will load histograms for those needed columns/indices and put them into the cache. + LoadNeededHistograms() (err error) + + // ReloadExtendedStatistics drops the cache for extended statistics and reload data from mysql.stats_extended. + ReloadExtendedStatistics() error + + //// SaveTableStatsToStorage saves the stats of a table to storage. + //SaveTableStatsToStorage(results *statistics.AnalyzeResults, analyzeSnapshot bool, source string) (err error) + + // SaveStatsToStorage save the stats data to the storage. + SaveStatsToStorage(tableID int64, count, modifyCount int64, isIndex int, hg *statistics.Histogram, + cms *statistics.CMSketch, topN *statistics.TopN, statsVersion int, isAnalyzed int64, updateAnalyzeTime bool, source string) (err error) + + // SaveMetaToStorage saves stats meta to the storage. + SaveMetaToStorage(tableID, count, modifyCount int64, source string) (err error) + + // SaveStatsFromJSON saves stats from JSON to the storage. + // TODO: use *storage.JSONTable instead of interface{} (which is used to avoid cycle import). + SaveStatsFromJSON(tableInfo *model.TableInfo, physicalID int64, jsonTbl interface{}) error + + // Methods for extended stast. + + // InsertExtendedStats inserts a record into mysql.stats_extended and update version in mysql.stats_meta. + InsertExtendedStats(statsName string, colIDs []int64, tp int, tableID int64, ifNotExists bool) (err error) + + // MarkExtendedStatsDeleted update the status of mysql.stats_extended to be `deleted` and the version of mysql.stats_meta. + MarkExtendedStatsDeleted(statsName string, tableID int64, ifExists bool) (err error) + + // SaveExtendedStatsToStorage writes extended stats of a table into mysql.stats_extended. + SaveExtendedStatsToStorage(tableID int64, extStats *statistics.ExtendedStatsColl, isLoad bool) (err error) +} + // StatsHandle is used to manage TiDB Statistics. type StatsHandle interface { // GPool returns the goroutine pool. @@ -247,4 +293,7 @@ type StatsHandle interface { // StatsLock is used to manage locked stats. StatsLock + + // StatsReadWriter is used to read and write stats to the storage. + StatsReadWriter }