diff --git a/config/config.go b/config/config.go index fc47852bab3bd..74b099665fb39 100644 --- a/config/config.go +++ b/config/config.go @@ -150,6 +150,7 @@ type Performance struct { RunAutoAnalyze bool `toml:"run-auto-analyze" json:"run-auto-analyze"` StmtCountLimit uint `toml:"stmt-count-limit" json:"stmt-count-limit"` FeedbackProbability float64 `toml:"feedback-probability" json:"feedback-probability"` + QueryFeedbackLimit uint `toml:"query-feedback-limit" json:"query-feedback-limit"` PseudoEstimateRatio float64 `toml:"pseudo-estimate-ratio" json:"pseudo-estimate-ratio"` } @@ -257,6 +258,7 @@ var defaultConf = Config{ RunAutoAnalyze: true, StmtCountLimit: 5000, FeedbackProbability: 0, + QueryFeedbackLimit: 1024, PseudoEstimateRatio: 0.7, }, XProtocol: XProtocol{ diff --git a/config/config.toml.example b/config/config.toml.example index 4b9d6f39cd039..7cfc1b345df03 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -139,6 +139,9 @@ run-auto-analyze = true # Probability to use the query feedback to update stats, 0 or 1 for always false/true. feedback-probability = 0.0 +# The max number of query feedback that cache in memory. +query-feedback-limit = 1024 + # Pseudo stats will be used if the ratio between the modify count and # row count in statistics of a table is greater than it. pseudo-estimate-ratio = 0.7 diff --git a/domain/domain.go b/domain/domain.go index 41fd0bef3bd07..0e659ea7973e8 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -632,6 +632,10 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager) defer loadHistogramTicker.Stop() gcStatsTicker := time.NewTicker(100 * lease) defer gcStatsTicker.Stop() + dumpFeedbackTicker := time.NewTicker(200 * lease) + defer dumpFeedbackTicker.Stop() + loadFeedbackTicker := time.NewTicker(5 * lease) + defer loadFeedbackTicker.Stop() statsHandle := do.StatsHandle() t := time.Now() err := statsHandle.InitStats(do.InfoSchema()) @@ -644,7 +648,7 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager) for { select { case <-loadTicker.C: - err := statsHandle.Update(do.InfoSchema()) + err = statsHandle.Update(do.InfoSchema()) if err != nil { log.Error("[stats] update stats info fail: ", errors.ErrorStack(err)) } @@ -653,32 +657,45 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager) return // This channel is sent only by ddl owner or the drop stats executor. case t := <-statsHandle.DDLEventCh(): - err := statsHandle.HandleDDLEvent(t) + err = statsHandle.HandleDDLEvent(t) if err != nil { log.Error("[stats] handle ddl event fail: ", errors.ErrorStack(err)) } case t := <-statsHandle.AnalyzeResultCh(): for i, hg := range t.Hist { - err := statistics.SaveStatsToStorage(ctx, t.TableID, t.Count, t.IsIndex, hg, t.Cms[i]) + err = statistics.SaveStatsToStorage(ctx, t.TableID, t.Count, t.IsIndex, hg, t.Cms[i]) if err != nil { log.Error("[stats] save histogram to storage fail: ", errors.ErrorStack(err)) } } case <-deltaUpdateTicker.C: - err := statsHandle.DumpStatsDeltaToKV() + err = statsHandle.DumpStatsDeltaToKV() if err != nil { log.Error("[stats] dump stats delta fail: ", errors.ErrorStack(err)) } case <-loadHistogramTicker.C: - err := statsHandle.LoadNeededHistograms() + err = statsHandle.LoadNeededHistograms() if err != nil { log.Error("[stats] load histograms fail: ", errors.ErrorStack(err)) } + case <-loadFeedbackTicker.C: + if !owner.IsOwner() { + continue + } + err = statsHandle.HandleUpdateStats(do.InfoSchema()) + if err != nil { + log.Errorf("[stats] update stats using feedback fail: ", errors.ErrorStack(err)) + } + case <-dumpFeedbackTicker.C: + err = statsHandle.DumpStatsFeedbackToKV() + if err != nil { + log.Error("[stats] dump stats feedback fail: ", errors.ErrorStack(err)) + } case <-gcStatsTicker.C: if !owner.IsOwner() { continue } - err := statsHandle.GCStats(do.InfoSchema(), do.DDL().GetLease()) + err = statsHandle.GCStats(do.InfoSchema(), do.DDL().GetLease()) if err != nil { log.Error("[stats] gc stats fail: ", errors.ErrorStack(err)) } diff --git a/executor/aggregate_test.go b/executor/aggregate_test.go index 27ffe5e90b957..5be2bcb4e6282 100644 --- a/executor/aggregate_test.go +++ b/executor/aggregate_test.go @@ -243,7 +243,7 @@ func (s *testSuite) TestAggregation(c *C) { result = tk.MustQuery("select count(*) from information_schema.columns") // When adding new memory columns in information_schema, please update this variable. - columnCountOfAllInformationSchemaTables := "737" + columnCountOfAllInformationSchemaTables := "741" result.Check(testkit.Rows(columnCountOfAllInformationSchemaTables)) tk.MustExec("drop table if exists t1") diff --git a/session/bootstrap.go b/session/bootstrap.go index 06b05084c5514..fcd0e930d1a7a 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -188,6 +188,15 @@ const ( UNIQUE KEY (element_id), KEY (job_id, element_id) );` + + // CreateStatsFeedbackTable stores the feedback info which is used to update stats. + CreateStatsFeedbackTable = `CREATE TABLE IF NOT EXISTS mysql.stats_feedback ( + table_id bigint(64) NOT NULL, + is_index tinyint(2) NOT NULL, + hist_id bigint(64) NOT NULL, + feedback blob NOT NULL, + index hist(table_id, is_index, hist_id) + );` ) // bootstrap initiates system DB for a store. @@ -233,6 +242,7 @@ const ( version17 = 17 version18 = 18 version19 = 19 + version20 = 20 ) func checkBootstrapped(s Session) (bool, error) { @@ -367,6 +377,10 @@ func upgrade(s Session) { upgradeToVer19(s) } + if ver < version20 { + upgradeToVer20(s) + } + updateBootstrapVer(s) _, err = s.Execute(context.Background(), "COMMIT") @@ -589,6 +603,10 @@ func upgradeToVer19(s Session) { doReentrantDDL(s, "ALTER TABLE mysql.columns_priv MODIFY User CHAR(32)") } +func upgradeToVer20(s Session) { + doReentrantDDL(s, CreateStatsFeedbackTable) +} + // updateBootstrapVer updates bootstrap version variable in mysql.TiDB table. func updateBootstrapVer(s Session) { // Update bootstrap version. @@ -635,6 +653,8 @@ func doDDLWorks(s Session) { mustExecute(s, CreateStatsBucketsTable) // Create gc_delete_range table. mustExecute(s, CreateGCDeleteRangeTable) + // Create stats_feedback table. + mustExecute(s, CreateStatsFeedbackTable) } // doDMLWorks executes DML statements in bootstrap stage. diff --git a/session/session.go b/session/session.go index 212fa11d07321..3d841e4662545 100644 --- a/session/session.go +++ b/session/session.go @@ -1208,7 +1208,7 @@ func createSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er const ( notBootstrapped = 0 - currentBootstrapVersion = 19 + currentBootstrapVersion = 20 ) func getStoreBootstrapVersion(store kv.Storage) int64 { diff --git a/statistics/cmsketch.go b/statistics/cmsketch.go index a7f402f3600c8..181fbf2aa23fd 100644 --- a/statistics/cmsketch.go +++ b/statistics/cmsketch.go @@ -22,7 +22,7 @@ import ( "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/codec" - tipb "github.com/pingcap/tipb/go-tipb" + "github.com/pingcap/tipb/go-tipb" "github.com/spaolacci/murmur3" ) @@ -54,6 +54,18 @@ func (c *CMSketch) InsertBytes(bytes []byte) { } } +// setValue sets the count for value that hashed into (h1, h2). +func (c *CMSketch) setValue(h1, h2 uint64, count uint32) { + oriCount := c.queryHashValue(h1, h2) + c.count += uint64(count) - uint64(oriCount) + // let it overflow naturally + deltaCount := count - oriCount + for i := range c.table { + j := (h1 + h2*uint64(i)) % uint64(c.width) + c.table[i][j] = c.table[i][j] + deltaCount + } +} + func (c *CMSketch) queryValue(sc *stmtctx.StatementContext, val types.Datum) (uint32, error) { bytes, err := codec.EncodeValue(sc, nil, val) if err != nil { @@ -64,6 +76,10 @@ func (c *CMSketch) queryValue(sc *stmtctx.StatementContext, val types.Datum) (ui func (c *CMSketch) queryBytes(bytes []byte) uint32 { h1, h2 := murmur3.Sum128(bytes) + return c.queryHashValue(h1, h2) +} + +func (c *CMSketch) queryHashValue(h1, h2 uint64) uint32 { vals := make([]uint32, c.depth) min := uint32(math.MaxUint32) for i := range c.table { @@ -173,3 +189,15 @@ func (c *CMSketch) Equal(rc *CMSketch) bool { } return true } + +func (c *CMSketch) copy() *CMSketch { + if c == nil { + return nil + } + tbl := make([][]uint32, c.depth) + for i := range tbl { + tbl[i] = make([]uint32, c.width) + copy(tbl[i], c.table[i]) + } + return &CMSketch{count: c.count, width: c.width, depth: c.depth, table: tbl} +} diff --git a/statistics/feedback.go b/statistics/feedback.go index 889e89b1f6ea0..55b9abb9d8793 100644 --- a/statistics/feedback.go +++ b/statistics/feedback.go @@ -15,6 +15,7 @@ package statistics import ( "bytes" + "encoding/gob" "math" "math/rand" "sort" @@ -30,6 +31,7 @@ import ( "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/ranger" log "github.com/sirupsen/logrus" + "github.com/spaolacci/murmur3" ) // `feedback` represents the total scan count in range [lower, upper). @@ -145,10 +147,10 @@ func (q *QueryFeedback) Update(startKey kv.Key, counts []int64) { return } - if q.hist.tp.Tp == mysql.TypeLong { - startKey = tablecodec.CutRowKeyPrefix(startKey) - } else { + if q.hist.tp.Tp == mysql.TypeBlob { startKey = tablecodec.CutIndexPrefix(startKey) + } else { + startKey = tablecodec.CutRowKeyPrefix(startKey) } // Find the range that startKey falls in. idx := sort.Search(len(q.feedback), func(i int) bool { @@ -176,7 +178,7 @@ func (q *QueryFeedback) Update(startKey kv.Key, counts []int64) { return } -// DecodeInt decodes the int value stored in the feedback, only used for test. +// DecodeInt decodes the int value stored in the feedback. func (q *QueryFeedback) DecodeInt() error { for _, fb := range q.feedback { _, v, err := codec.DecodeInt(fb.lower.GetBytes()) @@ -202,47 +204,45 @@ type BucketFeedback struct { } // buildBucketFeedback build the feedback for each bucket from the histogram feedback. -func buildBucketFeedback(h *Histogram, feedbacks []*QueryFeedback) (map[int]*BucketFeedback, int) { +func buildBucketFeedback(h *Histogram, feedback *QueryFeedback) (map[int]*BucketFeedback, int) { bktID2FB := make(map[int]*BucketFeedback) total := 0 - for _, feedback := range feedbacks { - for _, ran := range feedback.feedback { - idx, _ := h.Bounds.LowerBound(0, ran.lower) - bktIdx := 0 - // The last bucket also stores the feedback that falls outside the upper bound. - if idx >= h.Bounds.NumRows()-2 { - bktIdx = h.Len() - 1 - } else { - bktIdx = idx / 2 - // Make sure that this feedback lies within the bucket. - if chunk.Compare(h.Bounds.GetRow(2*bktIdx+1), 0, ran.upper) < 0 { - continue - } - } - total++ - bkt := bktID2FB[bktIdx] - if bkt == nil { - bkt = &BucketFeedback{lower: h.GetLower(bktIdx), upper: h.GetUpper(bktIdx)} - bktID2FB[bktIdx] = bkt - } - bkt.feedback = append(bkt.feedback, ran) - // Update the bound if necessary. - res, err := bkt.lower.CompareDatum(nil, ran.lower) - if err != nil { - log.Debugf("compare datum %v with %v failed, err: %v", bkt.lower, ran.lower, errors.ErrorStack(err)) - continue - } - if res > 0 { - bkt.lower = ran.lower - } - res, err = bkt.upper.CompareDatum(nil, ran.upper) - if err != nil { - log.Debugf("compare datum %v with %v failed, err: %v", bkt.upper, ran.upper, errors.ErrorStack(err)) + for _, ran := range feedback.feedback { + idx, _ := h.Bounds.LowerBound(0, ran.lower) + bktIdx := 0 + // The last bucket also stores the feedback that falls outside the upper bound. + if idx >= h.Bounds.NumRows()-2 { + bktIdx = h.Len() - 1 + } else { + bktIdx = idx / 2 + // Make sure that this feedback lies within the bucket. + if chunk.Compare(h.Bounds.GetRow(2*bktIdx+1), 0, ran.upper) < 0 { continue } - if res < 0 { - bkt.upper = ran.upper - } + } + total++ + bkt := bktID2FB[bktIdx] + if bkt == nil { + bkt = &BucketFeedback{lower: h.GetLower(bktIdx), upper: h.GetUpper(bktIdx)} + bktID2FB[bktIdx] = bkt + } + bkt.feedback = append(bkt.feedback, ran) + // Update the bound if necessary. + res, err := bkt.lower.CompareDatum(nil, ran.lower) + if err != nil { + log.Debugf("compare datum %v with %v failed, err: %v", bkt.lower, ran.lower, errors.ErrorStack(err)) + continue + } + if res > 0 { + bkt.lower = ran.lower + } + res, err = bkt.upper.CompareDatum(nil, ran.upper) + if err != nil { + log.Debugf("compare datum %v with %v failed, err: %v", bkt.upper, ran.upper, errors.ErrorStack(err)) + continue + } + if res < 0 { + bkt.upper = ran.upper } } return bktID2FB, total @@ -438,8 +438,8 @@ func mergeBuckets(bkts []bucket, isNewBuckets []bool, totalCount float64) []buck return bkts } -func splitBuckets(h *Histogram, feedbacks []*QueryFeedback) ([]bucket, []bool, int64) { - bktID2FB, fbNum := buildBucketFeedback(h, feedbacks) +func splitBuckets(h *Histogram, feedback *QueryFeedback) ([]bucket, []bool, int64) { + bktID2FB, fbNum := buildBucketFeedback(h, feedback) counts := make([]int64, 0, h.Len()) for i := 0; i < h.Len(); i++ { bkt, ok := bktID2FB[i] @@ -478,8 +478,8 @@ func splitBuckets(h *Histogram, feedbacks []*QueryFeedback) ([]bucket, []bool, i } // UpdateHistogram updates the histogram according buckets. -func UpdateHistogram(h *Histogram, feedbacks []*QueryFeedback) *Histogram { - buckets, isNewBuckets, totalCount := splitBuckets(h, feedbacks) +func UpdateHistogram(h *Histogram, feedback *QueryFeedback) *Histogram { + buckets, isNewBuckets, totalCount := splitBuckets(h, feedback) buckets = mergeBuckets(buckets, isNewBuckets, float64(totalCount)) return buildNewHistogram(h, buckets) } @@ -493,3 +493,127 @@ func buildNewHistogram(h *Histogram, buckets []bucket) *Histogram { } return hist } + +// queryFeedback is used to serialize the QueryFeedback. +type queryFeedback struct { + IntRanges []int64 + HashValues []uint64 // HashValues is the murmur hash values for each index point. + IndexRanges [][]byte + Counts []int64 // Counts is the number of scan keys in each range. +} + +func encodePKFeedback(q *QueryFeedback) (*queryFeedback, error) { + pb := &queryFeedback{} + for _, fb := range q.feedback { + err := q.DecodeInt() + if err != nil { + return nil, errors.Trace(err) + } + // There is no need to update the point queries. + if fb.upper.GetInt64()-fb.lower.GetInt64() <= 1 { + continue + } + pb.IntRanges = append(pb.IntRanges, fb.lower.GetInt64(), fb.upper.GetInt64()) + pb.Counts = append(pb.Counts, fb.count) + } + return pb, nil +} + +func encodeIndexFeedback(q *QueryFeedback) *queryFeedback { + pb := &queryFeedback{} + var pointCounts []int64 + for _, fb := range q.feedback { + if bytes.Equal(kv.Key(fb.lower.GetBytes()).PrefixNext(), fb.upper.GetBytes()) { + h1, h2 := murmur3.Sum128(fb.lower.GetBytes()) + pb.HashValues = append(pb.HashValues, h1, h2) + pointCounts = append(pointCounts, fb.count) + } else { + pb.IndexRanges = append(pb.IndexRanges, fb.lower.GetBytes(), fb.upper.GetBytes()) + pb.Counts = append(pb.Counts, fb.count) + } + } + pb.Counts = append(pb.Counts, pointCounts...) + return pb +} + +func encodeFeedback(q *QueryFeedback) ([]byte, error) { + var pb *queryFeedback + var err error + if q.hist.tp.Tp == mysql.TypeBlob { + pb = encodeIndexFeedback(q) + } else { + pb, err = encodePKFeedback(q) + if err != nil { + return nil, errors.Trace(err) + } + } + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err = enc.Encode(pb) + if err != nil { + return nil, errors.Trace(err) + } + return buf.Bytes(), nil +} + +func decodeFeedback(val []byte, q *QueryFeedback, c *CMSketch) error { + buf := bytes.NewBuffer(val) + dec := gob.NewDecoder(buf) + pb := &queryFeedback{} + err := dec.Decode(pb) + if err != nil { + return errors.Trace(err) + } + // decode feedback for index + if len(pb.IndexRanges) > 0 { + // decode the index range feedback + for i := 0; i < len(pb.IndexRanges); i += 2 { + lower, upper := types.NewBytesDatum(pb.IndexRanges[i]), types.NewBytesDatum(pb.IndexRanges[i+1]) + q.feedback = append(q.feedback, feedback{&lower, &upper, pb.Counts[i/2], 0}) + } + if c == nil { + return nil + } + // decode the index point feedback, just set value count in CM Sketch + start := len(pb.IndexRanges) / 2 + for i := 0; i < len(pb.HashValues); i += 2 { + c.setValue(pb.HashValues[i], pb.HashValues[i+1], uint32(pb.Counts[start+i/2])) + } + return nil + } + // decode feedback for primary key + for i := 0; i < len(pb.IntRanges); i += 2 { + lower, upper := types.NewIntDatum(pb.IntRanges[i]), types.NewIntDatum(pb.IntRanges[i+1]) + q.feedback = append(q.feedback, feedback{&lower, &upper, pb.Counts[i/2], 0}) + } + return nil +} + +// Equal tests if two query feedback equal, it is only used in test. +func (q *QueryFeedback) Equal(rq *QueryFeedback) bool { + if len(q.feedback) != len(rq.feedback) { + return false + } + for i, fb := range q.feedback { + rfb := rq.feedback[i] + if fb.count != rfb.count { + return false + } + if fb.lower.Kind() == types.KindInt64 { + if fb.lower.GetInt64() != rfb.lower.GetInt64() { + return false + } + if fb.upper.GetInt64() != rfb.upper.GetInt64() { + return false + } + } else { + if bytes.Compare(fb.lower.GetBytes(), rfb.lower.GetBytes()) != 0 { + return false + } + if bytes.Compare(fb.upper.GetBytes(), rfb.upper.GetBytes()) != 0 { + return false + } + } + } + return true +} diff --git a/statistics/feedback_test.go b/statistics/feedback_test.go index 69eb0f60eb379..8eed6126cdce6 100644 --- a/statistics/feedback_test.go +++ b/statistics/feedback_test.go @@ -17,6 +17,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/codec" ) var _ = Suite(&testFeedbackSuite{}) @@ -68,7 +69,7 @@ func (s *testFeedbackSuite) TestUpdateHistogram(c *C) { originBucketCount := defaultBucketCount defaultBucketCount = 5 defer func() { defaultBucketCount = originBucketCount }() - c.Assert(UpdateHistogram(q.Hist(), []*QueryFeedback{q}).ToString(0), Equals, + c.Assert(UpdateHistogram(q.Hist(), q).ToString(0), Equals, "column:0 ndv:0\n"+ "num: 10000\tlower_bound: 0\tupper_bound: 1\trepeats: 0\n"+ "num: 10003\tlower_bound: 2\tupper_bound: 3\trepeats: 0\n"+ @@ -85,7 +86,7 @@ func (s *testFeedbackSuite) TestSplitBuckets(c *C) { } q := NewQueryFeedback(0, genHistogram(), 0, false) q.feedback = feedbacks - buckets, isNewBuckets, totalCount := splitBuckets(q.Hist(), []*QueryFeedback{q}) + buckets, isNewBuckets, totalCount := splitBuckets(q.Hist(), q) c.Assert(buildNewHistogram(q.Hist(), buckets).ToString(0), Equals, "column:0 ndv:0\n"+ "num: 1\tlower_bound: 0\tupper_bound: 1\trepeats: 0\n"+ @@ -104,7 +105,7 @@ func (s *testFeedbackSuite) TestSplitBuckets(c *C) { } q = NewQueryFeedback(0, genHistogram(), 0, false) q.feedback = feedbacks - buckets, isNewBuckets, totalCount = splitBuckets(q.Hist(), []*QueryFeedback{q}) + buckets, isNewBuckets, totalCount = splitBuckets(q.Hist(), q) c.Assert(buildNewHistogram(q.Hist(), buckets).ToString(0), Equals, "column:0 ndv:0\n"+ "num: 100000\tlower_bound: 0\tupper_bound: 1\trepeats: 0\n"+ @@ -124,7 +125,7 @@ func (s *testFeedbackSuite) TestSplitBuckets(c *C) { } q = NewQueryFeedback(0, h, 0, false) q.feedback = feedbacks - buckets, isNewBuckets, totalCount = splitBuckets(q.Hist(), []*QueryFeedback{q}) + buckets, isNewBuckets, totalCount = splitBuckets(q.Hist(), q) c.Assert(buildNewHistogram(q.Hist(), buckets).ToString(0), Equals, "column:0 ndv:0\n"+ "num: 1000000\tlower_bound: 0\tupper_bound: 1000000\trepeats: 0") @@ -184,3 +185,34 @@ func (s *testFeedbackSuite) TestMergeBuckets(c *C) { c.Assert(result, Equals, t.result) } } + +func encodeInt(v int64) *types.Datum { + val := codec.EncodeInt(nil, v) + d := types.NewBytesDatum(val) + return &d +} + +func (s *testFeedbackSuite) TestFeedbackEncoding(c *C) { + hist := NewHistogram(0, 0, 0, 0, types.NewFieldType(mysql.TypeLong), 0, 0) + q := &QueryFeedback{hist: hist} + q.feedback = append(q.feedback, feedback{encodeInt(0), encodeInt(3), 1, 0}) + q.feedback = append(q.feedback, feedback{encodeInt(0), encodeInt(5), 1, 0}) + val, err := encodeFeedback(q) + c.Assert(err, IsNil) + rq := &QueryFeedback{} + c.Assert(decodeFeedback(val, rq, nil), IsNil) + c.Assert(q.Equal(rq), IsTrue) + + hist.tp = types.NewFieldType(mysql.TypeBlob) + q = &QueryFeedback{hist: hist} + q.feedback = append(q.feedback, feedback{encodeInt(0), encodeInt(3), 1, 0}) + q.feedback = append(q.feedback, feedback{encodeInt(0), encodeInt(1), 1, 0}) + val, err = encodeFeedback(q) + c.Assert(err, IsNil) + rq = &QueryFeedback{} + cms := NewCMSketch(4, 4) + c.Assert(decodeFeedback(val, rq, cms), IsNil) + c.Assert(cms.queryBytes(codec.EncodeInt(nil, 0)), Equals, uint32(1)) + q.feedback = q.feedback[:1] + c.Assert(q.Equal(rq), IsTrue) +} diff --git a/statistics/handle.go b/statistics/handle.go index f886e6e2e1d4d..fdb9ec9474604 100644 --- a/statistics/handle.go +++ b/statistics/handle.go @@ -72,8 +72,8 @@ func (h *Handle) Clear() { h.globalMap = make(tableDeltaMap) } -// For now, we do not use the query feedback, so just set it to 1. -const maxQueryFeedBackCount = 1 +// MaxQueryFeedbackCount is the max number of feedback that cache in memory. +var MaxQueryFeedbackCount = 1 << 10 // NewHandle creates a Handle for update stats. func NewHandle(ctx sessionctx.Context, lease time.Duration) *Handle { @@ -84,7 +84,7 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration) *Handle { listHead: &SessionStatsCollector{mapper: make(tableDeltaMap)}, globalMap: make(tableDeltaMap), Lease: lease, - feedback: make([]*QueryFeedback, 0, maxQueryFeedBackCount), + feedback: make([]*QueryFeedback, 0, MaxQueryFeedbackCount), } handle.statsCache.Store(statsCache{}) return handle diff --git a/statistics/histogram.go b/statistics/histogram.go index 7047f8eb2c9a1..f5f21177c8ea3 100644 --- a/statistics/histogram.go +++ b/statistics/histogram.go @@ -194,8 +194,14 @@ func SaveStatsToStorage(sctx sessionctx.Context, tableID int64, count int64, isI } txn := sctx.Txn() version := txn.StartTS() - replaceSQL := fmt.Sprintf("replace into mysql.stats_meta (version, table_id, count) values (%d, %d, %d)", version, tableID, count) - _, err = exec.Execute(ctx, replaceSQL) + var sql string + // If the count is less than 0, then we do not want to update the modify count and count. + if count >= 0 { + sql = fmt.Sprintf("replace into mysql.stats_meta (version, table_id, count) values (%d, %d, %d)", version, tableID, count) + } else { + sql = fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d", version, tableID) + } + _, err = exec.Execute(ctx, sql) if err != nil { return errors.Trace(err) } @@ -203,7 +209,7 @@ func SaveStatsToStorage(sctx sessionctx.Context, tableID int64, count int64, isI if err != nil { return errors.Trace(err) } - replaceSQL = fmt.Sprintf("replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size) values (%d, %d, %d, %d, %d, %d, X'%X', %d)", + replaceSQL := fmt.Sprintf("replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size) values (%d, %d, %d, %d, %d, %d, X'%X', %d)", tableID, isIndex, hg.ID, hg.NDV, version, hg.NullCount, data, hg.TotColSize) _, err = exec.Execute(ctx, replaceSQL) if err != nil { diff --git a/statistics/update.go b/statistics/update.go index 57e2da921d8e1..fda83ecda32b4 100644 --- a/statistics/update.go +++ b/statistics/update.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/model" + "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/util/sqlexec" @@ -78,7 +79,7 @@ func (s *SessionStatsCollector) Update(id int64, delta int64, count int64) { func mergeQueryFeedback(lq []*QueryFeedback, rq []*QueryFeedback) []*QueryFeedback { for _, q := range rq { - if len(lq) >= maxQueryFeedBackCount { + if len(lq) >= MaxQueryFeedbackCount { break } lq = append(lq, q) @@ -108,7 +109,7 @@ func (s *SessionStatsCollector) StoreQueryFeedback(feedback interface{}) { s.Lock() defer s.Unlock() - if len(s.feedback) >= maxQueryFeedBackCount { + if len(s.feedback) >= MaxQueryFeedbackCount { return } s.feedback = append(s.feedback, q) @@ -190,6 +191,116 @@ func (h *Handle) dumpTableStatDeltaToKV(id int64, delta variable.TableDelta) (bo return updated, errors.Trace(err) } +// DumpStatsFeedbackToKV dumps the stats feedback to KV. +func (h *Handle) DumpStatsFeedbackToKV() error { + var err error + var successCount int + for _, fb := range h.feedback { + err = h.dumpFeedbackToKV(fb) + if err != nil { + break + } + successCount++ + } + h.feedback = h.feedback[successCount:] + return errors.Trace(err) +} + +func (h *Handle) dumpFeedbackToKV(fb *QueryFeedback) error { + vals, err := encodeFeedback(fb) + if err != nil { + log.Debugf("error occurred when encoding feedback, err: ", errors.ErrorStack(err)) + return nil + } + var isIndex int64 + if fb.hist.tp.Tp == mysql.TypeBlob { + isIndex = 1 + } else { + isIndex = 0 + } + sql := fmt.Sprintf("insert into mysql.stats_feedback (table_id, hist_id, is_index, feedback) values "+ + "(%d, %d, %d, X'%X')", fb.tableID, fb.hist.ID, isIndex, vals) + _, err = h.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + return errors.Trace(err) +} + +// HandleUpdateStats update the stats using feedback. +func (h *Handle) HandleUpdateStats(is infoschema.InfoSchema) error { + sql := "select table_id, hist_id, is_index, feedback from mysql.stats_feedback order by table_id, hist_id, is_index" + rows, _, err := h.ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(h.ctx, sql) + if len(rows) == 0 || err != nil { + return errors.Trace(err) + } + tableID, histID, isIndex := int64(-1), int64(-1), int64(-1) + q := &QueryFeedback{} + var ( + cms *CMSketch + hist *Histogram + ) + for _, row := range rows { + // merge into previous feedback + if row.GetInt64(0) == tableID && row.GetInt64(1) == histID && row.GetInt64(2) == isIndex { + err = decodeFeedback(row.GetBytes(3), q, cms) + if err != nil { + log.Debugf("decode feedback failed, err: %v", errors.ErrorStack(err)) + } + continue + } + // dump the stats into kv + if hist != nil { + err = h.dumpStatsUpdateToKV(tableID, int(isIndex), q, hist, cms) + if err != nil { + return errors.Trace(err) + } + } + // initialize new feedback + tableID, histID, isIndex = row.GetInt64(0), row.GetInt64(1), row.GetInt64(2) + table, ok := is.TableByID(tableID) + if !ok { + hist, cms = nil, nil + continue + } + tbl := h.GetTableStats(table.Meta()) + if isIndex == 1 { + idx, ok := tbl.Indices[histID] + if !ok { + hist, cms = nil, nil + continue + } + hist = &idx.Histogram + cms = idx.CMSketch.copy() + } else { + col, ok := tbl.Columns[histID] + if !ok { + hist, cms = nil, nil + continue + } + hist = &col.Histogram + cms = nil + } + err = decodeFeedback(row.GetBytes(3), q, cms) + if err != nil { + log.Debugf("decode feedback failed, err: %v", errors.ErrorStack(err)) + } + } + // dump the last feedback into kv + err = h.dumpStatsUpdateToKV(tableID, int(isIndex), q, hist, cms) + return errors.Trace(err) +} + +func (h *Handle) dumpStatsUpdateToKV(tableID int64, isIndex int, q *QueryFeedback, hist *Histogram, cms *CMSketch) error { + hist = UpdateHistogram(hist, q) + err := SaveStatsToStorage(h.ctx, tableID, -1, isIndex, hist, cms) + if err != nil { + return errors.Trace(err) + } + h.ctx.GetSessionVars().BatchDelete = true + sql := fmt.Sprintf("delete from mysql.stats_feedback where table_id = %d and hist_id = %d and is_index = %d", tableID, hist.ID, isIndex) + _, err = h.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + q.feedback = q.feedback[:0] + return errors.Trace(err) +} + const ( // StatsOwnerKey is the stats owner path that is saved to etcd. StatsOwnerKey = "/tidb/stats/owner" diff --git a/statistics/update_test.go b/statistics/update_test.go index e1c0c22f1bdbe..391a8dd9aa531 100644 --- a/statistics/update_test.go +++ b/statistics/update_test.go @@ -397,7 +397,7 @@ func (s *testStatsUpdateSuite) TestQueryFeedback(c *C) { defer cleanEnv(c, s.store, s.do) testKit := testkit.NewTestKit(c, s.store) testKit.MustExec("use test") - testKit.MustExec("create table t (a int, b int, primary key(a), index idx(b))") + testKit.MustExec("create table t (a bigint(64), b bigint(64), primary key(a), index idx(b))") testKit.MustExec("insert into t values (1,2),(2,2),(4,5)") testKit.MustExec("analyze table t") testKit.MustExec("insert into t values (3,4)") @@ -416,6 +416,7 @@ func (s *testStatsUpdateSuite) TestQueryFeedback(c *C) { idxCols int }{ { + // test primary key feedback sql: "select * from t where t.a <= 5", hist: "column:1 ndv:3\n" + "num: 1\tlower_bound: 1\tupper_bound: 1\trepeats: 1\n" + @@ -424,6 +425,7 @@ func (s *testStatsUpdateSuite) TestQueryFeedback(c *C) { idxCols: 0, }, { + // test index feedback by double read sql: "select * from t use index(idx) where t.b <= 5", hist: "index:1 ndv:2\n" + "num: 2\tlower_bound: 2\tupper_bound: 2\trepeats: 2\n" + @@ -431,6 +433,7 @@ func (s *testStatsUpdateSuite) TestQueryFeedback(c *C) { idxCols: 1, }, { + // test index feedback by single read sql: "select b from t use index(idx) where t.b <= 5", hist: "index:1 ndv:2\n" + "num: 2\tlower_bound: 2\tupper_bound: 2\trepeats: 2\n" + @@ -446,9 +449,24 @@ func (s *testStatsUpdateSuite) TestQueryFeedback(c *C) { if t.idxCols == 0 { c.Assert(feedback[0].DecodeInt(), IsNil) } - c.Assert(statistics.UpdateHistogram(feedback[0].Hist(), feedback).ToString(t.idxCols), Equals, t.hist) + c.Assert(statistics.UpdateHistogram(feedback[0].Hist(), feedback[0]).ToString(t.idxCols), Equals, t.hist) } + for _, t := range tests { + testKit.MustQuery(t.sql) + } + c.Assert(h.DumpStatsDeltaToKV(), IsNil) + c.Assert(h.DumpStatsFeedbackToKV(), IsNil) + c.Assert(h.HandleUpdateStats(s.do.InfoSchema()), IsNil) + is := s.do.InfoSchema() + table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + c.Assert(err, IsNil) + h.Update(s.do.InfoSchema()) + tblInfo := table.Meta() + tbl := h.GetTableStats(tblInfo) + c.Assert(tbl.Columns[tblInfo.Columns[0].ID].ToString(0), Equals, tests[0].hist) + c.Assert(tbl.Indices[tblInfo.Indices[0].ID].ToString(1), Equals, tests[1].hist) + // Feedback from limit executor may not be accurate. testKit.MustQuery("select * from t where t.a <= 2 limit 1") h.DumpStatsDeltaToKV() diff --git a/tidb-server/main.go b/tidb-server/main.go index 23e7a8ad6d423..24d7fbc936ddc 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -365,6 +365,7 @@ func setGlobalVars() { session.SetStatsLease(statsLeaseDuration) domain.RunAutoAnalyze = cfg.Performance.RunAutoAnalyze statistics.FeedbackProbability = cfg.Performance.FeedbackProbability + statistics.MaxQueryFeedbackCount = int(cfg.Performance.QueryFeedbackLimit) plan.RatioOfPseudoEstimate = cfg.Performance.PseudoEstimateRatio ddl.RunWorker = cfg.RunDDL ddl.EnableSplitTableRegion = cfg.SplitTable