Skip to content

Commit

Permalink
statistics: refactor the statistics package use the RestrictedSQLExec…
Browse files Browse the repository at this point in the history
…utor API (#22636) (#22961) (#23225)
  • Loading branch information
ti-srebot authored Mar 16, 2021
1 parent 430a81c commit d4750fe
Show file tree
Hide file tree
Showing 10 changed files with 262 additions and 216 deletions.
5 changes: 3 additions & 2 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,8 @@ func (do *Domain) StatsHandle() *handle.Handle {

// CreateStatsHandle is used only for test.
func (do *Domain) CreateStatsHandle(ctx sessionctx.Context) {
atomic.StorePointer(&do.statsHandle, unsafe.Pointer(handle.NewHandle(ctx, do.statsLease)))
h := handle.NewHandle(ctx, do.statsLease, do.sysSessionPool)
atomic.StorePointer(&do.statsHandle, unsafe.Pointer(h))
}

// StatsUpdating checks if the stats worker is updating.
Expand All @@ -1040,7 +1041,7 @@ var RunAutoAnalyze = true
// It should be called only once in BootstrapSession.
func (do *Domain) UpdateTableStatsLoop(ctx sessionctx.Context) error {
ctx.GetSessionVars().InRestrictedSQL = true
statsHandle := handle.NewHandle(ctx, do.statsLease)
statsHandle := handle.NewHandle(ctx, do.statsLease, do.sysSessionPool)
atomic.StorePointer(&do.statsHandle, unsafe.Pointer(statsHandle))
do.ddl.RegisterEventCh(statsHandle.DDLEventCh())
// Negative stats lease indicates that it is in test, it does not need update.
Expand Down
5 changes: 1 addition & 4 deletions server/statistics_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/sqlexec"
)

// StatsHandler is the handler for dumping statistics.
Expand Down Expand Up @@ -122,9 +121,7 @@ func (sh StatsHistoryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
writeError(w, err)
return
}
se.GetSessionVars().SnapshotInfoschema, se.GetSessionVars().SnapshotTS = is, snapshot
historyStatsExec := se.(sqlexec.RestrictedSQLExecutor)
js, err := h.DumpStatsToJSON(params[pDBName], tbl.Meta(), historyStatsExec)
js, err := h.DumpStatsToJSONBySnapshot(params[pDBName], tbl.Meta(), snapshot)
if err != nil {
writeError(w, err)
} else {
Expand Down
44 changes: 18 additions & 26 deletions statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,16 @@ func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache *statsCache

func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (statsCache, error) {
sql := "select HIGH_PRIORITY version, table_id, modify_count, count from mysql.stats_meta"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if len(rc) > 0 {
defer terror.Call(rc[0].Close)
}
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql)
if err != nil {
return statsCache{}, errors.Trace(err)
}
defer terror.Call(rc.Close)
tables := statsCache{tables: make(map[int64]*statistics.Table)}
req := rc[0].NewChunk()
req := rc.NewChunk()
iter := chunk.NewIterator4Chunk(req)
for {
err := rc[0].Next(context.TODO(), req)
err := rc.Next(context.TODO(), req)
if err != nil {
return statsCache{}, errors.Trace(err)
}
Expand Down Expand Up @@ -147,17 +145,15 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache *stat

func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache *statsCache) error {
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if len(rc) > 0 {
defer terror.Call(rc[0].Close)
}
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql)
if err != nil {
return errors.Trace(err)
}
req := rc[0].NewChunk()
defer terror.Call(rc.Close)
req := rc.NewChunk()
iter := chunk.NewIterator4Chunk(req)
for {
err := rc[0].Next(context.TODO(), req)
err := rc.Next(context.TODO(), req)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -187,17 +183,15 @@ func (h *Handle) initStatsTopN4Chunk(cache *statsCache, iter *chunk.Iterator4Chu

func (h *Handle) initStatsTopN(cache *statsCache) error {
sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if len(rc) > 0 {
defer terror.Call(rc[0].Close)
}
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql)
if err != nil {
return errors.Trace(err)
}
req := rc[0].NewChunk()
defer terror.Call(rc.Close)
req := rc.NewChunk()
iter := chunk.NewIterator4Chunk(req)
for {
err := rc[0].Next(context.TODO(), req)
err := rc.Next(context.TODO(), req)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -257,17 +251,15 @@ func initStatsBuckets4Chunk(ctx sessionctx.Context, cache *statsCache, iter *chu

func (h *Handle) initStatsBuckets(cache *statsCache) error {
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if len(rc) > 0 {
defer terror.Call(rc[0].Close)
}
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql)
if err != nil {
return errors.Trace(err)
}
req := rc[0].NewChunk()
defer terror.Call(rc.Close)
req := rc.NewChunk()
iter := chunk.NewIterator4Chunk(req)
for {
err := rc[0].Next(context.TODO(), req)
err := rc.Next(context.TODO(), req)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -300,13 +292,13 @@ func (h *Handle) initStatsBuckets(cache *statsCache) error {
func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
h.mu.Lock()
defer func() {
_, err1 := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "commit")
_, err1 := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), "commit")
if err == nil && err1 != nil {
err = err1
}
h.mu.Unlock()
}()
_, err = h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "begin")
_, err = h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), "begin")
if err != nil {
return err
}
Expand Down
70 changes: 34 additions & 36 deletions statistics/handle/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package handle

import (
"context"
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -75,28 +74,34 @@ func (h *Handle) DDLEventCh() chan *util.Event {
func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (err error) {
h.mu.Lock()
defer h.mu.Unlock()
ctx := context.Background()
exec := h.mu.ctx.(sqlexec.SQLExecutor)
_, err = exec.Execute(context.Background(), "begin")
_, err = exec.ExecuteInternal(ctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(context.Background(), exec, err)
err = finishTransaction(ctx, exec, err)
}()
txn, err := h.mu.ctx.Txn(true)
if err != nil {
return errors.Trace(err)
}
startTS := txn.StartTS()
sqls := make([]string, 0, 1+len(info.Columns)+len(info.Indices))
sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_meta (version, table_id) values(%d, %d)", startTS, physicalID))
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_meta (version, table_id) values(%?, %?)", startTS, physicalID); err != nil {
return err
}
for _, col := range info.Columns {
sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 0, %d, 0, %d)", physicalID, col.ID, startTS))
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 0, %?, 0, %?)", physicalID, col.ID, startTS); err != nil {
return err
}
}
for _, idx := range info.Indices {
sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 1, %d, 0, %d)", physicalID, idx.ID, startTS))
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 1, %?, 0, %?)", physicalID, idx.ID, startTS); err != nil {
return err
}
}
return execSQLs(context.Background(), exec, sqls)
return nil
}

// insertColStats2KV insert a record to stats_histograms with distinct_count 1 and insert a bucket to stats_buckets with default value.
Expand All @@ -105,38 +110,36 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfo *model.ColumnInfo)
h.mu.Lock()
defer h.mu.Unlock()

ctx := context.TODO()
exec := h.mu.ctx.(sqlexec.SQLExecutor)
_, err = exec.Execute(context.Background(), "begin")
_, err = exec.ExecuteInternal(ctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(context.Background(), exec, err)
err = finishTransaction(ctx, exec, err)
}()
txn, err := h.mu.ctx.Txn(true)
if err != nil {
return errors.Trace(err)
}
startTS := txn.StartTS()
// First of all, we update the version.
_, err = exec.Execute(context.Background(), fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", startTS, physicalID))
_, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %? where table_id = %?", startTS, physicalID)
if err != nil {
return
}
ctx := context.TODO()
// If we didn't update anything by last SQL, it means the stats of this table does not exist.
if h.mu.ctx.GetSessionVars().StmtCtx.AffectedRows() > 0 {
// By this step we can get the count of this table, then we can sure the count and repeats of bucket.
var rs []sqlexec.RecordSet
rs, err = exec.Execute(ctx, fmt.Sprintf("select count from mysql.stats_meta where table_id = %d", physicalID))
if len(rs) > 0 {
defer terror.Call(rs[0].Close)
}
var rs sqlexec.RecordSet
rs, err = exec.ExecuteInternal(ctx, "select count from mysql.stats_meta where table_id = %?", physicalID)
if err != nil {
return
}
req := rs[0].NewChunk()
err = rs[0].Next(ctx, req)
defer terror.Call(rs.Close)
req := rs.NewChunk()
err = rs.Next(ctx, req)
if err != nil {
return
}
Expand All @@ -146,42 +149,37 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfo *model.ColumnInfo)
if err != nil {
return
}
sqls := make([]string, 0, 1)
if value.IsNull() {
// If the adding column has default value null, all the existing rows have null value on the newly added column.
sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%d, %d, 0, %d, 0, %d)", startTS, physicalID, colInfo.ID, count))
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%?, %?, 0, %?, 0, %?)", startTS, physicalID, colInfo.ID, count); err != nil {
return err
}
} else {
// If this stats exists, we insert histogram meta first, the distinct_count will always be one.
sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%d, %d, 0, %d, 1, %d)", startTS, physicalID, colInfo.ID, int64(len(value.GetBytes()))*count))
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%?, %?, 0, %?, 1, %?)", startTS, physicalID, colInfo.ID, int64(len(value.GetBytes()))*count); err != nil {
return err
}

value, err = value.ConvertTo(h.mu.ctx.GetSessionVars().StmtCtx, types.NewFieldType(mysql.TypeBlob))
if err != nil {
return
}
// There must be only one bucket for this new column and the value is the default value.
sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%d, 0, %d, 0, %d, %d, X'%X', X'%X')", physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes()))
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%?, 0, %?, 0, %?, %?, %?, %?)", physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes()); err != nil {
return err
}
}
return execSQLs(context.Background(), exec, sqls)
}
return
}

// finishTransaction will execute `commit` when error is nil, otherwise `rollback`.
func finishTransaction(ctx context.Context, exec sqlexec.SQLExecutor, err error) error {
if err == nil {
_, err = exec.Execute(ctx, "commit")
_, err = exec.ExecuteInternal(ctx, "commit")
} else {
_, err1 := exec.Execute(ctx, "rollback")
_, err1 := exec.ExecuteInternal(ctx, "rollback")
terror.Log(errors.Trace(err1))
}
return errors.Trace(err)
}

func execSQLs(ctx context.Context, exec sqlexec.SQLExecutor, sqls []string) error {
for _, sql := range sqls {
_, err := exec.Execute(ctx, sql)
if err != nil {
return err
}
}
return nil
}
21 changes: 16 additions & 5 deletions statistics/handle/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -63,17 +64,27 @@ func dumpJSONCol(hist *statistics.Histogram, CMSketch *statistics.CMSketch) *jso

// DumpStatsToJSON dumps statistic to json.
func (h *Handle) DumpStatsToJSON(dbName string, tableInfo *model.TableInfo, historyStatsExec sqlexec.RestrictedSQLExecutor) (*JSONTable, error) {
var snapshot uint64
if historyStatsExec != nil {
sctx := historyStatsExec.(sessionctx.Context)
snapshot = sctx.GetSessionVars().SnapshotTS
}
return h.DumpStatsToJSONBySnapshot(dbName, tableInfo, snapshot)
}

// DumpStatsToJSONBySnapshot dumps statistic to json.
func (h *Handle) DumpStatsToJSONBySnapshot(dbName string, tableInfo *model.TableInfo, snapshot uint64) (*JSONTable, error) {
pi := tableInfo.GetPartitionInfo()
if pi == nil {
return h.tableStatsToJSON(dbName, tableInfo, tableInfo.ID, historyStatsExec)
return h.tableStatsToJSON(dbName, tableInfo, tableInfo.ID, snapshot)
}
jsonTbl := &JSONTable{
DatabaseName: dbName,
TableName: tableInfo.Name.L,
Partitions: make(map[string]*JSONTable, len(pi.Definitions)),
}
for _, def := range pi.Definitions {
tbl, err := h.tableStatsToJSON(dbName, tableInfo, def.ID, historyStatsExec)
tbl, err := h.tableStatsToJSON(dbName, tableInfo, def.ID, snapshot)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -85,12 +96,12 @@ func (h *Handle) DumpStatsToJSON(dbName string, tableInfo *model.TableInfo, hist
return jsonTbl, nil
}

func (h *Handle) tableStatsToJSON(dbName string, tableInfo *model.TableInfo, physicalID int64, historyStatsExec sqlexec.RestrictedSQLExecutor) (*JSONTable, error) {
tbl, err := h.tableStatsFromStorage(tableInfo, physicalID, true, historyStatsExec)
func (h *Handle) tableStatsToJSON(dbName string, tableInfo *model.TableInfo, physicalID int64, snapshot uint64) (*JSONTable, error) {
tbl, err := h.tableStatsFromStorage(tableInfo, physicalID, true, snapshot)
if err != nil || tbl == nil {
return nil, err
}
tbl.Version, tbl.ModifyCount, tbl.Count, err = h.statsMetaByTableIDFromStorage(physicalID, historyStatsExec)
tbl.Version, tbl.ModifyCount, tbl.Count, err = h.statsMetaByTableIDFromStorage(physicalID, snapshot)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit d4750fe

Please sign in to comment.