Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: refactor the statistics package use the RestrictedSQLExecutor API (#22636) (#22961) #23225

Merged
merged 4 commits into from
Mar 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,8 @@ func (do *Domain) StatsHandle() *handle.Handle {

// CreateStatsHandle is used only for test.
func (do *Domain) CreateStatsHandle(ctx sessionctx.Context) {
atomic.StorePointer(&do.statsHandle, unsafe.Pointer(handle.NewHandle(ctx, do.statsLease)))
h := handle.NewHandle(ctx, do.statsLease, do.sysSessionPool)
atomic.StorePointer(&do.statsHandle, unsafe.Pointer(h))
}

// StatsUpdating checks if the stats worker is updating.
Expand All @@ -1040,7 +1041,7 @@ var RunAutoAnalyze = true
// It should be called only once in BootstrapSession.
func (do *Domain) UpdateTableStatsLoop(ctx sessionctx.Context) error {
ctx.GetSessionVars().InRestrictedSQL = true
statsHandle := handle.NewHandle(ctx, do.statsLease)
statsHandle := handle.NewHandle(ctx, do.statsLease, do.sysSessionPool)
atomic.StorePointer(&do.statsHandle, unsafe.Pointer(statsHandle))
do.ddl.RegisterEventCh(statsHandle.DDLEventCh())
// Negative stats lease indicates that it is in test, it does not need update.
Expand Down
5 changes: 1 addition & 4 deletions server/statistics_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/sqlexec"
)

// StatsHandler is the handler for dumping statistics.
Expand Down Expand Up @@ -122,9 +121,7 @@ func (sh StatsHistoryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request
writeError(w, err)
return
}
se.GetSessionVars().SnapshotInfoschema, se.GetSessionVars().SnapshotTS = is, snapshot
historyStatsExec := se.(sqlexec.RestrictedSQLExecutor)
js, err := h.DumpStatsToJSON(params[pDBName], tbl.Meta(), historyStatsExec)
js, err := h.DumpStatsToJSONBySnapshot(params[pDBName], tbl.Meta(), snapshot)
if err != nil {
writeError(w, err)
} else {
Expand Down
44 changes: 18 additions & 26 deletions statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,16 @@ func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache *statsCache

func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (statsCache, error) {
sql := "select HIGH_PRIORITY version, table_id, modify_count, count from mysql.stats_meta"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if len(rc) > 0 {
defer terror.Call(rc[0].Close)
}
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql)
if err != nil {
return statsCache{}, errors.Trace(err)
}
defer terror.Call(rc.Close)
tables := statsCache{tables: make(map[int64]*statistics.Table)}
req := rc[0].NewChunk()
req := rc.NewChunk()
iter := chunk.NewIterator4Chunk(req)
for {
err := rc[0].Next(context.TODO(), req)
err := rc.Next(context.TODO(), req)
if err != nil {
return statsCache{}, errors.Trace(err)
}
Expand Down Expand Up @@ -147,17 +145,15 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache *stat

func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache *statsCache) error {
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if len(rc) > 0 {
defer terror.Call(rc[0].Close)
}
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql)
if err != nil {
return errors.Trace(err)
}
req := rc[0].NewChunk()
defer terror.Call(rc.Close)
req := rc.NewChunk()
iter := chunk.NewIterator4Chunk(req)
for {
err := rc[0].Next(context.TODO(), req)
err := rc.Next(context.TODO(), req)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -187,17 +183,15 @@ func (h *Handle) initStatsTopN4Chunk(cache *statsCache, iter *chunk.Iterator4Chu

func (h *Handle) initStatsTopN(cache *statsCache) error {
sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if len(rc) > 0 {
defer terror.Call(rc[0].Close)
}
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql)
if err != nil {
return errors.Trace(err)
}
req := rc[0].NewChunk()
defer terror.Call(rc.Close)
req := rc.NewChunk()
iter := chunk.NewIterator4Chunk(req)
for {
err := rc[0].Next(context.TODO(), req)
err := rc.Next(context.TODO(), req)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -257,17 +251,15 @@ func initStatsBuckets4Chunk(ctx sessionctx.Context, cache *statsCache, iter *chu

func (h *Handle) initStatsBuckets(cache *statsCache) error {
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if len(rc) > 0 {
defer terror.Call(rc[0].Close)
}
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), sql)
if err != nil {
return errors.Trace(err)
}
req := rc[0].NewChunk()
defer terror.Call(rc.Close)
req := rc.NewChunk()
iter := chunk.NewIterator4Chunk(req)
for {
err := rc[0].Next(context.TODO(), req)
err := rc.Next(context.TODO(), req)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -300,13 +292,13 @@ func (h *Handle) initStatsBuckets(cache *statsCache) error {
func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
h.mu.Lock()
defer func() {
_, err1 := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "commit")
_, err1 := h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), "commit")
if err == nil && err1 != nil {
err = err1
}
h.mu.Unlock()
}()
_, err = h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "begin")
_, err = h.mu.ctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), "begin")
if err != nil {
return err
}
Expand Down
70 changes: 34 additions & 36 deletions statistics/handle/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package handle

import (
"context"
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -75,28 +74,34 @@ func (h *Handle) DDLEventCh() chan *util.Event {
func (h *Handle) insertTableStats2KV(info *model.TableInfo, physicalID int64) (err error) {
h.mu.Lock()
defer h.mu.Unlock()
ctx := context.Background()
exec := h.mu.ctx.(sqlexec.SQLExecutor)
_, err = exec.Execute(context.Background(), "begin")
_, err = exec.ExecuteInternal(ctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(context.Background(), exec, err)
err = finishTransaction(ctx, exec, err)
}()
txn, err := h.mu.ctx.Txn(true)
if err != nil {
return errors.Trace(err)
}
startTS := txn.StartTS()
sqls := make([]string, 0, 1+len(info.Columns)+len(info.Indices))
sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_meta (version, table_id) values(%d, %d)", startTS, physicalID))
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_meta (version, table_id) values(%?, %?)", startTS, physicalID); err != nil {
return err
}
for _, col := range info.Columns {
sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 0, %d, 0, %d)", physicalID, col.ID, startTS))
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 0, %?, 0, %?)", physicalID, col.ID, startTS); err != nil {
return err
}
}
for _, idx := range info.Indices {
sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 1, %d, 0, %d)", physicalID, idx.ID, startTS))
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%?, 1, %?, 0, %?)", physicalID, idx.ID, startTS); err != nil {
return err
}
}
return execSQLs(context.Background(), exec, sqls)
return nil
}

// insertColStats2KV insert a record to stats_histograms with distinct_count 1 and insert a bucket to stats_buckets with default value.
Expand All @@ -105,38 +110,36 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfo *model.ColumnInfo)
h.mu.Lock()
defer h.mu.Unlock()

ctx := context.TODO()
exec := h.mu.ctx.(sqlexec.SQLExecutor)
_, err = exec.Execute(context.Background(), "begin")
_, err = exec.ExecuteInternal(ctx, "begin")
if err != nil {
return errors.Trace(err)
}
defer func() {
err = finishTransaction(context.Background(), exec, err)
err = finishTransaction(ctx, exec, err)
}()
txn, err := h.mu.ctx.Txn(true)
if err != nil {
return errors.Trace(err)
}
startTS := txn.StartTS()
// First of all, we update the version.
_, err = exec.Execute(context.Background(), fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", startTS, physicalID))
_, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %? where table_id = %?", startTS, physicalID)
if err != nil {
return
}
ctx := context.TODO()
// If we didn't update anything by last SQL, it means the stats of this table does not exist.
if h.mu.ctx.GetSessionVars().StmtCtx.AffectedRows() > 0 {
// By this step we can get the count of this table, then we can sure the count and repeats of bucket.
var rs []sqlexec.RecordSet
rs, err = exec.Execute(ctx, fmt.Sprintf("select count from mysql.stats_meta where table_id = %d", physicalID))
if len(rs) > 0 {
defer terror.Call(rs[0].Close)
}
var rs sqlexec.RecordSet
rs, err = exec.ExecuteInternal(ctx, "select count from mysql.stats_meta where table_id = %?", physicalID)
if err != nil {
return
}
req := rs[0].NewChunk()
err = rs[0].Next(ctx, req)
defer terror.Call(rs.Close)
req := rs.NewChunk()
err = rs.Next(ctx, req)
if err != nil {
return
}
Expand All @@ -146,42 +149,37 @@ func (h *Handle) insertColStats2KV(physicalID int64, colInfo *model.ColumnInfo)
if err != nil {
return
}
sqls := make([]string, 0, 1)
if value.IsNull() {
// If the adding column has default value null, all the existing rows have null value on the newly added column.
sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%d, %d, 0, %d, 0, %d)", startTS, physicalID, colInfo.ID, count))
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%?, %?, 0, %?, 0, %?)", startTS, physicalID, colInfo.ID, count); err != nil {
return err
}
} else {
// If this stats exists, we insert histogram meta first, the distinct_count will always be one.
sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%d, %d, 0, %d, 1, %d)", startTS, physicalID, colInfo.ID, int64(len(value.GetBytes()))*count))
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%?, %?, 0, %?, 1, %?)", startTS, physicalID, colInfo.ID, int64(len(value.GetBytes()))*count); err != nil {
return err
}

value, err = value.ConvertTo(h.mu.ctx.GetSessionVars().StmtCtx, types.NewFieldType(mysql.TypeBlob))
if err != nil {
return
}
// There must be only one bucket for this new column and the value is the default value.
sqls = append(sqls, fmt.Sprintf("insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%d, 0, %d, 0, %d, %d, X'%X', X'%X')", physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes()))
if _, err := exec.ExecuteInternal(ctx, "insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, repeats, count, lower_bound, upper_bound) values (%?, 0, %?, 0, %?, %?, %?, %?)", physicalID, colInfo.ID, count, count, value.GetBytes(), value.GetBytes()); err != nil {
return err
}
}
return execSQLs(context.Background(), exec, sqls)
}
return
}

// finishTransaction will execute `commit` when error is nil, otherwise `rollback`.
func finishTransaction(ctx context.Context, exec sqlexec.SQLExecutor, err error) error {
if err == nil {
_, err = exec.Execute(ctx, "commit")
_, err = exec.ExecuteInternal(ctx, "commit")
} else {
_, err1 := exec.Execute(ctx, "rollback")
_, err1 := exec.ExecuteInternal(ctx, "rollback")
terror.Log(errors.Trace(err1))
}
return errors.Trace(err)
}

func execSQLs(ctx context.Context, exec sqlexec.SQLExecutor, sqls []string) error {
for _, sql := range sqls {
_, err := exec.Execute(ctx, sql)
if err != nil {
return err
}
}
return nil
}
21 changes: 16 additions & 5 deletions statistics/handle/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -63,17 +64,27 @@ func dumpJSONCol(hist *statistics.Histogram, CMSketch *statistics.CMSketch) *jso

// DumpStatsToJSON dumps statistic to json.
func (h *Handle) DumpStatsToJSON(dbName string, tableInfo *model.TableInfo, historyStatsExec sqlexec.RestrictedSQLExecutor) (*JSONTable, error) {
var snapshot uint64
if historyStatsExec != nil {
sctx := historyStatsExec.(sessionctx.Context)
snapshot = sctx.GetSessionVars().SnapshotTS
}
return h.DumpStatsToJSONBySnapshot(dbName, tableInfo, snapshot)
}

// DumpStatsToJSONBySnapshot dumps statistic to json.
func (h *Handle) DumpStatsToJSONBySnapshot(dbName string, tableInfo *model.TableInfo, snapshot uint64) (*JSONTable, error) {
pi := tableInfo.GetPartitionInfo()
if pi == nil {
return h.tableStatsToJSON(dbName, tableInfo, tableInfo.ID, historyStatsExec)
return h.tableStatsToJSON(dbName, tableInfo, tableInfo.ID, snapshot)
}
jsonTbl := &JSONTable{
DatabaseName: dbName,
TableName: tableInfo.Name.L,
Partitions: make(map[string]*JSONTable, len(pi.Definitions)),
}
for _, def := range pi.Definitions {
tbl, err := h.tableStatsToJSON(dbName, tableInfo, def.ID, historyStatsExec)
tbl, err := h.tableStatsToJSON(dbName, tableInfo, def.ID, snapshot)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -85,12 +96,12 @@ func (h *Handle) DumpStatsToJSON(dbName string, tableInfo *model.TableInfo, hist
return jsonTbl, nil
}

func (h *Handle) tableStatsToJSON(dbName string, tableInfo *model.TableInfo, physicalID int64, historyStatsExec sqlexec.RestrictedSQLExecutor) (*JSONTable, error) {
tbl, err := h.tableStatsFromStorage(tableInfo, physicalID, true, historyStatsExec)
func (h *Handle) tableStatsToJSON(dbName string, tableInfo *model.TableInfo, physicalID int64, snapshot uint64) (*JSONTable, error) {
tbl, err := h.tableStatsFromStorage(tableInfo, physicalID, true, snapshot)
if err != nil || tbl == nil {
return nil, err
}
tbl.Version, tbl.ModifyCount, tbl.Count, err = h.statsMetaByTableIDFromStorage(physicalID, historyStatsExec)
tbl.Version, tbl.ModifyCount, tbl.Count, err = h.statsMetaByTableIDFromStorage(physicalID, snapshot)
if err != nil {
return nil, err
}
Expand Down
Loading