diff --git a/domain/domain.go b/domain/domain.go index 1742102cc9445..c581ea037fcc7 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1015,7 +1015,8 @@ func (do *Domain) StatsHandle() *handle.Handle { // CreateStatsHandle is used only for test. func (do *Domain) CreateStatsHandle(ctx sessionctx.Context) { - atomic.StorePointer(&do.statsHandle, unsafe.Pointer(handle.NewHandle(ctx, do.statsLease))) + h := handle.NewHandle(ctx, do.statsLease, do.sysSessionPool) + atomic.StorePointer(&do.statsHandle, unsafe.Pointer(h)) } // StatsUpdating checks if the stats worker is updating. @@ -1040,7 +1041,7 @@ var RunAutoAnalyze = true // It should be called only once in BootstrapSession. func (do *Domain) UpdateTableStatsLoop(ctx sessionctx.Context) error { ctx.GetSessionVars().InRestrictedSQL = true - statsHandle := handle.NewHandle(ctx, do.statsLease) + statsHandle := handle.NewHandle(ctx, do.statsLease, do.sysSessionPool) atomic.StorePointer(&do.statsHandle, unsafe.Pointer(statsHandle)) do.ddl.RegisterEventCh(statsHandle.DDLEventCh()) // Negative stats lease indicates that it is in test, it does not need update. diff --git a/server/statistics_handler.go b/server/statistics_handler.go index a40e1b19b321f..ecb246e46cf48 100644 --- a/server/statistics_handler.go +++ b/server/statistics_handler.go @@ -25,7 +25,6 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/gcutil" - "github.com/pingcap/tidb/util/sqlexec" ) // StatsHandler is the handler for dumping statistics. @@ -122,9 +121,7 @@ func (sh StatsHistoryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request writeError(w, err) return } - se.GetSessionVars().SnapshotInfoschema, se.GetSessionVars().SnapshotTS = is, snapshot - historyStatsExec := se.(sqlexec.RestrictedSQLExecutor) - js, err := h.DumpStatsToJSON(params[pDBName], tbl.Meta(), historyStatsExec) + js, err := h.DumpStatsToJSONBySnapshot(params[pDBName], tbl.Meta(), snapshot) if err != nil { writeError(w, err) } else { diff --git a/statistics/handle/bootstrap.go b/statistics/handle/bootstrap.go index 4bc8257537eda..bc87750400f2a 100644 --- a/statistics/handle/bootstrap.go +++ b/statistics/handle/bootstrap.go @@ -60,18 +60,16 @@ func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache *statsCache func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (statsCache, error) { sql := "select HIGH_PRIORITY version, table_id, modify_count, count from mysql.stats_meta" - rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) - if len(rc) > 0 { - defer terror.Call(rc[0].Close) - } + rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql) if err != nil { return statsCache{}, errors.Trace(err) } + defer terror.Call(rc.Close) tables := statsCache{tables: make(map[int64]*statistics.Table)} - req := rc[0].NewChunk() + req := rc.NewChunk() iter := chunk.NewIterator4Chunk(req) for { - err := rc[0].Next(context.TODO(), req) + err := rc.Next(context.TODO(), req) if err != nil { return statsCache{}, errors.Trace(err) } @@ -147,17 +145,15 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache *stat func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache *statsCache) error { sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms" - rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) - if len(rc) > 0 { - defer terror.Call(rc[0].Close) - } + rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql) if err != nil { return errors.Trace(err) } - req := rc[0].NewChunk() + defer terror.Call(rc.Close) + req := rc.NewChunk() iter := chunk.NewIterator4Chunk(req) for { - err := rc[0].Next(context.TODO(), req) + err := rc.Next(context.TODO(), req) if err != nil { return errors.Trace(err) } @@ -187,17 +183,15 @@ func (h *Handle) initStatsTopN4Chunk(cache *statsCache, iter *chunk.Iterator4Chu func (h *Handle) initStatsTopN(cache *statsCache) error { sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1" - rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) - if len(rc) > 0 { - defer terror.Call(rc[0].Close) - } + rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql) if err != nil { return errors.Trace(err) } - req := rc[0].NewChunk() + defer terror.Call(rc.Close) + req := rc.NewChunk() iter := chunk.NewIterator4Chunk(req) for { - err := rc[0].Next(context.TODO(), req) + err := rc.Next(context.TODO(), req) if err != nil { return errors.Trace(err) } @@ -257,17 +251,15 @@ func initStatsBuckets4Chunk(ctx sessionctx.Context, cache *statsCache, iter *chu func (h *Handle) initStatsBuckets(cache *statsCache) error { sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id" - rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) - if len(rc) > 0 { - defer terror.Call(rc[0].Close) - } + rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql) if err != nil { return errors.Trace(err) } - req := rc[0].NewChunk() + defer terror.Call(rc.Close) + req := rc.NewChunk() iter := chunk.NewIterator4Chunk(req) for { - err := rc[0].Next(context.TODO(), req) + err := rc.Next(context.TODO(), req) if err != nil { return errors.Trace(err) } @@ -300,13 +292,13 @@ func (h *Handle) initStatsBuckets(cache *statsCache) error { func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) { h.mu.Lock() defer func() { - _, err1 := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "commit") + _, err1 := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), "commit") if err == nil && err1 != nil { err = err1 } h.mu.Unlock() }() - _, err = h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "begin") + _, err = h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), "begin") if err != nil { return err } diff --git a/statistics/handle/ddl.go b/statistics/handle/ddl.go index 127608edb7bb8..7a1770d592be0 100644 --- a/statistics/handle/ddl.go +++ b/statistics/handle/ddl.go @@ -15,7 +15,6 @@ package handle import ( "context" - "fmt" "github.com/pingcap/errors" "github.com/pingcap/parser/model" @@ -75,28 +74,34 @@ func (h *Handle) DDLEventCh() chan *util.Event { func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (err error) { h.mu.Lock() defer h.mu.Unlock() + ctx := context.Background() exec := h.mu.ctx.(sqlexec.SQLExecutor) - _, err = exec.Execute(context.Background(), "begin") + _, err = exec.ExecuteInternal(ctx, "begin") if err != nil { return errors.Trace(err) } defer func() { - err = finishTransaction(context.Background(), exec, err) + err = finishTransaction(ctx, exec, err) }() txn, err := h.mu.ctx.Txn(true) if err != nil { return errors.Trace(err) } startTS := txn.StartTS() - sqls := make([]string, 0, 1+len(info.Columns)+len(info.Indices)) - sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_meta (version, table_id) values(%d, %d)", startTS, physicalID)) + if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_meta (version, table_id) values(%?, %?)", startTS, physicalID); err != nil { + return err + } for _, col := range info.Columns { - sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 0, %d, 0, %d)", physicalID, col.ID, startTS)) + if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 0, %?, 0, %?)", physicalID, col.ID, startTS); err != nil { + return err + } } for _, idx := range info.Indices { - sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 1, %d, 0, %d)", physicalID, idx.ID, startTS)) + if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 1, %?, 0, %?)", physicalID, idx.ID, startTS); err != nil { + return err + } } - return execSQLs(context.Background(), exec, sqls) + return nil } // insertColStats2KV insert a record to stats_histograms with distinct_count 1 and insert a bucket to stats_buckets with default value. @@ -105,13 +110,14 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfo *model.ColumnInfo) h.mu.Lock() defer h.mu.Unlock() + ctx := context.TODO() exec := h.mu.ctx.(sqlexec.SQLExecutor) - _, err = exec.Execute(context.Background(), "begin") + _, err = exec.ExecuteInternal(ctx, "begin") if err != nil { return errors.Trace(err) } defer func() { - err = finishTransaction(context.Background(), exec, err) + err = finishTransaction(ctx, exec, err) }() txn, err := h.mu.ctx.Txn(true) if err != nil { @@ -119,24 +125,21 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfo *model.ColumnInfo) } startTS := txn.StartTS() // First of all, we update the version. - _, err = exec.Execute(context.Background(), fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", startTS, physicalID)) + _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %? where table_id = %?", startTS, physicalID) if err != nil { return } - ctx := context.TODO() // If we didn't update anything by last SQL, it means the stats of this table does not exist. if h.mu.ctx.GetSessionVars().StmtCtx.AffectedRows() > 0 { // By this step we can get the count of this table, then we can sure the count and repeats of bucket. - var rs []sqlexec.RecordSet - rs, err = exec.Execute(ctx, fmt.Sprintf("select count from mysql.stats_meta where table_id = %d", physicalID)) - if len(rs) > 0 { - defer terror.Call(rs[0].Close) - } + var rs sqlexec.RecordSet + rs, err = exec.ExecuteInternal(ctx, "select count from mysql.stats_meta where table_id = %?", physicalID) if err != nil { return } - req := rs[0].NewChunk() - err = rs[0].Next(ctx, req) + defer terror.Call(rs.Close) + req := rs.NewChunk() + err = rs.Next(ctx, req) if err != nil { return } @@ -146,21 +149,26 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfo *model.ColumnInfo) if err != nil { return } - sqls := make([]string, 0, 1) if value.IsNull() { // If the adding column has default value null, all the existing rows have null value on the newly added column. - sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%d, %d, 0, %d, 0, %d)", startTS, physicalID, colInfo.ID, count)) + if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%?, %?, 0, %?, 0, %?)", startTS, physicalID, colInfo.ID, count); err != nil { + return err + } } else { // If this stats exists, we insert histogram meta first, the distinct_count will always be one. - sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%d, %d, 0, %d, 1, %d)", startTS, physicalID, colInfo.ID, int64(len(value.GetBytes()))*count)) + if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%?, %?, 0, %?, 1, %?)", startTS, physicalID, colInfo.ID, int64(len(value.GetBytes()))*count); err != nil { + return err + } + value, err = value.ConvertTo(h.mu.ctx.GetSessionVars().StmtCtx, types.NewFieldType(mysql.TypeBlob)) if err != nil { return } // There must be only one bucket for this new column and the value is the default value. - sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%d, 0, %d, 0, %d, %d, X'%X', X'%X')", physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes())) + if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%?, 0, %?, 0, %?, %?, %?, %?)", physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes()); err != nil { + return err + } } - return execSQLs(context.Background(), exec, sqls) } return } @@ -168,20 +176,10 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfo *model.ColumnInfo) // finishTransaction will execute `commit` when error is nil, otherwise `rollback`. func finishTransaction(ctx context.Context, exec sqlexec.SQLExecutor, err error) error { if err == nil { - _, err = exec.Execute(ctx, "commit") + _, err = exec.ExecuteInternal(ctx, "commit") } else { - _, err1 := exec.Execute(ctx, "rollback") + _, err1 := exec.ExecuteInternal(ctx, "rollback") terror.Log(errors.Trace(err1)) } return errors.Trace(err) } - -func execSQLs(ctx context.Context, exec sqlexec.SQLExecutor, sqls []string) error { - for _, sql := range sqls { - _, err := exec.Execute(ctx, sql) - if err != nil { - return err - } - } - return nil -} diff --git a/statistics/handle/dump.go b/statistics/handle/dump.go index 16295569d76c0..33ebfdd92fb94 100644 --- a/statistics/handle/dump.go +++ b/statistics/handle/dump.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/types" @@ -63,9 +64,19 @@ func dumpJSONCol(hist *statistics.Histogram, CMSketch *statistics.CMSketch) *jso // DumpStatsToJSON dumps statistic to json. func (h *Handle) DumpStatsToJSON(dbName string, tableInfo *model.TableInfo, historyStatsExec sqlexec.RestrictedSQLExecutor) (*JSONTable, error) { + var snapshot uint64 + if historyStatsExec != nil { + sctx := historyStatsExec.(sessionctx.Context) + snapshot = sctx.GetSessionVars().SnapshotTS + } + return h.DumpStatsToJSONBySnapshot(dbName, tableInfo, snapshot) +} + +// DumpStatsToJSONBySnapshot dumps statistic to json. +func (h *Handle) DumpStatsToJSONBySnapshot(dbName string, tableInfo *model.TableInfo, snapshot uint64) (*JSONTable, error) { pi := tableInfo.GetPartitionInfo() if pi == nil { - return h.tableStatsToJSON(dbName, tableInfo, tableInfo.ID, historyStatsExec) + return h.tableStatsToJSON(dbName, tableInfo, tableInfo.ID, snapshot) } jsonTbl := &JSONTable{ DatabaseName: dbName, @@ -73,7 +84,7 @@ func (h *Handle) DumpStatsToJSON(dbName string, tableInfo *model.TableInfo, hist Partitions: make(map[string]*JSONTable, len(pi.Definitions)), } for _, def := range pi.Definitions { - tbl, err := h.tableStatsToJSON(dbName, tableInfo, def.ID, historyStatsExec) + tbl, err := h.tableStatsToJSON(dbName, tableInfo, def.ID, snapshot) if err != nil { return nil, errors.Trace(err) } @@ -85,12 +96,12 @@ func (h *Handle) DumpStatsToJSON(dbName string, tableInfo *model.TableInfo, hist return jsonTbl, nil } -func (h *Handle) tableStatsToJSON(dbName string, tableInfo *model.TableInfo, physicalID int64, historyStatsExec sqlexec.RestrictedSQLExecutor) (*JSONTable, error) { - tbl, err := h.tableStatsFromStorage(tableInfo, physicalID, true, historyStatsExec) +func (h *Handle) tableStatsToJSON(dbName string, tableInfo *model.TableInfo, physicalID int64, snapshot uint64) (*JSONTable, error) { + tbl, err := h.tableStatsFromStorage(tableInfo, physicalID, true, snapshot) if err != nil || tbl == nil { return nil, err } - tbl.Version, tbl.ModifyCount, tbl.Count, err = h.statsMetaByTableIDFromStorage(physicalID, historyStatsExec) + tbl.Version, tbl.ModifyCount, tbl.Count, err = h.statsMetaByTableIDFromStorage(physicalID, snapshot) if err != nil { return nil, err } diff --git a/statistics/handle/gc.go b/statistics/handle/gc.go index 3264485e6b703..7bb4bcd6a426f 100644 --- a/statistics/handle/gc.go +++ b/statistics/handle/gc.go @@ -15,7 +15,6 @@ package handle import ( "context" - "fmt" "time" "github.com/cznic/mathutil" @@ -27,6 +26,7 @@ import ( // GCStats will garbage collect the useless stats info. For dropped tables, we will first update their version so that // other tidb could know that table is deleted. func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) error { + ctx := context.Background() // To make sure that all the deleted tables' schema and stats info have been acknowledged to all tidb, // we only garbage collect version before 10 lease. lease := mathutil.MaxInt64(int64(h.Lease()), int64(ddlLease)) @@ -34,8 +34,8 @@ func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) error if h.LastUpdateVersion() < offset { return nil } - sql := fmt.Sprintf("select table_id from mysql.stats_meta where version < %d", h.LastUpdateVersion()-offset) - rows, _, err := h.restrictedExec.ExecRestrictedSQL(sql) + gcVer := h.LastUpdateVersion() - offset + rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_meta where version < %?", gcVer) if err != nil { return errors.Trace(err) } @@ -48,17 +48,18 @@ func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) error } func (h *Handle) gcTableStats(is infoschema.InfoSchema, physicalID int64) error { - sql := fmt.Sprintf("select is_index, hist_id from mysql.stats_histograms where table_id = %d", physicalID) - rows, _, err := h.restrictedExec.ExecRestrictedSQL(sql) + ctx := context.Background() + rows, _, err := h.execRestrictedSQL(ctx, "select is_index, hist_id from mysql.stats_histograms where table_id = %?", physicalID) if err != nil { return errors.Trace(err) } // The table has already been deleted in stats and acknowledged to all tidb, // we can safely remove the meta info now. if len(rows) == 0 { - sql := fmt.Sprintf("delete from mysql.stats_meta where table_id = %d", physicalID) - _, _, err := h.restrictedExec.ExecRestrictedSQL(sql) - return errors.Trace(err) + _, _, err = h.execRestrictedSQL(ctx, "delete from mysql.stats_meta where table_id = %?", physicalID) + if err != nil { + return errors.Trace(err) + } } h.mu.Lock() tbl, ok := h.getTableByPhysicalID(is, physicalID) @@ -99,29 +100,37 @@ func (h *Handle) deleteHistStatsFromKV(physicalID int64, histID int64, isIndex i h.mu.Lock() defer h.mu.Unlock() + ctx := context.Background() exec := h.mu.ctx.(sqlexec.SQLExecutor) - _, err = exec.Execute(context.Background(), "begin") + _, err = exec.ExecuteInternal(ctx, "begin") if err != nil { return errors.Trace(err) } defer func() { - err = finishTransaction(context.Background(), exec, err) + err = finishTransaction(ctx, exec, err) }() txn, err := h.mu.ctx.Txn(true) if err != nil { return errors.Trace(err) } startTS := txn.StartTS() - sqls := make([]string, 0, 4) // First of all, we update the version. If this table doesn't exist, it won't have any problem. Because we cannot delete anything. - sqls = append(sqls, fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", startTS, physicalID)) + if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %? where table_id = %? ", startTS, physicalID); err != nil { + return err + } // delete histogram meta - sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_histograms where table_id = %d and hist_id = %d and is_index = %d", physicalID, histID, isIndex)) + if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_histograms where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil { + return err + } // delete top n data - sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_top_n where table_id = %d and hist_id = %d and is_index = %d", physicalID, histID, isIndex)) + if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_top_n where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil { + return err + } // delete all buckets - sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_buckets where table_id = %d and hist_id = %d and is_index = %d", physicalID, histID, isIndex)) - return execSQLs(context.Background(), exec, sqls) + if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_buckets where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil { + return err + } + return nil } // DeleteTableStatsFromKV deletes table statistics from kv. @@ -129,7 +138,7 @@ func (h *Handle) DeleteTableStatsFromKV(physicalID int64) (err error) { h.mu.Lock() defer h.mu.Unlock() exec := h.mu.ctx.(sqlexec.SQLExecutor) - _, err = exec.Execute(context.Background(), "begin") + _, err = exec.ExecuteInternal(context.Background(), "begin") if err != nil { return errors.Trace(err) } @@ -140,13 +149,23 @@ func (h *Handle) DeleteTableStatsFromKV(physicalID int64) (err error) { if err != nil { return errors.Trace(err) } + ctx := context.Background() startTS := txn.StartTS() - sqls := make([]string, 0, 5) // We only update the version so that other tidb will know that this table is deleted. - sqls = append(sqls, fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", startTS, physicalID)) - sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_histograms where table_id = %d", physicalID)) - sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_buckets where table_id = %d", physicalID)) - sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_top_n where table_id = %d", physicalID)) - sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_feedback where table_id = %d", physicalID)) - return execSQLs(context.Background(), exec, sqls) + if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %? where table_id = %? ", startTS, physicalID); err != nil { + return err + } + if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_histograms where table_id = %?", physicalID); err != nil { + return err + } + if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_buckets where table_id = %?", physicalID); err != nil { + return err + } + if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_top_n where table_id = %?", physicalID); err != nil { + return err + } + if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_feedback where table_id = %?", physicalID); err != nil { + return err + } + return nil } diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 3c681aafb3804..2d8915a8e0478 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -20,12 +20,12 @@ import ( "sync/atomic" "time" + "github.com/ngaut/pools" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" - "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/sessionctx" @@ -68,7 +68,7 @@ type Handle struct { atomic.Value } - restrictedExec sqlexec.RestrictedSQLExecutor + pool sessionPool // ddlEventCh is a channel to notify a ddl operation has happened. // It is sent only by owner or the drop stats executor, and read by stats handle. @@ -83,6 +83,37 @@ type Handle struct { lease atomic2.Duration } +func (h *Handle) withRestrictedSQLExecutor(ctx context.Context, fn func(context.Context, sqlexec.RestrictedSQLExecutor) ([]chunk.Row, []*ast.ResultField, error)) ([]chunk.Row, []*ast.ResultField, error) { + se, err := h.pool.Get() + if err != nil { + return nil, nil, errors.Trace(err) + } + defer h.pool.Put(se) + + exec := se.(sqlexec.RestrictedSQLExecutor) + return fn(ctx, exec) +} + +func (h *Handle) execRestrictedSQL(ctx context.Context, sql string, params ...interface{}) ([]chunk.Row, []*ast.ResultField, error) { + return h.withRestrictedSQLExecutor(ctx, func(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) ([]chunk.Row, []*ast.ResultField, error) { + stmt, err := exec.ParseWithParams(ctx, sql, params...) + if err != nil { + return nil, nil, errors.Trace(err) + } + return exec.ExecRestrictedStmt(ctx, stmt) + }) +} + +func (h *Handle) execRestrictedSQLWithSnapshot(ctx context.Context, sql string, snapshot uint64, params ...interface{}) ([]chunk.Row, []*ast.ResultField, error) { + return h.withRestrictedSQLExecutor(ctx, func(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) ([]chunk.Row, []*ast.ResultField, error) { + stmt, err := exec.ParseWithParams(ctx, sql, params...) + if err != nil { + return nil, nil, errors.Trace(err) + } + return exec.ExecRestrictedStmt(ctx, stmt, sqlexec.ExecOptionWithSnapshot(snapshot)) + }) +} + // Clear the statsCache, only for test. func (h *Handle) Clear() { h.mu.Lock() @@ -101,19 +132,21 @@ func (h *Handle) Clear() { h.mu.Unlock() } +type sessionPool interface { + Get() (pools.Resource, error) + Put(pools.Resource) +} + // NewHandle creates a Handle for update stats. -func NewHandle(ctx sessionctx.Context, lease time.Duration) *Handle { +func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool) *Handle { handle := &Handle{ ddlEventCh: make(chan *util.Event, 100), listHead: &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)}, globalMap: make(tableDeltaMap), feedback: statistics.NewQueryFeedbackMap(), + pool: pool, } handle.lease.Store(lease) - // It is safe to use it concurrently because the exec won't touch the ctx. - if exec, ok := ctx.(sqlexec.RestrictedSQLExecutor); ok { - handle.restrictedExec = exec - } handle.mu.ctx = ctx handle.mu.rateMap = make(errorRateDeltaMap) handle.statsCache.Store(statsCache{tables: make(map[int64]*statistics.Table)}) @@ -158,8 +191,8 @@ func (h *Handle) Update(is infoschema.InfoSchema) error { } else { lastVersion = 0 } - sql := fmt.Sprintf("SELECT version, table_id, modify_count, count from mysql.stats_meta where version > %d order by version", lastVersion) - rows, _, err := h.restrictedExec.ExecRestrictedSQL(sql) + ctx := context.Background() + rows, _, err := h.execRestrictedSQL(ctx, "SELECT version, table_id, modify_count, count from mysql.stats_meta where version > %? order by version", lastVersion) if err != nil { return errors.Trace(err) } @@ -181,7 +214,7 @@ func (h *Handle) Update(is infoschema.InfoSchema) error { continue } tableInfo := table.Meta() - tbl, err := h.tableStatsFromStorage(tableInfo, physicalID, false, nil) + tbl, err := h.tableStatsFromStorage(tableInfo, physicalID, false, 0) // Error is not nil may mean that there are some ddl changes on this table, we will not update it. if err != nil { logutil.BgLogger().Error("[stats] error occurred when read table stats", zap.String("table", tableInfo.Name.O), zap.Error(err)) @@ -281,7 +314,7 @@ func (sc statsCache) update(tables []*statistics.Table, deletedIDs []int64, newV // LoadNeededHistograms will load histograms for those needed columns. func (h *Handle) LoadNeededHistograms() (err error) { cols := statistics.HistogramNeededColumns.AllCols() - reader, err := h.getStatsReader(nil) + reader, err := h.getStatsReader(0) if err != nil { return err } @@ -355,13 +388,11 @@ func (h *Handle) FlushStats() { } func (h *Handle) cmSketchFromStorage(reader *statsReader, tblID int64, isIndex, histID int64) (_ *statistics.CMSketch, err error) { - selSQL := fmt.Sprintf("select cm_sketch from mysql.stats_histograms where table_id = %d and is_index = %d and hist_id = %d", tblID, isIndex, histID) - rows, _, err := reader.read(selSQL) + rows, _, err := reader.read("select cm_sketch from mysql.stats_histograms where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID) if err != nil || len(rows) == 0 { return nil, err } - selSQL = fmt.Sprintf("select HIGH_PRIORITY value, count from mysql.stats_top_n where table_id = %d and is_index = %d and hist_id = %d", tblID, isIndex, histID) - topNRows, _, err := reader.read(selSQL) + topNRows, _, err := reader.read("select HIGH_PRIORITY value, count from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID) if err != nil { return nil, err } @@ -497,8 +528,8 @@ func (h *Handle) columnStatsFromStorage(reader *statsReader, row chunk.Row, tabl } // tableStatsFromStorage loads table stats info from storage. -func (h *Handle) tableStatsFromStorage(tableInfo *model.TableInfo, physicalID int64, loadAll bool, historyStatsExec sqlexec.RestrictedSQLExecutor) (_ *statistics.Table, err error) { - reader, err := h.getStatsReader(historyStatsExec) +func (h *Handle) tableStatsFromStorage(tableInfo *model.TableInfo, physicalID int64, loadAll bool, snapshot uint64) (_ *statistics.Table, err error) { + reader, err := h.getStatsReader(snapshot) if err != nil { return nil, err } @@ -511,7 +542,7 @@ func (h *Handle) tableStatsFromStorage(tableInfo *model.TableInfo, physicalID in table, ok := h.statsCache.Load().(statsCache).tables[physicalID] // If table stats is pseudo, we also need to copy it, since we will use the column stats when // the average error rate of it is small. - if !ok || historyStatsExec != nil { + if !ok || snapshot > 0 { histColl := statistics.HistColl{ PhysicalID: physicalID, HavePhysicalID: true, @@ -526,8 +557,7 @@ func (h *Handle) tableStatsFromStorage(tableInfo *model.TableInfo, physicalID in table = table.Copy() } table.Pseudo = false - selSQL := fmt.Sprintf("select table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, flag, correlation, last_analyze_pos from mysql.stats_histograms where table_id = %d", physicalID) - rows, _, err := reader.read(selSQL) + rows, _, err := reader.read("select table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, flag, correlation, last_analyze_pos from mysql.stats_histograms where table_id = %?", physicalID) // Check deleted table. if err != nil || len(rows) == 0 { return nil, nil @@ -551,7 +581,7 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg defer h.mu.Unlock() ctx := context.TODO() exec := h.mu.ctx.(sqlexec.SQLExecutor) - _, err = exec.Execute(ctx, "begin") + _, err = exec.ExecuteInternal(ctx, "begin") if err != nil { return errors.Trace(err) } @@ -564,29 +594,40 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg } version := txn.StartTS() - sqls := make([]string, 0, 4) // If the count is less than 0, then we do not want to update the modify count and count. if count >= 0 { - sqls = append(sqls, fmt.Sprintf("replace into mysql.stats_meta (version, table_id, count) values (%d, %d, %d)", version, tableID, count)) + _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_meta (version, table_id, count) values (%?, %?, %?)", version, tableID, count) } else { - sqls = append(sqls, fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d", version, tableID)) + _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %? where table_id = %?", version, tableID) + } + if err != nil { + return err } data, err := statistics.EncodeCMSketchWithoutTopN(cms) if err != nil { - return + return err } // Delete outdated data - sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_top_n where table_id = %d and is_index = %d and hist_id = %d", tableID, isIndex, hg.ID)) + if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil { + return err + } for _, meta := range cms.TopN() { - sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_top_n (table_id, is_index, hist_id, value, count) values (%d, %d, %d, X'%X', %d)", tableID, isIndex, hg.ID, meta.Data, meta.Count)) + _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_top_n (table_id, is_index, hist_id, value, count) values (%?, %?, %?, %?, %?)", tableID, isIndex, hg.ID, meta.Data, meta.Count) + if err != nil { + return err + } } flag := 0 if isAnalyzed == 1 { flag = statistics.AnalyzeFlag } - sqls = append(sqls, fmt.Sprintf("replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, flag, correlation) values (%d, %d, %d, %d, %d, %d, X'%X', %d, %d, %d, %f)", - tableID, isIndex, hg.ID, hg.NDV, version, hg.NullCount, data, hg.TotColSize, statistics.CurStatsVersion, flag, hg.Correlation)) - sqls = append(sqls, fmt.Sprintf("delete from mysql.stats_buckets where table_id = %d and is_index = %d and hist_id = %d", tableID, isIndex, hg.ID)) + if _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, flag, correlation) values (%?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?)", + tableID, isIndex, hg.ID, hg.NDV, version, hg.NullCount, data, hg.TotColSize, statistics.CurStatsVersion, flag, hg.Correlation); err != nil { + return err + } + if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil { + return err + } sc := h.mu.ctx.GetSessionVars().StmtCtx var lastAnalyzePos []byte for i := range hg.Buckets { @@ -607,12 +648,16 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg if err != nil { return } - sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_buckets(table_id, is_index, hist_id, bucket_id, count, repeats, lower_bound, upper_bound) values(%d, %d, %d, %d, %d, %d, X'%X', X'%X')", tableID, isIndex, hg.ID, i, count, hg.Buckets[i].Repeat, lowerBound.GetBytes(), upperBound.GetBytes())) + if _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_buckets(table_id, is_index, hist_id, bucket_id, count, repeats, lower_bound, upper_bound) values(%?, %?, %?, %?, %?, %?, %?, %?)", tableID, isIndex, hg.ID, i, count, hg.Buckets[i].Repeat, lowerBound.GetBytes(), upperBound.GetBytes()); err != nil { + return err + } } if isAnalyzed == 1 && len(lastAnalyzePos) > 0 { - sqls = append(sqls, fmt.Sprintf("update mysql.stats_histograms set last_analyze_pos = X'%X' where table_id = %d and is_index = %d and hist_id = %d", lastAnalyzePos, tableID, isIndex, hg.ID)) + if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_histograms set last_analyze_pos = %? where table_id = %? and is_index = %? and hist_id = %?", lastAnalyzePos, tableID, isIndex, hg.ID); err != nil { + return err + } } - return execSQLs(context.Background(), exec, sqls) + return } // SaveMetaToStorage will save stats_meta to storage. @@ -621,7 +666,7 @@ func (h *Handle) SaveMetaToStorage(tableID, count, modifyCount int64) (err error defer h.mu.Unlock() ctx := context.TODO() exec := h.mu.ctx.(sqlexec.SQLExecutor) - _, err = exec.Execute(ctx, "begin") + _, err = exec.ExecuteInternal(ctx, "begin") if err != nil { return errors.Trace(err) } @@ -632,16 +677,13 @@ func (h *Handle) SaveMetaToStorage(tableID, count, modifyCount int64) (err error if err != nil { return errors.Trace(err) } - var sql string version := txn.StartTS() - sql = fmt.Sprintf("replace into mysql.stats_meta (version, table_id, count, modify_count) values (%d, %d, %d, %d)", version, tableID, count, modifyCount) - _, err = exec.Execute(ctx, sql) - return + _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_meta (version, table_id, count, modify_count) values (%?, %?, %?, %?)", version, tableID, count, modifyCount) + return err } func (h *Handle) histogramFromStorage(reader *statsReader, tableID int64, colID int64, tp *types.FieldType, distinct int64, isIndex int, ver uint64, nullCount int64, totColSize int64, corr float64) (_ *statistics.Histogram, err error) { - selSQL := fmt.Sprintf("select count, repeats, lower_bound, upper_bound from mysql.stats_buckets where table_id = %d and is_index = %d and hist_id = %d order by bucket_id", tableID, isIndex, colID) - rows, fields, err := reader.read(selSQL) + rows, fields, err := reader.read("select count, repeats, lower_bound, upper_bound from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %? order by bucket_id", tableID, isIndex, colID) if err != nil { return nil, errors.Trace(err) } @@ -677,8 +719,7 @@ func (h *Handle) histogramFromStorage(reader *statsReader, tableID int64, colID } func (h *Handle) columnCountFromStorage(reader *statsReader, tableID, colID int64) (int64, error) { - selSQL := fmt.Sprintf("select sum(count) from mysql.stats_buckets where table_id = %d and is_index = %d and hist_id = %d", tableID, 0, colID) - rows, _, err := reader.read(selSQL) + rows, _, err := reader.read("select sum(count) from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %?", tableID, 0, colID) if err != nil { return 0, errors.Trace(err) } @@ -688,13 +729,16 @@ func (h *Handle) columnCountFromStorage(reader *statsReader, tableID, colID int6 return rows[0].GetMyDecimal(0).ToInt() } -func (h *Handle) statsMetaByTableIDFromStorage(tableID int64, historyStatsExec sqlexec.RestrictedSQLExecutor) (version uint64, modifyCount, count int64, err error) { - selSQL := fmt.Sprintf("SELECT version, modify_count, count from mysql.stats_meta where table_id = %d order by version", tableID) +func (h *Handle) statsMetaByTableIDFromStorage(tableID int64, snapshot uint64) (version uint64, modifyCount, count int64, err error) { + ctx := context.Background() var rows []chunk.Row - if historyStatsExec == nil { - rows, _, err = h.restrictedExec.ExecRestrictedSQL(selSQL) + if snapshot == 0 { + rows, _, err = h.execRestrictedSQL(ctx, "SELECT version, modify_count, count from mysql.stats_meta where table_id = %? order by version", tableID) } else { - rows, _, err = historyStatsExec.ExecRestrictedSQLWithSnapshot(selSQL) + rows, _, err = h.execRestrictedSQLWithSnapshot(ctx, "SELECT version, modify_count, count from mysql.stats_meta where table_id = %? order by version", snapshot, tableID) + if err != nil { + return 0, 0, 0, err + } } if err != nil || len(rows) == 0 { return @@ -708,49 +752,34 @@ func (h *Handle) statsMetaByTableIDFromStorage(tableID int64, historyStatsExec s // statsReader is used for simplify code that needs to read system tables in different sqls // but requires the same transactions. type statsReader struct { - ctx sessionctx.Context - history sqlexec.RestrictedSQLExecutor + ctx sqlexec.RestrictedSQLExecutor + snapshot uint64 } -func (sr *statsReader) read(sql string) (rows []chunk.Row, fields []*ast.ResultField, err error) { - if sr.history != nil { - return sr.history.ExecRestrictedSQLWithSnapshot(sql) - } - rc, err := sr.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) - if len(rc) > 0 { - defer terror.Call(rc[0].Close) - } +func (sr *statsReader) read(sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, err error) { + ctx := context.TODO() + stmt, err := sr.ctx.ParseWithParams(ctx, sql, args...) if err != nil { - return nil, nil, err + return nil, nil, errors.Trace(err) } - for { - req := rc[0].NewChunk() - err := rc[0].Next(context.TODO(), req) - if err != nil { - return nil, nil, err - } - if req.NumRows() == 0 { - break - } - for i := 0; i < req.NumRows(); i++ { - rows = append(rows, req.GetRow(i)) - } + if sr.snapshot > 0 { + return sr.ctx.ExecRestrictedStmt(ctx, stmt, sqlexec.ExecOptionWithSnapshot(sr.snapshot)) } - return rows, rc[0].Fields(), nil + return sr.ctx.ExecRestrictedStmt(ctx, stmt) } func (sr *statsReader) isHistory() bool { - return sr.history != nil + return sr.snapshot > 0 } -func (h *Handle) getStatsReader(history sqlexec.RestrictedSQLExecutor) (reader *statsReader, err error) { +func (h *Handle) getStatsReader(snapshot uint64) (reader *statsReader, err error) { failpoint.Inject("mockGetStatsReaderFail", func(val failpoint.Value) { if val.(bool) { failpoint.Return(nil, errors.New("gofail genStatsReader error")) } }) - if history != nil { - return &statsReader{history: history}, nil + if snapshot > 0 { + return &statsReader{ctx: h.mu.ctx.(sqlexec.RestrictedSQLExecutor), snapshot: snapshot}, nil } h.mu.Lock() defer func() { @@ -762,18 +791,18 @@ func (h *Handle) getStatsReader(history sqlexec.RestrictedSQLExecutor) (reader * } }() failpoint.Inject("mockGetStatsReaderPanic", nil) - _, err = h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "begin") + _, err = h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), "begin") if err != nil { return nil, err } - return &statsReader{ctx: h.mu.ctx}, nil + return &statsReader{ctx: h.mu.ctx.(sqlexec.RestrictedSQLExecutor)}, nil } func (h *Handle) releaseStatsReader(reader *statsReader) error { - if reader.history != nil { + if reader.snapshot > 0 { return nil } - _, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "commit") + _, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), "commit") h.mu.Unlock() return err } diff --git a/statistics/handle/handle_test.go b/statistics/handle/handle_test.go index eefcc08dee748..32b491ce7ef35 100644 --- a/statistics/handle/handle_test.go +++ b/statistics/handle/handle_test.go @@ -265,7 +265,7 @@ func (s *testStatsSuite) TestVersion(c *C) { tbl1, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) c.Assert(err, IsNil) tableInfo1 := tbl1.Meta() - h := handle.NewHandle(testKit.Se, time.Millisecond) + h := handle.NewHandle(testKit.Se, time.Millisecond, do.SysSessionPool()) unit := oracle.ComposeTS(1, 0) testKit.MustExec("update mysql.stats_meta set version = ? where table_id = ?", 2*unit, tableInfo1.ID) @@ -499,7 +499,7 @@ func (s *testStatsSuite) TestCorrelation(c *C) { result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() c.Assert(len(result.Rows()), Equals, 2) c.Assert(result.Rows()[0][9], Equals, "0") - c.Assert(result.Rows()[1][9], Equals, "0.828571") + c.Assert(result.Rows()[1][9], Equals, "0.8285714285714286") testKit.MustExec("truncate table t") result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() @@ -515,7 +515,7 @@ func (s *testStatsSuite) TestCorrelation(c *C) { result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() c.Assert(len(result.Rows()), Equals, 2) c.Assert(result.Rows()[0][9], Equals, "0") - c.Assert(result.Rows()[1][9], Equals, "-0.942857") + c.Assert(result.Rows()[1][9], Equals, "-0.9428571428571428") testKit.MustExec("truncate table t") testKit.MustExec("insert into t values (1,1),(2,1),(3,1),(4,1),(5,1),(6,1),(7,1),(8,1),(9,1),(10,1),(11,1),(12,1),(13,1),(14,1),(15,1),(16,1),(17,1),(18,1),(19,1),(20,2),(21,2),(22,2),(23,2),(24,2),(25,2)") @@ -532,14 +532,14 @@ func (s *testStatsSuite) TestCorrelation(c *C) { result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() c.Assert(len(result.Rows()), Equals, 2) c.Assert(result.Rows()[0][9], Equals, "1") - c.Assert(result.Rows()[1][9], Equals, "0.828571") + c.Assert(result.Rows()[1][9], Equals, "0.8285714285714286") testKit.MustExec("truncate table t") testKit.MustExec("insert into t values(1,1),(2,7),(3,12),(8,18),(4,20),(5,21)") testKit.MustExec("analyze table t") result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() c.Assert(len(result.Rows()), Equals, 2) - c.Assert(result.Rows()[0][9], Equals, "0.828571") + c.Assert(result.Rows()[0][9], Equals, "0.8285714285714286") c.Assert(result.Rows()[1][9], Equals, "1") testKit.MustExec("drop table t") diff --git a/statistics/handle/update.go b/statistics/handle/update.go index a17357cef2c5b..0ee940b559f05 100644 --- a/statistics/handle/update.go +++ b/statistics/handle/update.go @@ -331,7 +331,7 @@ func (h *Handle) dumpTableStatCountToKV(id int64, delta variable.TableDelta) (up defer h.mu.Unlock() ctx := context.TODO() exec := h.mu.ctx.(sqlexec.SQLExecutor) - _, err = exec.Execute(ctx, "begin") + _, err = exec.ExecuteInternal(ctx, "begin") if err != nil { return false, errors.Trace(err) } @@ -344,13 +344,14 @@ func (h *Handle) dumpTableStatCountToKV(id int64, delta variable.TableDelta) (up return false, errors.Trace(err) } startTS := txn.StartTS() - var sql string if delta.Delta < 0 { - sql = fmt.Sprintf("update mysql.stats_meta set version = %d, count = count - %d, modify_count = modify_count + %d where table_id = %d and count >= %d", startTS, -delta.Delta, delta.Count, id, -delta.Delta) + _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %?, count = count - %?, modify_count = modify_count + %? where table_id = %? and count >= %?", startTS, -delta.Delta, delta.Count, id, -delta.Delta) } else { - sql = fmt.Sprintf("update mysql.stats_meta set version = %d, count = count + %d, modify_count = modify_count + %d where table_id = %d", startTS, delta.Delta, delta.Count, id) + _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %?, count = count + %?, modify_count = modify_count + %? where table_id = %?", startTS, delta.Delta, delta.Count, id) + } + if err != nil { + return false, errors.Trace(err) } - err = execSQLs(context.Background(), exec, []string{sql}) updated = h.mu.ctx.GetSessionVars().StmtCtx.AffectedRows() > 0 return } @@ -371,7 +372,7 @@ func (h *Handle) dumpTableStatColSizeToKV(id int64, delta variable.TableDelta) e } sql := fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, tot_col_size) "+ "values %s on duplicate key update tot_col_size = tot_col_size + values(tot_col_size)", strings.Join(values, ",")) - _, _, err := h.restrictedExec.ExecRestrictedSQL(sql) + _, _, err := h.execRestrictedSQL(context.Background(), sql) return errors.Trace(err) } @@ -409,10 +410,9 @@ func (h *Handle) DumpFeedbackToKV(fb *statistics.QueryFeedback) error { if fb.Tp == statistics.IndexType { isIndex = 1 } - sql := fmt.Sprintf("insert into mysql.stats_feedback (table_id, hist_id, is_index, feedback) values "+ - "(%d, %d, %d, X'%X')", fb.PhysicalID, fb.Hist.ID, isIndex, vals) + const sql = "insert into mysql.stats_feedback (table_id, hist_id, is_index, feedback) values (%?, %?, %?, %?)" h.mu.Lock() - _, err = h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + _, err = h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql, fb.PhysicalID, fb.Hist.ID, isIndex, vals) h.mu.Unlock() if err != nil { metrics.DumpFeedbackCounter.WithLabelValues(metrics.LblError).Inc() @@ -503,8 +503,8 @@ func (h *Handle) UpdateErrorRate(is infoschema.InfoSchema) { // HandleUpdateStats update the stats using feedback. func (h *Handle) HandleUpdateStats(is infoschema.InfoSchema) error { - sql := "SELECT distinct table_id from mysql.stats_feedback" - tables, _, err := h.restrictedExec.ExecRestrictedSQL(sql) + ctx := context.Background() + tables, _, err := h.execRestrictedSQL(ctx, "SELECT distinct table_id from mysql.stats_feedback") if err != nil { return errors.Trace(err) } @@ -516,20 +516,18 @@ func (h *Handle) HandleUpdateStats(is infoschema.InfoSchema) error { // this func lets `defer` works normally, where `Close()` should be called before any return err = func() error { tbl := ptbl.GetInt64(0) - sql = fmt.Sprintf("select table_id, hist_id, is_index, feedback from mysql.stats_feedback where table_id=%d order by hist_id, is_index", tbl) - rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) - if len(rc) > 0 { - defer terror.Call(rc[0].Close) - } + const sql = "select table_id, hist_id, is_index, feedback from mysql.stats_feedback where table_id=%? order by hist_id, is_index" + rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql, tbl) if err != nil { return errors.Trace(err) } + defer terror.Call(rc.Close) tableID, histID, isIndex := int64(-1), int64(-1), int64(-1) var rows []chunk.Row for { - req := rc[0].NewChunk() + req := rc.NewChunk() iter := chunk.NewIterator4Chunk(req) - err := rc[0].Next(context.TODO(), req) + err := rc.Next(context.TODO(), req) if err != nil { return errors.Trace(err) } @@ -622,8 +620,8 @@ func (h *Handle) deleteOutdatedFeedback(tableID, histID, isIndex int64) error { defer h.mu.Unlock() hasData := true for hasData { - sql := fmt.Sprintf("delete from mysql.stats_feedback where table_id = %d and hist_id = %d and is_index = %d limit 10000", tableID, histID, isIndex) - _, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + sql := "delete from mysql.stats_feedback where table_id = %? and hist_id = %? and is_index = %? limit 10000" + _, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql, tableID, histID, isIndex) if err != nil { return errors.Trace(err) } @@ -694,9 +692,9 @@ func NeedAnalyzeTable(tbl *statistics.Table, limit time.Duration, autoAnalyzeRat } func (h *Handle) getAutoAnalyzeParameters() map[string]string { - sql := fmt.Sprintf("select variable_name, variable_value from mysql.global_variables where variable_name in ('%s', '%s', '%s')", - variable.TiDBAutoAnalyzeRatio, variable.TiDBAutoAnalyzeStartTime, variable.TiDBAutoAnalyzeEndTime) - rows, _, err := h.restrictedExec.ExecRestrictedSQL(sql) + ctx := context.Background() + sql := "select variable_name, variable_value from mysql.global_variables where variable_name in (%?, %?, %?)" + rows, _, err := h.execRestrictedSQL(ctx, sql, variable.TiDBAutoAnalyzeRatio, variable.TiDBAutoAnalyzeStartTime, variable.TiDBAutoAnalyzeEndTime) if err != nil { return map[string]string{} } @@ -745,11 +743,10 @@ func (h *Handle) HandleAutoAnalyze(is infoschema.InfoSchema) (analyzed bool) { for _, tbl := range tbls { tblInfo := tbl.Meta() pi := tblInfo.GetPartitionInfo() - tblName := "`" + db + "`.`" + tblInfo.Name.O + "`" if pi == nil { statsTbl := h.GetTableStats(tblInfo) - sql := fmt.Sprintf("analyze table %s", tblName) - analyzed := h.autoAnalyzeTable(tblInfo, statsTbl, start, end, autoAnalyzeRatio, sql) + sql := "analyze table %n.%n" + analyzed := h.autoAnalyzeTable(tblInfo, statsTbl, start, end, autoAnalyzeRatio, sql, db, tblInfo.Name.O) if analyzed { // analyze one table at a time to let it get the freshest parameters. // others will be analyzed next round which is just 3s later. @@ -758,9 +755,9 @@ func (h *Handle) HandleAutoAnalyze(is infoschema.InfoSchema) (analyzed bool) { continue } for _, def := range pi.Definitions { - sql := fmt.Sprintf("analyze table %s partition `%s`", tblName, def.Name.O) + sql := "analyze table %n.%n partition %n" statsTbl := h.GetPartitionStats(tblInfo, def.ID) - analyzed := h.autoAnalyzeTable(tblInfo, statsTbl, start, end, autoAnalyzeRatio, sql) + analyzed := h.autoAnalyzeTable(tblInfo, statsTbl, start, end, autoAnalyzeRatio, sql, db, tblInfo.Name.O, def.Name.O) if analyzed { return true } @@ -771,29 +768,28 @@ func (h *Handle) HandleAutoAnalyze(is infoschema.InfoSchema) (analyzed bool) { return false } -func (h *Handle) autoAnalyzeTable(tblInfo *model.TableInfo, statsTbl *statistics.Table, start, end time.Time, ratio float64, sql string) bool { +func (h *Handle) autoAnalyzeTable(tblInfo *model.TableInfo, statsTbl *statistics.Table, start, end time.Time, ratio float64, sql string, params ...interface{}) bool { if statsTbl.Pseudo || statsTbl.Count < AutoAnalyzeMinCnt { return false } if needAnalyze, reason := NeedAnalyzeTable(statsTbl, 20*h.Lease(), ratio, start, end, time.Now()); needAnalyze { logutil.BgLogger().Info("[stats] auto analyze triggered", zap.String("sql", sql), zap.String("reason", reason)) - h.execAutoAnalyze(sql) + h.execAutoAnalyze(sql, params...) return true } for _, idx := range tblInfo.Indices { if _, ok := statsTbl.Indices[idx.ID]; !ok && idx.State == model.StatePublic { - sql = fmt.Sprintf("%s index `%s`", sql, idx.Name.O) logutil.BgLogger().Info("[stats] auto analyze for unanalyzed", zap.String("sql", sql)) - h.execAutoAnalyze(sql) + h.execAutoAnalyze(sql+" index %n", append(params, idx.Name.O)...) return true } } return false } -func (h *Handle) execAutoAnalyze(sql string) { +func (h *Handle) execAutoAnalyze(sql string, params ...interface{}) { startTime := time.Now() - _, _, err := h.restrictedExec.ExecRestrictedSQL(sql) + _, _, err := h.execRestrictedSQL(context.Background(), sql, params...) dur := time.Since(startTime) metrics.AutoAnalyzeHistogram.Observe(dur.Seconds()) if err != nil { diff --git a/types/datum.go b/types/datum.go index 08c989d366a74..7a72024488006 100644 --- a/types/datum.go +++ b/types/datum.go @@ -194,7 +194,10 @@ var sink = func(s string) { // GetBytes gets bytes value. func (d *Datum) GetBytes() []byte { - return d.b + if d.b != nil { + return d.b + } + return []byte{} } // SetBytes sets bytes value to datum.