From 7bcf4bb59bfaec07e5e08bbf465758e77ba5a238 Mon Sep 17 00:00:00 2001 From: yisaer Date: Wed, 4 Jan 2023 15:44:01 +0800 Subject: [PATCH 1/3] supp historical stats partition table --- domain/historical_stats.go | 18 ++++++++++++++---- executor/analyze.go | 22 ++++++++++++++++++---- executor/analyze_global_stats.go | 12 ++++++++---- executor/analyze_worker.go | 4 ---- executor/historical_stats_test.go | 27 +++++++++++++++++++++++++++ statistics/handle/dump.go | 2 ++ statistics/handle/handle.go | 24 +++++++++++++++++------- 7 files changed, 86 insertions(+), 23 deletions(-) diff --git a/domain/historical_stats.go b/domain/historical_stats.go index 04d50608c58c4..92cdcce86ac4c 100644 --- a/domain/historical_stats.go +++ b/domain/historical_stats.go @@ -16,6 +16,7 @@ package domain import ( "github.com/pingcap/errors" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics/handle" ) @@ -42,17 +43,26 @@ func (w *HistoricalStatsWorker) DumpHistoricalStats(tableID int64, statsHandle * } sctx := w.sctx is := GetDomain(sctx).InfoSchema() + isPartition := false + var tblInfo *model.TableInfo tbl, existed := is.TableByID(tableID) if !existed { - return errors.Errorf("cannot get table by id %d", tableID) + tbl, db, p := is.FindTableByPartitionID(tableID) + if tbl != nil && db != nil && p != nil { + isPartition = true + tblInfo = tbl.Meta() + } else { + return errors.Errorf("cannot get table by id %d", tableID) + } + } else { + tblInfo = tbl.Meta() } - tblInfo := tbl.Meta() dbInfo, existed := is.SchemaByTable(tblInfo) if !existed { return errors.Errorf("cannot get DBInfo by TableID %d", tableID) } - if _, err := statsHandle.RecordHistoricalStatsToStorage(dbInfo.Name.O, tblInfo); err != nil { - return errors.Errorf("record table %s.%s's historical stats failed", dbInfo.Name.O, tblInfo.Name.O) + if _, err := statsHandle.RecordHistoricalStatsToStorage(dbInfo.Name.O, tblInfo, tableID, isPartition); err != nil { + return errors.Errorf("record table %s.%s's historical stats failed, err:%v", dbInfo.Name.O, tblInfo.Name.O, err) } return nil } diff --git a/executor/analyze.go b/executor/analyze.go index af223b24dd4a8..705e6eed6c590 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -291,6 +291,8 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n } } + tableIDs := map[int64]struct{}{} + // save analyze results in single-thread. statsHandle := domain.GetDomain(e.ctx).StatsHandle() panicCnt := 0 @@ -311,6 +313,7 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n continue } handleGlobalStats(needGlobalStats, globalStatsMap, results) + tableIDs[results.TableID.GetStatisticsID()] = struct{}{} if err1 := statsHandle.SaveTableStatsToStorage(results, e.ctx.GetSessionVars().EnableAnalyzeSnapshot, handle.StatsMetaHistorySourceAnalyze); err1 != nil { tableID := results.TableID.TableID @@ -319,10 +322,6 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n finishJobWithLog(e.ctx, results.Job, err) } else { finishJobWithLog(e.ctx, results.Job, nil) - // Dump stats to historical storage. - if err := recordHistoricalStats(e.ctx, results.TableID.TableID); err != nil { - logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) - } } invalidInfoSchemaStatCache(results.TableID.GetStatisticsID()) if atomic.LoadUint32(&e.ctx.GetSessionVars().Killed) == 1 { @@ -330,6 +329,13 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n return errors.Trace(ErrQueryInterrupted) } } + // Dump stats to historical storage. + for tableID := range tableIDs { + if err := recordHistoricalStats(e.ctx, tableID); err != nil { + logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) + } + } + return err } @@ -348,6 +354,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta worker.run(ctx1, e.ctx.GetSessionVars().EnableAnalyzeSnapshot) }) } + tableIDs := map[int64]struct{}{} panicCnt := 0 var err error for panicCnt < statsConcurrency { @@ -370,6 +377,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta continue } handleGlobalStats(needGlobalStats, globalStatsMap, results) + tableIDs[results.TableID.GetStatisticsID()] = struct{}{} saveResultsCh <- results } close(saveResultsCh) @@ -382,6 +390,12 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta } err = errors.New(strings.Join(errMsg, ",")) } + for tableID := range tableIDs { + // Dump stats to historical storage. + if err := recordHistoricalStats(e.ctx, tableID); err != nil { + logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) + } + } return err } diff --git a/executor/analyze_global_stats.go b/executor/analyze_global_stats.go index 6b11e68a3e614..e8f8d53b8adbf 100644 --- a/executor/analyze_global_stats.go +++ b/executor/analyze_global_stats.go @@ -54,7 +54,9 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo globalStatsTableIDs[globalStatsID.tableID] = struct{}{} } statsHandle := domain.GetDomain(e.ctx).StatsHandle() + tableIDs := map[int64]struct{}{} for tableID := range globalStatsTableIDs { + tableIDs[tableID] = struct{}{} tableAllPartitionStats := make(map[int64]*statistics.Table) for globalStatsID, info := range globalStatsMap { if globalStatsID.tableID != tableID { @@ -101,16 +103,18 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.String("info", job.JobInfo), zap.Int64("histID", hg.ID), zap.Error(err), zap.Int64("tableID", tableID)) } - // Dump stats to historical storage. - if err1 := recordHistoricalStats(e.ctx, globalStatsID.tableID); err1 != nil { - logutil.BgLogger().Error("record historical stats failed", zap.String("info", job.JobInfo), zap.Int64("histID", hg.ID), zap.Error(err1)) - } } return err }() FinishAnalyzeMergeJob(e.ctx, job, mergeStatsErr) } } + for tableID := range tableIDs { + // Dump stats to historical storage. + if err := recordHistoricalStats(e.ctx, tableID); err != nil { + logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) + } + } return nil } diff --git a/executor/analyze_worker.go b/executor/analyze_worker.go index 18edc514c5d4d..688f89f5a120d 100644 --- a/executor/analyze_worker.go +++ b/executor/analyze_worker.go @@ -66,10 +66,6 @@ func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot b worker.errCh <- err } else { finishJobWithLog(worker.sctx, results.Job, nil) - // Dump stats to historical storage. - if err := recordHistoricalStats(worker.sctx, results.TableID.TableID); err != nil { - logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) - } } invalidInfoSchemaStatCache(results.TableID.GetStatisticsID()) if err != nil { diff --git a/executor/historical_stats_test.go b/executor/historical_stats_test.go index 809c2c862bf43..6ae23dcebb365 100644 --- a/executor/historical_stats_test.go +++ b/executor/historical_stats_test.go @@ -216,3 +216,30 @@ func TestGCOutdatedHistoryStats(t *testing.T) { tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where table_id = '%d'", tableInfo.Meta().ID)).Check(testkit.Rows("0")) } + +func TestPartitionTableHistoricalStats(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_historical_stats = 1") + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec(`CREATE TABLE t (a int, b int, index idx(b)) +PARTITION BY RANGE ( a ) ( +PARTITION p0 VALUES LESS THAN (6) +)`) + tk.MustExec("delete from mysql.stats_history") + + tk.MustExec("analyze table test.t") + // dump historical stats + h := dom.StatsHandle() + hsWorker := dom.GetHistoricalStatsWorker() + + // assert global table and partition table be dumped + tblID := hsWorker.GetOneHistoricalStatsTable() + err := hsWorker.DumpHistoricalStats(tblID, h) + require.NoError(t, err) + tblID = hsWorker.GetOneHistoricalStatsTable() + err = hsWorker.DumpHistoricalStats(tblID, h) + require.NoError(t, err) + tk.MustQuery("select count(*) from mysql.stats_history").Check(testkit.Rows("2")) +} diff --git a/statistics/handle/dump.go b/statistics/handle/dump.go index daaf28ead7573..09a3e2b2ee607 100644 --- a/statistics/handle/dump.go +++ b/statistics/handle/dump.go @@ -45,6 +45,7 @@ type JSONTable struct { Count int64 `json:"count"` ModifyCount int64 `json:"modify_count"` Partitions map[string]*JSONTable `json:"partitions"` + Version uint64 `json:"version"` } type jsonExtendedStats struct { @@ -214,6 +215,7 @@ func GenJSONTableFromStats(dbName string, tableInfo *model.TableInfo, tbl *stati Indices: make(map[string]*jsonColumn, len(tbl.Indices)), Count: tbl.Count, ModifyCount: tbl.ModifyCount, + Version: tbl.Version, } for _, col := range tbl.Columns { sc := &stmtctx.StatementContext{TimeZone: time.UTC} diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index f53f075301acb..b3a9c99298a92 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -2565,17 +2565,27 @@ func (h *Handle) GetPredicateColumns(tableID int64) ([]int64, error) { const maxColumnSize = 6 << 20 // RecordHistoricalStatsToStorage records the given table's stats data to mysql.stats_history -func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo) (uint64, error) { +func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo, physicalID int64, isPartition bool) (uint64, error) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - js, err := h.DumpStatsToJSON(dbName, tableInfo, nil, true) + var js *JSONTable + var err error + if isPartition { + js, err = h.tableStatsToJSON(dbName, tableInfo, physicalID, 0) + } else { + js, err = h.DumpStatsToJSON(dbName, tableInfo, nil, true) + } if err != nil { return 0, errors.Trace(err) } version := uint64(0) - for _, value := range js.Columns { - version = uint64(*value.StatsVer) - if version != 0 { - break + if len(js.Partitions) == 0 { + version = js.Version + } else { + for _, p := range js.Partitions { + version = p.Version + if version != 0 { + break + } } } blocks, err := JSONTableToBlocks(js, maxColumnSize) @@ -2596,7 +2606,7 @@ func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model. const sql = "INSERT INTO mysql.stats_history(table_id, stats_data, seq_no, version, create_time) VALUES (%?, %?, %?, %?, %?)" for i := 0; i < len(blocks); i++ { - if _, err := exec.ExecuteInternal(ctx, sql, tableInfo.ID, blocks[i], i, version, ts); err != nil { + if _, err := exec.ExecuteInternal(ctx, sql, physicalID, blocks[i], i, version, ts); err != nil { return version, errors.Trace(err) } } From 2eeae63468184958ae4096900b0d3622b42d5346 Mon Sep 17 00:00:00 2001 From: yisaer Date: Wed, 4 Jan 2023 16:48:26 +0800 Subject: [PATCH 2/3] fix --- statistics/handle/handle_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/statistics/handle/handle_test.go b/statistics/handle/handle_test.go index 9bb80498bc90f..eded5f771273d 100644 --- a/statistics/handle/handle_test.go +++ b/statistics/handle/handle_test.go @@ -1159,8 +1159,8 @@ func TestGlobalStatsData2(t *testing.T) { tk.MustExec("analyze table tint with 2 topn, 2 buckets") tk.MustQuery("select modify_count, count from mysql.stats_meta order by table_id asc").Check(testkit.Rows( - "0 20", // global: g.count = p0.count + p1.count - "0 9", // p0 + "0 20", // global: g.count = p0.count + p1.count + "0 9", // p0 "0 11")) // p1 tk.MustQuery("show stats_topn where table_name='tint' and is_index=0").Check(testkit.Rows( @@ -1190,7 +1190,7 @@ func TestGlobalStatsData2(t *testing.T) { tk.MustQuery("select distinct_count, null_count, tot_col_size from mysql.stats_histograms where is_index=0 order by table_id asc").Check( testkit.Rows("12 1 19", // global, g = p0 + p1 - "5 1 8", // p0 + "5 1 8", // p0 "7 0 11")) // p1 tk.MustQuery("show stats_buckets where is_index=1").Check(testkit.Rows( @@ -1204,7 +1204,7 @@ func TestGlobalStatsData2(t *testing.T) { tk.MustQuery("select distinct_count, null_count from mysql.stats_histograms where is_index=1 order by table_id asc").Check( testkit.Rows("12 1", // global, g = p0 + p1 - "5 1", // p0 + "5 1", // p0 "7 0")) // p1 // double + (column + index with 1 column) @@ -3299,7 +3299,7 @@ func TestRecordHistoricalStatsToStorage(t *testing.T) { tableInfo, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) require.NoError(t, err) - version, err := dom.StatsHandle().RecordHistoricalStatsToStorage("t", tableInfo.Meta()) + version, err := dom.StatsHandle().RecordHistoricalStatsToStorage("t", tableInfo.Meta(), tableInfo.Meta().ID, false) require.NoError(t, err) rows := tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where version = '%d'", version)).Rows() From 7c7a5acc59f8d3f94574556d297a15f5164c0ff2 Mon Sep 17 00:00:00 2001 From: yisaer Date: Wed, 4 Jan 2023 16:54:56 +0800 Subject: [PATCH 3/3] fix --- domain/historical_stats.go | 1 + statistics/handle/handle_test.go | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/domain/historical_stats.go b/domain/historical_stats.go index 599eff13799b9..5d6d90feedef8 100644 --- a/domain/historical_stats.go +++ b/domain/historical_stats.go @@ -68,6 +68,7 @@ func (w *HistoricalStatsWorker) DumpHistoricalStats(tableID int64, statsHandle * return errors.Errorf("cannot get DBInfo by TableID %d", tableID) } if _, err := statsHandle.RecordHistoricalStatsToStorage(dbInfo.Name.O, tblInfo, tableID, isPartition); err != nil { + generateHistoricalStatsFailedCounter.Inc() return errors.Errorf("record table %s.%s's historical stats failed, err:%v", dbInfo.Name.O, tblInfo.Name.O, err) } generateHistoricalStatsSuccessCounter.Inc() diff --git a/statistics/handle/handle_test.go b/statistics/handle/handle_test.go index eded5f771273d..ac9936bed11fe 100644 --- a/statistics/handle/handle_test.go +++ b/statistics/handle/handle_test.go @@ -1159,8 +1159,8 @@ func TestGlobalStatsData2(t *testing.T) { tk.MustExec("analyze table tint with 2 topn, 2 buckets") tk.MustQuery("select modify_count, count from mysql.stats_meta order by table_id asc").Check(testkit.Rows( - "0 20", // global: g.count = p0.count + p1.count - "0 9", // p0 + "0 20", // global: g.count = p0.count + p1.count + "0 9", // p0 "0 11")) // p1 tk.MustQuery("show stats_topn where table_name='tint' and is_index=0").Check(testkit.Rows( @@ -1190,7 +1190,7 @@ func TestGlobalStatsData2(t *testing.T) { tk.MustQuery("select distinct_count, null_count, tot_col_size from mysql.stats_histograms where is_index=0 order by table_id asc").Check( testkit.Rows("12 1 19", // global, g = p0 + p1 - "5 1 8", // p0 + "5 1 8", // p0 "7 0 11")) // p1 tk.MustQuery("show stats_buckets where is_index=1").Check(testkit.Rows( @@ -1204,7 +1204,7 @@ func TestGlobalStatsData2(t *testing.T) { tk.MustQuery("select distinct_count, null_count from mysql.stats_histograms where is_index=1 order by table_id asc").Check( testkit.Rows("12 1", // global, g = p0 + p1 - "5 1", // p0 + "5 1", // p0 "7 0")) // p1 // double + (column + index with 1 column)