Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: merge partition-level TopN to global-level TopN #22433

Merged
merged 18 commits into from
Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (e *AnalyzeIndexExec) buildStatsFromResult(result distsql.SelectResult, nee
if err := cms.MergeCMSketch(cm); err != nil {
return nil, nil, nil, err
}
statistics.MergeTopN(topn, tmpTopN, cms, uint32(e.opts[ast.AnalyzeOptNumTopN]), false)
statistics.MergeTopNAndUpdateCMSketch(topn, tmpTopN, cms, uint32(e.opts[ast.AnalyzeOptNumTopN]))
}
}
}
Expand Down Expand Up @@ -1157,7 +1157,7 @@ func (e *AnalyzeFastExec) buildIndexStats(idxInfo *model.IndexInfo, collector *s
if err != nil {
return nil, nil, nil, err
}
statistics.MergeTopN(topN, curTopN, cmSketch, numTop, false)
statistics.MergeTopNAndUpdateCMSketch(topN, curTopN, cmSketch, numTop)
}
// Build Histogram.
hist, err := statistics.BuildColumnHist(e.ctx, int64(e.opts[ast.AnalyzeOptNumBuckets]), idxInfo.ID, collector, types.NewFieldType(mysql.TypeBlob), rowCount, int64(ndv), collector.NullCount*int64(scaleRatio))
Expand Down Expand Up @@ -1312,7 +1312,7 @@ func analyzeIndexIncremental(idxExec *analyzeIndexIncrementalExec) analyzeResult
cms.CalcDefaultValForAnalyze(uint64(hist.NDV))
}
if statsVer == statistics.Version2 {
poped := statistics.MergeTopN(topN, idxExec.oldTopN, cms, uint32(idxExec.opts[ast.AnalyzeOptNumTopN]), false)
poped := statistics.MergeTopNAndUpdateCMSketch(topN, idxExec.oldTopN, cms, uint32(idxExec.opts[ast.AnalyzeOptNumTopN]))
hist.AddIdxVals(poped)
}
result := analyzeResult{
Expand Down
101 changes: 65 additions & 36 deletions statistics/cmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,43 +304,18 @@ func (c *CMSketch) queryHashValue(h1, h2 uint64) uint64 {
return uint64(res)
}

// MergeTopN merges the src TopN into the dst, and spilled values will be inserted into the CMSketch.
func MergeTopN(dst, src *TopN, c *CMSketch, numTop uint32, usingMax bool) []TopNMeta {
if dst.TotalCount()+src.TotalCount() == 0 {
return nil
}
popedTopNPair := make([]TopNMeta, 0, 4)
counter := make(map[hack.MutableString]uint64)
for _, meta := range dst.TopN {
counter[hack.String(meta.Encoded)] += meta.Count
}
for _, meta := range src.TopN {
if usingMax {
counter[hack.String(meta.Encoded)] = mathutil.MaxUint64(counter[hack.String(meta.Encoded)], meta.Count)
} else {
counter[hack.String(meta.Encoded)] += meta.Count
}
// MergeTopNAndUpdateCMSketch merges the src TopN into the dst, and spilled values will be inserted into the CMSketch.
func MergeTopNAndUpdateCMSketch(dst, src *TopN, c *CMSketch, numTop uint32) []TopNMeta {
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
topNs := []*TopN{src, dst}
mergedTopN, popedTopNPair := MergeTopN(topNs, numTop)
if mergedTopN == nil {
// mergedTopN == nil means the total count of the input TopN are equal to zero
return popedTopNPair
}
dst.TopN = mergedTopN.TopN
for _, topNMeta := range popedTopNPair {
c.InsertBytesByCount(topNMeta.Encoded, topNMeta.Count)
}
sorted := make([]uint64, len(counter))
for _, cnt := range counter {
sorted = append(sorted, cnt)
}
sort.Slice(sorted, func(i, j int) bool {
return sorted[i] > sorted[j]
})
numTop = mathutil.MinUint32(uint32(len(counter)), numTop)
lastTopCnt := sorted[numTop-1]
dst.TopN = make([]TopNMeta, 0, numTop)
for value, cnt := range counter {
data := hack.Slice(string(value))
if cnt >= lastTopCnt {
dst.AppendTopN(data, cnt)
} else {
popedTopNPair = append(popedTopNPair, TopNMeta{Encoded: data, Count: cnt})
c.InsertBytesByCount(data, cnt)
}
}
dst.Sort()
return popedTopNPair
}

Expand Down Expand Up @@ -652,3 +627,57 @@ func (c *TopN) RemoveVal(val []byte) {
func NewTopN(n int) *TopN {
return &TopN{TopN: make([]TopNMeta, 0, n)}
}

// MergeTopN is used to merge more TopN structures to generate a new TopN struct by the given size.
// The input parameters are multiple TopN structures to be merged and the size of the new TopN that will be generated.
// The output parameters are the newly generated TopN structure and the remaining numbers.
// Notice: If n == 0, we will let n = max(len(TopN.TopN))
func MergeTopN(topNs []*TopN, n uint32) (*TopN, []TopNMeta) {
needTopNNum := false
if n == 0 {
needTopNNum = true
}
Reminiscent marked this conversation as resolved.
Show resolved Hide resolved
totCnt := uint64(0)
for _, topN := range topNs {
totCnt += topN.TotalCount()
}
if totCnt == 0 {
return nil, nil
}
Comment on lines +636 to +642
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to check this? It seems unnecessary.

// Different TopN structures may hold the same value, we have to merge them.
counter := make(map[hack.MutableString]uint64)
for _, topN := range topNs {
if needTopNNum && uint32(len(topN.TopN)) > n {
n = uint32(len(topN.TopN))
}
for _, val := range topN.TopN {
counter[hack.String(val.Encoded)] += val.Count
}
}

numTop := len(counter)
sorted := make([]uint64, numTop)
for _, cnt := range counter {
sorted = append(sorted, cnt)
}
sort.Slice(sorted, func(i, j int) bool {
return sorted[i] > sorted[j]
})
n = mathutil.MinUint32(uint32(numTop), n)
// lastTopCnt is the smallest value in the new TopN structure
lastTopCnt := sorted[numTop-1]

var finalTopN TopN
finalTopN.TopN = make([]TopNMeta, 0, n)
popedTopNPair := make([]TopNMeta, 0, uint32(numTop)-n)
for value, cnt := range counter {
data := hack.Slice(string(value))
if cnt >= lastTopCnt {
finalTopN.AppendTopN(data, cnt)
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
} else {
popedTopNPair = append(popedTopNPair, TopNMeta{Encoded: data, Count: cnt})
}
}
finalTopN.Sort()
return &finalTopN, popedTopNPair
}
87 changes: 87 additions & 0 deletions statistics/cmsketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"math"
"math/rand"
"strconv"
"time"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -303,3 +304,89 @@ func (s *testStatisticsSuite) TestCMSketchCodingTopN(c *C) {
// do not panic
DecodeCMSketchAndTopN([]byte{}, rows)
}

func (s *testStatisticsSuite) TestMergeTopN(c *C) {
tests := []struct {
topnNum int
n int
maxTopNVal int
maxTopNCnt int
}{
{
topnNum: 10,
n: 5,
maxTopNVal: 50,
maxTopNCnt: 100,
},
{
topnNum: 1,
n: 5,
maxTopNVal: 50,
maxTopNCnt: 100,
},
{
topnNum: 5,
n: 5,
maxTopNVal: 5,
maxTopNCnt: 100,
},
{
topnNum: 5,
n: 5,
maxTopNVal: 10,
maxTopNCnt: 100,
},
}
for _, t := range tests {
topnNum, n := t.topnNum, t.n
maxTopNVal, maxTopNCnt := t.maxTopNVal, t.maxTopNCnt

// the number of maxTopNVal should be bigger than n.
ok := maxTopNVal >= n
c.Assert(ok, Equals, true)

topNs := make([]*TopN, 0, topnNum)
res := make(map[int]uint64)
rand.Seed(time.Now().Unix())
for i := 0; i < topnNum; i++ {
topN := NewTopN(n)
occur := make(map[int]bool)
for j := 0; j < n; j++ {
// The range of numbers in the topn structure is in [0, maxTopNVal)
// But there cannot be repeated occurrences of value in a topN structure.
randNum := rand.Intn(maxTopNVal)
for occur[randNum] {
randNum = rand.Intn(maxTopNVal)
}
occur[randNum] = true
tString := []byte(fmt.Sprintf("%d", randNum))
// The range of the number of occurrences in the topn structure is in [0, maxTopNCnt)
randCnt := uint64(rand.Intn(maxTopNCnt))
res[randNum] += randCnt
topNMeta := TopNMeta{tString, randCnt}
topN.TopN = append(topN.TopN, topNMeta)
}
topNs = append(topNs, topN)
}
topN, remainTopN := MergeTopN(topNs, uint32(n))
cnt := len(topN.TopN)
var minTopNCnt uint64
for _, topNMeta := range topN.TopN {
val, err := strconv.Atoi(string(topNMeta.Encoded))
c.Assert(err, IsNil)
c.Assert(topNMeta.Count, Equals, res[val])
minTopNCnt = topNMeta.Count
}
if remainTopN != nil {
cnt += len(remainTopN)
for _, remainTopNMeta := range remainTopN {
val, err := strconv.Atoi(string(remainTopNMeta.Encoded))
c.Assert(err, IsNil)
c.Assert(remainTopNMeta.Count, Equals, res[val])
ok = minTopNCnt > remainTopNMeta.Count
c.Assert(ok, Equals, true)
}
}
c.Assert(cnt, Equals, len(res))
}
}
19 changes: 14 additions & 5 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,13 +330,15 @@ func (h *Handle) MergePartitionStats2GlobalStats(sc *stmtctx.StatementContext, i
allHg := make([][]*statistics.Histogram, globalStats.Num)
allCms := make([][]*statistics.CMSketch, globalStats.Num)
allTopN := make([][]*statistics.TopN, globalStats.Num)
allID := make([]int64, 0, globalStats.Num)
allFieldType := make([]*types.FieldType, 0, globalStats.Num)
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
for i := 0; i < globalStats.Num; i++ {
allHg[i] = make([]*statistics.Histogram, 0, partitionNum)
allCms[i] = make([]*statistics.CMSketch, 0, partitionNum)
allTopN[i] = make([]*statistics.TopN, 0, partitionNum)
}

for _, partitionID := range partitionIDs {
for idx, partitionID := range partitionIDs {
h.mu.Lock()
partitionTable, ok := h.getTableByPhysicalID(is, partitionID)
h.mu.Unlock()
Expand All @@ -357,9 +359,15 @@ func (h *Handle) MergePartitionStats2GlobalStats(sc *stmtctx.StatementContext, i
globalStats.Count += partitionStats.Count
for i := 0; i < globalStats.Num; i++ {
ID := tableInfo.Columns[i].ID
tp := &tableInfo.Columns[i].FieldType
if isIndex != 0 {
// If the statistics is the index stats, we should use the index ID to replace the column ID.
ID = idxID
tp = types.NewFieldType(mysql.TypeBlob)
}
if idx == 0 {
allID = append(allID, ID)
allFieldType = append(allFieldType, tp)
}
hg, cms, topN := partitionStats.GetStatsInfo(ID, isIndex == 1)
allHg[i] = append(allHg[i], hg)
Expand All @@ -382,10 +390,11 @@ func (h *Handle) MergePartitionStats2GlobalStats(sc *stmtctx.StatementContext, i

// Merge topN. We need to merge TopN before merging the histogram.
// Because after merging TopN, some numbers will be left.
// These left numbers should be inserted into the histogram.
err = errors.Errorf("TODO: The merge function of the topN structure has not been implemented yet")
if err != nil {
return
// These remaining topN numbers will be used as a separate bucket for later histogram merging.
var popedTopN []statistics.TopNMeta
globalStats.TopN[i], popedTopN = statistics.MergeTopN(allTopN[i], 0)
if len(popedTopN) != 0 {
// TODO: use the popedTopN as a bucket for later histogram merging.
}

// Merge histogram
Expand Down
4 changes: 2 additions & 2 deletions statistics/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ func (s *testStatsSuite) TestBuildGlobalLevelStats(c *C) {
// Test the 'dynamic-only' mode
testKit.MustExec("set @@tidb_partition_prune_mode = 'dynamic-only';")
err := testKit.ExecToErr("analyze table t, t1;")
c.Assert(err.Error(), Equals, "TODO: The merge function of the topN structure has not been implemented yet")
c.Assert(err.Error(), Equals, "TODO: The merge function of the NDV structure has not been implemented yet")
result = testKit.MustQuery("show stats_meta where table_name = 't'").Sort()
c.Assert(len(result.Rows()), Equals, 3)
c.Assert(result.Rows()[0][5], Equals, "1")
Expand All @@ -722,7 +722,7 @@ func (s *testStatsSuite) TestBuildGlobalLevelStats(c *C) {
c.Assert(len(result.Rows()), Equals, 1)

err = testKit.ExecToErr("analyze table t index idx_t_ab, idx_t_b;")
c.Assert(err.Error(), Equals, "TODO: The merge function of the topN structure has not been implemented yet")
c.Assert(err.Error(), Equals, "TODO: The merge function of the NDV structure has not been implemented yet")
result = testKit.MustQuery("show stats_meta where table_name = 't'").Sort()
c.Assert(len(result.Rows()), Equals, 3)
c.Assert(result.Rows()[0][5], Equals, "1")
Expand Down