Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: merge partition-level TopN to global-level TopN #22433

Merged
merged 18 commits into from
Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (e *AnalyzeIndexExec) buildStatsFromResult(result distsql.SelectResult, nee
if err := cms.MergeCMSketch(cm); err != nil {
return nil, nil, nil, err
}
statistics.MergeTopN(topn, tmpTopN, cms, uint32(e.opts[ast.AnalyzeOptNumTopN]), false)
statistics.MergeTopNAndUpdateCMSketch(topn, tmpTopN, cms, uint32(e.opts[ast.AnalyzeOptNumTopN]))
}
}
}
Expand Down Expand Up @@ -1157,7 +1157,7 @@ func (e *AnalyzeFastExec) buildIndexStats(idxInfo *model.IndexInfo, collector *s
if err != nil {
return nil, nil, nil, err
}
statistics.MergeTopN(topN, curTopN, cmSketch, numTop, false)
statistics.MergeTopNAndUpdateCMSketch(topN, curTopN, cmSketch, numTop)
}
// Build Histogram.
hist, err := statistics.BuildColumnHist(e.ctx, int64(e.opts[ast.AnalyzeOptNumBuckets]), idxInfo.ID, collector, types.NewFieldType(mysql.TypeBlob), rowCount, int64(ndv), collector.NullCount*int64(scaleRatio))
Expand Down Expand Up @@ -1312,7 +1312,7 @@ func analyzeIndexIncremental(idxExec *analyzeIndexIncrementalExec) analyzeResult
cms.CalcDefaultValForAnalyze(uint64(hist.NDV))
}
if statsVer == statistics.Version2 {
poped := statistics.MergeTopN(topN, idxExec.oldTopN, cms, uint32(idxExec.opts[ast.AnalyzeOptNumTopN]), false)
poped := statistics.MergeTopNAndUpdateCMSketch(topN, idxExec.oldTopN, cms, uint32(idxExec.opts[ast.AnalyzeOptNumTopN]))
hist.AddIdxVals(poped)
}
result := analyzeResult{
Expand Down
100 changes: 64 additions & 36 deletions statistics/cmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,43 +304,18 @@ func (c *CMSketch) queryHashValue(h1, h2 uint64) uint64 {
return uint64(res)
}

// MergeTopN merges the src TopN into the dst, and spilled values will be inserted into the CMSketch.
func MergeTopN(dst, src *TopN, c *CMSketch, numTop uint32, usingMax bool) []TopNMeta {
if dst.TotalCount()+src.TotalCount() == 0 {
return nil
}
popedTopNPair := make([]TopNMeta, 0, 4)
counter := make(map[hack.MutableString]uint64)
for _, meta := range dst.TopN {
counter[hack.String(meta.Encoded)] += meta.Count
}
for _, meta := range src.TopN {
if usingMax {
counter[hack.String(meta.Encoded)] = mathutil.MaxUint64(counter[hack.String(meta.Encoded)], meta.Count)
} else {
counter[hack.String(meta.Encoded)] += meta.Count
}
}
sorted := make([]uint64, len(counter))
for _, cnt := range counter {
sorted = append(sorted, cnt)
}
sort.Slice(sorted, func(i, j int) bool {
return sorted[i] > sorted[j]
})
numTop = mathutil.MinUint32(uint32(len(counter)), numTop)
lastTopCnt := sorted[numTop-1]
dst.TopN = make([]TopNMeta, 0, numTop)
for value, cnt := range counter {
data := hack.Slice(string(value))
if cnt >= lastTopCnt {
dst.AppendTopN(data, cnt)
} else {
popedTopNPair = append(popedTopNPair, TopNMeta{Encoded: data, Count: cnt})
c.InsertBytesByCount(data, cnt)
}
// MergeTopNAndUpdateCMSketch merges the src TopN into the dst, and spilled values will be inserted into the CMSketch.
func MergeTopNAndUpdateCMSketch(dst, src *TopN, c *CMSketch, numTop uint32) []TopNMeta {
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
topNs := []*TopN{src, dst}
mergedTopN, popedTopNPair := MergeTopN(topNs, numTop)
if mergedTopN == nil {
// mergedTopN == nil means the total count of the input TopN are equal to zero
return popedTopNPair
}
dst.TopN = mergedTopN.TopN
for _, topNMeta := range popedTopNPair {
c.InsertBytesByCount(topNMeta.Encoded, topNMeta.Count)
}
dst.Sort()
return popedTopNPair
}

Expand Down Expand Up @@ -652,3 +627,56 @@ func (c *TopN) RemoveVal(val []byte) {
func NewTopN(n int) *TopN {
return &TopN{TopN: make([]TopNMeta, 0, n)}
}

// MergeTopN is used to merge more TopN structures to generate a new TopN struct by the given size.
// The input parameters are multiple TopN structures to be merged and the size of the new TopN that will be generated.
// The output parameters are the newly generated TopN structure and the remaining numbers.
// Notice: The n can be 0. So n has no default value, we must explicitly specify this value.
func MergeTopN(topNs []*TopN, n uint32) (*TopN, []TopNMeta) {
totCnt := uint64(0)
for _, topN := range topNs {
totCnt += topN.TotalCount()
}
if totCnt == 0 {
return nil, nil
}
Comment on lines +636 to +642
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to check this? It seems unnecessary.

// Different TopN structures may hold the same value, we have to merge them.
counter := make(map[hack.MutableString]uint64)
for _, topN := range topNs {
if topN.TotalCount() == 0 {
continue
}
for _, val := range topN.TopN {
counter[hack.String(val.Encoded)] += val.Count
}
}

numTop := len(counter)
if numTop == 0 {
return nil, nil
}
sorted := make([]uint64, numTop)
for _, cnt := range counter {
sorted = append(sorted, cnt)
}
sort.Slice(sorted, func(i, j int) bool {
return sorted[i] > sorted[j]
})
n = mathutil.MinUint32(uint32(numTop), n)
// lastTopCnt is the smallest value in the new TopN structure
lastTopCnt := sorted[numTop-1]

var finalTopN TopN
finalTopN.TopN = make([]TopNMeta, 0, n)
popedTopNPair := make([]TopNMeta, 0, uint32(numTop)-n)
for value, cnt := range counter {
data := hack.Slice(string(value))
if cnt >= lastTopCnt {
finalTopN.AppendTopN(data, cnt)
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
} else {
popedTopNPair = append(popedTopNPair, TopNMeta{Encoded: data, Count: cnt})
}
}
finalTopN.Sort()
return &finalTopN, popedTopNPair
}
87 changes: 87 additions & 0 deletions statistics/cmsketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"math"
"math/rand"
"strconv"
"time"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -303,3 +304,89 @@ func (s *testStatisticsSuite) TestCMSketchCodingTopN(c *C) {
// do not panic
DecodeCMSketchAndTopN([]byte{}, rows)
}

func (s *testStatisticsSuite) TestMergeTopN(c *C) {
tests := []struct {
topnNum int
n int
maxTopNVal int
maxTopNCnt int
}{
{
topnNum: 10,
n: 5,
maxTopNVal: 50,
maxTopNCnt: 100,
},
{
topnNum: 1,
n: 5,
maxTopNVal: 50,
maxTopNCnt: 100,
},
{
topnNum: 5,
n: 5,
maxTopNVal: 5,
maxTopNCnt: 100,
},
{
topnNum: 5,
n: 5,
maxTopNVal: 10,
maxTopNCnt: 100,
},
}
for _, t := range tests {
topnNum, n := t.topnNum, t.n
maxTopNVal, maxTopNCnt := t.maxTopNVal, t.maxTopNCnt

// the number of maxTopNVal should be bigger than n.
ok := maxTopNVal >= n
c.Assert(ok, Equals, true)

topNs := make([]*TopN, 0, topnNum)
res := make(map[int]uint64)
rand.Seed(time.Now().Unix())
for i := 0; i < topnNum; i++ {
topN := NewTopN(n)
occur := make(map[int]bool)
for j := 0; j < n; j++ {
// The range of numbers in the topn structure is in [0, maxTopNVal)
// But there cannot be repeated occurrences of value in a topN structure.
randNum := rand.Intn(maxTopNVal)
for occur[randNum] {
randNum = rand.Intn(maxTopNVal)
}
occur[randNum] = true
tString := []byte(fmt.Sprintf("%d", randNum))
// The range of the number of occurrences in the topn structure is in [0, maxTopNCnt)
randCnt := uint64(rand.Intn(maxTopNCnt))
res[randNum] += randCnt
topNMeta := TopNMeta{tString, randCnt}
topN.TopN = append(topN.TopN, topNMeta)
}
topNs = append(topNs, topN)
}
topN, remainTopN := MergeTopN(topNs, uint32(n))
cnt := len(topN.TopN)
var minTopNCnt uint64
for _, topNMeta := range topN.TopN {
val, err := strconv.Atoi(string(topNMeta.Encoded))
c.Assert(err, IsNil)
c.Assert(topNMeta.Count, Equals, res[val])
minTopNCnt = topNMeta.Count
}
if remainTopN != nil {
cnt += len(remainTopN)
for _, remainTopNMeta := range remainTopN {
val, err := strconv.Atoi(string(remainTopNMeta.Encoded))
c.Assert(err, IsNil)
c.Assert(remainTopNMeta.Count, Equals, res[val])
ok = minTopNCnt > remainTopNMeta.Count
c.Assert(ok, Equals, true)
}
}
c.Assert(cnt, Equals, len(res))
}
}
13 changes: 9 additions & 4 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,10 +382,15 @@ func (h *Handle) MergePartitionStats2GlobalStats(sc *stmtctx.StatementContext, i

// Merge topN. We need to merge TopN before merging the histogram.
// Because after merging TopN, some numbers will be left.
// These left numbers should be inserted into the histogram.
err = errors.Errorf("TODO: The merge function of the topN structure has not been implemented yet")
if err != nil {
return
// These remaining topN numbers will be used as a separate bucket for later histogram merging.
var popedTopN []statistics.TopNMeta
n := uint32(0)
for _, topN := range allTopN[i] {
n = mathutil.MaxUint32(n, uint32(len(topN.TopN)))
}
globalStats.TopN[i], popedTopN = statistics.MergeTopN(allTopN[i], n)
if len(popedTopN) != 0 {
// TODO: use the popedTopN as a bucket for later histogram merging.
}

// Merge histogram
Expand Down
4 changes: 2 additions & 2 deletions statistics/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ func (s *testStatsSuite) TestBuildGlobalLevelStats(c *C) {
// Test the 'dynamic-only' mode
testKit.MustExec("set @@tidb_partition_prune_mode = 'dynamic-only';")
err := testKit.ExecToErr("analyze table t, t1;")
c.Assert(err.Error(), Equals, "TODO: The merge function of the topN structure has not been implemented yet")
c.Assert(err.Error(), Equals, "TODO: The merge function of the NDV has not been implemented yet")
result = testKit.MustQuery("show stats_meta where table_name = 't'").Sort()
c.Assert(len(result.Rows()), Equals, 3)
c.Assert(result.Rows()[0][5], Equals, "1")
Expand All @@ -722,7 +722,7 @@ func (s *testStatsSuite) TestBuildGlobalLevelStats(c *C) {
c.Assert(len(result.Rows()), Equals, 1)

err = testKit.ExecToErr("analyze table t index idx_t_ab, idx_t_b;")
c.Assert(err.Error(), Equals, "TODO: The merge function of the topN structure has not been implemented yet")
c.Assert(err.Error(), Equals, "TODO: The merge function of the NDV has not been implemented yet")
result = testKit.MustQuery("show stats_meta where table_name = 't'").Sort()
c.Assert(len(result.Rows()), Equals, 3)
c.Assert(result.Rows()[0][5], Equals, "1")
Expand Down