Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: merge the partition-level histograms to a global-level histogram #22603

Merged
merged 12 commits into from
Feb 19, 2021
3 changes: 2 additions & 1 deletion executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ func (e *AnalyzeExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
if needGlobalStats {
for globalStatsID, info := range globalStatsMap {
globalStats, err := statsHandle.MergePartitionStats2GlobalStats(infoschema.GetInfoSchema(e.ctx), globalStatsID.tableID, info.isIndex, info.idxID)
sc := e.ctx.GetSessionVars().StmtCtx
globalStats, err := statsHandle.MergePartitionStats2GlobalStats(sc, infoschema.GetInfoSchema(e.ctx), globalStatsID.tableID, info.isIndex, info.idxID)
if err != nil {
return err
}
Expand Down
10 changes: 8 additions & 2 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ type GlobalStats struct {
}

// MergePartitionStats2GlobalStats merge the partition-level stats to global-level stats based on the tableID.
func (h *Handle) MergePartitionStats2GlobalStats(is infoschema.InfoSchema, physicalID int64, isIndex int, idxID int64) (globalStats *GlobalStats, err error) {
func (h *Handle) MergePartitionStats2GlobalStats(sc *stmtctx.StatementContext, is infoschema.InfoSchema, physicalID int64, isIndex int, idxID int64) (globalStats *GlobalStats, err error) {
// get the partition table IDs
h.mu.Lock()
globalTable, ok := h.getTableByPhysicalID(is, physicalID)
Expand Down Expand Up @@ -389,7 +389,13 @@ func (h *Handle) MergePartitionStats2GlobalStats(is infoschema.InfoSchema, physi
}

// Merge histogram
err = errors.Errorf("TODO: The merge function of the histogram structure has not been implemented yet")
globalStats.Hg[i], err = statistics.MergePartitionHist2GlobalHist(sc, allHg[i], 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we explicitly set the number of histogram buckets (the third parameter of the MergePartitionHist2GlobalHist function)?

if err != nil {
return
}

// Merge NDV
err = errors.Errorf("TODO: The merge function of the NDV has not been implemented yet")
if err != nil {
return
}
Expand Down
307 changes: 307 additions & 0 deletions statistics/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -1495,3 +1495,310 @@ func (hg *Histogram) ExtractTopN(cms *CMSketch, topN *TopN, numCols int, numTopN
topN.Sort()
return nil
}

// bucket4Merging is only used for merging partition hists to global hist.
type bucket4Merging struct {
lower *types.Datum
upper *types.Datum
Bucket
// disjointNDV is used for merging bucket NDV, see mergeBucketNDV for more details.
disjointNDV int64
}

func newBucket4Meging() *bucket4Merging {
return &bucket4Merging{
lower: new(types.Datum),
upper: new(types.Datum),
Bucket: Bucket{
Repeat: 0,
NDV: 0,
Count: 0,
},
disjointNDV: 0,
}
}

// buildBucket4Merging builds bucket4Merging from Histogram
// Notice: Count in Histogram.Buckets is prefix sum but in bucket4Merging is not.
func (hg *Histogram) buildBucket4Merging() []*bucket4Merging {
buckets := make([]*bucket4Merging, 0, hg.Len())
for i := 0; i < hg.Len(); i++ {
b := newBucket4Meging()
hg.GetLower(i).Copy(b.lower)
hg.GetUpper(i).Copy(b.upper)
b.Repeat = hg.Buckets[i].Repeat
b.NDV = hg.Buckets[i].NDV
b.Count = hg.Buckets[i].Count
if i != 0 {
b.Count -= hg.Buckets[i-1].Count
}
buckets = append(buckets, b)
}
return buckets
}

func (b *bucket4Merging) Clone() bucket4Merging {
return bucket4Merging{
lower: b.lower.Clone(),
upper: b.upper.Clone(),
Bucket: Bucket{
Repeat: b.Repeat,
NDV: b.NDV,
Count: b.Count,
},
disjointNDV: b.disjointNDV,
}
}

// mergeBucketNDV merges bucket NDV from tow bucket `right` & `left`.
// Before merging, you need to make sure that when using (upper, lower) as the comparison key, `right` is greater than `left`
func mergeBucketNDV(sc *stmtctx.StatementContext, left *bucket4Merging, right *bucket4Merging) (*bucket4Merging, error) {
res := right.Clone()
if left.NDV == 0 {
return &res, nil
}
if right.NDV == 0 {
res.lower = left.lower.Clone()
res.upper = left.upper.Clone()
res.NDV = left.NDV
return &res, nil
}
upperCompare, err := right.upper.CompareDatum(sc, left.upper)
if err != nil {
return nil, err
}
// __right__|
// _______left____|
// illegal order.
if upperCompare < 0 {
return nil, errors.Errorf("illegal bucket order")
}
// ___right_|
// ___left__|
// They have the same upper.
if upperCompare == 0 {
lowerCompare, err := right.lower.CompareDatum(sc, left.lower)
if err != nil {
return nil, err
}
// |____right____|
// |__left____|
// illegal order.
if lowerCompare < 0 {
return nil, errors.Errorf("illegal bucket order")
}
// |___right___|
// |____left___|
// ndv = max(right.ndv, left.ndv)
if lowerCompare == 0 {
if left.NDV > right.NDV {
res.NDV = left.NDV
}
return &res, nil
}
// |_right_|
// |_____left______|
// |-ratio-|
// ndv = ratio * left.ndv + max((1-ratio) * left.ndv, right.ndv)
ratio := calcFraction4Datums(left.lower, left.upper, right.lower)
res.NDV = int64(ratio*float64(left.NDV) + math.Max((1-ratio)*float64(left.NDV), float64(right.NDV)))
res.lower = left.lower.Clone()
return &res, nil
}
// ____right___|
// ____left__|
// right.upper > left.upper
lowerCompareUpper, err := right.lower.CompareDatum(sc, left.upper)
if err != nil {
return nil, err
}
// |_right_|
// |___left____|
// `left` and `right` do not intersect
// We add right.ndv in `disjointNDV`, and let `right.ndv = left.ndv` be used for subsequent merge.
// This is because, for the merging of many buckets, we merge them from back to front.
if lowerCompareUpper >= 0 {
res.upper = left.upper.Clone()
res.lower = left.lower.Clone()
res.disjointNDV += right.NDV
res.NDV = left.NDV
return &res, nil
}
upperRatio := calcFraction4Datums(right.lower, right.upper, left.upper)
lowerCompare, err := right.lower.CompareDatum(sc, left.lower)
if err != nil {
return nil, err
}
// |-upperRatio-|
// |_______right_____|
// |_______left______________|
// |-lowerRatio-|
// ndv = lowerRatio * left.ndv
// + max((1-lowerRatio) * left.ndv, upperRatio * right.ndv)
// + (1-upperRatio) * right.ndv
if lowerCompare >= 0 {
lowerRatio := calcFraction4Datums(left.lower, left.upper, right.lower)
res.NDV = int64(lowerRatio*float64(left.NDV) +
math.Max((1-lowerRatio)*float64(left.NDV), upperRatio*float64(right.NDV)) +
(1-upperRatio)*float64(right.NDV))
res.lower = left.lower.Clone()
return &res, nil
}
// |------upperRatio--------|
// |-lowerRatio-|
// |____________right______________|
// |___left____|
// ndv = lowerRatio * right.ndv
// + max(left.ndv + (upperRatio - lowerRatio) * right.ndv)
// + (1-upperRatio) * right.ndv
lowerRatio := calcFraction4Datums(right.lower, right.upper, left.lower)
res.NDV = int64(lowerRatio*float64(right.NDV) +
math.Max(float64(left.NDV), (upperRatio-lowerRatio)*float64(right.NDV)) +
(1-upperRatio)*float64(right.NDV))
return &res, nil
}

// mergeParitionBuckets merges buckets[l...r) to one global bucket.
// global bucket:
// upper = buckets[r-1].upper
// count = sum of buckets[l...r).count
// repeat = sum of buckets[i] (buckets[i].upper == global bucket.upper && i in [l...r))
// ndv = merge bucket ndv from r-1 to l by mergeBucketNDV
// Notice: lower is not calculated here.
func mergePartitionBuckets(sc *stmtctx.StatementContext, buckets []*bucket4Merging) (*bucket4Merging, error) {
if len(buckets) == 0 {
return nil, errors.Errorf("not enough buckets to merge")
Copy link
Contributor

@Reminiscent Reminiscent Feb 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we return the error here? Or check whether bucket4Merging is nil in the calling function.

}
res := bucket4Merging{}
res.upper = buckets[len(buckets)-1].upper.Clone()
right := buckets[len(buckets)-1].Clone()
for i := len(buckets) - 1; i >= 0; i-- {
res.Count += buckets[i].Count
compare, err := buckets[i].upper.CompareDatum(sc, res.upper)
if err != nil {
return nil, err
}
if compare == 0 {
res.Repeat += buckets[i].Repeat
}
if i != len(buckets)-1 {
tmp, err := mergeBucketNDV(sc, buckets[i], &right)
if err != nil {
return nil, err
}
right = *tmp
}
}
res.NDV = right.NDV + right.disjointNDV
return &res, nil
}

// MergePartitionHist2GlobalHist merges hists (partition-level Histogram) to a global-level Histogram
// Notice: If expBucketNumber == 0, we will let expBucketNumber = max(hists.Len())
func MergePartitionHist2GlobalHist(sc *stmtctx.StatementContext, hists []*Histogram, expBucketNumber int64) (*Histogram, error) {
var totCount, totNull, bucketNumber, totColSize int64
needBucketNumber := false
if expBucketNumber == 0 {
needBucketNumber = true
}
// minValue is used to calc the bucket lower.
var minValue *types.Datum
for _, hist := range hists {
totColSize += hist.TotColSize
totNull += hist.NullCount
bucketNumber += int64(hist.Len())
if hist.Len() > 0 {
totCount += hist.Buckets[hist.Len()-1].Count
if needBucketNumber && int64(hist.Len()) > expBucketNumber {
expBucketNumber = int64(hist.Len())
}
if minValue == nil {
minValue = hist.GetLower(0).Clone()
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the continue occurs, the expBucketNumber can not be updated.

}
res, err := hist.GetLower(0).CompareDatum(sc, minValue)
if err != nil {
return nil, err
}
if res < 0 {
minValue = hist.GetLower(0).Clone()
}
}
}
buckets := make([]*bucket4Merging, 0, bucketNumber)
globalBuckets := make([]*bucket4Merging, 0, expBucketNumber)

// init `buckets`.
for _, hist := range hists {
buckets = append(buckets, hist.buildBucket4Merging()...)
}
var sortError error
sort.Slice(buckets, func(i, j int) bool {
res, err := buckets[i].upper.CompareDatum(sc, buckets[j].upper)
if err != nil {
sortError = err
}
if res != 0 {
return res < 0
}
res, err = buckets[i].lower.CompareDatum(sc, buckets[j].lower)
if err != nil {
sortError = err
}
return res < 0
})
if sortError != nil {
return nil, sortError
}
var sum int64
r := len(buckets)
bucketCount := int64(1)
for i := len(buckets) - 1; i >= 0; i-- {
sum += buckets[i].Count
if sum >= totCount*bucketCount/expBucketNumber {
// if the buckets have the same upper, we merge them into the same new buckets.
for ; i > 0; i-- {
res, err := buckets[i-1].upper.CompareDatum(sc, buckets[i].upper)
if err != nil {
return nil, err
}
if res != 0 {
break
}
}
merged, err := mergePartitionBuckets(sc, buckets[i:r])
if err != nil {
return nil, err
}
globalBuckets = append(globalBuckets, merged)
r = i
bucketCount++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set sum = 0 here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to set sum = 0, because we use sum >= totCount*bucketCount/expBucketNumber as the judgment condition.

}
}
if r > 0 {
merged, err := mergePartitionBuckets(sc, buckets[0:r])
if err != nil {
return nil, err
}
globalBuckets = append(globalBuckets, merged)
}
// Because we merge backwards, we need to flip the slices.
for i, j := 0, len(globalBuckets)-1; i < j; i, j = i+1, j-1 {
globalBuckets[i], globalBuckets[j] = globalBuckets[j], globalBuckets[i]
}

// Calc the bucket lower.
if minValue == nil {
return nil, errors.Errorf("merge partition-level hist failed")
}
globalBuckets[0].lower = minValue.Clone()
for i := 1; i < len(globalBuckets); i++ {
globalBuckets[i].lower = globalBuckets[i-1].upper.Clone()
globalBuckets[i].Count = globalBuckets[i].Count + globalBuckets[i-1].Count
}
globalHist := NewHistogram(hists[0].ID, 0, totNull, hists[0].LastUpdateVersion, hists[0].Tp, len(globalBuckets), totColSize)
for _, bucket := range globalBuckets {
globalHist.AppendBucketWithNDV(bucket.lower, bucket.upper, bucket.Count, bucket.Repeat, bucket.NDV)
}
return globalHist, nil
}
Loading