Skip to content

Commit

Permalink
statistics: avoid stats meta full load after table analysis (#57756)
Browse files Browse the repository at this point in the history
close #57631
  • Loading branch information
Rustin170506 authored Dec 3, 2024
1 parent 8fe0618 commit f585f5d
Show file tree
Hide file tree
Showing 12 changed files with 134 additions and 32 deletions.
10 changes: 9 additions & 1 deletion pkg/executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
if len(tasks) == 0 {
return nil
}
tableAndPartitionIDs := make([]int64, 0, len(tasks))
for _, task := range tasks {
tableID := getTableIDFromTask(task)
tableAndPartitionIDs = append(tableAndPartitionIDs, tableID.TableID)
if tableID.IsPartitionTable() {
tableAndPartitionIDs = append(tableAndPartitionIDs, tableID.PartitionID)
}
}

// Get the min number of goroutines for parallel execution.
buildStatsConcurrency, err := getBuildStatsConcurrency(e.Ctx())
Expand Down Expand Up @@ -186,7 +194,7 @@ TASKLOOP:
if err != nil {
sessionVars.StmtCtx.AppendWarning(err)
}
return statsHandle.Update(ctx, infoSchema)
return statsHandle.Update(ctx, infoSchema, tableAndPartitionIDs...)
}

func (e *AnalyzeExec) waitFinish(ctx context.Context, g *errgroup.Group, resultsCh chan *statistics.AnalyzeResults) error {
Expand Down
12 changes: 9 additions & 3 deletions pkg/statistics/handle/cache/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ func benchCopyAndUpdate(b *testing.B, c types.StatsCache) {
defer wg.Done()
t1 := testutil.NewMockStatisticsTable(1, 1, true, false, false)
t1.PhysicalID = rand.Int63()
c.UpdateStatsCache([]*statistics.Table{t1}, nil)
c.UpdateStatsCache(types.CacheUpdate{
Updated: []*statistics.Table{t1},
})
}()
}
wg.Wait()
Expand All @@ -51,7 +53,9 @@ func benchPutGet(b *testing.B, c types.StatsCache) {
defer wg.Done()
t1 := testutil.NewMockStatisticsTable(1, 1, true, false, false)
t1.PhysicalID = rand.Int63()
c.UpdateStatsCache([]*statistics.Table{t1}, nil)
c.UpdateStatsCache(types.CacheUpdate{
Updated: []*statistics.Table{t1},
})
}(i)
}
for i := 0; i < b.N; i++ {
Expand All @@ -73,7 +77,9 @@ func benchGet(b *testing.B, c types.StatsCache) {
defer w.Done()
t1 := testutil.NewMockStatisticsTable(1, 1, true, false, false)
t1.PhysicalID = rand.Int63()
c.UpdateStatsCache([]*statistics.Table{t1}, nil)
c.UpdateStatsCache(types.CacheUpdate{
Updated: []*statistics.Table{t1},
})
}(i)
}
w.Wait()
Expand Down
48 changes: 36 additions & 12 deletions pkg/statistics/handle/cache/statscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package cache

import (
"context"
"slices"
"strconv"
"sync/atomic"
"time"

Expand Down Expand Up @@ -64,19 +66,35 @@ func NewStatsCacheImplForTest() (types.StatsCache, error) {
}

// Update reads stats meta from store and updates the stats map.
func (s *StatsCacheImpl) Update(ctx context.Context, is infoschema.InfoSchema) error {
func (s *StatsCacheImpl) Update(ctx context.Context, is infoschema.InfoSchema, tableAndPartitionIDs ...int64) error {
start := time.Now()
lastVersion := s.GetNextCheckVersionWithOffset()
var (
rows []chunk.Row
err error
skipMoveForwardStatsCache bool
rows []chunk.Row
err error
)
if err := util.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error {
rows, _, err = util.ExecRows(
sctx,
"SELECT version, table_id, modify_count, count, snapshot from mysql.stats_meta where version > %? order by version",
lastVersion,
)
query := "SELECT version, table_id, modify_count, count, snapshot from mysql.stats_meta where version > %? "
args := []any{lastVersion}

if len(tableAndPartitionIDs) > 0 {
// When updating specific tables, we skip incrementing the max stats version to avoid missing
// delta updates for other tables. The max version only advances when doing a full update.
skipMoveForwardStatsCache = true
// Sort and deduplicate the table IDs to remove duplicates
slices.Sort(tableAndPartitionIDs)
tableAndPartitionIDs = slices.Compact(tableAndPartitionIDs)
// Convert table IDs to strings since the SQL executor only accepts string arrays for IN clauses
tableStringIDs := make([]string, 0, len(tableAndPartitionIDs))
for _, tableID := range tableAndPartitionIDs {
tableStringIDs = append(tableStringIDs, strconv.FormatInt(tableID, 10))
}
query += "and table_id in (%?) "
args = append(args, tableStringIDs)
}
query += "order by version"
rows, _, err = util.ExecRows(sctx, query, args...)
return err
}); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -150,7 +168,13 @@ func (s *StatsCacheImpl) Update(ctx context.Context, is infoschema.InfoSchema) e
tables = append(tables, tbl)
}

s.UpdateStatsCache(tables, deletedTableIDs)
s.UpdateStatsCache(types.CacheUpdate{
Updated: tables,
Deleted: deletedTableIDs,
Options: types.UpdateOptions{
SkipMoveForward: skipMoveForwardStatsCache,
},
})
dur := time.Since(start)
tidbmetrics.StatsDeltaLoadHistogram.Observe(dur.Seconds())
return nil
Expand Down Expand Up @@ -191,12 +215,12 @@ func (s *StatsCacheImpl) replace(newCache *StatsCache) {
}

// UpdateStatsCache updates the cache with the new cache.
func (s *StatsCacheImpl) UpdateStatsCache(tables []*statistics.Table, deletedIDs []int64) {
func (s *StatsCacheImpl) UpdateStatsCache(cacheUpdate types.CacheUpdate) {
if enableQuota := config.GetGlobalConfig().Performance.EnableStatsCacheMemQuota; enableQuota {
s.Load().Update(tables, deletedIDs)
s.Load().Update(cacheUpdate.Updated, cacheUpdate.Deleted, cacheUpdate.Options.SkipMoveForward)
} else {
// TODO: remove this branch because we will always enable quota.
newCache := s.Load().CopyAndUpdate(tables, deletedIDs)
newCache := s.Load().CopyAndUpdate(cacheUpdate.Updated, cacheUpdate.Deleted)
s.replace(newCache)
}
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/statistics/handle/cache/statscacheinner.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (sc *StatsCache) CopyAndUpdate(tables []*statistics.Table, deletedIDs []int
}

// Update updates the new statistics table cache.
func (sc *StatsCache) Update(tables []*statistics.Table, deletedIDs []int64) {
func (sc *StatsCache) Update(tables []*statistics.Table, deletedIDs []int64, skipMoveForwardStatsCache bool) {
for _, tbl := range tables {
id := tbl.PhysicalID
metrics.UpdateCounter.Inc()
Expand All @@ -174,10 +174,12 @@ func (sc *StatsCache) Update(tables []*statistics.Table, deletedIDs []int64) {
sc.c.Del(id)
}

// update the maxTblStatsVer
for _, t := range tables {
if oldVersion := sc.maxTblStatsVer.Load(); t.Version > oldVersion {
sc.maxTblStatsVer.CompareAndSwap(oldVersion, t.Version)
if !skipMoveForwardStatsCache {
// update the maxTblStatsVer
for _, t := range tables {
if oldVersion := sc.maxTblStatsVer.Load(); t.Version > oldVersion {
sc.maxTblStatsVer.CompareAndSwap(oldVersion, t.Version)
}
}
}
}
4 changes: 3 additions & 1 deletion pkg/statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ func (h *Handle) getPartitionStats(tblInfo *model.TableInfo, pid int64, returnPs
tbl = statistics.PseudoTable(tblInfo, false, true)
tbl.PhysicalID = pid
if tblInfo.GetPartitionInfo() == nil || h.Len() < 64 {
h.UpdateStatsCache([]*statistics.Table{tbl}, nil)
h.UpdateStatsCache(types.CacheUpdate{
Updated: []*statistics.Table{tbl},
})
}
return tbl
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/handle/handletest/statstest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 12,
shard_count = 13,
deps = [
"//pkg/config",
"//pkg/parser/model",
Expand Down
32 changes: 32 additions & 0 deletions pkg/statistics/handle/handletest/statstest/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,38 @@ import (
"github.com/stretchr/testify/require"
)

func TestStatsCacheProcess(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
testKit := testkit.NewTestKit(t, store)
testKit.MustExec("use test")
testKit.MustExec("create table t (c1 int, c2 int)")
testKit.MustExec("insert into t values(1, 2)")
analyzehelper.TriggerPredicateColumnsCollection(t, testKit, store, "t", "c1", "c2")
do := dom
is := do.InfoSchema()
tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tableInfo := tbl.Meta()
statsTbl := do.StatsHandle().GetTableStats(tableInfo)
require.True(t, statsTbl.Pseudo)
require.Zero(t, statsTbl.Version)
currentVersion := do.StatsHandle().MaxTableStatsVersion()
testKit.MustExec("analyze table t")
statsTbl = do.StatsHandle().GetTableStats(tableInfo)
require.False(t, statsTbl.Pseudo)
require.NotZero(t, statsTbl.Version)
require.Equal(t, currentVersion, do.StatsHandle().MaxTableStatsVersion())
newVersion := do.StatsHandle().GetNextCheckVersionWithOffset()
require.Equal(t, currentVersion, newVersion, "analyze should not move forward the stats cache version")

// Insert more rows
testKit.MustExec("insert into t values(2, 3)")
require.NoError(t, do.StatsHandle().DumpStatsDeltaToKV(true))
require.NoError(t, do.StatsHandle().Update(context.Background(), is))
newVersion = do.StatsHandle().MaxTableStatsVersion()
require.NotEqual(t, currentVersion, newVersion, "update with no table should move forward the stats cache version")
}

func TestStatsCache(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
testKit := testkit.NewTestKit(t, store)
Expand Down
12 changes: 9 additions & 3 deletions pkg/statistics/handle/storage/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,9 @@ func loadNeededColumnHistograms(sctx sessionctx.Context, statsHandle statstypes.
if loadNeeded && !analyzed {
fakeCol := statistics.EmptyColumn(tblInfo.ID, tblInfo.PKIsHandle, colInfo)
statsTbl.SetCol(col.ID, fakeCol)
statsHandle.UpdateStatsCache([]*statistics.Table{statsTbl}, nil)
statsHandle.UpdateStatsCache(statstypes.CacheUpdate{
Updated: []*statistics.Table{statsTbl},
})
}
asyncload.AsyncLoadHistogramNeededItems.Delete(col)
return nil
Expand Down Expand Up @@ -720,7 +722,9 @@ func loadNeededColumnHistograms(sctx sessionctx.Context, statsHandle statstypes.
}
}
statsTbl.SetCol(col.ID, colHist)
statsHandle.UpdateStatsCache([]*statistics.Table{statsTbl}, nil)
statsHandle.UpdateStatsCache(statstypes.CacheUpdate{
Updated: []*statistics.Table{statsTbl},
})
asyncload.AsyncLoadHistogramNeededItems.Delete(col)
if col.IsSyncLoadFailed {
logutil.BgLogger().Warn("Hist for column should already be loaded as sync but not found.",
Expand Down Expand Up @@ -782,7 +786,9 @@ func loadNeededIndexHistograms(sctx sessionctx.Context, is infoschema.InfoSchema
tbl.LastAnalyzeVersion = max(tbl.LastAnalyzeVersion, idxHist.LastUpdateVersion)
}
tbl.SetIdx(idx.ID, idxHist)
statsHandle.UpdateStatsCache([]*statistics.Table{tbl}, nil)
statsHandle.UpdateStatsCache(statstypes.CacheUpdate{
Updated: []*statistics.Table{tbl},
})
if idx.IsSyncLoadFailed {
logutil.BgLogger().Warn("Hist for index should already be loaded as sync but not found.",
zap.Int64("table_id", idx.TableID),
Expand Down
4 changes: 3 additions & 1 deletion pkg/statistics/handle/storage/stats_read_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,9 @@ func (s *statsReadWriter) ReloadExtendedStatistics() error {
}
tables = append(tables, t)
}
s.statsHandler.UpdateStatsCache(tables, nil)
s.statsHandler.UpdateStatsCache(statstypes.CacheUpdate{
Updated: tables,
})
return nil
}, util.FlagWrapTxn)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/statistics/handle/storage/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ func removeExtendedStatsItem(statsCache types.StatsCache,
}
newTbl := tbl.Copy()
delete(newTbl.ExtendedStats.Stats, statsName)
statsCache.UpdateStatsCache([]*statistics.Table{newTbl}, nil)
statsCache.UpdateStatsCache(types.CacheUpdate{
Updated: []*statistics.Table{newTbl},
})
}

var changeGlobalStatsTables = []string{
Expand Down
4 changes: 3 additions & 1 deletion pkg/statistics/handle/syncload/stats_syncload.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,8 @@ func (s *statsSyncLoad) updateCachedItem(tblInfo table.Table, item model.TableIt
tbl.StatsVer = statistics.Version0
}
}
s.statsHandle.UpdateStatsCache([]*statistics.Table{tbl}, nil)
s.statsHandle.UpdateStatsCache(statstypes.CacheUpdate{
Updated: []*statistics.Table{tbl},
})
return true
}
22 changes: 19 additions & 3 deletions pkg/statistics/handle/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,22 @@ type StatsAnalyze interface {
Close()
}

// CacheUpdate encapsulates changes to be made to the stats cache
type CacheUpdate struct {
Updated []*statistics.Table
Deleted []int64
Options UpdateOptions
}

// UpdateOptions contains configuration for cache updates
type UpdateOptions struct {
// SkipMoveForward controls whether to skip updating the cache's max version number.
// When true, the cache max version number stays unchanged even after updates.
// This improves performance when analyzing a small number of tables by avoiding
// unnecessary full cache reloads that would normally be triggered by version changes.
SkipMoveForward bool
}

// StatsCache is used to manage all table statistics in memory.
type StatsCache interface {
// Close closes this cache.
Expand All @@ -204,7 +220,7 @@ type StatsCache interface {
Clear()

// Update reads stats meta from store and updates the stats map.
Update(ctx context.Context, is infoschema.InfoSchema) error
Update(ctx context.Context, is infoschema.InfoSchema, tableAndPartitionIDs ...int64) error

// MemConsumed returns its memory usage.
MemConsumed() (size int64)
Expand All @@ -215,8 +231,8 @@ type StatsCache interface {
// Put puts this table stats into the cache.
Put(tableID int64, t *statistics.Table)

// UpdateStatsCache updates the cache.
UpdateStatsCache(addedTables []*statistics.Table, deletedTableIDs []int64)
// UpdateStatsCache applies a batch of changes to the cache
UpdateStatsCache(update CacheUpdate)

// GetNextCheckVersionWithOffset returns the last version with offset.
// It is used to fetch updated statistics from the stats meta table.
Expand Down

0 comments on commit f585f5d

Please sign in to comment.