Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: move more methods from StatsHandle to its sub-packages #47739

Merged
merged 2 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/executor/analyze_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand Down Expand Up @@ -61,7 +61,8 @@ func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot b
worker.errCh <- errors.Trace(exeerrors.ErrQueryInterrupted)
return
}
err := handle.SaveTableStatsToStorage(worker.sctx, results, analyzeSnapshot, util.StatsMetaHistorySourceAnalyze)
statsHandle := domain.GetDomain(worker.sctx).StatsHandle()
err := statsHandle.SaveTableStatsToStorage(results, analyzeSnapshot, util.StatsMetaHistorySourceAnalyze)
if err != nil {
logutil.Logger(ctx).Error("save table stats to storage failed", zap.Error(err))
finishJobWithLog(worker.sctx, results.Job, err)
Expand Down
1 change: 0 additions & 1 deletion pkg/statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ go_library(
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/logutil",
"//pkg/util/sqlexec",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_tiancaiamao_gp//:gp",
Expand Down
169 changes: 9 additions & 160 deletions pkg/statistics/handle/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,11 @@
package handle

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics/handle/globalstats"
"github.com/pingcap/tidb/pkg/statistics/handle/storage"
statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/sqlexec"
)

// HandleDDLEvent begins to process a ddl task.
Expand All @@ -40,7 +31,7 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error {
return err
}
for _, id := range ids {
if err := h.insertTableStats2KV(t.TableInfo, id); err != nil {
if err := h.InsertTableStats2KV(t.TableInfo, id); err != nil {
return err
}
}
Expand All @@ -50,7 +41,7 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error {
return err
}
for _, id := range ids {
if err := h.resetTableStats2KVForDrop(id); err != nil {
if err := h.ResetTableStats2KVForDrop(id); err != nil {
return err
}
}
Expand All @@ -60,13 +51,13 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error {
return err
}
for _, id := range ids {
if err := h.insertColStats2KV(id, t.ColumnInfos); err != nil {
if err := h.InsertColStats2KV(id, t.ColumnInfos); err != nil {
return err
}
}
case model.ActionAddTablePartition, model.ActionTruncateTablePartition:
for _, def := range t.PartInfo.Definitions {
if err := h.insertTableStats2KV(t.TableInfo, def.ID); err != nil {
if err := h.InsertTableStats2KV(t.TableInfo, def.ID); err != nil {
return err
}
}
Expand All @@ -81,14 +72,14 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error {
}
}
for _, def := range t.PartInfo.Definitions {
if err := h.resetTableStats2KVForDrop(def.ID); err != nil {
if err := h.ResetTableStats2KVForDrop(def.ID); err != nil {
return err
}
}
case model.ActionReorganizePartition:
for _, def := range t.PartInfo.Definitions {
// TODO: Should we trigger analyze instead of adding 0s?
if err := h.insertTableStats2KV(t.TableInfo, def.ID); err != nil {
if err := h.InsertTableStats2KV(t.TableInfo, def.ID); err != nil {
return err
}
// Do not update global stats, since the data have not changed!
Expand All @@ -97,7 +88,7 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error {
// Add partitioning
for _, def := range t.PartInfo.Definitions {
// TODO: Should we trigger analyze instead of adding 0s?
if err := h.insertTableStats2KV(t.TableInfo, def.ID); err != nil {
if err := h.InsertTableStats2KV(t.TableInfo, def.ID); err != nil {
return err
}
}
Expand All @@ -107,21 +98,13 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error {
// Note that t.TableInfo is the current (new) table info
// and t.PartInfo.NewTableID is actually the old table ID!
// (see onReorganizePartition)
return h.changeGlobalStatsID(t.PartInfo.NewTableID, t.TableInfo.ID)
return h.ChangeGlobalStatsID(t.PartInfo.NewTableID, t.TableInfo.ID)
case model.ActionFlashbackCluster:
return h.updateStatsVersion()
return h.UpdateStatsVersion()
}
return nil
}

// updateStatsVersion will set statistics version to the newest TS,
// then tidb-server will reload automatic.
func (h *Handle) updateStatsVersion() error {
return h.callWithSCtx(func(sctx sessionctx.Context) error {
return storage.UpdateStatsVersion(sctx)
}, statsutil.FlagWrapTxn)
}

// updateGlobalStats will trigger the merge of global-stats when we drop table partition
func (h *Handle) updateGlobalStats(tblInfo *model.TableInfo) error {
// We need to merge the partition-level stats to global-stats when we drop table partition in dynamic mode.
Expand All @@ -130,18 +113,6 @@ func (h *Handle) updateGlobalStats(tblInfo *model.TableInfo) error {
})
}

func (h *Handle) changeGlobalStatsID(from, to int64) (err error) {
return h.callWithSCtx(func(sctx sessionctx.Context) error {
for _, table := range []string{"stats_meta", "stats_top_n", "stats_fm_sketch", "stats_buckets", "stats_histograms", "column_stats_usage"} {
_, err = statsutil.Exec(sctx, "update mysql."+table+" set table_id = %? where table_id = %?", to, from)
if err != nil {
return err
}
}
return nil
}, statsutil.FlagWrapTxn)
}

func (h *Handle) getInitStateTableIDs(tblInfo *model.TableInfo) (ids []int64, err error) {
pi := tblInfo.GetPartitionInfo()
if pi == nil {
Expand All @@ -165,125 +136,3 @@ func (h *Handle) getInitStateTableIDs(tblInfo *model.TableInfo) (ids []int64, er
func (h *Handle) DDLEventCh() chan *util.Event {
return h.ddlEventCh
}

// insertTableStats2KV inserts a record standing for a new table to stats_meta and inserts some records standing for the
// new columns and indices which belong to this table.
func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (err error) {
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
h.RecordHistoricalStatsMeta(physicalID, statsVer, statsutil.StatsMetaHistorySourceSchemaChange)
}
}()

return h.callWithSCtx(func(sctx sessionctx.Context) error {
startTS, err := statsutil.GetStartTS(sctx)
if err != nil {
return errors.Trace(err)
}
if _, err := statsutil.Exec(sctx, "insert into mysql.stats_meta (version, table_id) values(%?, %?)", startTS, physicalID); err != nil {
return err
}
statsVer = startTS
for _, col := range info.Columns {
if _, err := statsutil.Exec(sctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 0, %?, 0, %?)", physicalID, col.ID, startTS); err != nil {
return err
}
}
for _, idx := range info.Indices {
if _, err := statsutil.Exec(sctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 1, %?, 0, %?)", physicalID, idx.ID, startTS); err != nil {
return err
}
}
return nil
}, statsutil.FlagWrapTxn)
}

// resetTableStats2KV resets the count to 0.
func (h *Handle) resetTableStats2KVForDrop(physicalID int64) (err error) {
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
h.RecordHistoricalStatsMeta(physicalID, statsVer, statsutil.StatsMetaHistorySourceSchemaChange)
}
}()

return h.callWithSCtx(func(sctx sessionctx.Context) error {
startTS, err := statsutil.GetStartTS(sctx)
if err != nil {
return errors.Trace(err)
}
if _, err := statsutil.Exec(sctx, "update mysql.stats_meta set version=%? where table_id =%?", startTS, physicalID); err != nil {
return err
}
return nil
}, statsutil.FlagWrapTxn)
}

// insertColStats2KV insert a record to stats_histograms with distinct_count 1 and insert a bucket to stats_buckets with default value.
// This operation also updates version.
func (h *Handle) insertColStats2KV(physicalID int64, colInfos []*model.ColumnInfo) (err error) {
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
h.RecordHistoricalStatsMeta(physicalID, statsVer, statsutil.StatsMetaHistorySourceSchemaChange)
}
}()

return h.callWithSCtx(func(sctx sessionctx.Context) error {
startTS, err := statsutil.GetStartTS(sctx)
if err != nil {
return errors.Trace(err)
}

// First of all, we update the version.
_, err = statsutil.Exec(sctx, "update mysql.stats_meta set version = %? where table_id = %?", startTS, physicalID)
if err != nil {
return err
}
statsVer = startTS
// If we didn't update anything by last SQL, it means the stats of this table does not exist.
if sctx.GetSessionVars().StmtCtx.AffectedRows() > 0 {
// By this step we can get the count of this table, then we can sure the count and repeats of bucket.
var rs sqlexec.RecordSet
rs, err = statsutil.Exec(sctx, "select count from mysql.stats_meta where table_id = %?", physicalID)
if err != nil {
return err
}
defer terror.Call(rs.Close)
req := rs.NewChunk(nil)
err = rs.Next(context.Background(), req)
if err != nil {
return err
}
count := req.GetRow(0).GetInt64(0)
for _, colInfo := range colInfos {
value := types.NewDatum(colInfo.GetOriginDefaultValue())
value, err = value.ConvertTo(sctx.GetSessionVars().StmtCtx, &colInfo.FieldType)
if err != nil {
return err
}
if value.IsNull() {
// If the adding column has default value null, all the existing rows have null value on the newly added column.
if _, err := statsutil.Exec(sctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%?, %?, 0, %?, 0, %?)", startTS, physicalID, colInfo.ID, count); err != nil {
return err
}
} else {
// If this stats exists, we insert histogram meta first, the distinct_count will always be one.
if _, err := statsutil.Exec(sctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%?, %?, 0, %?, 1, %?)", startTS, physicalID, colInfo.ID, int64(len(value.GetBytes()))*count); err != nil {
return err
}
value, err = value.ConvertTo(sctx.GetSessionVars().StmtCtx, types.NewFieldType(mysql.TypeBlob))
if err != nil {
return err
}
// There must be only one bucket for this new column and the value is the default value.
if _, err := statsutil.Exec(sctx, "insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%?, 0, %?, 0, %?, %?, %?, %?)", physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes()); err != nil {
return err
}
}
}
}
return nil
}, statsutil.FlagWrapTxn)
}
23 changes: 0 additions & 23 deletions pkg/statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,29 +220,6 @@ func (h *Handle) FlushStats() {
}
}

// SaveTableStatsToStorage saves the stats of a table to storage.
func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, analyzeSnapshot bool, source string) (err error) {
return h.callWithSCtx(func(sctx sessionctx.Context) error {
return SaveTableStatsToStorage(sctx, results, analyzeSnapshot, source)
})
}

// SaveTableStatsToStorage saves the stats of a table to storage.
func SaveTableStatsToStorage(sctx sessionctx.Context, results *statistics.AnalyzeResults, analyzeSnapshot bool, source string) error {
statsVer, err := storage.SaveTableStatsToStorage(sctx, results, analyzeSnapshot)
if err == nil && statsVer != 0 {
tableID := results.TableID.GetStatisticsID()
if err1 := history.RecordHistoricalStatsMeta(sctx, tableID, statsVer, source); err1 != nil {
logutil.BgLogger().Error("record historical stats meta failed",
zap.Int64("table-id", tableID),
zap.Uint64("version", statsVer),
zap.String("source", source),
zap.Error(err1))
}
}
return err
}

// BuildExtendedStats build extended stats for column groups if needed based on the column samples.
func (h *Handle) BuildExtendedStats(tableID int64, cols []*model.ColumnInfo, collectors []*statistics.SampleCollector) (es *statistics.ExtendedStatsColl, err error) {
err = h.callWithSCtx(func(sctx sessionctx.Context) error {
Expand Down
16 changes: 9 additions & 7 deletions pkg/statistics/handle/history/history_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,18 @@ func (sh *statsHistoryImpl) RecordHistoricalStatsToStorage(dbName string, tableI
}

// RecordHistoricalStatsMeta records stats meta of the specified version to stats_meta_history table.
func (sh *statsHistoryImpl) RecordHistoricalStatsMeta(tableID int64, version uint64, source string) {
func (sh *statsHistoryImpl) RecordHistoricalStatsMeta(tableID int64, version uint64, source string, enforce bool) {
if version == 0 {
return
}
tbl, ok := sh.statsHandle.Get(tableID)
if !ok {
return
}
if !tbl.IsInitialized() {
return
if !enforce {
tbl, ok := sh.statsHandle.Get(tableID)
if !ok {
return
}
if !tbl.IsInitialized() {
return
}
}
err := util.CallWithSCtx(sh.statsHandle.SPool(), func(sctx sessionctx.Context) error {
return RecordHistoricalStatsMeta(sctx, tableID, version, source)
Expand Down
Loading