From 159ff685aad45015d87db1c773a4e61d91c825e8 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Thu, 4 Mar 2021 10:22:55 +0800 Subject: [PATCH 1/2] cherry pick #22961 to release-4.0 Signed-off-by: ti-srebot --- domain/domain.go | 17 + server/statistics_handler.go | 5 +- statistics/handle/bootstrap.go | 44 +-- statistics/handle/ddl.go | 84 +++-- statistics/handle/dump.go | 24 +- statistics/handle/gc.go | 126 ++++++- statistics/handle/handle.go | 551 +++++++++++++++++++++++++++---- statistics/handle/handle_test.go | 13 +- statistics/handle/update.go | 188 +++++++++-- types/datum.go | 5 +- 10 files changed, 878 insertions(+), 179 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 1742102cc9445..26de37779de43 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1014,8 +1014,18 @@ func (do *Domain) StatsHandle() *handle.Handle { } // CreateStatsHandle is used only for test. +<<<<<<< HEAD func (do *Domain) CreateStatsHandle(ctx sessionctx.Context) { atomic.StorePointer(&do.statsHandle, unsafe.Pointer(handle.NewHandle(ctx, do.statsLease))) +======= +func (do *Domain) CreateStatsHandle(ctx sessionctx.Context) error { + h, err := handle.NewHandle(ctx, do.statsLease, do.sysSessionPool) + if err != nil { + return err + } + atomic.StorePointer(&do.statsHandle, unsafe.Pointer(h)) + return nil +>>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) } // StatsUpdating checks if the stats worker is updating. @@ -1040,7 +1050,14 @@ var RunAutoAnalyze = true // It should be called only once in BootstrapSession. func (do *Domain) UpdateTableStatsLoop(ctx sessionctx.Context) error { ctx.GetSessionVars().InRestrictedSQL = true +<<<<<<< HEAD statsHandle := handle.NewHandle(ctx, do.statsLease) +======= + statsHandle, err := handle.NewHandle(ctx, do.statsLease, do.sysSessionPool) + if err != nil { + return err + } +>>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) atomic.StorePointer(&do.statsHandle, unsafe.Pointer(statsHandle)) do.ddl.RegisterEventCh(statsHandle.DDLEventCh()) // Negative stats lease indicates that it is in test, it does not need update. diff --git a/server/statistics_handler.go b/server/statistics_handler.go index a40e1b19b321f..ecb246e46cf48 100644 --- a/server/statistics_handler.go +++ b/server/statistics_handler.go @@ -25,7 +25,6 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/gcutil" - "github.com/pingcap/tidb/util/sqlexec" ) // StatsHandler is the handler for dumping statistics. @@ -122,9 +121,7 @@ func (sh StatsHistoryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request writeError(w, err) return } - se.GetSessionVars().SnapshotInfoschema, se.GetSessionVars().SnapshotTS = is, snapshot - historyStatsExec := se.(sqlexec.RestrictedSQLExecutor) - js, err := h.DumpStatsToJSON(params[pDBName], tbl.Meta(), historyStatsExec) + js, err := h.DumpStatsToJSONBySnapshot(params[pDBName], tbl.Meta(), snapshot) if err != nil { writeError(w, err) } else { diff --git a/statistics/handle/bootstrap.go b/statistics/handle/bootstrap.go index 4bc8257537eda..bc87750400f2a 100644 --- a/statistics/handle/bootstrap.go +++ b/statistics/handle/bootstrap.go @@ -60,18 +60,16 @@ func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache *statsCache func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (statsCache, error) { sql := "select HIGH_PRIORITY version, table_id, modify_count, count from mysql.stats_meta" - rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) - if len(rc) > 0 { - defer terror.Call(rc[0].Close) - } + rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql) if err != nil { return statsCache{}, errors.Trace(err) } + defer terror.Call(rc.Close) tables := statsCache{tables: make(map[int64]*statistics.Table)} - req := rc[0].NewChunk() + req := rc.NewChunk() iter := chunk.NewIterator4Chunk(req) for { - err := rc[0].Next(context.TODO(), req) + err := rc.Next(context.TODO(), req) if err != nil { return statsCache{}, errors.Trace(err) } @@ -147,17 +145,15 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache *stat func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache *statsCache) error { sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms" - rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) - if len(rc) > 0 { - defer terror.Call(rc[0].Close) - } + rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql) if err != nil { return errors.Trace(err) } - req := rc[0].NewChunk() + defer terror.Call(rc.Close) + req := rc.NewChunk() iter := chunk.NewIterator4Chunk(req) for { - err := rc[0].Next(context.TODO(), req) + err := rc.Next(context.TODO(), req) if err != nil { return errors.Trace(err) } @@ -187,17 +183,15 @@ func (h *Handle) initStatsTopN4Chunk(cache *statsCache, iter *chunk.Iterator4Chu func (h *Handle) initStatsTopN(cache *statsCache) error { sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1" - rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) - if len(rc) > 0 { - defer terror.Call(rc[0].Close) - } + rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql) if err != nil { return errors.Trace(err) } - req := rc[0].NewChunk() + defer terror.Call(rc.Close) + req := rc.NewChunk() iter := chunk.NewIterator4Chunk(req) for { - err := rc[0].Next(context.TODO(), req) + err := rc.Next(context.TODO(), req) if err != nil { return errors.Trace(err) } @@ -257,17 +251,15 @@ func initStatsBuckets4Chunk(ctx sessionctx.Context, cache *statsCache, iter *chu func (h *Handle) initStatsBuckets(cache *statsCache) error { sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id" - rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) - if len(rc) > 0 { - defer terror.Call(rc[0].Close) - } + rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql) if err != nil { return errors.Trace(err) } - req := rc[0].NewChunk() + defer terror.Call(rc.Close) + req := rc.NewChunk() iter := chunk.NewIterator4Chunk(req) for { - err := rc[0].Next(context.TODO(), req) + err := rc.Next(context.TODO(), req) if err != nil { return errors.Trace(err) } @@ -300,13 +292,13 @@ func (h *Handle) initStatsBuckets(cache *statsCache) error { func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) { h.mu.Lock() defer func() { - _, err1 := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "commit") + _, err1 := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), "commit") if err == nil && err1 != nil { err = err1 } h.mu.Unlock() }() - _, err = h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "begin") + _, err = h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), "begin") if err != nil { return err } diff --git a/statistics/handle/ddl.go b/statistics/handle/ddl.go index 127608edb7bb8..9eb22ef35425e 100644 --- a/statistics/handle/ddl.go +++ b/statistics/handle/ddl.go @@ -15,7 +15,6 @@ package handle import ( "context" - "fmt" "github.com/pingcap/errors" "github.com/pingcap/parser/model" @@ -75,28 +74,34 @@ func (h *Handle) DDLEventCh() chan *util.Event { func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (err error) { h.mu.Lock() defer h.mu.Unlock() + ctx := context.Background() exec := h.mu.ctx.(sqlexec.SQLExecutor) - _, err = exec.Execute(context.Background(), "begin") + _, err = exec.ExecuteInternal(ctx, "begin") if err != nil { return errors.Trace(err) } defer func() { - err = finishTransaction(context.Background(), exec, err) + err = finishTransaction(ctx, exec, err) }() txn, err := h.mu.ctx.Txn(true) if err != nil { return errors.Trace(err) } startTS := txn.StartTS() - sqls := make([]string, 0, 1+len(info.Columns)+len(info.Indices)) - sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_meta (version, table_id) values(%d, %d)", startTS, physicalID)) + if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_meta (version, table_id) values(%?, %?)", startTS, physicalID); err != nil { + return err + } for _, col := range info.Columns { - sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 0, %d, 0, %d)", physicalID, col.ID, startTS)) + if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 0, %?, 0, %?)", physicalID, col.ID, startTS); err != nil { + return err + } } for _, idx := range info.Indices { - sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 1, %d, 0, %d)", physicalID, idx.ID, startTS)) + if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 1, %?, 0, %?)", physicalID, idx.ID, startTS); err != nil { + return err + } } - return execSQLs(context.Background(), exec, sqls) + return nil } // insertColStats2KV insert a record to stats_histograms with distinct_count 1 and insert a bucket to stats_buckets with default value. @@ -105,13 +110,14 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfo *model.ColumnInfo) h.mu.Lock() defer h.mu.Unlock() + ctx := context.TODO() exec := h.mu.ctx.(sqlexec.SQLExecutor) - _, err = exec.Execute(context.Background(), "begin") + _, err = exec.ExecuteInternal(ctx, "begin") if err != nil { return errors.Trace(err) } defer func() { - err = finishTransaction(context.Background(), exec, err) + err = finishTransaction(ctx, exec, err) }() txn, err := h.mu.ctx.Txn(true) if err != nil { @@ -119,28 +125,26 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfo *model.ColumnInfo) } startTS := txn.StartTS() // First of all, we update the version. - _, err = exec.Execute(context.Background(), fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", startTS, physicalID)) + _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %? where table_id = %?", startTS, physicalID) if err != nil { return } - ctx := context.TODO() // If we didn't update anything by last SQL, it means the stats of this table does not exist. if h.mu.ctx.GetSessionVars().StmtCtx.AffectedRows() > 0 { // By this step we can get the count of this table, then we can sure the count and repeats of bucket. - var rs []sqlexec.RecordSet - rs, err = exec.Execute(ctx, fmt.Sprintf("select count from mysql.stats_meta where table_id = %d", physicalID)) - if len(rs) > 0 { - defer terror.Call(rs[0].Close) - } + var rs sqlexec.RecordSet + rs, err = exec.ExecuteInternal(ctx, "select count from mysql.stats_meta where table_id = %?", physicalID) if err != nil { return } - req := rs[0].NewChunk() - err = rs[0].Next(ctx, req) + defer terror.Call(rs.Close) + req := rs.NewChunk() + err = rs.Next(ctx, req) if err != nil { return } count := req.GetRow(0).GetInt64(0) +<<<<<<< HEAD value := types.NewDatum(colInfo.GetOriginDefaultValue()) value, err = value.ConvertTo(h.mu.ctx.GetSessionVars().StmtCtx, &colInfo.FieldType) if err != nil { @@ -159,8 +163,34 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfo *model.ColumnInfo) } // There must be only one bucket for this new column and the value is the default value. sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%d, 0, %d, 0, %d, %d, X'%X', X'%X')", physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes())) +======= + for _, colInfo := range colInfos { + value := types.NewDatum(colInfo.GetOriginDefaultValue()) + value, err = value.ConvertTo(h.mu.ctx.GetSessionVars().StmtCtx, &colInfo.FieldType) + if err != nil { + return + } + if value.IsNull() { + // If the adding column has default value null, all the existing rows have null value on the newly added column. + if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%?, %?, 0, %?, 0, %?)", startTS, physicalID, colInfo.ID, count); err != nil { + return err + } + } else { + // If this stats exists, we insert histogram meta first, the distinct_count will always be one. + if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%?, %?, 0, %?, 1, %?)", startTS, physicalID, colInfo.ID, int64(len(value.GetBytes()))*count); err != nil { + return err + } + value, err = value.ConvertTo(h.mu.ctx.GetSessionVars().StmtCtx, types.NewFieldType(mysql.TypeBlob)) + if err != nil { + return + } + // There must be only one bucket for this new column and the value is the default value. + if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%?, 0, %?, 0, %?, %?, %?, %?)", physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes()); err != nil { + return err + } + } +>>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) } - return execSQLs(context.Background(), exec, sqls) } return } @@ -168,20 +198,10 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfo *model.ColumnInfo) // finishTransaction will execute `commit` when error is nil, otherwise `rollback`. func finishTransaction(ctx context.Context, exec sqlexec.SQLExecutor, err error) error { if err == nil { - _, err = exec.Execute(ctx, "commit") + _, err = exec.ExecuteInternal(ctx, "commit") } else { - _, err1 := exec.Execute(ctx, "rollback") + _, err1 := exec.ExecuteInternal(ctx, "rollback") terror.Log(errors.Trace(err1)) } return errors.Trace(err) } - -func execSQLs(ctx context.Context, exec sqlexec.SQLExecutor, sqls []string) error { - for _, sql := range sqls { - _, err := exec.Execute(ctx, sql) - if err != nil { - return err - } - } - return nil -} diff --git a/statistics/handle/dump.go b/statistics/handle/dump.go index 16295569d76c0..487e2faa2c64f 100644 --- a/statistics/handle/dump.go +++ b/statistics/handle/dump.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/types" @@ -63,9 +64,24 @@ func dumpJSONCol(hist *statistics.Histogram, CMSketch *statistics.CMSketch) *jso // DumpStatsToJSON dumps statistic to json. func (h *Handle) DumpStatsToJSON(dbName string, tableInfo *model.TableInfo, historyStatsExec sqlexec.RestrictedSQLExecutor) (*JSONTable, error) { + var snapshot uint64 + if historyStatsExec != nil { + sctx := historyStatsExec.(sessionctx.Context) + snapshot = sctx.GetSessionVars().SnapshotTS + } + return h.DumpStatsToJSONBySnapshot(dbName, tableInfo, snapshot) +} + +// DumpStatsToJSONBySnapshot dumps statistic to json. +func (h *Handle) DumpStatsToJSONBySnapshot(dbName string, tableInfo *model.TableInfo, snapshot uint64) (*JSONTable, error) { pi := tableInfo.GetPartitionInfo() +<<<<<<< HEAD if pi == nil { return h.tableStatsToJSON(dbName, tableInfo, tableInfo.ID, historyStatsExec) +======= + if pi == nil || h.CurrentPruneMode() == variable.DynamicOnly { + return h.tableStatsToJSON(dbName, tableInfo, tableInfo.ID, snapshot) +>>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) } jsonTbl := &JSONTable{ DatabaseName: dbName, @@ -73,7 +89,7 @@ func (h *Handle) DumpStatsToJSON(dbName string, tableInfo *model.TableInfo, hist Partitions: make(map[string]*JSONTable, len(pi.Definitions)), } for _, def := range pi.Definitions { - tbl, err := h.tableStatsToJSON(dbName, tableInfo, def.ID, historyStatsExec) + tbl, err := h.tableStatsToJSON(dbName, tableInfo, def.ID, snapshot) if err != nil { return nil, errors.Trace(err) } @@ -85,12 +101,12 @@ func (h *Handle) DumpStatsToJSON(dbName string, tableInfo *model.TableInfo, hist return jsonTbl, nil } -func (h *Handle) tableStatsToJSON(dbName string, tableInfo *model.TableInfo, physicalID int64, historyStatsExec sqlexec.RestrictedSQLExecutor) (*JSONTable, error) { - tbl, err := h.tableStatsFromStorage(tableInfo, physicalID, true, historyStatsExec) +func (h *Handle) tableStatsToJSON(dbName string, tableInfo *model.TableInfo, physicalID int64, snapshot uint64) (*JSONTable, error) { + tbl, err := h.tableStatsFromStorage(tableInfo, physicalID, true, snapshot) if err != nil || tbl == nil { return nil, err } - tbl.Version, tbl.ModifyCount, tbl.Count, err = h.statsMetaByTableIDFromStorage(physicalID, historyStatsExec) + tbl.Version, tbl.ModifyCount, tbl.Count, err = h.statsMetaByTableIDFromStorage(physicalID, snapshot) if err != nil { return nil, err } diff --git a/statistics/handle/gc.go b/statistics/handle/gc.go index 3264485e6b703..176b76c5976c0 100644 --- a/statistics/handle/gc.go +++ b/statistics/handle/gc.go @@ -15,7 +15,11 @@ package handle import ( "context" +<<<<<<< HEAD "fmt" +======= + "encoding/json" +>>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) "time" "github.com/cznic/mathutil" @@ -27,6 +31,7 @@ import ( // GCStats will garbage collect the useless stats info. For dropped tables, we will first update their version so that // other tidb could know that table is deleted. func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) error { + ctx := context.Background() // To make sure that all the deleted tables' schema and stats info have been acknowledged to all tidb, // we only garbage collect version before 10 lease. lease := mathutil.MaxInt64(int64(h.Lease()), int64(ddlLease)) @@ -34,8 +39,13 @@ func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) error if h.LastUpdateVersion() < offset { return nil } +<<<<<<< HEAD sql := fmt.Sprintf("select table_id from mysql.stats_meta where version < %d", h.LastUpdateVersion()-offset) rows, _, err := h.restrictedExec.ExecRestrictedSQL(sql) +======= + gcVer := h.LastUpdateVersion() - offset + rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_meta where version < %?", gcVer) +>>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) if err != nil { return errors.Trace(err) } @@ -48,17 +58,18 @@ func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) error } func (h *Handle) gcTableStats(is infoschema.InfoSchema, physicalID int64) error { - sql := fmt.Sprintf("select is_index, hist_id from mysql.stats_histograms where table_id = %d", physicalID) - rows, _, err := h.restrictedExec.ExecRestrictedSQL(sql) + ctx := context.Background() + rows, _, err := h.execRestrictedSQL(ctx, "select is_index, hist_id from mysql.stats_histograms where table_id = %?", physicalID) if err != nil { return errors.Trace(err) } // The table has already been deleted in stats and acknowledged to all tidb, // we can safely remove the meta info now. if len(rows) == 0 { - sql := fmt.Sprintf("delete from mysql.stats_meta where table_id = %d", physicalID) - _, _, err := h.restrictedExec.ExecRestrictedSQL(sql) - return errors.Trace(err) + _, _, err = h.execRestrictedSQL(ctx, "delete from mysql.stats_meta where table_id = %?", physicalID) + if err != nil { + return errors.Trace(err) + } } h.mu.Lock() tbl, ok := h.getTableByPhysicalID(is, physicalID) @@ -91,6 +102,43 @@ func (h *Handle) gcTableStats(is infoschema.InfoSchema, physicalID int64) error } } } +<<<<<<< HEAD +======= + // Mark records in mysql.stats_extended as `deleted`. + rows, _, err = h.execRestrictedSQL(ctx, "select stats_name, db, column_ids from mysql.stats_extended where table_id = %? and status in (%?, %?)", physicalID, StatsStatusAnalyzed, StatsStatusInited) + if err != nil { + return errors.Trace(err) + } + if len(rows) == 0 { + return nil + } + for _, row := range rows { + statsName, db, strColIDs := row.GetString(0), row.GetString(1), row.GetString(2) + var colIDs []int64 + err = json.Unmarshal([]byte(strColIDs), &colIDs) + if err != nil { + logutil.BgLogger().Debug("decode column IDs failed", zap.String("column_ids", strColIDs), zap.Error(err)) + return errors.Trace(err) + } + for _, colID := range colIDs { + found := false + for _, col := range tblInfo.Columns { + if colID == col.ID { + found = true + break + } + } + if !found { + err = h.MarkExtendedStatsDeleted(statsName, db, physicalID) + if err != nil { + logutil.BgLogger().Debug("update stats_extended status failed", zap.String("stats_name", statsName), zap.String("db", db), zap.Error(err)) + return errors.Trace(err) + } + break + } + } + } +>>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) return nil } @@ -99,29 +147,37 @@ func (h *Handle) deleteHistStatsFromKV(physicalID int64, histID int64, isIndex i h.mu.Lock() defer h.mu.Unlock() + ctx := context.Background() exec := h.mu.ctx.(sqlexec.SQLExecutor) - _, err = exec.Execute(context.Background(), "begin") + _, err = exec.ExecuteInternal(ctx, "begin") if err != nil { return errors.Trace(err) } defer func() { - err = finishTransaction(context.Background(), exec, err) + err = finishTransaction(ctx, exec, err) }() txn, err := h.mu.ctx.Txn(true) if err != nil { return errors.Trace(err) } startTS := txn.StartTS() - sqls := make([]string, 0, 4) // First of all, we update the version. If this table doesn't exist, it won't have any problem. Because we cannot delete anything. - sqls = append(sqls, fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", startTS, physicalID)) + if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %? where table_id = %? ", startTS, physicalID); err != nil { + return err + } // delete histogram meta - sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_histograms where table_id = %d and hist_id = %d and is_index = %d", physicalID, histID, isIndex)) + if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_histograms where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil { + return err + } // delete top n data - sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_top_n where table_id = %d and hist_id = %d and is_index = %d", physicalID, histID, isIndex)) + if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_top_n where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil { + return err + } // delete all buckets - sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_buckets where table_id = %d and hist_id = %d and is_index = %d", physicalID, histID, isIndex)) - return execSQLs(context.Background(), exec, sqls) + if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_buckets where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil { + return err + } + return nil } // DeleteTableStatsFromKV deletes table statistics from kv. @@ -129,7 +185,7 @@ func (h *Handle) DeleteTableStatsFromKV(physicalID int64) (err error) { h.mu.Lock() defer h.mu.Unlock() exec := h.mu.ctx.(sqlexec.SQLExecutor) - _, err = exec.Execute(context.Background(), "begin") + _, err = exec.ExecuteInternal(context.Background(), "begin") if err != nil { return errors.Trace(err) } @@ -140,9 +196,10 @@ func (h *Handle) DeleteTableStatsFromKV(physicalID int64) (err error) { if err != nil { return errors.Trace(err) } + ctx := context.Background() startTS := txn.StartTS() - sqls := make([]string, 0, 5) // We only update the version so that other tidb will know that this table is deleted. +<<<<<<< HEAD sqls = append(sqls, fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", startTS, physicalID)) sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_histograms where table_id = %d", physicalID)) sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_buckets where table_id = %d", physicalID)) @@ -150,3 +207,42 @@ func (h *Handle) DeleteTableStatsFromKV(physicalID int64) (err error) { sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_feedback where table_id = %d", physicalID)) return execSQLs(context.Background(), exec, sqls) } +======= + if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %? where table_id = %? ", startTS, physicalID); err != nil { + return err + } + if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_histograms where table_id = %?", physicalID); err != nil { + return err + } + if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_buckets where table_id = %?", physicalID); err != nil { + return err + } + if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_top_n where table_id = %?", physicalID); err != nil { + return err + } + if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_feedback where table_id = %?", physicalID); err != nil { + return err + } + if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_extended set version = %?, status = %? where table_id = %? and status in (%?, %?)", startTS, StatsStatusDeleted, physicalID, StatsStatusAnalyzed, StatsStatusInited); err != nil { + return err + } + return nil +} + +func (h *Handle) removeDeletedExtendedStats(version uint64) (err error) { + h.mu.Lock() + defer h.mu.Unlock() + exec := h.mu.ctx.(sqlexec.SQLExecutor) + ctx := context.Background() + _, err = exec.ExecuteInternal(ctx, "begin pessimistic") + if err != nil { + return errors.Trace(err) + } + defer func() { + err = finishTransaction(ctx, exec, err) + }() + const sql = "delete from mysql.stats_extended where status = %? and version < %?" + _, err = exec.ExecuteInternal(ctx, sql, StatsStatusDeleted, version) + return +} +>>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 3c681aafb3804..069e73eaf251a 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -20,12 +20,16 @@ import ( "sync/atomic" "time" +<<<<<<< HEAD +======= + "github.com/cznic/mathutil" + "github.com/ngaut/pools" +>>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" - "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/sessionctx" @@ -68,7 +72,7 @@ type Handle struct { atomic.Value } - restrictedExec sqlexec.RestrictedSQLExecutor + pool sessionPool // ddlEventCh is a channel to notify a ddl operation has happened. // It is sent only by owner or the drop stats executor, and read by stats handle. @@ -83,6 +87,37 @@ type Handle struct { lease atomic2.Duration } +func (h *Handle) withRestrictedSQLExecutor(ctx context.Context, fn func(context.Context, sqlexec.RestrictedSQLExecutor) ([]chunk.Row, []*ast.ResultField, error)) ([]chunk.Row, []*ast.ResultField, error) { + se, err := h.pool.Get() + if err != nil { + return nil, nil, errors.Trace(err) + } + defer h.pool.Put(se) + + exec := se.(sqlexec.RestrictedSQLExecutor) + return fn(ctx, exec) +} + +func (h *Handle) execRestrictedSQL(ctx context.Context, sql string, params ...interface{}) ([]chunk.Row, []*ast.ResultField, error) { + return h.withRestrictedSQLExecutor(ctx, func(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) ([]chunk.Row, []*ast.ResultField, error) { + stmt, err := exec.ParseWithParams(ctx, sql, params...) + if err != nil { + return nil, nil, errors.Trace(err) + } + return exec.ExecRestrictedStmt(ctx, stmt) + }) +} + +func (h *Handle) execRestrictedSQLWithSnapshot(ctx context.Context, sql string, snapshot uint64, params ...interface{}) ([]chunk.Row, []*ast.ResultField, error) { + return h.withRestrictedSQLExecutor(ctx, func(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) ([]chunk.Row, []*ast.ResultField, error) { + stmt, err := exec.ParseWithParams(ctx, sql, params...) + if err != nil { + return nil, nil, errors.Trace(err) + } + return exec.ExecRestrictedStmt(ctx, stmt, sqlexec.ExecOptionWithSnapshot(snapshot)) + }) +} + // Clear the statsCache, only for test. func (h *Handle) Clear() { h.mu.Lock() @@ -101,7 +136,13 @@ func (h *Handle) Clear() { h.mu.Unlock() } +type sessionPool interface { + Get() (pools.Resource, error) + Put(pools.Resource) +} + // NewHandle creates a Handle for update stats. +<<<<<<< HEAD func NewHandle(ctx sessionctx.Context, lease time.Duration) *Handle { handle := &Handle{ ddlEventCh: make(chan *util.Event, 100), @@ -114,6 +155,20 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration) *Handle { if exec, ok := ctx.(sqlexec.RestrictedSQLExecutor); ok { handle.restrictedExec = exec } +======= +func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool) (*Handle, error) { + handle := &Handle{ + ddlEventCh: make(chan *util.Event, 100), + listHead: &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)}, + globalMap: make(tableDeltaMap), + feedback: statistics.NewQueryFeedbackMap(), + idxUsageListHead: &SessionIndexUsageCollector{mapper: make(indexUsageMap)}, + pool: pool, + } + handle.lease.Store(lease) + handle.pool = pool + handle.statsCache.memTracker = memory.NewTracker(memory.LabelForStatsCache, -1) +>>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) handle.mu.ctx = ctx handle.mu.rateMap = make(errorRateDeltaMap) handle.statsCache.Store(statsCache{tables: make(map[int64]*statistics.Table)}) @@ -158,8 +213,8 @@ func (h *Handle) Update(is infoschema.InfoSchema) error { } else { lastVersion = 0 } - sql := fmt.Sprintf("SELECT version, table_id, modify_count, count from mysql.stats_meta where version > %d order by version", lastVersion) - rows, _, err := h.restrictedExec.ExecRestrictedSQL(sql) + ctx := context.Background() + rows, _, err := h.execRestrictedSQL(ctx, "SELECT version, table_id, modify_count, count from mysql.stats_meta where version > %? order by version", lastVersion) if err != nil { return errors.Trace(err) } @@ -181,7 +236,7 @@ func (h *Handle) Update(is infoschema.InfoSchema) error { continue } tableInfo := table.Meta() - tbl, err := h.tableStatsFromStorage(tableInfo, physicalID, false, nil) + tbl, err := h.tableStatsFromStorage(tableInfo, physicalID, false, 0) // Error is not nil may mean that there are some ddl changes on this table, we will not update it. if err != nil { logutil.BgLogger().Error("[stats] error occurred when read table stats", zap.String("table", tableInfo.Name.O), zap.Error(err)) @@ -281,7 +336,7 @@ func (sc statsCache) update(tables []*statistics.Table, deletedIDs []int64, newV // LoadNeededHistograms will load histograms for those needed columns. func (h *Handle) LoadNeededHistograms() (err error) { cols := statistics.HistogramNeededColumns.AllCols() - reader, err := h.getStatsReader(nil) + reader, err := h.getStatsReader(0) if err != nil { return err } @@ -309,7 +364,15 @@ func (h *Handle) LoadNeededHistograms() (err error) { if err != nil { return errors.Trace(err) } +<<<<<<< HEAD cms, err := h.cmSketchFromStorage(reader, col.TableID, 0, col.ColumnID) +======= + cms, topN, err := h.cmSketchAndTopNFromStorage(reader, col.TableID, 0, col.ColumnID) + if err != nil { + return errors.Trace(err) + } + rows, _, err := reader.read("select stats_ver from mysql.stats_histograms where is_index = 0 and table_id = %? and hist_id = %?", col.TableID, col.ColumnID) +>>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) if err != nil { return errors.Trace(err) } @@ -354,14 +417,18 @@ func (h *Handle) FlushStats() { } } +<<<<<<< HEAD func (h *Handle) cmSketchFromStorage(reader *statsReader, tblID int64, isIndex, histID int64) (_ *statistics.CMSketch, err error) { selSQL := fmt.Sprintf("select cm_sketch from mysql.stats_histograms where table_id = %d and is_index = %d and hist_id = %d", tblID, isIndex, histID) rows, _, err := reader.read(selSQL) +======= +func (h *Handle) cmSketchAndTopNFromStorage(reader *statsReader, tblID int64, isIndex, histID int64) (_ *statistics.CMSketch, _ *statistics.TopN, err error) { + rows, _, err := reader.read("select cm_sketch from mysql.stats_histograms where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID) +>>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) if err != nil || len(rows) == 0 { return nil, err } - selSQL = fmt.Sprintf("select HIGH_PRIORITY value, count from mysql.stats_top_n where table_id = %d and is_index = %d and hist_id = %d", tblID, isIndex, histID) - topNRows, _, err := reader.read(selSQL) + topNRows, _, err := reader.read("select HIGH_PRIORITY value, count from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID) if err != nil { return nil, err } @@ -497,8 +564,8 @@ func (h *Handle) columnStatsFromStorage(reader *statsReader, row chunk.Row, tabl } // tableStatsFromStorage loads table stats info from storage. -func (h *Handle) tableStatsFromStorage(tableInfo *model.TableInfo, physicalID int64, loadAll bool, historyStatsExec sqlexec.RestrictedSQLExecutor) (_ *statistics.Table, err error) { - reader, err := h.getStatsReader(historyStatsExec) +func (h *Handle) tableStatsFromStorage(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (_ *statistics.Table, err error) { + reader, err := h.getStatsReader(snapshot) if err != nil { return nil, err } @@ -511,7 +578,7 @@ func (h *Handle) tableStatsFromStorage(tableInfo *model.TableInfo, physicalID in table, ok := h.statsCache.Load().(statsCache).tables[physicalID] // If table stats is pseudo, we also need to copy it, since we will use the column stats when // the average error rate of it is small. - if !ok || historyStatsExec != nil { + if !ok || snapshot > 0 { histColl := statistics.HistColl{ PhysicalID: physicalID, HavePhysicalID: true, @@ -526,8 +593,7 @@ func (h *Handle) tableStatsFromStorage(tableInfo *model.TableInfo, physicalID in table = table.Copy() } table.Pseudo = false - selSQL := fmt.Sprintf("select table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, flag, correlation, last_analyze_pos from mysql.stats_histograms where table_id = %d", physicalID) - rows, _, err := reader.read(selSQL) + rows, _, err := reader.read("select table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, flag, correlation, last_analyze_pos from mysql.stats_histograms where table_id = %?", physicalID) // Check deleted table. if err != nil || len(rows) == 0 { return nil, nil @@ -542,6 +608,48 @@ func (h *Handle) tableStatsFromStorage(tableInfo *model.TableInfo, physicalID in return nil, err } } +<<<<<<< HEAD +======= + return h.extendedStatsFromStorage(reader, table, physicalID, loadAll) +} + +func (h *Handle) extendedStatsFromStorage(reader *statsReader, table *statistics.Table, physicalID int64, loadAll bool) (*statistics.Table, error) { + lastVersion := uint64(0) + if table.ExtendedStats != nil && !loadAll { + lastVersion = table.ExtendedStats.LastUpdateVersion + } else { + table.ExtendedStats = statistics.NewExtendedStatsColl() + } + rows, _, err := reader.read("select stats_name, db, status, type, column_ids, scalar_stats, blob_stats, version from mysql.stats_extended where table_id = %? and status in (%?, %?) and version > %?", physicalID, StatsStatusAnalyzed, StatsStatusDeleted, lastVersion) + if err != nil || len(rows) == 0 { + return table, nil + } + for _, row := range rows { + lastVersion = mathutil.MaxUint64(lastVersion, row.GetUint64(7)) + key := statistics.ExtendedStatsKey{ + StatsName: row.GetString(0), + DB: row.GetString(1), + } + status := uint8(row.GetInt64(2)) + if status == StatsStatusDeleted { + delete(table.ExtendedStats.Stats, key) + } else { + item := &statistics.ExtendedStatsItem{ + Tp: uint8(row.GetInt64(3)), + ScalarVals: row.GetFloat64(5), + StringVals: row.GetString(6), + } + colIDs := row.GetString(4) + err := json.Unmarshal([]byte(colIDs), &item.ColIDs) + if err != nil { + logutil.BgLogger().Error("[stats] decode column IDs failed", zap.String("column_ids", colIDs), zap.Error(err)) + return nil, err + } + table.ExtendedStats.Stats[key] = item + } + } + table.ExtendedStats.LastUpdateVersion = lastVersion +>>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) return table, nil } @@ -551,7 +659,7 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg defer h.mu.Unlock() ctx := context.TODO() exec := h.mu.ctx.(sqlexec.SQLExecutor) - _, err = exec.Execute(ctx, "begin") + _, err = exec.ExecuteInternal(ctx, "begin") if err != nil { return errors.Trace(err) } @@ -564,29 +672,53 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg } version := txn.StartTS() - sqls := make([]string, 0, 4) // If the count is less than 0, then we do not want to update the modify count and count. if count >= 0 { - sqls = append(sqls, fmt.Sprintf("replace into mysql.stats_meta (version, table_id, count) values (%d, %d, %d)", version, tableID, count)) + _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_meta (version, table_id, count) values (%?, %?, %?)", version, tableID, count) } else { - sqls = append(sqls, fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d", version, tableID)) + _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %? where table_id = %?", version, tableID) + } + if err != nil { + return err } data, err := statistics.EncodeCMSketchWithoutTopN(cms) if err != nil { - return + return err } // Delete outdated data +<<<<<<< HEAD sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_top_n where table_id = %d and is_index = %d and hist_id = %d", tableID, isIndex, hg.ID)) for _, meta := range cms.TopN() { sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_top_n (table_id, is_index, hist_id, value, count) values (%d, %d, %d, X'%X', %d)", tableID, isIndex, hg.ID, meta.Data, meta.Count)) +======= + if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil { + return err + } + if topN != nil { + for _, meta := range topN.TopN { + if _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_top_n (table_id, is_index, hist_id, value, count) values (%?, %?, %?, %?, %?)", tableID, isIndex, hg.ID, meta.Encoded, meta.Count); err != nil { + return err + } + } +>>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) } flag := 0 if isAnalyzed == 1 { flag = statistics.AnalyzeFlag } +<<<<<<< HEAD sqls = append(sqls, fmt.Sprintf("replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, flag, correlation) values (%d, %d, %d, %d, %d, %d, X'%X', %d, %d, %d, %f)", tableID, isIndex, hg.ID, hg.NDV, version, hg.NullCount, data, hg.TotColSize, statistics.CurStatsVersion, flag, hg.Correlation)) sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_buckets where table_id = %d and is_index = %d and hist_id = %d", tableID, isIndex, hg.ID)) +======= + if _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, flag, correlation) values (%?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?)", + tableID, isIndex, hg.ID, hg.NDV, version, hg.NullCount, data, hg.TotColSize, statsVersion, flag, hg.Correlation); err != nil { + return err + } + if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil { + return err + } +>>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) sc := h.mu.ctx.GetSessionVars().StmtCtx var lastAnalyzePos []byte for i := range hg.Buckets { @@ -607,12 +739,16 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg if err != nil { return } - sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_buckets(table_id, is_index, hist_id, bucket_id, count, repeats, lower_bound, upper_bound) values(%d, %d, %d, %d, %d, %d, X'%X', X'%X')", tableID, isIndex, hg.ID, i, count, hg.Buckets[i].Repeat, lowerBound.GetBytes(), upperBound.GetBytes())) + if _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_buckets(table_id, is_index, hist_id, bucket_id, count, repeats, lower_bound, upper_bound) values(%?, %?, %?, %?, %?, %?, %?, %?)", tableID, isIndex, hg.ID, i, count, hg.Buckets[i].Repeat, lowerBound.GetBytes(), upperBound.GetBytes()); err != nil { + return err + } } if isAnalyzed == 1 && len(lastAnalyzePos) > 0 { - sqls = append(sqls, fmt.Sprintf("update mysql.stats_histograms set last_analyze_pos = X'%X' where table_id = %d and is_index = %d and hist_id = %d", lastAnalyzePos, tableID, isIndex, hg.ID)) + if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_histograms set last_analyze_pos = %? where table_id = %? and is_index = %? and hist_id = %?", lastAnalyzePos, tableID, isIndex, hg.ID); err != nil { + return err + } } - return execSQLs(context.Background(), exec, sqls) + return } // SaveMetaToStorage will save stats_meta to storage. @@ -621,7 +757,7 @@ func (h *Handle) SaveMetaToStorage(tableID, count, modifyCount int64) (err error defer h.mu.Unlock() ctx := context.TODO() exec := h.mu.ctx.(sqlexec.SQLExecutor) - _, err = exec.Execute(ctx, "begin") + _, err = exec.ExecuteInternal(ctx, "begin") if err != nil { return errors.Trace(err) } @@ -632,16 +768,13 @@ func (h *Handle) SaveMetaToStorage(tableID, count, modifyCount int64) (err error if err != nil { return errors.Trace(err) } - var sql string version := txn.StartTS() - sql = fmt.Sprintf("replace into mysql.stats_meta (version, table_id, count, modify_count) values (%d, %d, %d, %d)", version, tableID, count, modifyCount) - _, err = exec.Execute(ctx, sql) - return + _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_meta (version, table_id, count, modify_count) values (%?, %?, %?, %?)", version, tableID, count, modifyCount) + return err } func (h *Handle) histogramFromStorage(reader *statsReader, tableID int64, colID int64, tp *types.FieldType, distinct int64, isIndex int, ver uint64, nullCount int64, totColSize int64, corr float64) (_ *statistics.Histogram, err error) { - selSQL := fmt.Sprintf("select count, repeats, lower_bound, upper_bound from mysql.stats_buckets where table_id = %d and is_index = %d and hist_id = %d order by bucket_id", tableID, isIndex, colID) - rows, fields, err := reader.read(selSQL) + rows, fields, err := reader.read("select count, repeats, lower_bound, upper_bound from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %? order by bucket_id", tableID, isIndex, colID) if err != nil { return nil, errors.Trace(err) } @@ -676,25 +809,57 @@ func (h *Handle) histogramFromStorage(reader *statsReader, tableID int64, colID return hg, nil } +<<<<<<< HEAD func (h *Handle) columnCountFromStorage(reader *statsReader, tableID, colID int64) (int64, error) { selSQL := fmt.Sprintf("select sum(count) from mysql.stats_buckets where table_id = %d and is_index = %d and hist_id = %d", tableID, 0, colID) rows, _, err := reader.read(selSQL) +======= +func (h *Handle) columnCountFromStorage(reader *statsReader, tableID, colID, statsVer int64) (int64, error) { + rows, _, err := reader.read("select sum(count) from mysql.stats_buckets where table_id = %? and is_index = 0 and hist_id = %?", tableID, colID) +>>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) if err != nil { return 0, errors.Trace(err) } if rows[0].IsNull(0) { return 0, nil } +<<<<<<< HEAD return rows[0].GetMyDecimal(0).ToInt() +======= + count, err := rows[0].GetMyDecimal(0).ToInt() + if err != nil { + return 0, errors.Trace(err) + } + if statsVer == statistics.Version2 { + // Before stats ver 2, histogram represents all data in this column. + // In stats ver 2, histogram + TopN represent all data in this column. + // So we need to add TopN total count here. + rows, _, err = reader.read("select sum(count) from mysql.stats_top_n where table_id = %? and is_index = 0 and hist_id = %?", tableID, colID) + if err != nil { + return 0, errors.Trace(err) + } + if !rows[0].IsNull(0) { + topNCount, err := rows[0].GetMyDecimal(0).ToInt() + if err != nil { + return 0, errors.Trace(err) + } + count += topNCount + } + } + return count, err +>>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) } -func (h *Handle) statsMetaByTableIDFromStorage(tableID int64, historyStatsExec sqlexec.RestrictedSQLExecutor) (version uint64, modifyCount, count int64, err error) { - selSQL := fmt.Sprintf("SELECT version, modify_count, count from mysql.stats_meta where table_id = %d order by version", tableID) +func (h *Handle) statsMetaByTableIDFromStorage(tableID int64, snapshot uint64) (version uint64, modifyCount, count int64, err error) { + ctx := context.Background() var rows []chunk.Row - if historyStatsExec == nil { - rows, _, err = h.restrictedExec.ExecRestrictedSQL(selSQL) + if snapshot == 0 { + rows, _, err = h.execRestrictedSQL(ctx, "SELECT version, modify_count, count from mysql.stats_meta where table_id = %? order by version", tableID) } else { - rows, _, err = historyStatsExec.ExecRestrictedSQLWithSnapshot(selSQL) + rows, _, err = h.execRestrictedSQLWithSnapshot(ctx, "SELECT version, modify_count, count from mysql.stats_meta where table_id = %? order by version", snapshot, tableID) + if err != nil { + return 0, 0, 0, err + } } if err != nil || len(rows) == 0 { return @@ -708,49 +873,34 @@ func (h *Handle) statsMetaByTableIDFromStorage(tableID int64, historyStatsExec s // statsReader is used for simplify code that needs to read system tables in different sqls // but requires the same transactions. type statsReader struct { - ctx sessionctx.Context - history sqlexec.RestrictedSQLExecutor + ctx sqlexec.RestrictedSQLExecutor + snapshot uint64 } -func (sr *statsReader) read(sql string) (rows []chunk.Row, fields []*ast.ResultField, err error) { - if sr.history != nil { - return sr.history.ExecRestrictedSQLWithSnapshot(sql) - } - rc, err := sr.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) - if len(rc) > 0 { - defer terror.Call(rc[0].Close) - } +func (sr *statsReader) read(sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, err error) { + ctx := context.TODO() + stmt, err := sr.ctx.ParseWithParams(ctx, sql, args...) if err != nil { - return nil, nil, err + return nil, nil, errors.Trace(err) } - for { - req := rc[0].NewChunk() - err := rc[0].Next(context.TODO(), req) - if err != nil { - return nil, nil, err - } - if req.NumRows() == 0 { - break - } - for i := 0; i < req.NumRows(); i++ { - rows = append(rows, req.GetRow(i)) - } + if sr.snapshot > 0 { + return sr.ctx.ExecRestrictedStmt(ctx, stmt, sqlexec.ExecOptionWithSnapshot(sr.snapshot)) } - return rows, rc[0].Fields(), nil + return sr.ctx.ExecRestrictedStmt(ctx, stmt) } func (sr *statsReader) isHistory() bool { - return sr.history != nil + return sr.snapshot > 0 } -func (h *Handle) getStatsReader(history sqlexec.RestrictedSQLExecutor) (reader *statsReader, err error) { +func (h *Handle) getStatsReader(snapshot uint64) (reader *statsReader, err error) { failpoint.Inject("mockGetStatsReaderFail", func(val failpoint.Value) { if val.(bool) { failpoint.Return(nil, errors.New("gofail genStatsReader error")) } }) - if history != nil { - return &statsReader{history: history}, nil + if snapshot > 0 { + return &statsReader{ctx: h.mu.ctx.(sqlexec.RestrictedSQLExecutor), snapshot: snapshot}, nil } h.mu.Lock() defer func() { @@ -762,18 +912,289 @@ func (h *Handle) getStatsReader(history sqlexec.RestrictedSQLExecutor) (reader * } }() failpoint.Inject("mockGetStatsReaderPanic", nil) - _, err = h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "begin") + _, err = h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), "begin") if err != nil { return nil, err } - return &statsReader{ctx: h.mu.ctx}, nil + return &statsReader{ctx: h.mu.ctx.(sqlexec.RestrictedSQLExecutor)}, nil } func (h *Handle) releaseStatsReader(reader *statsReader) error { - if reader.history != nil { + if reader.snapshot > 0 { return nil } - _, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "commit") + _, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), "commit") h.mu.Unlock() return err } +<<<<<<< HEAD +======= + +const ( + // StatsStatusInited is the status for extended stats which are just registered but have not been analyzed yet. + StatsStatusInited uint8 = iota + // StatsStatusAnalyzed is the status for extended stats which have been collected in analyze. + StatsStatusAnalyzed + // StatsStatusDeleted is the status for extended stats which were dropped. These "deleted" records would be removed from storage by GCStats(). + StatsStatusDeleted +) + +// InsertExtendedStats inserts a record into mysql.stats_extended and update version in mysql.stats_meta. +func (h *Handle) InsertExtendedStats(statsName, db string, colIDs []int64, tp int, tableID int64, ifNotExists bool) (err error) { + bytes, err := json.Marshal(colIDs) + if err != nil { + return errors.Trace(err) + } + strColIDs := string(bytes) + h.mu.Lock() + defer h.mu.Unlock() + ctx := context.TODO() + exec := h.mu.ctx.(sqlexec.SQLExecutor) + _, err = exec.ExecuteInternal(ctx, "begin pessimistic") + if err != nil { + return errors.Trace(err) + } + defer func() { + err = finishTransaction(ctx, exec, err) + }() + txn, err := h.mu.ctx.Txn(true) + if err != nil { + return errors.Trace(err) + } + version := txn.StartTS() + const sql = "INSERT INTO mysql.stats_extended(stats_name, db, type, table_id, column_ids, version, status) VALUES (%?, %?, %?, %?, %?, %?, %?)" + _, err = exec.ExecuteInternal(ctx, sql, statsName, db, tp, tableID, strColIDs, version, StatsStatusInited) + // Key exists, but `if not exists` is specified, so we ignore this error. + if kv.ErrKeyExists.Equal(err) && ifNotExists { + err = nil + } + return +} + +// MarkExtendedStatsDeleted update the status of mysql.stats_extended to be `deleted` and the version of mysql.stats_meta. +func (h *Handle) MarkExtendedStatsDeleted(statsName, db string, tableID int64) (err error) { + ctx := context.Background() + if tableID < 0 { + rows, _, err := h.execRestrictedSQL(ctx, "SELECT table_id FROM mysql.stats_extended WHERE stats_name = %? and db = %?", statsName, db) + if err != nil { + return errors.Trace(err) + } + if len(rows) == 0 { + return nil + } + tableID = rows[0].GetInt64(0) + } + h.mu.Lock() + defer h.mu.Unlock() + exec := h.mu.ctx.(sqlexec.SQLExecutor) + _, err = exec.ExecuteInternal(ctx, "begin pessimistic") + if err != nil { + return errors.Trace(err) + } + defer func() { + err = finishTransaction(ctx, exec, err) + }() + txn, err := h.mu.ctx.Txn(true) + if err != nil { + return errors.Trace(err) + } + version := txn.StartTS() + if _, err = exec.ExecuteInternal(ctx, "UPDATE mysql.stats_extended SET version = %?, status = %? WHERE stats_name = %? and db = %?", version, StatsStatusDeleted, statsName, db); err != nil { + return err + } + if _, err = exec.ExecuteInternal(ctx, "UPDATE mysql.stats_meta SET version = %? WHERE table_id = %?", version, tableID); err != nil { + return err + } + return nil +} + +// ReloadExtendedStatistics drops the cache for extended statistics and reload data from mysql.stats_extended. +func (h *Handle) ReloadExtendedStatistics() error { + reader, err := h.getStatsReader(0) + if err != nil { + return err + } + oldCache := h.statsCache.Load().(statsCache) + tables := make([]*statistics.Table, 0, len(oldCache.tables)) + for physicalID, tbl := range oldCache.tables { + t, err := h.extendedStatsFromStorage(reader, tbl.Copy(), physicalID, true) + if err != nil { + return err + } + tables = append(tables, t) + } + err = h.releaseStatsReader(reader) + if err != nil { + return err + } + // Note that this update may fail when the statsCache.version has been modified by others. + h.updateStatsCache(oldCache.update(tables, nil, oldCache.version)) + return nil +} + +// BuildExtendedStats build extended stats for column groups if needed based on the column samples. +func (h *Handle) BuildExtendedStats(tableID int64, cols []*model.ColumnInfo, collectors []*statistics.SampleCollector) (*statistics.ExtendedStatsColl, error) { + ctx := context.Background() + const sql = "SELECT stats_name, db, type, column_ids FROM mysql.stats_extended WHERE table_id = %? and status in (%?, %?)" + rows, _, err := h.execRestrictedSQL(ctx, sql, tableID, StatsStatusAnalyzed, StatsStatusInited) + if err != nil { + return nil, errors.Trace(err) + } + if len(rows) == 0 { + return nil, nil + } + statsColl := statistics.NewExtendedStatsColl() + for _, row := range rows { + key := statistics.ExtendedStatsKey{ + StatsName: row.GetString(0), + DB: row.GetString(1), + } + item := &statistics.ExtendedStatsItem{Tp: uint8(row.GetInt64(2))} + colIDs := row.GetString(3) + err := json.Unmarshal([]byte(colIDs), &item.ColIDs) + if err != nil { + logutil.BgLogger().Error("invalid column_ids in mysql.stats_extended, skip collecting extended stats for this row", zap.String("column_ids", colIDs), zap.Error(err)) + continue + } + item = h.fillExtendedStatsItemVals(item, cols, collectors) + if item != nil { + statsColl.Stats[key] = item + } + } + if len(statsColl.Stats) == 0 { + return nil, nil + } + return statsColl, nil +} + +func (h *Handle) fillExtendedStatsItemVals(item *statistics.ExtendedStatsItem, cols []*model.ColumnInfo, collectors []*statistics.SampleCollector) *statistics.ExtendedStatsItem { + switch item.Tp { + case ast.StatsTypeCardinality, ast.StatsTypeDependency: + return nil + case ast.StatsTypeCorrelation: + return h.fillExtStatsCorrVals(item, cols, collectors) + } + return nil +} + +func (h *Handle) fillExtStatsCorrVals(item *statistics.ExtendedStatsItem, cols []*model.ColumnInfo, collectors []*statistics.SampleCollector) *statistics.ExtendedStatsItem { + colOffsets := make([]int, 0, 2) + for _, id := range item.ColIDs { + for i, col := range cols { + if col.ID == id { + colOffsets = append(colOffsets, i) + break + } + } + } + if len(colOffsets) != 2 { + return nil + } + // samplesX and samplesY are in order of handle, i.e, their SampleItem.Ordinals are in order. + samplesX := collectors[colOffsets[0]].Samples + // We would modify Ordinal of samplesY, so we make a deep copy. + samplesY := statistics.CopySampleItems(collectors[colOffsets[1]].Samples) + sampleNum := len(samplesX) + if sampleNum == 1 { + item.ScalarVals = float64(1) + return item + } + h.mu.Lock() + sc := h.mu.ctx.GetSessionVars().StmtCtx + h.mu.Unlock() + var err error + samplesX, err = statistics.SortSampleItems(sc, samplesX) + if err != nil { + return nil + } + samplesYInXOrder := make([]*statistics.SampleItem, sampleNum) + for i, itemX := range samplesX { + itemY := samplesY[itemX.Ordinal] + itemY.Ordinal = i + samplesYInXOrder[i] = itemY + } + samplesYInYOrder, err := statistics.SortSampleItems(sc, samplesYInXOrder) + if err != nil { + return nil + } + var corrXYSum float64 + for i := 1; i < sampleNum; i++ { + corrXYSum += float64(i) * float64(samplesYInYOrder[i].Ordinal) + } + // X means the ordinal of the item in original sequence, Y means the oridnal of the item in the + // sorted sequence, we know that X and Y value sets are both: + // 0, 1, ..., sampleNum-1 + // we can simply compute sum(X) = sum(Y) = + // (sampleNum-1)*sampleNum / 2 + // and sum(X^2) = sum(Y^2) = + // (sampleNum-1)*sampleNum*(2*sampleNum-1) / 6 + // We use "Pearson correlation coefficient" to compute the order correlation of columns, + // the formula is based on https://en.wikipedia.org/wiki/Pearson_correlation_coefficient. + // Note that (itemsCount*corrX2Sum - corrXSum*corrXSum) would never be zero when sampleNum is larger than 1. + itemsCount := float64(sampleNum) + corrXSum := (itemsCount - 1) * itemsCount / 2.0 + corrX2Sum := (itemsCount - 1) * itemsCount * (2*itemsCount - 1) / 6.0 + item.ScalarVals = (itemsCount*corrXYSum - corrXSum*corrXSum) / (itemsCount*corrX2Sum - corrXSum*corrXSum) + return item +} + +// SaveExtendedStatsToStorage writes extended stats of a table into mysql.stats_extended. +func (h *Handle) SaveExtendedStatsToStorage(tableID int64, extStats *statistics.ExtendedStatsColl, isLoad bool) (err error) { + if extStats == nil || len(extStats.Stats) == 0 { + return nil + } + h.mu.Lock() + defer h.mu.Unlock() + ctx := context.TODO() + exec := h.mu.ctx.(sqlexec.SQLExecutor) + _, err = exec.ExecuteInternal(ctx, "begin pessimistic") + if err != nil { + return errors.Trace(err) + } + defer func() { + err = finishTransaction(ctx, exec, err) + }() + txn, err := h.mu.ctx.Txn(true) + if err != nil { + return errors.Trace(err) + } + version := txn.StartTS() + for key, item := range extStats.Stats { + bytes, err := json.Marshal(item.ColIDs) + if err != nil { + return errors.Trace(err) + } + strColIDs := string(bytes) + switch item.Tp { + case ast.StatsTypeCardinality, ast.StatsTypeCorrelation: + // If isLoad is true, it's INSERT; otherwise, it's UPDATE. + _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_extended values (%?, %?, %?, %?, %?, %?, null, %?, %?)", key.StatsName, key.DB, item.Tp, tableID, strColIDs, item.ScalarVals, version, StatsStatusAnalyzed) + case ast.StatsTypeDependency: + _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_extended values (%?, %?, %?, %?, %?, null, %?, %?, %?)", key.StatsName, key.DB, item.Tp, tableID, strColIDs, item.StringVals, version, StatsStatusAnalyzed) + } + if err != nil { + return errors.Trace(err) + } + } + if !isLoad { + if _, err := exec.ExecuteInternal(ctx, "UPDATE mysql.stats_meta SET version = %? WHERE table_id = %?", version, tableID); err != nil { + return err + } + } + return nil +} + +// CurrentPruneMode indicates whether tbl support runtime prune for table and first partition id. +func (h *Handle) CurrentPruneMode() variable.PartitionPruneMode { + h.mu.Lock() + defer h.mu.Unlock() + return variable.PartitionPruneMode(h.mu.ctx.GetSessionVars().PartitionPruneMode.Load()) +} + +// RefreshVars uses to pull PartitionPruneMethod vars from kv storage. +func (h *Handle) RefreshVars() error { + h.mu.Lock() + defer h.mu.Unlock() + return h.mu.ctx.RefreshVars(context.Background()) +} +>>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) diff --git a/statistics/handle/handle_test.go b/statistics/handle/handle_test.go index eefcc08dee748..81d8af8982895 100644 --- a/statistics/handle/handle_test.go +++ b/statistics/handle/handle_test.go @@ -265,7 +265,12 @@ func (s *testStatsSuite) TestVersion(c *C) { tbl1, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) c.Assert(err, IsNil) tableInfo1 := tbl1.Meta() +<<<<<<< HEAD h := handle.NewHandle(testKit.Se, time.Millisecond) +======= + h, err := handle.NewHandle(testKit.Se, time.Millisecond, do.SysSessionPool()) + c.Assert(err, IsNil) +>>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) unit := oracle.ComposeTS(1, 0) testKit.MustExec("update mysql.stats_meta set version = ? where table_id = ?", 2*unit, tableInfo1.ID) @@ -499,7 +504,7 @@ func (s *testStatsSuite) TestCorrelation(c *C) { result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() c.Assert(len(result.Rows()), Equals, 2) c.Assert(result.Rows()[0][9], Equals, "0") - c.Assert(result.Rows()[1][9], Equals, "0.828571") + c.Assert(result.Rows()[1][9], Equals, "0.8285714285714286") testKit.MustExec("truncate table t") result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() @@ -515,7 +520,7 @@ func (s *testStatsSuite) TestCorrelation(c *C) { result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() c.Assert(len(result.Rows()), Equals, 2) c.Assert(result.Rows()[0][9], Equals, "0") - c.Assert(result.Rows()[1][9], Equals, "-0.942857") + c.Assert(result.Rows()[1][9], Equals, "-0.9428571428571428") testKit.MustExec("truncate table t") testKit.MustExec("insert into t values (1,1),(2,1),(3,1),(4,1),(5,1),(6,1),(7,1),(8,1),(9,1),(10,1),(11,1),(12,1),(13,1),(14,1),(15,1),(16,1),(17,1),(18,1),(19,1),(20,2),(21,2),(22,2),(23,2),(24,2),(25,2)") @@ -532,14 +537,14 @@ func (s *testStatsSuite) TestCorrelation(c *C) { result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() c.Assert(len(result.Rows()), Equals, 2) c.Assert(result.Rows()[0][9], Equals, "1") - c.Assert(result.Rows()[1][9], Equals, "0.828571") + c.Assert(result.Rows()[1][9], Equals, "0.8285714285714286") testKit.MustExec("truncate table t") testKit.MustExec("insert into t values(1,1),(2,7),(3,12),(8,18),(4,20),(5,21)") testKit.MustExec("analyze table t") result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() c.Assert(len(result.Rows()), Equals, 2) - c.Assert(result.Rows()[0][9], Equals, "0.828571") + c.Assert(result.Rows()[0][9], Equals, "0.8285714285714286") c.Assert(result.Rows()[1][9], Equals, "1") testKit.MustExec("drop table t") diff --git a/statistics/handle/update.go b/statistics/handle/update.go index affd2c0ca2457..5851f6434b1e7 100644 --- a/statistics/handle/update.go +++ b/statistics/handle/update.go @@ -208,6 +208,120 @@ func (h *Handle) NewSessionStatsCollector() *SessionStatsCollector { return newCollector } +<<<<<<< HEAD +======= +// IndexUsageInformation is the data struct to store index usage information. +type IndexUsageInformation struct { + QueryCount int64 + RowsSelected int64 + LastUsedAt string +} + +// GlobalIndexID is the key type for indexUsageMap. +type GlobalIndexID struct { + TableID int64 + IndexID int64 +} + +type indexUsageMap map[GlobalIndexID]IndexUsageInformation + +// SessionIndexUsageCollector is a list item that holds the index usage mapper. If you want to write or read mapper, you must lock it. +type SessionIndexUsageCollector struct { + sync.Mutex + + mapper indexUsageMap + next *SessionIndexUsageCollector + deleted bool +} + +func (m indexUsageMap) updateByKey(id GlobalIndexID, value *IndexUsageInformation) { + item := m[id] + item.QueryCount += value.QueryCount + item.RowsSelected += value.RowsSelected + if item.LastUsedAt < value.LastUsedAt { + item.LastUsedAt = value.LastUsedAt + } + m[id] = item +} + +func (m indexUsageMap) update(tableID int64, indexID int64, value *IndexUsageInformation) { + id := GlobalIndexID{TableID: tableID, IndexID: indexID} + m.updateByKey(id, value) +} + +func (m indexUsageMap) merge(destMap indexUsageMap) { + for id, item := range destMap { + m.updateByKey(id, &item) + } +} + +// Update updates the mapper in SessionIndexUsageCollector. +func (s *SessionIndexUsageCollector) Update(tableID int64, indexID int64, value *IndexUsageInformation) { + value.LastUsedAt = time.Now().Format(types.TimeFSPFormat) + s.Lock() + defer s.Unlock() + s.mapper.update(tableID, indexID, value) +} + +// Delete will set s.deleted to true which means it can be deleted from linked list. +func (s *SessionIndexUsageCollector) Delete() { + s.Lock() + defer s.Unlock() + s.deleted = true +} + +// NewSessionIndexUsageCollector will add a new SessionIndexUsageCollector into linked list headed by idxUsageListHead. +// idxUsageListHead always points to an empty SessionIndexUsageCollector as a sentinel node. So we let idxUsageListHead.next +// points to new item. It's helpful to sweepIdxUsageList. +func (h *Handle) NewSessionIndexUsageCollector() *SessionIndexUsageCollector { + h.idxUsageListHead.Lock() + defer h.idxUsageListHead.Unlock() + newCollector := &SessionIndexUsageCollector{ + mapper: make(indexUsageMap), + next: h.idxUsageListHead.next, + } + h.idxUsageListHead.next = newCollector + return newCollector +} + +// sweepIdxUsageList will loop over the list, merge each session's local index usage information into handle +// and remove closed session's collector. +// For convenience, we keep idxUsageListHead always points to sentinel node. So that we don't need to consider corner case. +func (h *Handle) sweepIdxUsageList() indexUsageMap { + prev := h.idxUsageListHead + prev.Lock() + mapper := make(indexUsageMap) + for curr := prev.next; curr != nil; curr = curr.next { + curr.Lock() + mapper.merge(curr.mapper) + if curr.deleted { + prev.next = curr.next + curr.Unlock() + } else { + prev.Unlock() + curr.mapper = make(indexUsageMap) + prev = curr + } + } + prev.Unlock() + return mapper +} + +// DumpIndexUsageToKV will dump in-memory index usage information to KV. +func (h *Handle) DumpIndexUsageToKV() error { + ctx := context.Background() + mapper := h.sweepIdxUsageList() + for id, value := range mapper { + const sql = `insert into mysql.SCHEMA_INDEX_USAGE values (%?, %?, %?, %?, %?) on duplicate key update query_count=query_count+%?, rows_selected=rows_selected+%?, last_used_at=greatest(last_used_at, %?)` + _, _, err := h.execRestrictedSQL(ctx, sql, id.TableID, id.IndexID, value.QueryCount, value.RowsSelected, value.LastUsedAt, value.QueryCount, value.RowsSelected, value.LastUsedAt) + if err != nil { + return err + } + } + return nil +} + +>>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) var ( // DumpStatsDeltaRatio is the lower bound of `Modify Count / Table Count` for stats delta to be dumped. DumpStatsDeltaRatio = 1 / 10000.0 @@ -331,7 +445,7 @@ func (h *Handle) dumpTableStatCountToKV(id int64, delta variable.TableDelta) (up defer h.mu.Unlock() ctx := context.TODO() exec := h.mu.ctx.(sqlexec.SQLExecutor) - _, err = exec.Execute(ctx, "begin") + _, err = exec.ExecuteInternal(ctx, "begin") if err != nil { return false, errors.Trace(err) } @@ -344,13 +458,14 @@ func (h *Handle) dumpTableStatCountToKV(id int64, delta variable.TableDelta) (up return false, errors.Trace(err) } startTS := txn.StartTS() - var sql string if delta.Delta < 0 { - sql = fmt.Sprintf("update mysql.stats_meta set version = %d, count = count - %d, modify_count = modify_count + %d where table_id = %d and count >= %d", startTS, -delta.Delta, delta.Count, id, -delta.Delta) + _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %?, count = count - %?, modify_count = modify_count + %? where table_id = %? and count >= %?", startTS, -delta.Delta, delta.Count, id, -delta.Delta) } else { - sql = fmt.Sprintf("update mysql.stats_meta set version = %d, count = count + %d, modify_count = modify_count + %d where table_id = %d", startTS, delta.Delta, delta.Count, id) + _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %?, count = count + %?, modify_count = modify_count + %? where table_id = %?", startTS, delta.Delta, delta.Count, id) + } + if err != nil { + return false, errors.Trace(err) } - err = execSQLs(context.Background(), exec, []string{sql}) updated = h.mu.ctx.GetSessionVars().StmtCtx.AffectedRows() > 0 return } @@ -371,7 +486,7 @@ func (h *Handle) dumpTableStatColSizeToKV(id int64, delta variable.TableDelta) e } sql := fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, tot_col_size) "+ "values %s on duplicate key update tot_col_size = tot_col_size + values(tot_col_size)", strings.Join(values, ",")) - _, _, err := h.restrictedExec.ExecRestrictedSQL(sql) + _, _, err := h.execRestrictedSQL(context.Background(), sql) return errors.Trace(err) } @@ -409,10 +524,9 @@ func (h *Handle) DumpFeedbackToKV(fb *statistics.QueryFeedback) error { if fb.Tp == statistics.IndexType { isIndex = 1 } - sql := fmt.Sprintf("insert into mysql.stats_feedback (table_id, hist_id, is_index, feedback) values "+ - "(%d, %d, %d, X'%X')", fb.PhysicalID, fb.Hist.ID, isIndex, vals) + const sql = "insert into mysql.stats_feedback (table_id, hist_id, is_index, feedback) values (%?, %?, %?, %?)" h.mu.Lock() - _, err = h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + _, err = h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql, fb.PhysicalID, fb.Hist.ID, isIndex, vals) h.mu.Unlock() if err != nil { metrics.DumpFeedbackCounter.WithLabelValues(metrics.LblError).Inc() @@ -503,8 +617,8 @@ func (h *Handle) UpdateErrorRate(is infoschema.InfoSchema) { // HandleUpdateStats update the stats using feedback. func (h *Handle) HandleUpdateStats(is infoschema.InfoSchema) error { - sql := "SELECT distinct table_id from mysql.stats_feedback" - tables, _, err := h.restrictedExec.ExecRestrictedSQL(sql) + ctx := context.Background() + tables, _, err := h.execRestrictedSQL(ctx, "SELECT distinct table_id from mysql.stats_feedback") if err != nil { return errors.Trace(err) } @@ -516,20 +630,18 @@ func (h *Handle) HandleUpdateStats(is infoschema.InfoSchema) error { // this func lets `defer` works normally, where `Close()` should be called before any return err = func() error { tbl := ptbl.GetInt64(0) - sql = fmt.Sprintf("select table_id, hist_id, is_index, feedback from mysql.stats_feedback where table_id=%d order by hist_id, is_index", tbl) - rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) - if len(rc) > 0 { - defer terror.Call(rc[0].Close) - } + const sql = "select table_id, hist_id, is_index, feedback from mysql.stats_feedback where table_id=%? order by hist_id, is_index" + rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql, tbl) if err != nil { return errors.Trace(err) } + defer terror.Call(rc.Close) tableID, histID, isIndex := int64(-1), int64(-1), int64(-1) var rows []chunk.Row for { - req := rc[0].NewChunk() + req := rc.NewChunk() iter := chunk.NewIterator4Chunk(req) - err := rc[0].Next(context.TODO(), req) + err := rc.Next(context.TODO(), req) if err != nil { return errors.Trace(err) } @@ -622,8 +734,8 @@ func (h *Handle) deleteOutdatedFeedback(tableID, histID, isIndex int64) error { defer h.mu.Unlock() hasData := true for hasData { - sql := fmt.Sprintf("delete from mysql.stats_feedback where table_id = %d and hist_id = %d and is_index = %d limit 10000", tableID, histID, isIndex) - _, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + sql := "delete from mysql.stats_feedback where table_id = %? and hist_id = %? and is_index = %? limit 10000" + _, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql, tableID, histID, isIndex) if err != nil { return errors.Trace(err) } @@ -690,9 +802,9 @@ func NeedAnalyzeTable(tbl *statistics.Table, limit time.Duration, autoAnalyzeRat } func (h *Handle) getAutoAnalyzeParameters() map[string]string { - sql := fmt.Sprintf("select variable_name, variable_value from mysql.global_variables where variable_name in ('%s', '%s', '%s')", - variable.TiDBAutoAnalyzeRatio, variable.TiDBAutoAnalyzeStartTime, variable.TiDBAutoAnalyzeEndTime) - rows, _, err := h.restrictedExec.ExecRestrictedSQL(sql) + ctx := context.Background() + sql := "select variable_name, variable_value from mysql.global_variables where variable_name in (%?, %?, %?)" + rows, _, err := h.execRestrictedSQL(ctx, sql, variable.TiDBAutoAnalyzeRatio, variable.TiDBAutoAnalyzeStartTime, variable.TiDBAutoAnalyzeEndTime) if err != nil { return map[string]string{} } @@ -744,19 +856,36 @@ func (h *Handle) HandleAutoAnalyze(is infoschema.InfoSchema) { tblName := "`" + db + "`.`" + tblInfo.Name.O + "`" if pi == nil { statsTbl := h.GetTableStats(tblInfo) +<<<<<<< HEAD sql := fmt.Sprintf("analyze table %s", tblName) analyzed := h.autoAnalyzeTable(tblInfo, statsTbl, start, end, autoAnalyzeRatio, sql) +======= + sql := "analyze table %n.%n" + analyzed := h.autoAnalyzeTable(tblInfo, statsTbl, start, end, autoAnalyzeRatio, sql, db, tblInfo.Name.O) +>>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) if analyzed { return } continue } +<<<<<<< HEAD for _, def := range pi.Definitions { sql := fmt.Sprintf("analyze table %s partition `%s`", tblName, def.Name.O) statsTbl := h.GetPartitionStats(tblInfo, def.ID) analyzed := h.autoAnalyzeTable(tblInfo, statsTbl, start, end, autoAnalyzeRatio, sql) if analyzed { return +======= + if pruneMode == variable.StaticOnly || pruneMode == variable.StaticButPrepareDynamic { + for _, def := range pi.Definitions { + sql := "analyze table %n.%n partition %n" + statsTbl := h.GetPartitionStats(tblInfo, def.ID) + analyzed := h.autoAnalyzeTable(tblInfo, statsTbl, start, end, autoAnalyzeRatio, sql, db, tblInfo.Name.O, def.Name.O) + if analyzed { + return + } + continue +>>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) } continue } @@ -764,29 +893,32 @@ func (h *Handle) HandleAutoAnalyze(is infoschema.InfoSchema) { } } -func (h *Handle) autoAnalyzeTable(tblInfo *model.TableInfo, statsTbl *statistics.Table, start, end time.Time, ratio float64, sql string) bool { +func (h *Handle) autoAnalyzeTable(tblInfo *model.TableInfo, statsTbl *statistics.Table, start, end time.Time, ratio float64, sql string, params ...interface{}) bool { if statsTbl.Pseudo || statsTbl.Count < AutoAnalyzeMinCnt { return false } if needAnalyze, reason := NeedAnalyzeTable(statsTbl, 20*h.Lease(), ratio, start, end, time.Now()); needAnalyze { logutil.BgLogger().Info("[stats] auto analyze triggered", zap.String("sql", sql), zap.String("reason", reason)) - h.execAutoAnalyze(sql) + h.execAutoAnalyze(sql, params...) return true } for _, idx := range tblInfo.Indices { if _, ok := statsTbl.Indices[idx.ID]; !ok && idx.State == model.StatePublic { - sql = fmt.Sprintf("%s index `%s`", sql, idx.Name.O) logutil.BgLogger().Info("[stats] auto analyze for unanalyzed", zap.String("sql", sql)) - h.execAutoAnalyze(sql) + h.execAutoAnalyze(sql+" index %n", append(params, idx.Name.O)...) return true } } return false } -func (h *Handle) execAutoAnalyze(sql string) { +func (h *Handle) execAutoAnalyze(sql string, params ...interface{}) { startTime := time.Now() +<<<<<<< HEAD _, _, err := h.restrictedExec.ExecRestrictedSQL(sql) +======= + _, _, err := h.execRestrictedSQL(context.Background(), sql, params...) +>>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) dur := time.Since(startTime) metrics.AutoAnalyzeHistogram.Observe(dur.Seconds()) if err != nil { diff --git a/types/datum.go b/types/datum.go index c48e5637ab460..9adbb0158f9de 100644 --- a/types/datum.go +++ b/types/datum.go @@ -194,7 +194,10 @@ var sink = func(s string) { // GetBytes gets bytes value. func (d *Datum) GetBytes() []byte { - return d.b + if d.b != nil { + return d.b + } + return []byte{} } // SetBytes sets bytes value to datum. From 38282d603f7f751131c1b54cf6c1a490a08cd63f Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Thu, 11 Mar 2021 22:00:40 +0800 Subject: [PATCH 2/2] resolve conflict --- domain/domain.go | 20 +- statistics/handle/ddl.go | 40 +-- statistics/handle/dump.go | 5 - statistics/handle/gc.go | 77 ------ statistics/handle/handle.go | 408 +------------------------------ statistics/handle/handle_test.go | 7 +- statistics/handle/update.go | 140 +---------- 7 files changed, 22 insertions(+), 675 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 26de37779de43..c581ea037fcc7 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1014,18 +1014,9 @@ func (do *Domain) StatsHandle() *handle.Handle { } // CreateStatsHandle is used only for test. -<<<<<<< HEAD func (do *Domain) CreateStatsHandle(ctx sessionctx.Context) { - atomic.StorePointer(&do.statsHandle, unsafe.Pointer(handle.NewHandle(ctx, do.statsLease))) -======= -func (do *Domain) CreateStatsHandle(ctx sessionctx.Context) error { - h, err := handle.NewHandle(ctx, do.statsLease, do.sysSessionPool) - if err != nil { - return err - } + h := handle.NewHandle(ctx, do.statsLease, do.sysSessionPool) atomic.StorePointer(&do.statsHandle, unsafe.Pointer(h)) - return nil ->>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) } // StatsUpdating checks if the stats worker is updating. @@ -1050,14 +1041,7 @@ var RunAutoAnalyze = true // It should be called only once in BootstrapSession. func (do *Domain) UpdateTableStatsLoop(ctx sessionctx.Context) error { ctx.GetSessionVars().InRestrictedSQL = true -<<<<<<< HEAD - statsHandle := handle.NewHandle(ctx, do.statsLease) -======= - statsHandle, err := handle.NewHandle(ctx, do.statsLease, do.sysSessionPool) - if err != nil { - return err - } ->>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) + statsHandle := handle.NewHandle(ctx, do.statsLease, do.sysSessionPool) atomic.StorePointer(&do.statsHandle, unsafe.Pointer(statsHandle)) do.ddl.RegisterEventCh(statsHandle.DDLEventCh()) // Negative stats lease indicates that it is in test, it does not need update. diff --git a/statistics/handle/ddl.go b/statistics/handle/ddl.go index 9eb22ef35425e..7a1770d592be0 100644 --- a/statistics/handle/ddl.go +++ b/statistics/handle/ddl.go @@ -144,52 +144,30 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfo *model.ColumnInfo) return } count := req.GetRow(0).GetInt64(0) -<<<<<<< HEAD value := types.NewDatum(colInfo.GetOriginDefaultValue()) value, err = value.ConvertTo(h.mu.ctx.GetSessionVars().StmtCtx, &colInfo.FieldType) if err != nil { return } - sqls := make([]string, 0, 1) if value.IsNull() { // If the adding column has default value null, all the existing rows have null value on the newly added column. - sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%d, %d, 0, %d, 0, %d)", startTS, physicalID, colInfo.ID, count)) + if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%?, %?, 0, %?, 0, %?)", startTS, physicalID, colInfo.ID, count); err != nil { + return err + } } else { // If this stats exists, we insert histogram meta first, the distinct_count will always be one. - sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%d, %d, 0, %d, 1, %d)", startTS, physicalID, colInfo.ID, int64(len(value.GetBytes()))*count)) + if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%?, %?, 0, %?, 1, %?)", startTS, physicalID, colInfo.ID, int64(len(value.GetBytes()))*count); err != nil { + return err + } + value, err = value.ConvertTo(h.mu.ctx.GetSessionVars().StmtCtx, types.NewFieldType(mysql.TypeBlob)) if err != nil { return } // There must be only one bucket for this new column and the value is the default value. - sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%d, 0, %d, 0, %d, %d, X'%X', X'%X')", physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes())) -======= - for _, colInfo := range colInfos { - value := types.NewDatum(colInfo.GetOriginDefaultValue()) - value, err = value.ConvertTo(h.mu.ctx.GetSessionVars().StmtCtx, &colInfo.FieldType) - if err != nil { - return - } - if value.IsNull() { - // If the adding column has default value null, all the existing rows have null value on the newly added column. - if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%?, %?, 0, %?, 0, %?)", startTS, physicalID, colInfo.ID, count); err != nil { - return err - } - } else { - // If this stats exists, we insert histogram meta first, the distinct_count will always be one. - if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%?, %?, 0, %?, 1, %?)", startTS, physicalID, colInfo.ID, int64(len(value.GetBytes()))*count); err != nil { - return err - } - value, err = value.ConvertTo(h.mu.ctx.GetSessionVars().StmtCtx, types.NewFieldType(mysql.TypeBlob)) - if err != nil { - return - } - // There must be only one bucket for this new column and the value is the default value. - if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%?, 0, %?, 0, %?, %?, %?, %?)", physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes()); err != nil { - return err - } + if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%?, 0, %?, 0, %?, %?, %?, %?)", physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes()); err != nil { + return err } ->>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) } } return diff --git a/statistics/handle/dump.go b/statistics/handle/dump.go index 487e2faa2c64f..33ebfdd92fb94 100644 --- a/statistics/handle/dump.go +++ b/statistics/handle/dump.go @@ -75,13 +75,8 @@ func (h *Handle) DumpStatsToJSON(dbName string, tableInfo *model.TableInfo, hist // DumpStatsToJSONBySnapshot dumps statistic to json. func (h *Handle) DumpStatsToJSONBySnapshot(dbName string, tableInfo *model.TableInfo, snapshot uint64) (*JSONTable, error) { pi := tableInfo.GetPartitionInfo() -<<<<<<< HEAD if pi == nil { - return h.tableStatsToJSON(dbName, tableInfo, tableInfo.ID, historyStatsExec) -======= - if pi == nil || h.CurrentPruneMode() == variable.DynamicOnly { return h.tableStatsToJSON(dbName, tableInfo, tableInfo.ID, snapshot) ->>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) } jsonTbl := &JSONTable{ DatabaseName: dbName, diff --git a/statistics/handle/gc.go b/statistics/handle/gc.go index 176b76c5976c0..7bb4bcd6a426f 100644 --- a/statistics/handle/gc.go +++ b/statistics/handle/gc.go @@ -15,11 +15,6 @@ package handle import ( "context" -<<<<<<< HEAD - "fmt" -======= - "encoding/json" ->>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) "time" "github.com/cznic/mathutil" @@ -39,13 +34,8 @@ func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) error if h.LastUpdateVersion() < offset { return nil } -<<<<<<< HEAD - sql := fmt.Sprintf("select table_id from mysql.stats_meta where version < %d", h.LastUpdateVersion()-offset) - rows, _, err := h.restrictedExec.ExecRestrictedSQL(sql) -======= gcVer := h.LastUpdateVersion() - offset rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_meta where version < %?", gcVer) ->>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) if err != nil { return errors.Trace(err) } @@ -102,43 +92,6 @@ func (h *Handle) gcTableStats(is infoschema.InfoSchema, physicalID int64) error } } } -<<<<<<< HEAD -======= - // Mark records in mysql.stats_extended as `deleted`. - rows, _, err = h.execRestrictedSQL(ctx, "select stats_name, db, column_ids from mysql.stats_extended where table_id = %? and status in (%?, %?)", physicalID, StatsStatusAnalyzed, StatsStatusInited) - if err != nil { - return errors.Trace(err) - } - if len(rows) == 0 { - return nil - } - for _, row := range rows { - statsName, db, strColIDs := row.GetString(0), row.GetString(1), row.GetString(2) - var colIDs []int64 - err = json.Unmarshal([]byte(strColIDs), &colIDs) - if err != nil { - logutil.BgLogger().Debug("decode column IDs failed", zap.String("column_ids", strColIDs), zap.Error(err)) - return errors.Trace(err) - } - for _, colID := range colIDs { - found := false - for _, col := range tblInfo.Columns { - if colID == col.ID { - found = true - break - } - } - if !found { - err = h.MarkExtendedStatsDeleted(statsName, db, physicalID) - if err != nil { - logutil.BgLogger().Debug("update stats_extended status failed", zap.String("stats_name", statsName), zap.String("db", db), zap.Error(err)) - return errors.Trace(err) - } - break - } - } - } ->>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) return nil } @@ -199,15 +152,6 @@ func (h *Handle) DeleteTableStatsFromKV(physicalID int64) (err error) { ctx := context.Background() startTS := txn.StartTS() // We only update the version so that other tidb will know that this table is deleted. -<<<<<<< HEAD - sqls = append(sqls, fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", startTS, physicalID)) - sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_histograms where table_id = %d", physicalID)) - sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_buckets where table_id = %d", physicalID)) - sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_top_n where table_id = %d", physicalID)) - sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_feedback where table_id = %d", physicalID)) - return execSQLs(context.Background(), exec, sqls) -} -======= if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %? where table_id = %? ", startTS, physicalID); err != nil { return err } @@ -223,26 +167,5 @@ func (h *Handle) DeleteTableStatsFromKV(physicalID int64) (err error) { if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_feedback where table_id = %?", physicalID); err != nil { return err } - if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_extended set version = %?, status = %? where table_id = %? and status in (%?, %?)", startTS, StatsStatusDeleted, physicalID, StatsStatusAnalyzed, StatsStatusInited); err != nil { - return err - } return nil } - -func (h *Handle) removeDeletedExtendedStats(version uint64) (err error) { - h.mu.Lock() - defer h.mu.Unlock() - exec := h.mu.ctx.(sqlexec.SQLExecutor) - ctx := context.Background() - _, err = exec.ExecuteInternal(ctx, "begin pessimistic") - if err != nil { - return errors.Trace(err) - } - defer func() { - err = finishTransaction(ctx, exec, err) - }() - const sql = "delete from mysql.stats_extended where status = %? and version < %?" - _, err = exec.ExecuteInternal(ctx, sql, StatsStatusDeleted, version) - return -} ->>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 069e73eaf251a..2d8915a8e0478 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -20,11 +20,7 @@ import ( "sync/atomic" "time" -<<<<<<< HEAD -======= - "github.com/cznic/mathutil" "github.com/ngaut/pools" ->>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/parser/ast" @@ -142,33 +138,15 @@ type sessionPool interface { } // NewHandle creates a Handle for update stats. -<<<<<<< HEAD -func NewHandle(ctx sessionctx.Context, lease time.Duration) *Handle { +func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool) *Handle { handle := &Handle{ ddlEventCh: make(chan *util.Event, 100), listHead: &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)}, globalMap: make(tableDeltaMap), feedback: statistics.NewQueryFeedbackMap(), + pool: pool, } handle.lease.Store(lease) - // It is safe to use it concurrently because the exec won't touch the ctx. - if exec, ok := ctx.(sqlexec.RestrictedSQLExecutor); ok { - handle.restrictedExec = exec - } -======= -func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool) (*Handle, error) { - handle := &Handle{ - ddlEventCh: make(chan *util.Event, 100), - listHead: &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)}, - globalMap: make(tableDeltaMap), - feedback: statistics.NewQueryFeedbackMap(), - idxUsageListHead: &SessionIndexUsageCollector{mapper: make(indexUsageMap)}, - pool: pool, - } - handle.lease.Store(lease) - handle.pool = pool - handle.statsCache.memTracker = memory.NewTracker(memory.LabelForStatsCache, -1) ->>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) handle.mu.ctx = ctx handle.mu.rateMap = make(errorRateDeltaMap) handle.statsCache.Store(statsCache{tables: make(map[int64]*statistics.Table)}) @@ -364,15 +342,7 @@ func (h *Handle) LoadNeededHistograms() (err error) { if err != nil { return errors.Trace(err) } -<<<<<<< HEAD cms, err := h.cmSketchFromStorage(reader, col.TableID, 0, col.ColumnID) -======= - cms, topN, err := h.cmSketchAndTopNFromStorage(reader, col.TableID, 0, col.ColumnID) - if err != nil { - return errors.Trace(err) - } - rows, _, err := reader.read("select stats_ver from mysql.stats_histograms where is_index = 0 and table_id = %? and hist_id = %?", col.TableID, col.ColumnID) ->>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) if err != nil { return errors.Trace(err) } @@ -417,14 +387,8 @@ func (h *Handle) FlushStats() { } } -<<<<<<< HEAD func (h *Handle) cmSketchFromStorage(reader *statsReader, tblID int64, isIndex, histID int64) (_ *statistics.CMSketch, err error) { - selSQL := fmt.Sprintf("select cm_sketch from mysql.stats_histograms where table_id = %d and is_index = %d and hist_id = %d", tblID, isIndex, histID) - rows, _, err := reader.read(selSQL) -======= -func (h *Handle) cmSketchAndTopNFromStorage(reader *statsReader, tblID int64, isIndex, histID int64) (_ *statistics.CMSketch, _ *statistics.TopN, err error) { rows, _, err := reader.read("select cm_sketch from mysql.stats_histograms where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID) ->>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) if err != nil || len(rows) == 0 { return nil, err } @@ -608,48 +572,6 @@ func (h *Handle) tableStatsFromStorage(tableInfo *model.TableInfo, physicalID in return nil, err } } -<<<<<<< HEAD -======= - return h.extendedStatsFromStorage(reader, table, physicalID, loadAll) -} - -func (h *Handle) extendedStatsFromStorage(reader *statsReader, table *statistics.Table, physicalID int64, loadAll bool) (*statistics.Table, error) { - lastVersion := uint64(0) - if table.ExtendedStats != nil && !loadAll { - lastVersion = table.ExtendedStats.LastUpdateVersion - } else { - table.ExtendedStats = statistics.NewExtendedStatsColl() - } - rows, _, err := reader.read("select stats_name, db, status, type, column_ids, scalar_stats, blob_stats, version from mysql.stats_extended where table_id = %? and status in (%?, %?) and version > %?", physicalID, StatsStatusAnalyzed, StatsStatusDeleted, lastVersion) - if err != nil || len(rows) == 0 { - return table, nil - } - for _, row := range rows { - lastVersion = mathutil.MaxUint64(lastVersion, row.GetUint64(7)) - key := statistics.ExtendedStatsKey{ - StatsName: row.GetString(0), - DB: row.GetString(1), - } - status := uint8(row.GetInt64(2)) - if status == StatsStatusDeleted { - delete(table.ExtendedStats.Stats, key) - } else { - item := &statistics.ExtendedStatsItem{ - Tp: uint8(row.GetInt64(3)), - ScalarVals: row.GetFloat64(5), - StringVals: row.GetString(6), - } - colIDs := row.GetString(4) - err := json.Unmarshal([]byte(colIDs), &item.ColIDs) - if err != nil { - logutil.BgLogger().Error("[stats] decode column IDs failed", zap.String("column_ids", colIDs), zap.Error(err)) - return nil, err - } - table.ExtendedStats.Stats[key] = item - } - } - table.ExtendedStats.LastUpdateVersion = lastVersion ->>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) return table, nil } @@ -686,39 +608,26 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg return err } // Delete outdated data -<<<<<<< HEAD - sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_top_n where table_id = %d and is_index = %d and hist_id = %d", tableID, isIndex, hg.ID)) - for _, meta := range cms.TopN() { - sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_top_n (table_id, is_index, hist_id, value, count) values (%d, %d, %d, X'%X', %d)", tableID, isIndex, hg.ID, meta.Data, meta.Count)) -======= if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil { return err } - if topN != nil { - for _, meta := range topN.TopN { - if _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_top_n (table_id, is_index, hist_id, value, count) values (%?, %?, %?, %?, %?)", tableID, isIndex, hg.ID, meta.Encoded, meta.Count); err != nil { - return err - } + for _, meta := range cms.TopN() { + _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_top_n (table_id, is_index, hist_id, value, count) values (%?, %?, %?, %?, %?)", tableID, isIndex, hg.ID, meta.Data, meta.Count) + if err != nil { + return err } ->>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) } flag := 0 if isAnalyzed == 1 { flag = statistics.AnalyzeFlag } -<<<<<<< HEAD - sqls = append(sqls, fmt.Sprintf("replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, flag, correlation) values (%d, %d, %d, %d, %d, %d, X'%X', %d, %d, %d, %f)", - tableID, isIndex, hg.ID, hg.NDV, version, hg.NullCount, data, hg.TotColSize, statistics.CurStatsVersion, flag, hg.Correlation)) - sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_buckets where table_id = %d and is_index = %d and hist_id = %d", tableID, isIndex, hg.ID)) -======= if _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, flag, correlation) values (%?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?)", - tableID, isIndex, hg.ID, hg.NDV, version, hg.NullCount, data, hg.TotColSize, statsVersion, flag, hg.Correlation); err != nil { + tableID, isIndex, hg.ID, hg.NDV, version, hg.NullCount, data, hg.TotColSize, statistics.CurStatsVersion, flag, hg.Correlation); err != nil { return err } if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil { return err } ->>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) sc := h.mu.ctx.GetSessionVars().StmtCtx var lastAnalyzePos []byte for i := range hg.Buckets { @@ -809,45 +718,15 @@ func (h *Handle) histogramFromStorage(reader *statsReader, tableID int64, colID return hg, nil } -<<<<<<< HEAD func (h *Handle) columnCountFromStorage(reader *statsReader, tableID, colID int64) (int64, error) { - selSQL := fmt.Sprintf("select sum(count) from mysql.stats_buckets where table_id = %d and is_index = %d and hist_id = %d", tableID, 0, colID) - rows, _, err := reader.read(selSQL) -======= -func (h *Handle) columnCountFromStorage(reader *statsReader, tableID, colID, statsVer int64) (int64, error) { - rows, _, err := reader.read("select sum(count) from mysql.stats_buckets where table_id = %? and is_index = 0 and hist_id = %?", tableID, colID) ->>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) + rows, _, err := reader.read("select sum(count) from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %?", tableID, 0, colID) if err != nil { return 0, errors.Trace(err) } if rows[0].IsNull(0) { return 0, nil } -<<<<<<< HEAD return rows[0].GetMyDecimal(0).ToInt() -======= - count, err := rows[0].GetMyDecimal(0).ToInt() - if err != nil { - return 0, errors.Trace(err) - } - if statsVer == statistics.Version2 { - // Before stats ver 2, histogram represents all data in this column. - // In stats ver 2, histogram + TopN represent all data in this column. - // So we need to add TopN total count here. - rows, _, err = reader.read("select sum(count) from mysql.stats_top_n where table_id = %? and is_index = 0 and hist_id = %?", tableID, colID) - if err != nil { - return 0, errors.Trace(err) - } - if !rows[0].IsNull(0) { - topNCount, err := rows[0].GetMyDecimal(0).ToInt() - if err != nil { - return 0, errors.Trace(err) - } - count += topNCount - } - } - return count, err ->>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) } func (h *Handle) statsMetaByTableIDFromStorage(tableID int64, snapshot uint64) (version uint64, modifyCount, count int64, err error) { @@ -927,274 +806,3 @@ func (h *Handle) releaseStatsReader(reader *statsReader) error { h.mu.Unlock() return err } -<<<<<<< HEAD -======= - -const ( - // StatsStatusInited is the status for extended stats which are just registered but have not been analyzed yet. - StatsStatusInited uint8 = iota - // StatsStatusAnalyzed is the status for extended stats which have been collected in analyze. - StatsStatusAnalyzed - // StatsStatusDeleted is the status for extended stats which were dropped. These "deleted" records would be removed from storage by GCStats(). - StatsStatusDeleted -) - -// InsertExtendedStats inserts a record into mysql.stats_extended and update version in mysql.stats_meta. -func (h *Handle) InsertExtendedStats(statsName, db string, colIDs []int64, tp int, tableID int64, ifNotExists bool) (err error) { - bytes, err := json.Marshal(colIDs) - if err != nil { - return errors.Trace(err) - } - strColIDs := string(bytes) - h.mu.Lock() - defer h.mu.Unlock() - ctx := context.TODO() - exec := h.mu.ctx.(sqlexec.SQLExecutor) - _, err = exec.ExecuteInternal(ctx, "begin pessimistic") - if err != nil { - return errors.Trace(err) - } - defer func() { - err = finishTransaction(ctx, exec, err) - }() - txn, err := h.mu.ctx.Txn(true) - if err != nil { - return errors.Trace(err) - } - version := txn.StartTS() - const sql = "INSERT INTO mysql.stats_extended(stats_name, db, type, table_id, column_ids, version, status) VALUES (%?, %?, %?, %?, %?, %?, %?)" - _, err = exec.ExecuteInternal(ctx, sql, statsName, db, tp, tableID, strColIDs, version, StatsStatusInited) - // Key exists, but `if not exists` is specified, so we ignore this error. - if kv.ErrKeyExists.Equal(err) && ifNotExists { - err = nil - } - return -} - -// MarkExtendedStatsDeleted update the status of mysql.stats_extended to be `deleted` and the version of mysql.stats_meta. -func (h *Handle) MarkExtendedStatsDeleted(statsName, db string, tableID int64) (err error) { - ctx := context.Background() - if tableID < 0 { - rows, _, err := h.execRestrictedSQL(ctx, "SELECT table_id FROM mysql.stats_extended WHERE stats_name = %? and db = %?", statsName, db) - if err != nil { - return errors.Trace(err) - } - if len(rows) == 0 { - return nil - } - tableID = rows[0].GetInt64(0) - } - h.mu.Lock() - defer h.mu.Unlock() - exec := h.mu.ctx.(sqlexec.SQLExecutor) - _, err = exec.ExecuteInternal(ctx, "begin pessimistic") - if err != nil { - return errors.Trace(err) - } - defer func() { - err = finishTransaction(ctx, exec, err) - }() - txn, err := h.mu.ctx.Txn(true) - if err != nil { - return errors.Trace(err) - } - version := txn.StartTS() - if _, err = exec.ExecuteInternal(ctx, "UPDATE mysql.stats_extended SET version = %?, status = %? WHERE stats_name = %? and db = %?", version, StatsStatusDeleted, statsName, db); err != nil { - return err - } - if _, err = exec.ExecuteInternal(ctx, "UPDATE mysql.stats_meta SET version = %? WHERE table_id = %?", version, tableID); err != nil { - return err - } - return nil -} - -// ReloadExtendedStatistics drops the cache for extended statistics and reload data from mysql.stats_extended. -func (h *Handle) ReloadExtendedStatistics() error { - reader, err := h.getStatsReader(0) - if err != nil { - return err - } - oldCache := h.statsCache.Load().(statsCache) - tables := make([]*statistics.Table, 0, len(oldCache.tables)) - for physicalID, tbl := range oldCache.tables { - t, err := h.extendedStatsFromStorage(reader, tbl.Copy(), physicalID, true) - if err != nil { - return err - } - tables = append(tables, t) - } - err = h.releaseStatsReader(reader) - if err != nil { - return err - } - // Note that this update may fail when the statsCache.version has been modified by others. - h.updateStatsCache(oldCache.update(tables, nil, oldCache.version)) - return nil -} - -// BuildExtendedStats build extended stats for column groups if needed based on the column samples. -func (h *Handle) BuildExtendedStats(tableID int64, cols []*model.ColumnInfo, collectors []*statistics.SampleCollector) (*statistics.ExtendedStatsColl, error) { - ctx := context.Background() - const sql = "SELECT stats_name, db, type, column_ids FROM mysql.stats_extended WHERE table_id = %? and status in (%?, %?)" - rows, _, err := h.execRestrictedSQL(ctx, sql, tableID, StatsStatusAnalyzed, StatsStatusInited) - if err != nil { - return nil, errors.Trace(err) - } - if len(rows) == 0 { - return nil, nil - } - statsColl := statistics.NewExtendedStatsColl() - for _, row := range rows { - key := statistics.ExtendedStatsKey{ - StatsName: row.GetString(0), - DB: row.GetString(1), - } - item := &statistics.ExtendedStatsItem{Tp: uint8(row.GetInt64(2))} - colIDs := row.GetString(3) - err := json.Unmarshal([]byte(colIDs), &item.ColIDs) - if err != nil { - logutil.BgLogger().Error("invalid column_ids in mysql.stats_extended, skip collecting extended stats for this row", zap.String("column_ids", colIDs), zap.Error(err)) - continue - } - item = h.fillExtendedStatsItemVals(item, cols, collectors) - if item != nil { - statsColl.Stats[key] = item - } - } - if len(statsColl.Stats) == 0 { - return nil, nil - } - return statsColl, nil -} - -func (h *Handle) fillExtendedStatsItemVals(item *statistics.ExtendedStatsItem, cols []*model.ColumnInfo, collectors []*statistics.SampleCollector) *statistics.ExtendedStatsItem { - switch item.Tp { - case ast.StatsTypeCardinality, ast.StatsTypeDependency: - return nil - case ast.StatsTypeCorrelation: - return h.fillExtStatsCorrVals(item, cols, collectors) - } - return nil -} - -func (h *Handle) fillExtStatsCorrVals(item *statistics.ExtendedStatsItem, cols []*model.ColumnInfo, collectors []*statistics.SampleCollector) *statistics.ExtendedStatsItem { - colOffsets := make([]int, 0, 2) - for _, id := range item.ColIDs { - for i, col := range cols { - if col.ID == id { - colOffsets = append(colOffsets, i) - break - } - } - } - if len(colOffsets) != 2 { - return nil - } - // samplesX and samplesY are in order of handle, i.e, their SampleItem.Ordinals are in order. - samplesX := collectors[colOffsets[0]].Samples - // We would modify Ordinal of samplesY, so we make a deep copy. - samplesY := statistics.CopySampleItems(collectors[colOffsets[1]].Samples) - sampleNum := len(samplesX) - if sampleNum == 1 { - item.ScalarVals = float64(1) - return item - } - h.mu.Lock() - sc := h.mu.ctx.GetSessionVars().StmtCtx - h.mu.Unlock() - var err error - samplesX, err = statistics.SortSampleItems(sc, samplesX) - if err != nil { - return nil - } - samplesYInXOrder := make([]*statistics.SampleItem, sampleNum) - for i, itemX := range samplesX { - itemY := samplesY[itemX.Ordinal] - itemY.Ordinal = i - samplesYInXOrder[i] = itemY - } - samplesYInYOrder, err := statistics.SortSampleItems(sc, samplesYInXOrder) - if err != nil { - return nil - } - var corrXYSum float64 - for i := 1; i < sampleNum; i++ { - corrXYSum += float64(i) * float64(samplesYInYOrder[i].Ordinal) - } - // X means the ordinal of the item in original sequence, Y means the oridnal of the item in the - // sorted sequence, we know that X and Y value sets are both: - // 0, 1, ..., sampleNum-1 - // we can simply compute sum(X) = sum(Y) = - // (sampleNum-1)*sampleNum / 2 - // and sum(X^2) = sum(Y^2) = - // (sampleNum-1)*sampleNum*(2*sampleNum-1) / 6 - // We use "Pearson correlation coefficient" to compute the order correlation of columns, - // the formula is based on https://en.wikipedia.org/wiki/Pearson_correlation_coefficient. - // Note that (itemsCount*corrX2Sum - corrXSum*corrXSum) would never be zero when sampleNum is larger than 1. - itemsCount := float64(sampleNum) - corrXSum := (itemsCount - 1) * itemsCount / 2.0 - corrX2Sum := (itemsCount - 1) * itemsCount * (2*itemsCount - 1) / 6.0 - item.ScalarVals = (itemsCount*corrXYSum - corrXSum*corrXSum) / (itemsCount*corrX2Sum - corrXSum*corrXSum) - return item -} - -// SaveExtendedStatsToStorage writes extended stats of a table into mysql.stats_extended. -func (h *Handle) SaveExtendedStatsToStorage(tableID int64, extStats *statistics.ExtendedStatsColl, isLoad bool) (err error) { - if extStats == nil || len(extStats.Stats) == 0 { - return nil - } - h.mu.Lock() - defer h.mu.Unlock() - ctx := context.TODO() - exec := h.mu.ctx.(sqlexec.SQLExecutor) - _, err = exec.ExecuteInternal(ctx, "begin pessimistic") - if err != nil { - return errors.Trace(err) - } - defer func() { - err = finishTransaction(ctx, exec, err) - }() - txn, err := h.mu.ctx.Txn(true) - if err != nil { - return errors.Trace(err) - } - version := txn.StartTS() - for key, item := range extStats.Stats { - bytes, err := json.Marshal(item.ColIDs) - if err != nil { - return errors.Trace(err) - } - strColIDs := string(bytes) - switch item.Tp { - case ast.StatsTypeCardinality, ast.StatsTypeCorrelation: - // If isLoad is true, it's INSERT; otherwise, it's UPDATE. - _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_extended values (%?, %?, %?, %?, %?, %?, null, %?, %?)", key.StatsName, key.DB, item.Tp, tableID, strColIDs, item.ScalarVals, version, StatsStatusAnalyzed) - case ast.StatsTypeDependency: - _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_extended values (%?, %?, %?, %?, %?, null, %?, %?, %?)", key.StatsName, key.DB, item.Tp, tableID, strColIDs, item.StringVals, version, StatsStatusAnalyzed) - } - if err != nil { - return errors.Trace(err) - } - } - if !isLoad { - if _, err := exec.ExecuteInternal(ctx, "UPDATE mysql.stats_meta SET version = %? WHERE table_id = %?", version, tableID); err != nil { - return err - } - } - return nil -} - -// CurrentPruneMode indicates whether tbl support runtime prune for table and first partition id. -func (h *Handle) CurrentPruneMode() variable.PartitionPruneMode { - h.mu.Lock() - defer h.mu.Unlock() - return variable.PartitionPruneMode(h.mu.ctx.GetSessionVars().PartitionPruneMode.Load()) -} - -// RefreshVars uses to pull PartitionPruneMethod vars from kv storage. -func (h *Handle) RefreshVars() error { - h.mu.Lock() - defer h.mu.Unlock() - return h.mu.ctx.RefreshVars(context.Background()) -} ->>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) diff --git a/statistics/handle/handle_test.go b/statistics/handle/handle_test.go index 81d8af8982895..32b491ce7ef35 100644 --- a/statistics/handle/handle_test.go +++ b/statistics/handle/handle_test.go @@ -265,12 +265,7 @@ func (s *testStatsSuite) TestVersion(c *C) { tbl1, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) c.Assert(err, IsNil) tableInfo1 := tbl1.Meta() -<<<<<<< HEAD - h := handle.NewHandle(testKit.Se, time.Millisecond) -======= - h, err := handle.NewHandle(testKit.Se, time.Millisecond, do.SysSessionPool()) - c.Assert(err, IsNil) ->>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) + h := handle.NewHandle(testKit.Se, time.Millisecond, do.SysSessionPool()) unit := oracle.ComposeTS(1, 0) testKit.MustExec("update mysql.stats_meta set version = ? where table_id = ?", 2*unit, tableInfo1.ID) diff --git a/statistics/handle/update.go b/statistics/handle/update.go index 5851f6434b1e7..7c4ff9d3ccfe9 100644 --- a/statistics/handle/update.go +++ b/statistics/handle/update.go @@ -208,120 +208,6 @@ func (h *Handle) NewSessionStatsCollector() *SessionStatsCollector { return newCollector } -<<<<<<< HEAD -======= -// IndexUsageInformation is the data struct to store index usage information. -type IndexUsageInformation struct { - QueryCount int64 - RowsSelected int64 - LastUsedAt string -} - -// GlobalIndexID is the key type for indexUsageMap. -type GlobalIndexID struct { - TableID int64 - IndexID int64 -} - -type indexUsageMap map[GlobalIndexID]IndexUsageInformation - -// SessionIndexUsageCollector is a list item that holds the index usage mapper. If you want to write or read mapper, you must lock it. -type SessionIndexUsageCollector struct { - sync.Mutex - - mapper indexUsageMap - next *SessionIndexUsageCollector - deleted bool -} - -func (m indexUsageMap) updateByKey(id GlobalIndexID, value *IndexUsageInformation) { - item := m[id] - item.QueryCount += value.QueryCount - item.RowsSelected += value.RowsSelected - if item.LastUsedAt < value.LastUsedAt { - item.LastUsedAt = value.LastUsedAt - } - m[id] = item -} - -func (m indexUsageMap) update(tableID int64, indexID int64, value *IndexUsageInformation) { - id := GlobalIndexID{TableID: tableID, IndexID: indexID} - m.updateByKey(id, value) -} - -func (m indexUsageMap) merge(destMap indexUsageMap) { - for id, item := range destMap { - m.updateByKey(id, &item) - } -} - -// Update updates the mapper in SessionIndexUsageCollector. -func (s *SessionIndexUsageCollector) Update(tableID int64, indexID int64, value *IndexUsageInformation) { - value.LastUsedAt = time.Now().Format(types.TimeFSPFormat) - s.Lock() - defer s.Unlock() - s.mapper.update(tableID, indexID, value) -} - -// Delete will set s.deleted to true which means it can be deleted from linked list. -func (s *SessionIndexUsageCollector) Delete() { - s.Lock() - defer s.Unlock() - s.deleted = true -} - -// NewSessionIndexUsageCollector will add a new SessionIndexUsageCollector into linked list headed by idxUsageListHead. -// idxUsageListHead always points to an empty SessionIndexUsageCollector as a sentinel node. So we let idxUsageListHead.next -// points to new item. It's helpful to sweepIdxUsageList. -func (h *Handle) NewSessionIndexUsageCollector() *SessionIndexUsageCollector { - h.idxUsageListHead.Lock() - defer h.idxUsageListHead.Unlock() - newCollector := &SessionIndexUsageCollector{ - mapper: make(indexUsageMap), - next: h.idxUsageListHead.next, - } - h.idxUsageListHead.next = newCollector - return newCollector -} - -// sweepIdxUsageList will loop over the list, merge each session's local index usage information into handle -// and remove closed session's collector. -// For convenience, we keep idxUsageListHead always points to sentinel node. So that we don't need to consider corner case. -func (h *Handle) sweepIdxUsageList() indexUsageMap { - prev := h.idxUsageListHead - prev.Lock() - mapper := make(indexUsageMap) - for curr := prev.next; curr != nil; curr = curr.next { - curr.Lock() - mapper.merge(curr.mapper) - if curr.deleted { - prev.next = curr.next - curr.Unlock() - } else { - prev.Unlock() - curr.mapper = make(indexUsageMap) - prev = curr - } - } - prev.Unlock() - return mapper -} - -// DumpIndexUsageToKV will dump in-memory index usage information to KV. -func (h *Handle) DumpIndexUsageToKV() error { - ctx := context.Background() - mapper := h.sweepIdxUsageList() - for id, value := range mapper { - const sql = `insert into mysql.SCHEMA_INDEX_USAGE values (%?, %?, %?, %?, %?) on duplicate key update query_count=query_count+%?, rows_selected=rows_selected+%?, last_used_at=greatest(last_used_at, %?)` - _, _, err := h.execRestrictedSQL(ctx, sql, id.TableID, id.IndexID, value.QueryCount, value.RowsSelected, value.LastUsedAt, value.QueryCount, value.RowsSelected, value.LastUsedAt) - if err != nil { - return err - } - } - return nil -} - ->>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) var ( // DumpStatsDeltaRatio is the lower bound of `Modify Count / Table Count` for stats delta to be dumped. DumpStatsDeltaRatio = 1 / 10000.0 @@ -853,39 +739,21 @@ func (h *Handle) HandleAutoAnalyze(is infoschema.InfoSchema) { for _, tbl := range tbls { tblInfo := tbl.Meta() pi := tblInfo.GetPartitionInfo() - tblName := "`" + db + "`.`" + tblInfo.Name.O + "`" if pi == nil { statsTbl := h.GetTableStats(tblInfo) -<<<<<<< HEAD - sql := fmt.Sprintf("analyze table %s", tblName) - analyzed := h.autoAnalyzeTable(tblInfo, statsTbl, start, end, autoAnalyzeRatio, sql) -======= sql := "analyze table %n.%n" analyzed := h.autoAnalyzeTable(tblInfo, statsTbl, start, end, autoAnalyzeRatio, sql, db, tblInfo.Name.O) ->>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) if analyzed { return } continue } -<<<<<<< HEAD for _, def := range pi.Definitions { - sql := fmt.Sprintf("analyze table %s partition `%s`", tblName, def.Name.O) + sql := "analyze table %n.%n partition %n" statsTbl := h.GetPartitionStats(tblInfo, def.ID) - analyzed := h.autoAnalyzeTable(tblInfo, statsTbl, start, end, autoAnalyzeRatio, sql) + analyzed := h.autoAnalyzeTable(tblInfo, statsTbl, start, end, autoAnalyzeRatio, sql, db, tblInfo.Name.O, def.Name.O) if analyzed { return -======= - if pruneMode == variable.StaticOnly || pruneMode == variable.StaticButPrepareDynamic { - for _, def := range pi.Definitions { - sql := "analyze table %n.%n partition %n" - statsTbl := h.GetPartitionStats(tblInfo, def.ID) - analyzed := h.autoAnalyzeTable(tblInfo, statsTbl, start, end, autoAnalyzeRatio, sql, db, tblInfo.Name.O, def.Name.O) - if analyzed { - return - } - continue ->>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) } continue } @@ -914,11 +782,7 @@ func (h *Handle) autoAnalyzeTable(tblInfo *model.TableInfo, statsTbl *statistics func (h *Handle) execAutoAnalyze(sql string, params ...interface{}) { startTime := time.Now() -<<<<<<< HEAD - _, _, err := h.restrictedExec.ExecRestrictedSQL(sql) -======= _, _, err := h.execRestrictedSQL(context.Background(), sql, params...) ->>>>>>> 8304d661f... statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) dur := time.Since(startTime) metrics.AutoAnalyzeHistogram.Observe(dur.Seconds()) if err != nil {