Skip to content

Commit

Permalink
planner: create dedicated interfaces for statsgc, statsusage (#47503
Browse files Browse the repository at this point in the history
)

ref #46905
  • Loading branch information
qw4990 authored Oct 10, 2023
1 parent 41ba7bf commit 9e4922a
Show file tree
Hide file tree
Showing 11 changed files with 251 additions and 141 deletions.
2 changes: 0 additions & 2 deletions statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ go_library(
"bootstrap.go",
"ddl.go",
"dump.go",
"gc.go",
"handle.go",
"handle_hist.go",
"lock_stats_handler.go",
Expand Down Expand Up @@ -39,7 +38,6 @@ go_library(
"//statistics/handle/storage",
"//statistics/handle/usage",
"//statistics/handle/util",
"//table",
"//types",
"//util",
"//util/chunk",
Expand Down
6 changes: 3 additions & 3 deletions statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache *cache.Stat
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
physicalID := row.GetInt64(1)
// The table is read-only. Please do not modify it.
table, ok := h.getTableByPhysicalID(is, physicalID)
table, ok := h.TableInfoByID(is, physicalID)
if !ok {
logutil.BgLogger().Debug("unknown physical ID in stats meta table, maybe it has been dropped", zap.Int64("ID", physicalID))
continue
Expand Down Expand Up @@ -105,7 +105,7 @@ func (h *Handle) initStatsHistograms4ChunkLite(is infoschema.InfoSchema, cache *
statsVer := row.GetInt64(7)
flag := row.GetInt64(9)
lastAnalyzePos := row.GetDatum(10, types.NewFieldType(mysql.TypeBlob))
tbl, _ := h.getTableByPhysicalID(is, table.PhysicalID)
tbl, _ := h.TableInfoByID(is, table.PhysicalID)
if isIndex > 0 {
var idxInfo *model.IndexInfo
for _, idx := range tbl.Meta().Indices {
Expand Down Expand Up @@ -171,7 +171,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache *cach
}
id, ndv, nullCount, version, totColSize := row.GetInt64(2), row.GetInt64(3), row.GetInt64(5), row.GetUint64(4), row.GetInt64(7)
lastAnalyzePos := row.GetDatum(11, types.NewFieldType(mysql.TypeBlob))
tbl, _ := h.getTableByPhysicalID(is, table.PhysicalID)
tbl, _ := h.TableInfoByID(is, table.PhysicalID)
if row.GetInt64(1) > 0 {
var idxInfo *model.IndexInfo
for _, idx := range tbl.Meta().Indices {
Expand Down
48 changes: 0 additions & 48 deletions statistics/handle/gc.go

This file was deleted.

93 changes: 20 additions & 73 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package handle
import (
"fmt"
"math"
"sync"
"time"

"github.com/pingcap/errors"
Expand All @@ -38,7 +37,6 @@ import (
"github.com/pingcap/tidb/statistics/handle/storage"
"github.com/pingcap/tidb/statistics/handle/usage"
"github.com/pingcap/tidb/statistics/handle/util"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
Expand All @@ -49,16 +47,26 @@ import (

// Handle can update stats info periodically.
type Handle struct {
// This gpool is used to reuse goroutine in the mergeGlobalStatsTopN.
gpool *gp.Pool
pool util.SessionPool
pool util.SessionPool

// initStatsCtx is the ctx only used for initStats
initStatsCtx sessionctx.Context

// sysProcTracker is used to track sys process like analyze
sysProcTracker sessionctx.SysProcTracker

// TableInfoGetter is used to fetch table meta info.
util.TableInfoGetter

// StatsGC is used to GC stats.
util.StatsGC

// StatsUsage is used to track the usage of column / index statistics.
util.StatsUsage

// This gpool is used to reuse goroutine in the mergeGlobalStatsTopN.
gpool *gp.Pool

// autoAnalyzeProcIDGetter is used to generate auto analyze ID.
autoAnalyzeProcIDGetter func() uint64

Expand Down Expand Up @@ -87,14 +95,6 @@ type Handle struct {
// StatsLoad is used to load stats concurrently
StatsLoad StatsLoad

schemaMu struct {
// pid2tid is the map from partition ID to table ID.
pid2tid map[int64]int64
// schemaVersion is the version of information schema when `pid2tid` is built.
schemaVersion int64
sync.RWMutex
}

lease atomic2.Duration
}

Expand Down Expand Up @@ -135,7 +135,11 @@ func NewHandle(_, initStatsCtx sessionctx.Context, lease time.Duration, pool uti
sysProcTracker: tracker,
autoAnalyzeProcIDGetter: autoAnalyzeProcIDGetter,
InitStatsDone: make(chan struct{}),
TableInfoGetter: util.NewTableInfoGetter(),
StatsUsage: usage.NewStatsUsageImpl(pool),
}
handle.StatsGC = storage.NewStatsGC(pool, lease, handle.TableInfoGetter, handle.MarkExtendedStatsDeleted)

handle.initStatsCtx = initStatsCtx
handle.lease.Store(lease)
statsCache, err := cache.NewStatsCachePointer()
Expand Down Expand Up @@ -217,7 +221,7 @@ func (h *Handle) Update(is infoschema.InfoSchema) error {
physicalID := row.GetInt64(1)
modifyCount := row.GetInt64(2)
count := row.GetInt64(3)
table, ok := h.getTableByPhysicalID(is, physicalID)
table, ok := h.TableInfoByID(is, physicalID)
if !ok {
logutil.BgLogger().Debug("unknown physical ID in stats meta table, maybe it has been dropped", zap.Int64("ID", physicalID))
deletedTableIDs = append(deletedTableIDs, physicalID)
Expand Down Expand Up @@ -257,7 +261,7 @@ func (h *Handle) MergePartitionStats2GlobalStatsByTableID(
histIDs []int64,
allPartitionStats map[int64]*statistics.Table,
) (globalStats *globalstats.GlobalStats, err error) {
return globalstats.MergePartitionStats2GlobalStatsByTableID(sc, h.gpool, opts, is, physicalID, isIndex, histIDs, allPartitionStats, h.getTableByPhysicalID, h.loadTablePartitionStats)
return globalstats.MergePartitionStats2GlobalStatsByTableID(sc, h.gpool, opts, is, physicalID, isIndex, histIDs, allPartitionStats, h.TableInfoByID, h.loadTablePartitionStats)
}

func (h *Handle) loadTablePartitionStats(tableInfo *model.TableInfo, partitionDef *model.PartitionDefinition) (*statistics.Table, error) {
Expand Down Expand Up @@ -285,42 +289,12 @@ func (h *Handle) mergePartitionStats2GlobalStats(
allPartitionStats map[int64]*statistics.Table,
) (gstats *globalstats.GlobalStats, err error) {
err = h.callWithSCtx(func(sctx sessionctx.Context) error {
gstats, err = globalstats.MergePartitionStats2GlobalStats(sctx, h.gpool, opts, is, globalTableInfo, isIndex, histIDs, allPartitionStats, h.getTableByPhysicalID, h.loadTablePartitionStats)
gstats, err = globalstats.MergePartitionStats2GlobalStats(sctx, h.gpool, opts, is, globalTableInfo, isIndex, histIDs, allPartitionStats, h.TableInfoByID, h.loadTablePartitionStats)
return err
})
return
}

func (h *Handle) getTableByPhysicalID(is infoschema.InfoSchema, physicalID int64) (table.Table, bool) {
h.schemaMu.Lock()
defer h.schemaMu.Unlock()
if is.SchemaMetaVersion() != h.schemaMu.schemaVersion {
h.schemaMu.schemaVersion = is.SchemaMetaVersion()
h.schemaMu.pid2tid = buildPartitionID2TableID(is)
}
if id, ok := h.schemaMu.pid2tid[physicalID]; ok {
return is.TableByID(id)
}
return is.TableByID(physicalID)
}

func buildPartitionID2TableID(is infoschema.InfoSchema) map[int64]int64 {
mapper := make(map[int64]int64)
for _, db := range is.AllSchemas() {
tbls := db.Tables
for _, tbl := range tbls {
pi := tbl.GetPartitionInfo()
if pi == nil {
continue
}
for _, def := range pi.Definitions {
mapper[def.ID] = tbl.ID
}
}
}
return mapper
}

// GetMemConsumed returns the mem size of statscache consumed
func (h *Handle) GetMemConsumed() (size int64) {
size = h.statsCache.Load().Cost()
Expand Down Expand Up @@ -562,33 +536,6 @@ func (h *Handle) CheckAnalyzeVersion(tblInfo *model.TableInfo, physicalIDs []int
return statistics.CheckAnalyzeVerOnTable(tbl, version)
}

// LoadColumnStatsUsage loads column stats usage information from disk.
func (h *Handle) LoadColumnStatsUsage(loc *time.Location) (colStatsMap map[model.TableItemID]usage.ColStatsTimeInfo, err error) {
err = h.callWithSCtx(func(sctx sessionctx.Context) error {
colStatsMap, err = usage.LoadColumnStatsUsage(sctx, loc)
return err
})
return
}

// CollectColumnsInExtendedStats returns IDs of the columns involved in extended stats.
func (h *Handle) CollectColumnsInExtendedStats(tableID int64) (columnIDs []int64, err error) {
err = h.callWithSCtx(func(sctx sessionctx.Context) error {
columnIDs, err = usage.CollectColumnsInExtendedStats(sctx, tableID)
return err
})
return
}

// GetPredicateColumns returns IDs of predicate columns, which are the columns whose stats are used(needed) when generating query plans.
func (h *Handle) GetPredicateColumns(tableID int64) (columnIDs []int64, err error) {
err = h.callWithSCtx(func(sctx sessionctx.Context) error {
columnIDs, err = usage.GetPredicateColumns(sctx, tableID)
return err
})
return
}

// RecordHistoricalStatsToStorage records the given table's stats data to mysql.stats_history
func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo, physicalID int64, isPartition bool) (uint64, error) {
var js *storage.JSONTable
Expand Down
1 change: 0 additions & 1 deletion statistics/handle/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ go_library(
"//statistics/handle/cache",
"//statistics/handle/lockstats",
"//statistics/handle/util",
"//table",
"//types",
"//util/chunk",
"//util/compress",
Expand Down
53 changes: 48 additions & 5 deletions statistics/handle/storage/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,62 @@ import (
"github.com/pingcap/tidb/statistics/handle/cache"
"github.com/pingcap/tidb/statistics/handle/lockstats"
"github.com/pingcap/tidb/statistics/handle/util"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)

// statsGCImpl implements StatsGC interface.
type statsGCImpl struct {
pool util.SessionPool // used to recycle sessionctx.
tblInfoGetter util.TableInfoGetter

// TODO: it's ugly to use a raw function, solve it later on.
markExtendedStatsDeleted func(statsName string, tableID int64, ifExists bool) (err error)
statsLease time.Duration // statistics lease
}

// NewStatsGC creates a new StatsGC.
func NewStatsGC(pool util.SessionPool, statsLease time.Duration, tblInfo util.TableInfoGetter,
markExtendedStatsDeleted func(statsName string, tableID int64, ifExists bool) (err error)) util.StatsGC {
return &statsGCImpl{
pool: pool,
statsLease: statsLease,
tblInfoGetter: tblInfo,
markExtendedStatsDeleted: markExtendedStatsDeleted,
}
}

// GCStats will garbage collect the useless stats' info.
// For dropped tables, we will first update their version
// so that other tidb could know that table is deleted.
func (gc *statsGCImpl) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) (err error) {
return util.CallWithSCtx(gc.pool, func(sctx sessionctx.Context) error {
return GCStats(sctx, gc.tblInfoGetter, gc.markExtendedStatsDeleted, is, gc.statsLease, ddlLease)
})
}

// ClearOutdatedHistoryStats clear outdated historical stats.
// Only for test.
func (gc *statsGCImpl) ClearOutdatedHistoryStats() error {
return util.CallWithSCtx(gc.pool, ClearOutdatedHistoryStats)
}

// DeleteTableStatsFromKV deletes table statistics from kv.
// A statsID refers to statistic of a table or a partition.
func (gc *statsGCImpl) DeleteTableStatsFromKV(statsIDs []int64) (err error) {
return util.CallWithSCtx(gc.pool, func(sctx sessionctx.Context) error {
return DeleteTableStatsFromKV(sctx, statsIDs)
}, util.FlagWrapTxn)
}

// GCStats will garbage collect the useless stats' info.
// For dropped tables, we will first update their version
// so that other tidb could know that table is deleted.
func GCStats(sctx sessionctx.Context,
getTableByPhysicalID func(is infoschema.InfoSchema, physicalID int64) (table.Table, bool),
tableInfoGetter util.TableInfoGetter,
markExtendedStatsDeleted func(statsName string, tableID int64, ifExists bool) (err error),
is infoschema.InfoSchema, statsLease, ddlLease time.Duration) (err error) {
// To make sure that all the deleted tables' schema and stats info have been acknowledged to all tidb,
Expand Down Expand Up @@ -72,7 +115,7 @@ func GCStats(sctx sessionctx.Context,
return errors.Trace(err)
}
for _, row := range rows {
if err := gcTableStats(sctx, getTableByPhysicalID, markExtendedStatsDeleted, is, row.GetInt64(0)); err != nil {
if err := gcTableStats(sctx, tableInfoGetter, markExtendedStatsDeleted, is, row.GetInt64(0)); err != nil {
return errors.Trace(err)
}
_, existed := is.TableByID(row.GetInt64(0))
Expand Down Expand Up @@ -218,7 +261,7 @@ func removeDeletedExtendedStats(sctx sessionctx.Context, version uint64) (err er

// gcTableStats GC this table's stats.
func gcTableStats(sctx sessionctx.Context,
getTableByPhysicalID func(is infoschema.InfoSchema, physicalID int64) (table.Table, bool),
tableInfoGetter util.TableInfoGetter,
markExtendedStatsDeleted func(statsName string, tableID int64, ifExists bool) (err error),
is infoschema.InfoSchema, physicalID int64) error {
rows, _, err := util.ExecRows(sctx, "select is_index, hist_id from mysql.stats_histograms where table_id = %?", physicalID)
Expand All @@ -234,7 +277,7 @@ func gcTableStats(sctx sessionctx.Context,
}
cache.TableRowStatsCache.Invalidate(physicalID)
}
tbl, ok := getTableByPhysicalID(is, physicalID)
tbl, ok := tableInfoGetter.TableInfoByID(is, physicalID)
if !ok {
logutil.BgLogger().Info("remove stats in GC due to dropped table", zap.Int64("table_id", physicalID))
return util.WrapTxn(sctx, func(sctx sessionctx.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ var (
// 3. If the stats delta haven't been dumped in the past hour, then return true.
// 4. If the table stats is pseudo or empty or `Modify Count / Table Count` exceeds the threshold.
func (h *Handle) needDumpStatsDelta(is infoschema.InfoSchema, mode dumpMode, id int64, item variable.TableDelta, currentTime time.Time) bool {
tbl, ok := h.getTableByPhysicalID(is, id)
tbl, ok := h.TableInfoByID(is, id)
if !ok {
return false
}
Expand Down
Loading

0 comments on commit 9e4922a

Please sign in to comment.