Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: move statsReader out of statistics/handle #40790

Merged
merged 9 commits into from
Jan 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions statistics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"fmsketch.go",
"histogram.go",
"index.go",
"interact_with_storage.go",
"merge_worker.go",
"row_sampler.go",
"sample.go",
Expand Down
8 changes: 4 additions & 4 deletions statistics/handle/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (h *Handle) tableHistoricalStatsToJSON(physicalID int64, snapshot uint64) (
}()

// get meta version
rows, _, err := reader.read("select distinct version from mysql.stats_meta_history where table_id = %? and version <= %? order by version desc limit 1", physicalID, snapshot)
rows, _, err := reader.Read("select distinct version from mysql.stats_meta_history where table_id = %? and version <= %? order by version desc limit 1", physicalID, snapshot)
if err != nil {
return nil, errors.AddStack(err)
}
Expand All @@ -272,14 +272,14 @@ func (h *Handle) tableHistoricalStatsToJSON(physicalID int64, snapshot uint64) (
}
statsMetaVersion := rows[0].GetInt64(0)
// get stats meta
rows, _, err = reader.read("select modify_count, count from mysql.stats_meta_history where table_id = %? and version = %?", physicalID, statsMetaVersion)
rows, _, err = reader.Read("select modify_count, count from mysql.stats_meta_history where table_id = %? and version = %?", physicalID, statsMetaVersion)
if err != nil {
return nil, errors.AddStack(err)
}
modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1)

// get stats version
rows, _, err = reader.read("select distinct version from mysql.stats_history where table_id = %? and version <= %? order by version desc limit 1", physicalID, snapshot)
rows, _, err = reader.Read("select distinct version from mysql.stats_history where table_id = %? and version <= %? order by version desc limit 1", physicalID, snapshot)
if err != nil {
return nil, errors.AddStack(err)
}
Expand All @@ -289,7 +289,7 @@ func (h *Handle) tableHistoricalStatsToJSON(physicalID int64, snapshot uint64) (
statsVersion := rows[0].GetInt64(0)

// get stats
rows, _, err = reader.read("select stats_data from mysql.stats_history where table_id = %? and version = %? order by seq_no", physicalID, statsVersion)
rows, _, err = reader.Read("select stats_data from mysql.stats_history where table_id = %? and version = %? order by seq_no", physicalID, statsVersion)
if err != nil {
return nil, errors.AddStack(err)
}
Expand Down
105 changes: 27 additions & 78 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -1067,7 +1067,7 @@ func (h *Handle) LoadNeededHistograms() (err error) {
return nil
}

func (h *Handle) loadNeededColumnHistograms(reader *statsReader, col model.TableItemID, loadFMSketch bool) (err error) {
func (h *Handle) loadNeededColumnHistograms(reader *statistics.StatsReader, col model.TableItemID, loadFMSketch bool) (err error) {
oldCache := h.statsCache.Load().(statsCache)
tbl, ok := oldCache.Get(col.TableID)
if !ok {
Expand All @@ -1093,7 +1093,7 @@ func (h *Handle) loadNeededColumnHistograms(reader *statsReader, col model.Table
return errors.Trace(err)
}
}
rows, _, err := reader.read("select stats_ver from mysql.stats_histograms where is_index = 0 and table_id = %? and hist_id = %?", col.TableID, col.ID)
rows, _, err := reader.Read("select stats_ver from mysql.stats_histograms where is_index = 0 and table_id = %? and hist_id = %?", col.TableID, col.ID)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1134,7 +1134,7 @@ func (h *Handle) loadNeededColumnHistograms(reader *statsReader, col model.Table
return nil
}

func (h *Handle) loadNeededIndexHistograms(reader *statsReader, idx model.TableItemID, loadFMSketch bool) (err error) {
func (h *Handle) loadNeededIndexHistograms(reader *statistics.StatsReader, idx model.TableItemID, loadFMSketch bool) (err error) {
oldCache := h.statsCache.Load().(statsCache)
tbl, ok := oldCache.Get(idx.TableID)
if !ok {
Expand All @@ -1160,7 +1160,7 @@ func (h *Handle) loadNeededIndexHistograms(reader *statsReader, idx model.TableI
return errors.Trace(err)
}
}
rows, _, err := reader.read("select stats_ver from mysql.stats_histograms where is_index = 1 and table_id = %? and hist_id = %?", idx.TableID, idx.ID)
rows, _, err := reader.Read("select stats_ver from mysql.stats_histograms where is_index = 1 and table_id = %? and hist_id = %?", idx.TableID, idx.ID)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1214,12 +1214,12 @@ func (h *Handle) FlushStats() {
}
}

func (h *Handle) cmSketchAndTopNFromStorage(reader *statsReader, tblID int64, isIndex, histID int64) (_ *statistics.CMSketch, _ *statistics.TopN, err error) {
topNRows, _, err := reader.read("select HIGH_PRIORITY value, count from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID)
func (h *Handle) cmSketchAndTopNFromStorage(reader *statistics.StatsReader, tblID int64, isIndex, histID int64) (_ *statistics.CMSketch, _ *statistics.TopN, err error) {
topNRows, _, err := reader.Read("select HIGH_PRIORITY value, count from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID)
if err != nil {
return nil, nil, err
}
rows, _, err := reader.read("select cm_sketch from mysql.stats_histograms where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID)
rows, _, err := reader.Read("select cm_sketch from mysql.stats_histograms where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID)
if err != nil {
return nil, nil, err
}
Expand All @@ -1229,15 +1229,15 @@ func (h *Handle) cmSketchAndTopNFromStorage(reader *statsReader, tblID int64, is
return statistics.DecodeCMSketchAndTopN(rows[0].GetBytes(0), topNRows)
}

func (h *Handle) fmSketchFromStorage(reader *statsReader, tblID int64, isIndex, histID int64) (_ *statistics.FMSketch, err error) {
rows, _, err := reader.read("select value from mysql.stats_fm_sketch where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID)
func (h *Handle) fmSketchFromStorage(reader *statistics.StatsReader, tblID int64, isIndex, histID int64) (_ *statistics.FMSketch, err error) {
rows, _, err := reader.Read("select value from mysql.stats_fm_sketch where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID)
if err != nil || len(rows) == 0 {
return nil, err
}
return statistics.DecodeFMSketch(rows[0].GetBytes(0))
}

func (h *Handle) indexStatsFromStorage(reader *statsReader, row chunk.Row, table *statistics.Table, tableInfo *model.TableInfo) error {
func (h *Handle) indexStatsFromStorage(reader *statistics.StatsReader, row chunk.Row, table *statistics.Table, tableInfo *model.TableInfo) error {
histID := row.GetInt64(2)
distinct := row.GetInt64(3)
histVer := row.GetUint64(4)
Expand All @@ -1247,7 +1247,7 @@ func (h *Handle) indexStatsFromStorage(reader *statsReader, row chunk.Row, table
errorRate := statistics.ErrorRate{}
flag := row.GetInt64(8)
lastAnalyzePos := row.GetDatum(10, types.NewFieldType(mysql.TypeBlob))
if statistics.IsAnalyzed(flag) && !reader.isHistory() {
if statistics.IsAnalyzed(flag) && !reader.IsHistory() {
h.mu.rateMap.clear(table.PhysicalID, histID, true)
} else if idx != nil {
errorRate = idx.ErrorRate
Expand Down Expand Up @@ -1295,7 +1295,7 @@ func (h *Handle) indexStatsFromStorage(reader *statsReader, row chunk.Row, table
return nil
}

func (h *Handle) columnStatsFromStorage(reader *statsReader, row chunk.Row, table *statistics.Table, tableInfo *model.TableInfo, loadAll bool) error {
func (h *Handle) columnStatsFromStorage(reader *statistics.StatsReader, row chunk.Row, table *statistics.Table, tableInfo *model.TableInfo, loadAll bool) error {
histID := row.GetInt64(2)
distinct := row.GetInt64(3)
histVer := row.GetUint64(4)
Expand All @@ -1307,7 +1307,7 @@ func (h *Handle) columnStatsFromStorage(reader *statsReader, row chunk.Row, tabl
col := table.Columns[histID]
errorRate := statistics.ErrorRate{}
flag := row.GetInt64(8)
if statistics.IsAnalyzed(flag) && !reader.isHistory() {
if statistics.IsAnalyzed(flag) && !reader.IsHistory() {
h.mu.rateMap.clear(table.PhysicalID, histID, false)
} else if col != nil {
errorRate = col.ErrorRate
Expand Down Expand Up @@ -1439,14 +1439,14 @@ func (h *Handle) TableStatsFromStorage(tableInfo *model.TableInfo, physicalID in
}
table.Pseudo = false

rows, _, err := reader.read("select modify_count, count from mysql.stats_meta where table_id = %?", physicalID)
rows, _, err := reader.Read("select modify_count, count from mysql.stats_meta where table_id = %?", physicalID)
if err != nil || len(rows) == 0 {
return nil, err
}
table.ModifyCount = rows[0].GetInt64(0)
table.Count = rows[0].GetInt64(1)

rows, _, err = reader.read("select table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, flag, correlation, last_analyze_pos from mysql.stats_histograms where table_id = %?", physicalID)
rows, _, err = reader.Read("select table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, flag, correlation, last_analyze_pos from mysql.stats_histograms where table_id = %?", physicalID)
// Check deleted table.
if err != nil || len(rows) == 0 {
return nil, nil
Expand All @@ -1464,7 +1464,7 @@ func (h *Handle) TableStatsFromStorage(tableInfo *model.TableInfo, physicalID in
return h.extendedStatsFromStorage(reader, table, physicalID, loadAll)
}

func (h *Handle) extendedStatsFromStorage(reader *statsReader, table *statistics.Table, physicalID int64, loadAll bool) (*statistics.Table, error) {
func (h *Handle) extendedStatsFromStorage(reader *statistics.StatsReader, table *statistics.Table, physicalID int64, loadAll bool) (*statistics.Table, error) {
failpoint.Inject("injectExtStatsLoadErr", func() {
failpoint.Return(nil, errors.New("gofail extendedStatsFromStorage error"))
})
Expand All @@ -1474,7 +1474,7 @@ func (h *Handle) extendedStatsFromStorage(reader *statsReader, table *statistics
} else {
table.ExtendedStats = statistics.NewExtendedStatsColl()
}
rows, _, err := reader.read("select name, status, type, column_ids, stats, version from mysql.stats_extended where table_id = %? and status in (%?, %?, %?) and version > %?", physicalID, StatsStatusInited, StatsStatusAnalyzed, StatsStatusDeleted, lastVersion)
rows, _, err := reader.Read("select name, status, type, column_ids, stats, version from mysql.stats_extended where table_id = %? and status in (%?, %?, %?) and version > %?", physicalID, StatsStatusInited, StatsStatusAnalyzed, StatsStatusDeleted, lastVersion)
if err != nil || len(rows) == 0 {
return table, nil
}
Expand Down Expand Up @@ -1525,7 +1525,7 @@ func (h *Handle) StatsMetaCountAndModifyCount(tableID int64) (int64, int64, erro
err = err1
}
}()
rows, _, err := reader.read("select count, modify_count from mysql.stats_meta where table_id = %?", tableID)
rows, _, err := reader.Read("select count, modify_count from mysql.stats_meta where table_id = %?", tableID)
if err != nil {
return 0, 0, err
}
Expand Down Expand Up @@ -1913,8 +1913,8 @@ func (h *Handle) SaveMetaToStorage(tableID, count, modifyCount int64, source str
return err
}

func (h *Handle) histogramFromStorage(reader *statsReader, tableID int64, colID int64, tp *types.FieldType, distinct int64, isIndex int, ver uint64, nullCount int64, totColSize int64, corr float64) (_ *statistics.Histogram, err error) {
rows, fields, err := reader.read("select count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %? order by bucket_id", tableID, isIndex, colID)
func (h *Handle) histogramFromStorage(reader *statistics.StatsReader, tableID int64, colID int64, tp *types.FieldType, distinct int64, isIndex int, ver uint64, nullCount int64, totColSize int64, corr float64) (_ *statistics.Histogram, err error) {
rows, fields, err := reader.Read("select count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %? order by bucket_id", tableID, isIndex, colID)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -1961,9 +1961,9 @@ func (h *Handle) histogramFromStorage(reader *statsReader, tableID int64, colID
return hg, nil
}

func (h *Handle) columnCountFromStorage(reader *statsReader, tableID, colID, statsVer int64) (int64, error) {
func (h *Handle) columnCountFromStorage(reader *statistics.StatsReader, tableID, colID, statsVer int64) (int64, error) {
count := int64(0)
rows, _, err := reader.read("select sum(count) from mysql.stats_buckets where table_id = %? and is_index = 0 and hist_id = %?", tableID, colID)
rows, _, err := reader.Read("select sum(count) from mysql.stats_buckets where table_id = %? and is_index = 0 and hist_id = %?", tableID, colID)
if err != nil {
return 0, errors.Trace(err)
}
Expand All @@ -1979,7 +1979,7 @@ func (h *Handle) columnCountFromStorage(reader *statsReader, tableID, colID, sta
// Before stats ver 2, histogram represents all data in this column.
// In stats ver 2, histogram + TopN represent all data in this column.
// So we need to add TopN total count here.
rows, _, err = reader.read("select sum(count) from mysql.stats_top_n where table_id = %? and is_index = 0 and hist_id = %?", tableID, colID)
rows, _, err = reader.Read("select sum(count) from mysql.stats_top_n where table_id = %? and is_index = 0 and hist_id = %?", tableID, colID)
if err != nil {
return 0, errors.Trace(err)
}
Expand Down Expand Up @@ -2014,26 +2014,7 @@ func (h *Handle) statsMetaByTableIDFromStorage(tableID int64, snapshot uint64) (
return
}

// statsReader is used for simplify code that needs to read system tables in different sqls
// but requires the same transactions.
type statsReader struct {
ctx sqlexec.RestrictedSQLExecutor
snapshot uint64
}

func (sr *statsReader) read(sql string, args ...interface{}) (rows []chunk.Row, fields []*ast.ResultField, err error) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
if sr.snapshot > 0 {
return sr.ctx.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseSessionPool, sqlexec.ExecOptionWithSnapshot(sr.snapshot)}, sql, args...)
}
return sr.ctx.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, sql, args...)
}

func (sr *statsReader) isHistory() bool {
return sr.snapshot > 0
}

func (h *Handle) getGlobalStatsReader(snapshot uint64) (reader *statsReader, err error) {
func (h *Handle) getGlobalStatsReader(snapshot uint64) (reader *statistics.StatsReader, err error) {
h.mu.Lock()
defer func() {
if r := recover(); r != nil {
Expand All @@ -2043,44 +2024,12 @@ func (h *Handle) getGlobalStatsReader(snapshot uint64) (reader *statsReader, err
h.mu.Unlock()
}
}()
return h.getStatsReader(snapshot, h.mu.ctx.(sqlexec.RestrictedSQLExecutor))
return statistics.GetStatsReader(snapshot, h.mu.ctx.(sqlexec.RestrictedSQLExecutor))
}

func (h *Handle) releaseGlobalStatsReader(reader *statsReader) error {
func (h *Handle) releaseGlobalStatsReader(reader *statistics.StatsReader) error {
defer h.mu.Unlock()
return h.releaseStatsReader(reader, h.mu.ctx.(sqlexec.RestrictedSQLExecutor))
}

func (h *Handle) getStatsReader(snapshot uint64, exec sqlexec.RestrictedSQLExecutor) (reader *statsReader, err error) {
failpoint.Inject("mockGetStatsReaderFail", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil, errors.New("gofail genStatsReader error"))
}
})
if snapshot > 0 {
return &statsReader{ctx: exec, snapshot: snapshot}, nil
}
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("getStatsReader panic %v", r)
}
}()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
failpoint.Inject("mockGetStatsReaderPanic", nil)
_, err = exec.(sqlexec.SQLExecutor).ExecuteInternal(ctx, "begin")
if err != nil {
return nil, err
}
return &statsReader{ctx: exec}, nil
}

func (h *Handle) releaseStatsReader(reader *statsReader, exec sqlexec.RestrictedSQLExecutor) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function asks that reader.ctx and exec must be the same one, which is error-prone. So I changed the function to func ReleaseStatsReader(reader *StatsReader) error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussing with @qw4990, it is better to rename releaseStatsReader to (*StatsReader).Close, which is typical golang style.

if reader.snapshot > 0 {
return nil
}
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
_, err := exec.(sqlexec.SQLExecutor).ExecuteInternal(ctx, "commit")
return err
return reader.Close()
}

const (
Expand Down
12 changes: 6 additions & 6 deletions statistics/handle/handle_hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ var errExit = errors.New("Stop loading since domain is closed")

// StatsReaderContext exported for testing
type StatsReaderContext struct {
reader *statsReader
reader *statistics.StatsReader
createdTime time.Time
}

Expand All @@ -188,7 +188,7 @@ func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitW
exitWg.Done()
logutil.BgLogger().Info("SubLoadWorker exited.")
if readerCtx.reader != nil {
err := h.releaseStatsReader(readerCtx.reader, ctx.(sqlexec.RestrictedSQLExecutor))
err := readerCtx.reader.Close()
if err != nil {
logutil.BgLogger().Error("Fail to release stats loader: ", zap.Error(err))
}
Expand Down Expand Up @@ -295,13 +295,13 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC
func (h *Handle) loadFreshStatsReader(readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor) {
if readerCtx.reader == nil || readerCtx.createdTime.Add(h.Lease()).Before(time.Now()) {
if readerCtx.reader != nil {
err := h.releaseStatsReader(readerCtx.reader, ctx)
err := readerCtx.reader.Close()
if err != nil {
logutil.BgLogger().Warn("Fail to release stats loader: ", zap.Error(err))
}
}
for {
newReader, err := h.getStatsReader(0, ctx)
newReader, err := statistics.GetStatsReader(0, ctx)
if err != nil {
logutil.BgLogger().Error("Fail to new stats loader, retry after a while.", zap.Error(err))
time.Sleep(h.Lease() / 10)
Expand All @@ -317,7 +317,7 @@ func (h *Handle) loadFreshStatsReader(readerCtx *StatsReaderContext, ctx sqlexec
}

// readStatsForOneItem reads hist for one column/index, TODO load data via kv-get asynchronously
func (h *Handle) readStatsForOneItem(item model.TableItemID, w *statsWrapper, reader *statsReader) (*statsWrapper, error) {
func (h *Handle) readStatsForOneItem(item model.TableItemID, w *statsWrapper, reader *statistics.StatsReader) (*statsWrapper, error) {
failpoint.Inject("mockReadStatsForOnePanic", nil)
failpoint.Inject("mockReadStatsForOneFail", func(val failpoint.Value) {
if val.(bool) {
Expand Down Expand Up @@ -357,7 +357,7 @@ func (h *Handle) readStatsForOneItem(item model.TableItemID, w *statsWrapper, re
return nil, errors.Trace(err)
}
}
rows, _, err := reader.read("select stats_ver from mysql.stats_histograms where table_id = %? and hist_id = %? and is_index = %?", item.TableID, item.ID, int(isIndexFlag))
rows, _, err := reader.Read("select stats_ver from mysql.stats_histograms where table_id = %? and hist_id = %? and is_index = %?", item.TableID, item.ID, int(isIndexFlag))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
8 changes: 4 additions & 4 deletions statistics/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,16 +622,16 @@ func TestLoadStats(t *testing.T) {
require.True(t, idx.IsFullLoad())

// Following test tests whether the LoadNeededHistograms would panic.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/statistics/handle/mockGetStatsReaderFail", `return(true)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/statistics/mockGetStatsReaderFail", `return(true)`))
err = h.LoadNeededHistograms()
require.Error(t, err)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/statistics/handle/mockGetStatsReaderFail"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/statistics/mockGetStatsReaderFail"))

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/statistics/handle/mockGetStatsReaderPanic", "panic"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/statistics/mockGetStatsReaderPanic", "panic"))
err = h.LoadNeededHistograms()
require.Error(t, err)
require.Regexp(t, ".*getStatsReader panic.*", err.Error())
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/statistics/handle/mockGetStatsReaderPanic"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/statistics/mockGetStatsReaderPanic"))
err = h.LoadNeededHistograms()
require.NoError(t, err)
}
Expand Down
Loading