Skip to content

Commit

Permalink
stats: incremental analyze for index with feedback updates (#10355)
Browse files Browse the repository at this point in the history
  • Loading branch information
alivxxx authored May 8, 2019
1 parent d8589df commit b0549b7
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 90 deletions.
28 changes: 11 additions & 17 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -1075,13 +1075,13 @@ func (e *AnalyzeTestFastExec) TestFastSample() error {

type analyzeIndexIncrementalExec struct {
AnalyzeIndexExec
index *statistics.Index
oldHist *statistics.Histogram
oldCMS *statistics.CMSketch
}

func analyzeIndexIncremental(idxExec *analyzeIndexIncrementalExec) analyzeResult {
idx := idxExec.index
highBound := idx.Histogram.GetUpper(idx.Len() - 1)
values, err := codec.Decode(highBound.GetBytes(), len(idxExec.idxInfo.Columns))
startPos := idxExec.oldHist.GetUpper(idxExec.oldHist.Len() - 1)
values, err := codec.DecodeRange(startPos.GetBytes(), len(idxExec.idxInfo.Columns))
if err != nil {
return analyzeResult{Err: err, job: idxExec.job}
}
Expand All @@ -1090,16 +1090,12 @@ func analyzeIndexIncremental(idxExec *analyzeIndexIncrementalExec) analyzeResult
if err != nil {
return analyzeResult{Err: err, job: idxExec.job}
}
oldHist, oldCMS, err := idx.RemoveUpperBound(idxExec.ctx.GetSessionVars().StmtCtx, values)
hist, err = statistics.MergeHistograms(idxExec.ctx.GetSessionVars().StmtCtx, idxExec.oldHist, hist, int(idxExec.maxNumBuckets))
if err != nil {
return analyzeResult{Err: err, job: idxExec.job}
}
hist, err = statistics.MergeHistograms(idxExec.ctx.GetSessionVars().StmtCtx, oldHist, hist, int(idxExec.maxNumBuckets))
if err != nil {
return analyzeResult{Err: err, job: idxExec.job}
}
if oldCMS != nil && cms != nil {
err = cms.MergeCMSketch(oldCMS)
if idxExec.oldCMS != nil && cms != nil {
err = cms.MergeCMSketch4IncrementalAnalyze(idxExec.oldCMS)
if err != nil {
return analyzeResult{Err: err, job: idxExec.job}
}
Expand All @@ -1120,26 +1116,24 @@ func analyzeIndexIncremental(idxExec *analyzeIndexIncrementalExec) analyzeResult

type analyzePKIncrementalExec struct {
AnalyzeColumnsExec
pkStats *statistics.Column
oldHist *statistics.Histogram
}

func analyzePKIncremental(colExec *analyzePKIncrementalExec) analyzeResult {
pkStats := colExec.pkStats
high := pkStats.GetUpper(pkStats.Len() - 1)
var maxVal types.Datum
if mysql.HasUnsignedFlag(colExec.pkInfo.Flag) {
maxVal = types.NewUintDatum(math.MaxUint64)
} else {
maxVal = types.NewIntDatum(math.MaxInt64)
}
ran := ranger.Range{LowVal: []types.Datum{*high}, LowExclude: true, HighVal: []types.Datum{maxVal}}
startPos := *colExec.oldHist.GetUpper(colExec.oldHist.Len() - 1)
ran := ranger.Range{LowVal: []types.Datum{startPos}, LowExclude: true, HighVal: []types.Datum{maxVal}}
hists, _, err := colExec.buildStats([]*ranger.Range{&ran})
if err != nil {
return analyzeResult{Err: err, job: colExec.job}
}
hist := hists[0]
oldHist := pkStats.Histogram.Copy()
hist, err = statistics.MergeHistograms(colExec.ctx.GetSessionVars().StmtCtx, oldHist, hist, int(colExec.maxNumBuckets))
hist, err = statistics.MergeHistograms(colExec.ctx.GetSessionVars().StmtCtx, colExec.oldHist, hist, int(colExec.maxNumBuckets))
if err != nil {
return analyzeResult{Err: err, job: colExec.job}
}
Expand Down
36 changes: 36 additions & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ import (
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/testkit"
)

Expand Down Expand Up @@ -303,4 +307,36 @@ func (s *testSuite1) TestAnalyzeIncremental(c *C) {
tk.MustExec("analyze incremental table t index")
// Result should not change.
tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t a 0 0 1 1 1 1", "test t a 0 1 2 1 2 2", "test t idx 1 0 1 1 1 1", "test t idx 1 1 2 1 2 2"))

// Test analyze incremental with feedback.
tk.MustExec("insert into t values (3,3)")
oriProbability := statistics.FeedbackProbability.Load()
defer func() {
statistics.FeedbackProbability.Store(oriProbability)
}()
statistics.FeedbackProbability.Store(1)
is := s.dom.InfoSchema()
table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tblInfo := table.Meta()
tk.MustQuery("select * from t use index(idx) where b = 3")
tk.MustQuery("select * from t where a > 1")
h := s.dom.StatsHandle()
c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil)
c.Assert(h.DumpStatsFeedbackToKV(), IsNil)
c.Assert(h.HandleUpdateStats(is), IsNil)
c.Assert(h.Update(is), IsNil)
tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t a 0 0 1 1 1 1", "test t a 0 1 3 0 2 2147483647", "test t idx 1 0 1 1 1 1", "test t idx 1 1 2 1 2 2"))
tblStats := h.GetTableStats(tblInfo)
val, err := codec.EncodeKey(tk.Se.GetSessionVars().StmtCtx, nil, types.NewIntDatum(3))
c.Assert(err, IsNil)
c.Assert(tblStats.Indices[tblInfo.Indices[0].ID].CMSketch.QueryBytes(val), Equals, uint64(1))
c.Assert(statistics.IsAnalyzed(tblStats.Indices[tblInfo.Indices[0].ID].Flag), IsFalse)
c.Assert(statistics.IsAnalyzed(tblStats.Columns[tblInfo.Columns[0].ID].Flag), IsFalse)

tk.MustExec("analyze incremental table t index")
tk.MustQuery("show stats_buckets").Check(testkit.Rows("test t a 0 0 1 1 1 1", "test t a 0 1 2 1 2 2", "test t a 0 2 3 1 3 3",
"test t idx 1 0 1 1 1 1", "test t idx 1 1 2 1 2 2", "test t idx 1 2 3 1 3 3"))
tblStats = h.GetTableStats(tblInfo)
c.Assert(tblStats.Indices[tblInfo.Indices[0].ID].CMSketch.QueryBytes(val), Equals, uint64(1))
}
47 changes: 36 additions & 11 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1398,18 +1398,28 @@ func (b *executorBuilder) buildAnalyzeIndexIncremental(task plannercore.AnalyzeI
return analyzeTask
}
idx, ok := statsTbl.Indices[task.IndexInfo.ID]
// TODO: If the index contains feedback, we may use other strategy.
if !ok || idx.Len() == 0 || idx.ContainsFeedback() {
if !ok || idx.Len() == 0 || idx.LastAnalyzePos.IsNull() {
return analyzeTask
}
exec := analyzeTask.idxExec
if idx.CMSketch != nil {
width, depth := idx.CMSketch.GetWidthAndDepth()
exec.analyzePB.IdxReq.CmsketchWidth = &width
exec.analyzePB.IdxReq.CmsketchDepth = &depth
var oldHist *statistics.Histogram
if statistics.IsAnalyzed(idx.Flag) {
exec := analyzeTask.idxExec
if idx.CMSketch != nil {
width, depth := idx.CMSketch.GetWidthAndDepth()
exec.analyzePB.IdxReq.CmsketchWidth = &width
exec.analyzePB.IdxReq.CmsketchDepth = &depth
}
oldHist = idx.Histogram.Copy()
} else {
_, bktID := idx.LessRowCountWithBktIdx(idx.LastAnalyzePos)
if bktID == 0 {
return analyzeTask
}
oldHist = idx.TruncateHistogram(bktID)
}
oldHist = oldHist.RemoveUpperBound()
analyzeTask.taskType = idxIncrementalTask
analyzeTask.idxIncrementalExec = &analyzeIndexIncrementalExec{AnalyzeIndexExec: *analyzeTask.idxExec, index: idx}
analyzeTask.idxIncrementalExec = &analyzeIndexIncrementalExec{AnalyzeIndexExec: *analyzeTask.idxExec, oldHist: oldHist, oldCMS: idx.CMSketch}
analyzeTask.job = &statistics.AnalyzeJob{DBName: task.DBName, TableName: task.TableName, PartitionName: task.PartitionName, JobInfo: "analyze incremental index " + task.IndexInfo.Name.O}
return analyzeTask
}
Expand Down Expand Up @@ -1458,13 +1468,28 @@ func (b *executorBuilder) buildAnalyzePKIncremental(task plannercore.AnalyzeColu
return analyzeTask
}
col, ok := statsTbl.Columns[task.PKInfo.ID]
// TODO: If the primary key contains feedback, we may use other strategy.
if !ok || col.Len() == 0 || col.ContainsFeedback() {
if !ok || col.Len() == 0 || col.LastAnalyzePos.IsNull() {
return analyzeTask
}
var oldHist *statistics.Histogram
if statistics.IsAnalyzed(col.Flag) {
oldHist = col.Histogram.Copy()
} else {
d, err := col.LastAnalyzePos.ConvertTo(b.ctx.GetSessionVars().StmtCtx, col.Tp)
if err != nil {
b.err = err
return nil
}
_, bktID := col.LessRowCountWithBktIdx(d)
if bktID == 0 {
return analyzeTask
}
oldHist = col.TruncateHistogram(bktID)
oldHist.NDV = int64(oldHist.TotalRowCount())
}
exec := analyzeTask.colExec
analyzeTask.taskType = pkIncrementalTask
analyzeTask.colIncrementalExec = &analyzePKIncrementalExec{AnalyzeColumnsExec: *exec, pkStats: col}
analyzeTask.colIncrementalExec = &analyzePKIncrementalExec{AnalyzeColumnsExec: *exec, oldHist: oldHist}
analyzeTask.job = &statistics.AnalyzeJob{DBName: task.DBName, TableName: task.TableName, PartitionName: task.PartitionName, JobInfo: "analyze incremental primary key"}
return analyzeTask
}
Expand Down
1 change: 1 addition & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2514,6 +2514,7 @@ func (s *testSuite1) SetUpSuite(c *C) {
c.Assert(err, IsNil)
s.dom, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
s.dom.SetStatsUpdating(true)
}

func (s *testSuite1) TearDownSuite(c *C) {
Expand Down
10 changes: 10 additions & 0 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ const (
stats_ver bigint(64) NOT NULL DEFAULT 0,
flag bigint(64) NOT NULL DEFAULT 0,
correlation double NOT NULL DEFAULT 0,
last_analyze_pos blob DEFAULT NULL,
unique index tbl(table_id, is_index, hist_id)
);`

Expand Down Expand Up @@ -328,6 +329,7 @@ const (
version28 = 28
version29 = 29
version30 = 30
version31 = 31
)

func checkBootstrapped(s Session) (bool, error) {
Expand Down Expand Up @@ -507,6 +509,10 @@ func upgrade(s Session) {
upgradeToVer30(s)
}

if ver < version31 {
upgradeToVer31(s)
}

updateBootstrapVer(s)
_, err = s.Execute(context.Background(), "COMMIT")

Expand Down Expand Up @@ -799,6 +805,10 @@ func upgradeToVer30(s Session) {
mustExecute(s, CreateStatsTopNTable)
}

func upgradeToVer31(s Session) {
doReentrantDDL(s, "ALTER TABLE mysql.stats_histograms ADD COLUMN `last_analyze_pos` blob default null", infoschema.ErrColumnExists)
}

// updateBootstrapVer updates bootstrap version variable in mysql.TiDB table.
func updateBootstrapVer(s Session) {
// Update bootstrap version.
Expand Down
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1559,7 +1559,7 @@ func createSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er

const (
notBootstrapped = 0
currentBootstrapVersion = 30
currentBootstrapVersion = 31
)

func getStoreBootstrapVersion(store kv.Storage) int64 {
Expand Down
25 changes: 25 additions & 0 deletions statistics/cmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,31 @@ func (c *CMSketch) MergeCMSketch(rc *CMSketch) error {
return nil
}

// MergeCMSketch4IncrementalAnalyze merges two CM Sketch for incremental analyze. Since there is no value
// that appears partially in `c` and `rc` for incremental analyze, it uses `max` to merge them.
// Here is a simple proof: when we query from the CM sketch, we use the `min` to get the answer:
// (1): For values that only appears in `c, using `max` to merge them affects the `min` query result less than using `sum`;
// (2): For values that only appears in `rc`, it is the same as condition (1);
// (3): For values that appears both in `c` and `rc`, if they do not appear partially in `c` and `rc`, for example,
// if `v` appears 5 times in the table, it can appears 5 times in `c` and 3 times in `rc`, then `max` also gives the correct answer.
// So in fact, if we can know the number of appearances of each value in the first place, it is better to use `max` to construct the CM sketch rather than `sum`.
func (c *CMSketch) MergeCMSketch4IncrementalAnalyze(rc *CMSketch) error {
if c.depth != rc.depth || c.width != rc.width {
return errors.New("Dimensions of Count-Min Sketch should be the same")
}
if c.topN != nil || rc.topN != nil {
return errors.New("CMSketch with Top-N does not support merge")
}
for i := range c.table {
c.count = 0
for j := range c.table[i] {
c.table[i][j] = mathutil.MaxUint32(c.table[i][j], rc.table[i][j])
c.count += uint64(c.table[i][j])
}
}
return nil
}

// CMSketchToProto converts CMSketch to its protobuf representation.
func CMSketchToProto(c *CMSketch) *tipb.CMSketch {
protoSketch := &tipb.CMSketch{Rows: make([]*tipb.CMSketchRow, c.depth)}
Expand Down
16 changes: 9 additions & 7 deletions statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, tables Stat
terror.Log(errors.Trace(err))
}
hist := statistics.NewHistogram(id, ndv, nullCount, version, types.NewFieldType(mysql.TypeBlob), chunk.InitialCapacity, 0)
table.Indices[hist.ID] = &statistics.Index{Histogram: *hist, CMSketch: cms, Info: idxInfo, StatsVer: row.GetInt64(8)}
table.Indices[hist.ID] = &statistics.Index{Histogram: *hist, CMSketch: cms, Info: idxInfo, StatsVer: row.GetInt64(8), Flag: row.GetInt64(10), LastAnalyzePos: row.GetDatum(11, types.NewFieldType(mysql.TypeBlob))}
} else {
var colInfo *model.ColumnInfo
for _, col := range tbl.Meta().Columns {
Expand All @@ -124,11 +124,13 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, tables Stat
hist := statistics.NewHistogram(id, ndv, nullCount, version, &colInfo.FieldType, 0, totColSize)
hist.Correlation = row.GetFloat64(9)
table.Columns[hist.ID] = &statistics.Column{
Histogram: *hist,
PhysicalID: table.PhysicalID,
Info: colInfo,
Count: nullCount,
IsHandle: tbl.Meta().PKIsHandle && mysql.HasPriKeyFlag(colInfo.Flag),
Histogram: *hist,
PhysicalID: table.PhysicalID,
Info: colInfo,
Count: nullCount,
IsHandle: tbl.Meta().PKIsHandle && mysql.HasPriKeyFlag(colInfo.Flag),
Flag: row.GetInt64(10),
LastAnalyzePos: row.GetDatum(11, types.NewFieldType(mysql.TypeBlob)),
}
}
}
Expand All @@ -137,7 +139,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, tables Stat
func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, tables StatsCache) error {
h.mu.Lock()
defer h.mu.Unlock()
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation from mysql.stats_histograms"
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if len(rc) > 0 {
defer terror.Call(rc[0].Close)
Expand Down
Loading

0 comments on commit b0549b7

Please sign in to comment.