Skip to content

Commit

Permalink
statistics: support historical stats dump partition table (#40310)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer authored Jan 5, 2023
1 parent be112dc commit d027270
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 24 deletions.
18 changes: 14 additions & 4 deletions domain/historical_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package domain
import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics/handle"
)
Expand Down Expand Up @@ -48,18 +49,27 @@ func (w *HistoricalStatsWorker) DumpHistoricalStats(tableID int64, statsHandle *
}
sctx := w.sctx
is := GetDomain(sctx).InfoSchema()
isPartition := false
var tblInfo *model.TableInfo
tbl, existed := is.TableByID(tableID)
if !existed {
return errors.Errorf("cannot get table by id %d", tableID)
tbl, db, p := is.FindTableByPartitionID(tableID)
if tbl != nil && db != nil && p != nil {
isPartition = true
tblInfo = tbl.Meta()
} else {
return errors.Errorf("cannot get table by id %d", tableID)
}
} else {
tblInfo = tbl.Meta()
}
tblInfo := tbl.Meta()
dbInfo, existed := is.SchemaByTable(tblInfo)
if !existed {
return errors.Errorf("cannot get DBInfo by TableID %d", tableID)
}
if _, err := statsHandle.RecordHistoricalStatsToStorage(dbInfo.Name.O, tblInfo); err != nil {
if _, err := statsHandle.RecordHistoricalStatsToStorage(dbInfo.Name.O, tblInfo, tableID, isPartition); err != nil {
generateHistoricalStatsFailedCounter.Inc()
return errors.Errorf("record table %s.%s's historical stats failed", dbInfo.Name.O, tblInfo.Name.O)
return errors.Errorf("record table %s.%s's historical stats failed, err:%v", dbInfo.Name.O, tblInfo.Name.O, err)
}
generateHistoricalStatsSuccessCounter.Inc()
return nil
Expand Down
22 changes: 18 additions & 4 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n
}
}

tableIDs := map[int64]struct{}{}

// save analyze results in single-thread.
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
panicCnt := 0
Expand All @@ -311,6 +313,7 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n
continue
}
handleGlobalStats(needGlobalStats, globalStatsMap, results)
tableIDs[results.TableID.GetStatisticsID()] = struct{}{}

if err1 := statsHandle.SaveTableStatsToStorage(results, e.ctx.GetSessionVars().EnableAnalyzeSnapshot, handle.StatsMetaHistorySourceAnalyze); err1 != nil {
tableID := results.TableID.TableID
Expand All @@ -319,17 +322,20 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n
finishJobWithLog(e.ctx, results.Job, err)
} else {
finishJobWithLog(e.ctx, results.Job, nil)
// Dump stats to historical storage.
if err := recordHistoricalStats(e.ctx, results.TableID.TableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
invalidInfoSchemaStatCache(results.TableID.GetStatisticsID())
if atomic.LoadUint32(&e.ctx.GetSessionVars().Killed) == 1 {
finishJobWithLog(e.ctx, results.Job, ErrQueryInterrupted)
return errors.Trace(ErrQueryInterrupted)
}
}
// Dump stats to historical storage.
for tableID := range tableIDs {
if err := recordHistoricalStats(e.ctx, tableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}

return err
}

Expand All @@ -348,6 +354,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta
worker.run(ctx1, e.ctx.GetSessionVars().EnableAnalyzeSnapshot)
})
}
tableIDs := map[int64]struct{}{}
panicCnt := 0
var err error
for panicCnt < statsConcurrency {
Expand All @@ -370,6 +377,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta
continue
}
handleGlobalStats(needGlobalStats, globalStatsMap, results)
tableIDs[results.TableID.GetStatisticsID()] = struct{}{}
saveResultsCh <- results
}
close(saveResultsCh)
Expand All @@ -382,6 +390,12 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta
}
err = errors.New(strings.Join(errMsg, ","))
}
for tableID := range tableIDs {
// Dump stats to historical storage.
if err := recordHistoricalStats(e.ctx, tableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
return err
}

Expand Down
12 changes: 8 additions & 4 deletions executor/analyze_global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo
globalStatsTableIDs[globalStatsID.tableID] = struct{}{}
}
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
tableIDs := map[int64]struct{}{}
for tableID := range globalStatsTableIDs {
tableIDs[tableID] = struct{}{}
tableAllPartitionStats := make(map[int64]*statistics.Table)
for globalStatsID, info := range globalStatsMap {
if globalStatsID.tableID != tableID {
Expand Down Expand Up @@ -101,16 +103,18 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo
logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.String("info", job.JobInfo),
zap.Int64("histID", hg.ID), zap.Error(err), zap.Int64("tableID", tableID))
}
// Dump stats to historical storage.
if err1 := recordHistoricalStats(e.ctx, globalStatsID.tableID); err1 != nil {
logutil.BgLogger().Error("record historical stats failed", zap.String("info", job.JobInfo), zap.Int64("histID", hg.ID), zap.Error(err1))
}
}
return err
}()
FinishAnalyzeMergeJob(e.ctx, job, mergeStatsErr)
}
}
for tableID := range tableIDs {
// Dump stats to historical storage.
if err := recordHistoricalStats(e.ctx, tableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
return nil
}

Expand Down
4 changes: 0 additions & 4 deletions executor/analyze_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,6 @@ func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot b
worker.errCh <- err
} else {
finishJobWithLog(worker.sctx, results.Job, nil)
// Dump stats to historical storage.
if err := recordHistoricalStats(worker.sctx, results.TableID.TableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
invalidInfoSchemaStatCache(results.TableID.GetStatisticsID())
if err != nil {
Expand Down
27 changes: 27 additions & 0 deletions executor/historical_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,30 @@ func TestGCOutdatedHistoryStats(t *testing.T) {
tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where table_id = '%d'",
tableInfo.Meta().ID)).Check(testkit.Rows("0"))
}

func TestPartitionTableHistoricalStats(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_historical_stats = 1")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec(`CREATE TABLE t (a int, b int, index idx(b))
PARTITION BY RANGE ( a ) (
PARTITION p0 VALUES LESS THAN (6)
)`)
tk.MustExec("delete from mysql.stats_history")

tk.MustExec("analyze table test.t")
// dump historical stats
h := dom.StatsHandle()
hsWorker := dom.GetHistoricalStatsWorker()

// assert global table and partition table be dumped
tblID := hsWorker.GetOneHistoricalStatsTable()
err := hsWorker.DumpHistoricalStats(tblID, h)
require.NoError(t, err)
tblID = hsWorker.GetOneHistoricalStatsTable()
err = hsWorker.DumpHistoricalStats(tblID, h)
require.NoError(t, err)
tk.MustQuery("select count(*) from mysql.stats_history").Check(testkit.Rows("2"))
}
2 changes: 2 additions & 0 deletions statistics/handle/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type JSONTable struct {
Count int64 `json:"count"`
ModifyCount int64 `json:"modify_count"`
Partitions map[string]*JSONTable `json:"partitions"`
Version uint64 `json:"version"`
}

type jsonExtendedStats struct {
Expand Down Expand Up @@ -228,6 +229,7 @@ func GenJSONTableFromStats(dbName string, tableInfo *model.TableInfo, tbl *stati
Indices: make(map[string]*jsonColumn, len(tbl.Indices)),
Count: tbl.Count,
ModifyCount: tbl.ModifyCount,
Version: tbl.Version,
}
for _, col := range tbl.Columns {
sc := &stmtctx.StatementContext{TimeZone: time.UTC}
Expand Down
24 changes: 17 additions & 7 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -2565,17 +2565,27 @@ func (h *Handle) GetPredicateColumns(tableID int64) ([]int64, error) {
const maxColumnSize = 6 << 20

// RecordHistoricalStatsToStorage records the given table's stats data to mysql.stats_history
func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo) (uint64, error) {
func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo, physicalID int64, isPartition bool) (uint64, error) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
js, err := h.DumpStatsToJSON(dbName, tableInfo, nil, true)
var js *JSONTable
var err error
if isPartition {
js, err = h.tableStatsToJSON(dbName, tableInfo, physicalID, 0)
} else {
js, err = h.DumpStatsToJSON(dbName, tableInfo, nil, true)
}
if err != nil {
return 0, errors.Trace(err)
}
version := uint64(0)
for _, value := range js.Columns {
version = uint64(*value.StatsVer)
if version != 0 {
break
if len(js.Partitions) == 0 {
version = js.Version
} else {
for _, p := range js.Partitions {
version = p.Version
if version != 0 {
break
}
}
}
blocks, err := JSONTableToBlocks(js, maxColumnSize)
Expand All @@ -2596,7 +2606,7 @@ func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model.

const sql = "INSERT INTO mysql.stats_history(table_id, stats_data, seq_no, version, create_time) VALUES (%?, %?, %?, %?, %?)"
for i := 0; i < len(blocks); i++ {
if _, err := exec.ExecuteInternal(ctx, sql, tableInfo.ID, blocks[i], i, version, ts); err != nil {
if _, err := exec.ExecuteInternal(ctx, sql, physicalID, blocks[i], i, version, ts); err != nil {
return version, errors.Trace(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3299,7 +3299,7 @@ func TestRecordHistoricalStatsToStorage(t *testing.T) {

tableInfo, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
version, err := dom.StatsHandle().RecordHistoricalStatsToStorage("t", tableInfo.Meta())
version, err := dom.StatsHandle().RecordHistoricalStatsToStorage("t", tableInfo.Meta(), tableInfo.Meta().ID, false)
require.NoError(t, err)

rows := tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where version = '%d'", version)).Rows()
Expand Down

0 comments on commit d027270

Please sign in to comment.