Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: debug the update #57638

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 124 additions & 68 deletions pkg/statistics/handle/cache/statscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package cache

import (
"context"
"runtime"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -65,7 +67,11 @@ func NewStatsCacheImplForTest() (types.StatsCache, error) {

// Update reads stats meta from store and updates the stats map.
func (s *StatsCacheImpl) Update(ctx context.Context, is infoschema.InfoSchema) error {
start := time.Now()
totalStart := time.Now()
logger := logutil.BgLogger()

// Step 1: Get version and execute query
queryStart := time.Now()
lastVersion := s.GetNextCheckVersionWithOffset()
var (
rows []chunk.Row
Expand All @@ -74,85 +80,135 @@ func (s *StatsCacheImpl) Update(ctx context.Context, is infoschema.InfoSchema) e
if err := util.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error {
rows, _, err = util.ExecRows(
sctx,
"SELECT version, table_id, modify_count, count, snapshot from mysql.stats_meta where version > %? order by version",
"SELECT version, table_id, modify_count, count, snapshot from mysql.stats_meta where version > %? order by version limit 2000",
lastVersion,
)
return err
}); err != nil {
return errors.Trace(err)
}
logger.Info("stats cache query completed",
zap.Duration("query_time", time.Since(queryStart)),
zap.Int("rows_count", len(rows)))

tables := make([]*statistics.Table, 0, len(rows))
deletedTableIDs := make([]int64, 0, len(rows))
// Step 2: Process rows
processStart := time.Now()
var (
tables = make([]*statistics.Table, 0, len(rows))
deletedTableIDs = make([]int64, 0, len(rows))
mu sync.Mutex
wg sync.WaitGroup
skipCount, errorCount, deletedCount, updatedCount int
)

for _, row := range rows {
version := row.GetUint64(0)
physicalID := row.GetInt64(1)
modifyCount := row.GetInt64(2)
count := row.GetInt64(3)
snapshot := row.GetUint64(4)

// Detect the context cancel signal, since it may take a long time for the loop.
// TODO: add context to TableInfoByID and remove this code block?
if ctx.Err() != nil {
return ctx.Err()
}
workers := runtime.GOMAXPROCS(0) / 2
rowCh := make(chan chunk.Row, workers)

table, ok := s.statsHandle.TableInfoByID(is, physicalID)
if !ok {
logutil.BgLogger().Debug(
"unknown physical ID in stats meta table, maybe it has been dropped",
zap.Int64("ID", physicalID),
)
deletedTableIDs = append(deletedTableIDs, physicalID)
continue
}
tableInfo := table.Meta()
// If the table is not updated, we can skip it.
if oldTbl, ok := s.Get(physicalID); ok &&
oldTbl.Version >= version &&
tableInfo.UpdateTS == oldTbl.TblInfoUpdateTS {
continue
}
tbl, err := s.statsHandle.TableStatsFromStorage(
tableInfo,
physicalID,
false,
0,
)
// Error is not nil may mean that there are some ddl changes on this table, we will not update it.
if err != nil {
statslogutil.StatsLogger().Error(
"error occurred when read table stats",
zap.String("table", tableInfo.Name.O),
zap.Error(err),
)
continue
}
if tbl == nil {
deletedTableIDs = append(deletedTableIDs, physicalID)
continue
}
tbl.Version = version
tbl.RealtimeCount = count
tbl.ModifyCount = modifyCount
tbl.TblInfoUpdateTS = tableInfo.UpdateTS
// It only occurs in the following situations:
// 1. The table has already been analyzed,
// but because the predicate columns feature is turned on, and it doesn't have any columns or indexes analyzed,
// it only analyzes _row_id and refreshes stats_meta, in which case the snapshot is not zero.
// 2. LastAnalyzeVersion is 0 because it has never been loaded.
// In this case, we can initialize LastAnalyzeVersion to the snapshot,
// otherwise auto-analyze will assume that the table has never been analyzed and try to analyze it again.
if tbl.LastAnalyzeVersion == 0 && snapshot != 0 {
tbl.LastAnalyzeVersion = snapshot
}
tables = append(tables, tbl)
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for row := range rowCh {
if ctx.Err() != nil {
return
}

version := row.GetUint64(0)
physicalID := row.GetInt64(1)
modifyCount := row.GetInt64(2)
count := row.GetInt64(3)
snapshot := row.GetUint64(4)

table, ok := s.statsHandle.TableInfoByID(is, physicalID)
if !ok {
mu.Lock()
deletedCount++
deletedTableIDs = append(deletedTableIDs, physicalID)
mu.Unlock()
continue
}

tableInfo := table.Meta()
if oldTbl, ok := s.Get(physicalID); ok &&
oldTbl.Version >= version &&
tableInfo.UpdateTS == oldTbl.TblInfoUpdateTS {
mu.Lock()
skipCount++
mu.Unlock()
continue
}

tbl, err := s.statsHandle.TableStatsFromStorage(
tableInfo,
physicalID,
false,
0,
)
if err != nil {
mu.Lock()
errorCount++
mu.Unlock()
statslogutil.StatsLogger().Error(
"error occurred when read table stats",
zap.String("table", tableInfo.Name.O),
zap.Error(err),
)
continue
}
if tbl == nil {
mu.Lock()
deletedCount++
deletedTableIDs = append(deletedTableIDs, physicalID)
mu.Unlock()
continue
}

tbl.Version = version
tbl.RealtimeCount = count
tbl.ModifyCount = modifyCount
tbl.TblInfoUpdateTS = tableInfo.UpdateTS
if tbl.LastAnalyzeVersion == 0 && snapshot != 0 {
tbl.LastAnalyzeVersion = snapshot
}

mu.Lock()
tables = append(tables, tbl)
updatedCount++
currentUpdated := updatedCount
mu.Unlock()

if currentUpdated%100 == 0 {
logger.Info("stats cache processing progress",
zap.Int("processed_tables", currentUpdated))
}
}
}()
}

for _, row := range rows {
rowCh <- row
}
close(rowCh)
wg.Wait()

logger.Info("stats cache processing completed",
zap.Duration("process_time", time.Since(processStart)),
zap.Int("total_rows", len(rows)),
zap.Int("skipped", skipCount),
zap.Int("errors", errorCount),
zap.Int("deleted", deletedCount),
zap.Int("updated", updatedCount))

// Step 3: Update cache
updateStart := time.Now()
s.UpdateStatsCache(tables, deletedTableIDs)
dur := time.Since(start)
tidbmetrics.StatsDeltaLoadHistogram.Observe(dur.Seconds())

totalDur := time.Since(totalStart)
logger.Info("stats cache update completed",
zap.Duration("total_time", totalDur),
zap.Duration("cache_update_time", time.Since(updateStart)))

tidbmetrics.StatsDeltaLoadHistogram.Observe(totalDur.Seconds())
return nil
}

Expand Down