Skip to content

Commit

Permalink
planner, stats: assign high priority for sync load related internal S…
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros authored Jun 4, 2024
1 parent 9c1a45a commit a44d409
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 28 deletions.
3 changes: 0 additions & 3 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2297,9 +2297,6 @@ func quitStatsOwner(do *Domain, mgr owner.Manager) {
func (do *Domain) StartLoadStatsSubWorkers(ctxList []sessionctx.Context) {
statsHandle := do.StatsHandle()
for _, ctx := range ctxList {
// The sync load will affect how optimizer choose the plan.
// We need to assign high priority to it so that we can get the stats as quick as we can.
ctx.GetSessionVars().StmtCtx.Priority = mysql.HighPriority
do.wg.Add(1)
go statsHandle.SubLoadWorker(ctx, do.exit, do.wg)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/statistics/handle/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
deps = [
"//pkg/config",
"//pkg/infoschema",
"//pkg/kv",
"//pkg/parser/ast",
"//pkg/parser/model",
"//pkg/parser/mysql",
Expand Down
3 changes: 2 additions & 1 deletion pkg/statistics/handle/storage/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ func TestDumpCMSketchWithTopN(t *testing.T) {
testKit.MustExec("use test")
testKit.MustExec("create table t(a int)")
testKit.MustExec("insert into t values (1),(3),(4),(2),(5)")
testKit.MustExec("set @@tidb_analyze_version=1")
testKit.MustExec("analyze table t")

is := dom.InfoSchema()
Expand All @@ -360,7 +361,7 @@ func TestDumpCMSketchWithTopN(t *testing.T) {
cms, _, _, _ := statistics.NewCMSketchAndTopN(5, 2048, fakeData, 20, 100)

stat := h.GetTableStats(tableInfo)
err = h.SaveStatsToStorage(tableInfo.ID, 1, 0, 0, &stat.Columns[tableInfo.Columns[0].ID].Histogram, cms, nil, statistics.Version2, 1, false, handleutil.StatsMetaHistorySourceLoadStats)
err = h.SaveStatsToStorage(tableInfo.ID, 1, 0, 0, &stat.Columns[tableInfo.Columns[0].ID].Histogram, cms, nil, statistics.Version1, 1, false, handleutil.StatsMetaHistorySourceLoadStats)
require.NoError(t, err)
require.Nil(t, h.Update(is))

Expand Down
65 changes: 45 additions & 20 deletions pkg/statistics/handle/storage/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
Expand Down Expand Up @@ -53,8 +54,8 @@ func StatsMetaCountAndModifyCount(sctx sessionctx.Context, tableID int64) (count
return count, modifyCount, false, nil
}

// HistMetaFromStorage reads the meta info of the histogram from the storage.
func HistMetaFromStorage(sctx sessionctx.Context, item *model.TableItemID, possibleColInfo *model.ColumnInfo) (*statistics.Histogram, *types.Datum, int64, int64, error) {
// HistMetaFromStorageWithHighPriority reads the meta info of the histogram from the storage.
func HistMetaFromStorageWithHighPriority(sctx sessionctx.Context, item *model.TableItemID, possibleColInfo *model.ColumnInfo) (*statistics.Histogram, *types.Datum, int64, int64, error) {
isIndex := 0
var tp *types.FieldType
if item.IsIndex {
Expand All @@ -64,7 +65,7 @@ func HistMetaFromStorage(sctx sessionctx.Context, item *model.TableItemID, possi
tp = &possibleColInfo.FieldType
}
rows, _, err := util.ExecRows(sctx,
"select distinct_count, version, null_count, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms where table_id = %? and hist_id = %? and is_index = %?",
"select high_priority distinct_count, version, null_count, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms where table_id = %? and hist_id = %? and is_index = %?",
item.TableID,
item.ID,
isIndex,
Expand All @@ -81,9 +82,29 @@ func HistMetaFromStorage(sctx sessionctx.Context, item *model.TableItemID, possi
return hist, &lastPos, rows[0].GetInt64(4), rows[0].GetInt64(6), nil
}

// HistogramFromStorage reads histogram from storage.
func HistogramFromStorage(sctx sessionctx.Context, tableID int64, colID int64, tp *types.FieldType, distinct int64, isIndex int, ver uint64, nullCount int64, totColSize int64, corr float64) (_ *statistics.Histogram, err error) {
rows, fields, err := util.ExecRows(sctx, "select count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %? order by bucket_id", tableID, isIndex, colID)
// HistogramFromStorageWithPriority wraps the HistogramFromStorage with the given kv.Priority.
// Sync load and async load will use high priority to get data.
func HistogramFromStorageWithPriority(
sctx sessionctx.Context,
tableID int64,
colID int64,
tp *types.FieldType,
distinct int64,
isIndex int,
ver uint64,
nullCount int64,
totColSize int64,
corr float64,
priority int,
) (*statistics.Histogram, error) {
selectPrefix := "select "
switch priority {
case kv.PriorityHigh:
selectPrefix += "high_priority "
case kv.PriorityLow:
selectPrefix += "low_priority "
}
rows, fields, err := util.ExecRows(sctx, selectPrefix+"count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %? order by bucket_id", tableID, isIndex, colID)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -127,12 +148,16 @@ func HistogramFromStorage(sctx sessionctx.Context, tableID int64, colID int64, t
return hg, nil
}

// CMSketchAndTopNFromStorage reads CMSketch and TopN from storage.
func CMSketchAndTopNFromStorage(sctx sessionctx.Context, tblID int64, isIndex, histID int64) (_ *statistics.CMSketch, _ *statistics.TopN, err error) {
// CMSketchAndTopNFromStorageWithHighPriority reads CMSketch and TopN from storage.
func CMSketchAndTopNFromStorageWithHighPriority(sctx sessionctx.Context, tblID int64, isIndex, histID, statsVer int64) (_ *statistics.CMSketch, _ *statistics.TopN, err error) {
topNRows, _, err := util.ExecRows(sctx, "select HIGH_PRIORITY value, count from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID)
if err != nil {
return nil, nil, err
}
// If we are on version higher than 1. Don't read Count-Min Sketch.
if statsVer > statistics.Version1 {
return statistics.DecodeCMSketchAndTopN(nil, topNRows)
}
rows, _, err := util.ExecRows(sctx, "select cm_sketch from mysql.stats_histograms where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -294,11 +319,11 @@ func indexStatsFromStorage(sctx sessionctx.Context, row chunk.Row, table *statis
break
}
if idx == nil || idx.LastUpdateVersion < histVer || loadAll {
hg, err := HistogramFromStorage(sctx, table.PhysicalID, histID, types.NewFieldType(mysql.TypeBlob), distinct, 1, histVer, nullCount, 0, 0)
hg, err := HistogramFromStorageWithPriority(sctx, table.PhysicalID, histID, types.NewFieldType(mysql.TypeBlob), distinct, 1, histVer, nullCount, 0, 0, kv.PriorityNormal)
if err != nil {
return errors.Trace(err)
}
cms, topN, err := CMSketchAndTopNFromStorage(sctx, table.PhysicalID, 1, idxInfo.ID)
cms, topN, err := CMSketchAndTopNFromStorageWithHighPriority(sctx, table.PhysicalID, 1, idxInfo.ID, statsVer)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -404,11 +429,11 @@ func columnStatsFromStorage(sctx sessionctx.Context, row chunk.Row, table *stati
break
}
if col == nil || col.LastUpdateVersion < histVer || loadAll {
hg, err := HistogramFromStorage(sctx, table.PhysicalID, histID, &colInfo.FieldType, distinct, 0, histVer, nullCount, totColSize, correlation)
hg, err := HistogramFromStorageWithPriority(sctx, table.PhysicalID, histID, &colInfo.FieldType, distinct, 0, histVer, nullCount, totColSize, correlation, kv.PriorityNormal)
if err != nil {
return errors.Trace(err)
}
cms, topN, err := CMSketchAndTopNFromStorage(sctx, table.PhysicalID, 0, colInfo.ID)
cms, topN, err := CMSketchAndTopNFromStorageWithHighPriority(sctx, table.PhysicalID, 0, colInfo.ID, statsVer)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -533,9 +558,9 @@ func LoadHistogram(sctx sessionctx.Context, tableID int64, isIndex int, histID i
tp = colInfo.FieldType
break
}
return HistogramFromStorage(sctx, tableID, histID, &tp, distinct, isIndex, histVer, nullCount, totColSize, corr)
return HistogramFromStorageWithPriority(sctx, tableID, histID, &tp, distinct, isIndex, histVer, nullCount, totColSize, corr, kv.PriorityNormal)
}
return HistogramFromStorage(sctx, tableID, histID, types.NewFieldType(mysql.TypeBlob), distinct, isIndex, histVer, nullCount, 0, 0)
return HistogramFromStorageWithPriority(sctx, tableID, histID, types.NewFieldType(mysql.TypeBlob), distinct, isIndex, histVer, nullCount, 0, 0, kv.PriorityNormal)
}

// LoadNeededHistograms will load histograms for those needed columns/indices.
Expand Down Expand Up @@ -594,7 +619,7 @@ func loadNeededColumnHistograms(sctx sessionctx.Context, statsCache statstypes.S
return nil
}
colInfo = tbl.ColAndIdxExistenceMap.GetCol(col.ID)
hg, _, statsVer, _, err := HistMetaFromStorage(sctx, &col, colInfo)
hg, _, statsVer, _, err := HistMetaFromStorageWithHighPriority(sctx, &col, colInfo)
if hg == nil || err != nil {
asyncload.AsyncLoadHistogramNeededItems.Delete(col)
return err
Expand All @@ -605,11 +630,11 @@ func loadNeededColumnHistograms(sctx sessionctx.Context, statsCache statstypes.S
fms *statistics.FMSketch
)
if fullLoad {
hg, err = HistogramFromStorage(sctx, col.TableID, col.ID, &colInfo.FieldType, hg.NDV, 0, hg.LastUpdateVersion, hg.NullCount, hg.TotColSize, hg.Correlation)
hg, err = HistogramFromStorageWithPriority(sctx, col.TableID, col.ID, &colInfo.FieldType, hg.NDV, 0, hg.LastUpdateVersion, hg.NullCount, hg.TotColSize, hg.Correlation, kv.PriorityHigh)
if err != nil {
return errors.Trace(err)
}
cms, topN, err = CMSketchAndTopNFromStorage(sctx, col.TableID, 0, col.ID)
cms, topN, err = CMSketchAndTopNFromStorageWithHighPriority(sctx, col.TableID, 0, col.ID, statsVer)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -670,17 +695,17 @@ func loadNeededIndexHistograms(sctx sessionctx.Context, statsCache statstypes.St
asyncload.AsyncLoadHistogramNeededItems.Delete(idx)
return nil
}
hgMeta, lastAnalyzePos, statsVer, flag, err := HistMetaFromStorage(sctx, &idx, nil)
hgMeta, lastAnalyzePos, statsVer, flag, err := HistMetaFromStorageWithHighPriority(sctx, &idx, nil)
if hgMeta == nil || err != nil {
asyncload.AsyncLoadHistogramNeededItems.Delete(idx)
return err
}
idxInfo := tbl.ColAndIdxExistenceMap.GetIndex(idx.ID)
hg, err := HistogramFromStorage(sctx, idx.TableID, idx.ID, types.NewFieldType(mysql.TypeBlob), hgMeta.NDV, 1, hgMeta.LastUpdateVersion, hgMeta.NullCount, hgMeta.TotColSize, hgMeta.Correlation)
hg, err := HistogramFromStorageWithPriority(sctx, idx.TableID, idx.ID, types.NewFieldType(mysql.TypeBlob), hgMeta.NDV, 1, hgMeta.LastUpdateVersion, hgMeta.NullCount, hgMeta.TotColSize, hgMeta.Correlation, kv.PriorityHigh)
if err != nil {
return errors.Trace(err)
}
cms, topN, err := CMSketchAndTopNFromStorage(sctx, idx.TableID, 1, idx.ID)
cms, topN, err := CMSketchAndTopNFromStorageWithHighPriority(sctx, idx.TableID, 1, idx.ID, statsVer)
if err != nil {
return errors.Trace(err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/statistics/handle/syncload/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/config",
"//pkg/kv",
"//pkg/metrics",
"//pkg/parser/model",
"//pkg/parser/mysql",
Expand Down
9 changes: 5 additions & 4 deletions pkg/statistics/handle/syncload/stats_syncload.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
Expand Down Expand Up @@ -370,7 +371,7 @@ func (*statsSyncLoad) readStatsForOneItem(sctx sessionctx.Context, item model.Ta
var hg *statistics.Histogram
var err error
isIndexFlag := int64(0)
hg, lastAnalyzePos, statsVer, flag, err := storage.HistMetaFromStorage(sctx, &item, w.colInfo)
hg, lastAnalyzePos, statsVer, flag, err := storage.HistMetaFromStorageWithHighPriority(sctx, &item, w.colInfo)
if err != nil {
return nil, err
}
Expand All @@ -387,17 +388,17 @@ func (*statsSyncLoad) readStatsForOneItem(sctx sessionctx.Context, item model.Ta
var fms *statistics.FMSketch
if fullLoad {
if item.IsIndex {
hg, err = storage.HistogramFromStorage(sctx, item.TableID, item.ID, types.NewFieldType(mysql.TypeBlob), hg.NDV, int(isIndexFlag), hg.LastUpdateVersion, hg.NullCount, hg.TotColSize, hg.Correlation)
hg, err = storage.HistogramFromStorageWithPriority(sctx, item.TableID, item.ID, types.NewFieldType(mysql.TypeBlob), hg.NDV, int(isIndexFlag), hg.LastUpdateVersion, hg.NullCount, hg.TotColSize, hg.Correlation, kv.PriorityHigh)
if err != nil {
return nil, errors.Trace(err)
}
} else {
hg, err = storage.HistogramFromStorage(sctx, item.TableID, item.ID, &w.colInfo.FieldType, hg.NDV, int(isIndexFlag), hg.LastUpdateVersion, hg.NullCount, hg.TotColSize, hg.Correlation)
hg, err = storage.HistogramFromStorageWithPriority(sctx, item.TableID, item.ID, &w.colInfo.FieldType, hg.NDV, int(isIndexFlag), hg.LastUpdateVersion, hg.NullCount, hg.TotColSize, hg.Correlation, kv.PriorityHigh)
if err != nil {
return nil, errors.Trace(err)
}
}
cms, topN, err = storage.CMSketchAndTopNFromStorage(sctx, item.TableID, isIndexFlag, item.ID)
cms, topN, err = storage.CMSketchAndTopNFromStorageWithHighPriority(sctx, item.TableID, isIndexFlag, item.ID, statsVer)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down

0 comments on commit a44d409

Please sign in to comment.