Skip to content

Commit

Permalink
stats: handle feedback updates for topn items (#11507)
Browse files Browse the repository at this point in the history
All tests passed, auto merged by Bot
  • Loading branch information
sre-bot authored Jul 30, 2019
1 parent aeeeb15 commit 761ba58
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 7 deletions.
24 changes: 17 additions & 7 deletions statistics/feedback.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/ranger"
"github.com/spaolacci/murmur3"
"go.uber.org/atomic"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -690,8 +689,11 @@ func buildNewHistogram(h *Histogram, buckets []bucket) *Histogram {
type queryFeedback struct {
IntRanges []int64
// HashValues is the murmur hash values for each index point.
// Note that index points will be stored in `IndexPoints`, we keep it here only for compatibility.
HashValues []uint64
IndexRanges [][]byte
// IndexPoints stores the value of each equal condition.
IndexPoints [][]byte
// Counts is the number of scan keys in each range. It first stores the count for `IntRanges`, `IndexRanges` or `ColumnRanges`.
// After that, it stores the Ranges for `HashValues`.
Counts []int64
Expand Down Expand Up @@ -724,8 +726,7 @@ func encodeIndexFeedback(q *QueryFeedback) *queryFeedback {
var pointCounts []int64
for _, fb := range q.Feedback {
if bytes.Compare(kv.Key(fb.Lower.GetBytes()).PrefixNext(), fb.Upper.GetBytes()) >= 0 {
h1, h2 := murmur3.Sum128(fb.Lower.GetBytes())
pb.HashValues = append(pb.HashValues, h1, h2)
pb.IndexPoints = append(pb.IndexPoints, fb.Lower.GetBytes())
pointCounts = append(pointCounts, fb.Count)
} else {
pb.IndexRanges = append(pb.IndexRanges, fb.Lower.GetBytes(), fb.Upper.GetBytes())
Expand Down Expand Up @@ -788,9 +789,18 @@ func decodeFeedbackForIndex(q *QueryFeedback, pb *queryFeedback, c *CMSketch) {
if c != nil {
// decode the index point feedback, just set value count in CM Sketch
start := len(pb.IndexRanges) / 2
for i := 0; i < len(pb.HashValues); i += 2 {
// TODO: update using raw bytes instead of hash values.
c.setValue(pb.HashValues[i], pb.HashValues[i+1], uint64(pb.Counts[start+i/2]))
if len(pb.HashValues) > 0 {
// It needs raw values to update the top n, so just skip it here.
if len(c.topN) > 0 {
return
}
for i := 0; i < len(pb.HashValues); i += 2 {
c.setValue(pb.HashValues[i], pb.HashValues[i+1], uint64(pb.Counts[start+i/2]))
}
return
}
for i := 0; i < len(pb.IndexPoints); i++ {
c.updateValueBytes(pb.IndexPoints[i], uint64(pb.Counts[start+i]))
}
}
}
Expand Down Expand Up @@ -860,7 +870,7 @@ func DecodeFeedback(val []byte, q *QueryFeedback, c *CMSketch, ft *types.FieldTy
if err != nil {
return errors.Trace(err)
}
if len(pb.IndexRanges) > 0 || len(pb.HashValues) > 0 {
if len(pb.IndexRanges) > 0 || len(pb.HashValues) > 0 || len(pb.IndexPoints) > 0 {
decodeFeedbackForIndex(q, pb, c)
} else if len(pb.IntRanges) > 0 {
decodeFeedbackForPK(q, pb, mysql.HasUnsignedFlag(ft.Flag))
Expand Down
41 changes: 41 additions & 0 deletions statistics/handle/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,47 @@ func (s *testStatsSuite) TestIndexQueryFeedback(c *C) {
}
}

func (s *testStatsSuite) TestIndexQueryFeedback4TopN(c *C) {
defer cleanEnv(c, s.store, s.do)
testKit := testkit.NewTestKit(c, s.store)

oriProbability := statistics.FeedbackProbability
defer func() {
statistics.FeedbackProbability = oriProbability
}()
statistics.FeedbackProbability.Store(1)

testKit.MustExec("use test")
testKit.MustExec("create table t (a bigint(64), index idx(a))")
for i := 0; i < 20; i++ {
testKit.MustExec(`insert into t values (1)`)
}
h := s.do.StatsHandle()
h.HandleDDLEvent(<-h.DDLEventCh())
c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil)
testKit.MustExec("set @@tidb_enable_fast_analyze = 1")
testKit.MustExec("analyze table t with 3 buckets")
for i := 0; i < 20; i++ {
testKit.MustExec(`insert into t values (1)`)
}
c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil)
is := s.do.InfoSchema()
c.Assert(h.Update(is), IsNil)
table, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tblInfo := table.Meta()

testKit.MustQuery("select * from t use index(idx) where a = 1")
c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil)
c.Assert(h.DumpStatsFeedbackToKV(), IsNil)
c.Assert(h.HandleUpdateStats(s.do.InfoSchema()), IsNil)
c.Assert(h.Update(is), IsNil)
tbl := h.GetTableStats(tblInfo)
val, err := codec.EncodeKey(testKit.Se.GetSessionVars().StmtCtx, nil, types.NewIntDatum(1))
c.Assert(err, IsNil)
c.Assert(tbl.Indices[1].CMSketch.QueryBytes(val), Equals, uint64(40))
}

func (s *testStatsSuite) TestAbnormalIndexFeedback(c *C) {
defer cleanEnv(c, s.store, s.do)
testKit := testkit.NewTestKit(c, s.store)
Expand Down

0 comments on commit 761ba58

Please sign in to comment.