Skip to content

Commit

Permalink
statistics: async merge global stats to reduce memory usage (#47391)
Browse files Browse the repository at this point in the history
close #47219
  • Loading branch information
hawkingrei authored Oct 11, 2023
1 parent 512934e commit 7f9ab7f
Show file tree
Hide file tree
Showing 8 changed files with 678 additions and 207 deletions.
39 changes: 39 additions & 0 deletions statistics/cmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,45 @@ func DecodeCMSketchAndTopN(data []byte, topNRows []chunk.Row) (*CMSketch, *TopN,
return cm, topN, nil
}

// DecodeTopN decodes a TopN from the given byte slice.
func DecodeTopN(topNRows []chunk.Row) (*TopN, error) {
pbTopN := make([]*tipb.CMSketchTopN, 0, len(topNRows))
for _, row := range topNRows {
data := make([]byte, len(row.GetBytes(0)))
copy(data, row.GetBytes(0))
pbTopN = append(pbTopN, &tipb.CMSketchTopN{
Data: data,
Count: row.GetUint64(1),
})
}
return TopNFromProto(pbTopN), nil
}

// DecodeCMSketch encodes the given CMSketch to byte slice.
func DecodeCMSketch(data []byte) (*CMSketch, error) {
if len(data) == 0 {
return nil, nil
}
protoSketch := &tipb.CMSketch{}
err := protoSketch.Unmarshal(data)
if err != nil {
return nil, errors.Trace(err)
}
if len(protoSketch.Rows) == 0 {
return nil, nil
}
c := NewCMSketch(int32(len(protoSketch.Rows)), int32(len(protoSketch.Rows[0].Counters)))
for i, row := range protoSketch.Rows {
c.count = 0
for j, counter := range row.Counters {
c.table[i][j] = counter
c.count = c.count + uint64(counter)
}
}
c.defaultValue = protoSketch.DefaultValue
return c, nil
}

// TotalCount returns the total count in the sketch, it is only used for test.
func (c *CMSketch) TotalCount() uint64 {
if c == nil {
Expand Down
6 changes: 6 additions & 0 deletions statistics/handle/globalstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "globalstats",
srcs = [
"global_stats.go",
"global_stats_async.go",
"merge_worker.go",
"topn.go",
],
Expand All @@ -14,13 +15,18 @@ go_library(
"//parser/ast",
"//parser/model",
"//sessionctx",
"//sessionctx/stmtctx",
"//statistics",
"//statistics/handle/storage",
"//statistics/handle/util",
"//table",
"//types",
"//util/hack",
"//util/logutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_tiancaiamao_gp//:gp",
"@org_golang_x_sync//errgroup",
"@org_uber_go_zap//:zap",
],
)
Expand Down
214 changes: 22 additions & 192 deletions statistics/handle/globalstats/global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@
package globalstats

import (
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"github.com/tiancaiamao/gp"
"go.uber.org/zap"
Expand Down Expand Up @@ -66,9 +63,8 @@ func newGlobalStats(histCount int) *GlobalStats {
}

type (
getTableByPhysicalIDFunc func(is infoschema.InfoSchema, physicalID int64) (table.Table, bool)
loadTablePartitionStatsFunc func(tableInfo *model.TableInfo, partitionDef *model.PartitionDefinition) (*statistics.Table, error)
// GlobalStatusHandler is used to handle the global-level stats.
getTableByPhysicalIDFunc func(is infoschema.InfoSchema, tableID int64) (table.Table, bool)
callWithSCtxFunc func(f func(sctx sessionctx.Context) error, flags ...int) (err error)
)

// MergePartitionStats2GlobalStats merge the partition-level stats to global-level stats based on the tableInfo.
Expand All @@ -80,188 +76,18 @@ func MergePartitionStats2GlobalStats(
globalTableInfo *model.TableInfo,
isIndex bool,
histIDs []int64,
allPartitionStats map[int64]*statistics.Table,
getTableByPhysicalIDFn getTableByPhysicalIDFunc,
loadTablePartitionStatsFn loadTablePartitionStatsFunc,
callWithSCtxFunc callWithSCtxFunc,
) (globalStats *GlobalStats, err error) {
externalCache := false
if allPartitionStats != nil {
externalCache = true
}

partitionNum := len(globalTableInfo.Partition.Definitions)
if len(histIDs) == 0 {
for _, col := range globalTableInfo.Columns {
// The virtual generated column stats can not be merged to the global stats.
if col.IsVirtualGenerated() {
continue
}
histIDs = append(histIDs, col.ID)
}
}

// Initialized the globalStats.
globalStats = newGlobalStats(len(histIDs))

// Slice Dimensions Explanation
// First dimension: Column or Index Stats
// Second dimension: Partition Tables
// Because all topN and histograms need to be collected before they can be merged.
// So we should store all the partition-level stats first, and merge them together.
allHg := make([][]*statistics.Histogram, globalStats.Num)
allCms := make([][]*statistics.CMSketch, globalStats.Num)
allTopN := make([][]*statistics.TopN, globalStats.Num)
allFms := make([][]*statistics.FMSketch, globalStats.Num)
for i := 0; i < globalStats.Num; i++ {
allHg[i] = make([]*statistics.Histogram, 0, partitionNum)
allCms[i] = make([]*statistics.CMSketch, 0, partitionNum)
allTopN[i] = make([]*statistics.TopN, 0, partitionNum)
allFms[i] = make([]*statistics.FMSketch, 0, partitionNum)
}

skipMissingPartitionStats := sc.GetSessionVars().SkipMissingPartitionStats
for _, def := range globalTableInfo.Partition.Definitions {
partitionID := def.ID
partitionTable, ok := getTableByPhysicalIDFn(is, partitionID)
if !ok {
err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", partitionID)
return
}
tableInfo := partitionTable.Meta()
var partitionStats *statistics.Table
var okLoad bool
if allPartitionStats != nil {
partitionStats, okLoad = allPartitionStats[partitionID]
} else {
okLoad = false
}
// If pre-load partition stats isn't provided, then we load partition stats directly and set it into allPartitionStats
if !okLoad {
var err1 error
partitionStats, err1 = loadTablePartitionStatsFn(tableInfo, &def)
if err1 != nil {
if skipMissingPartitionStats && types.ErrPartitionStatsMissing.Equal(err1) {
globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, fmt.Sprintf("partition `%s`", def.Name.L))
continue
}
err = err1
return
}
if externalCache {
allPartitionStats[partitionID] = partitionStats
}
}

for i := 0; i < globalStats.Num; i++ {
// GetStatsInfo will return the copy of the statsInfo, so we don't need to worry about the data race.
// partitionStats will be released after the for loop.
hg, cms, topN, fms, analyzed := partitionStats.GetStatsInfo(histIDs[i], isIndex, externalCache)
skipPartition := false
if !analyzed {
var missingPart string
if !isIndex {
missingPart = fmt.Sprintf("partition `%s` column `%s`", def.Name.L, tableInfo.FindColumnNameByID(histIDs[i]))
} else {
missingPart = fmt.Sprintf("partition `%s` index `%s`", def.Name.L, tableInfo.FindIndexNameByID(histIDs[i]))
}
if !skipMissingPartitionStats {
err = types.ErrPartitionStatsMissing.GenWithStackByArgs(fmt.Sprintf("table `%s` %s", tableInfo.Name.L, missingPart))
return
}
globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, missingPart)
skipPartition = true
}

// Partition stats is not empty but column stats(hist, topN) is missing.
if partitionStats.RealtimeCount > 0 && (hg == nil || hg.TotalRowCount() <= 0) && (topN == nil || topN.TotalCount() <= 0) {
var missingPart string
if !isIndex {
missingPart = fmt.Sprintf("partition `%s` column `%s`", def.Name.L, tableInfo.FindColumnNameByID(histIDs[i]))
} else {
missingPart = fmt.Sprintf("partition `%s` index `%s`", def.Name.L, tableInfo.FindIndexNameByID(histIDs[i]))
}
if !skipMissingPartitionStats {
err = types.ErrPartitionColumnStatsMissing.GenWithStackByArgs(fmt.Sprintf("table `%s` %s", tableInfo.Name.L, missingPart))
return
}
globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, missingPart+" hist and topN")
skipPartition = true
}

if i == 0 {
// In a partition, we will only update globalStats.Count once.
globalStats.Count += partitionStats.RealtimeCount
globalStats.ModifyCount += partitionStats.ModifyCount
}

if !skipPartition {
allHg[i] = append(allHg[i], hg)
allCms[i] = append(allCms[i], cms)
allTopN[i] = append(allTopN[i], topN)
allFms[i] = append(allFms[i], fms)
}
}
worker, err := NewAsyncMergePartitionStats2GlobalStats(gpool, globalTableInfo, histIDs, is, getTableByPhysicalIDFn, callWithSCtxFunc)
if err != nil {
return nil, errors.Trace(err)
}

// After collect all the statistics from the partition-level stats,
// we should merge them together.
for i := 0; i < globalStats.Num; i++ {
if len(allHg[i]) == 0 {
// If all partitions have no stats, we skip merging global stats because it may not handle the case `len(allHg[i]) == 0`
// correctly. It can avoid unexpected behaviors such as nil pointer panic.
continue
}
// FMSketch use many memory, so we first deal with it and then destroy it.
// Merge FMSketch.
globalStats.Fms[i] = allFms[i][0]
for j := 1; j < len(allFms[i]); j++ {
globalStats.Fms[i].MergeFMSketch(allFms[i][j])
allFms[i][j].DestroyAndPutToPool()
}

// Update the global NDV.
globalStatsNDV := globalStats.Fms[i].NDV()
if globalStatsNDV > globalStats.Count {
globalStatsNDV = globalStats.Count
}
globalStats.Fms[i].DestroyAndPutToPool()

// Merge CMSketch.
globalStats.Cms[i] = allCms[i][0]
for j := 1; j < len(allCms[i]); j++ {
err = globalStats.Cms[i].MergeCMSketch(allCms[i][j])
if err != nil {
return
}
}

// Merge topN.
// Note: We need to merge TopN before merging the histogram.
// Because after merging TopN, some numbers will be left.
// These remaining topN numbers will be used as a separate bucket for later histogram merging.
var poppedTopN []statistics.TopNMeta
wrapper := NewStatsWrapper(allHg[i], allTopN[i])
globalStats.TopN[i], poppedTopN, allHg[i], err = mergeGlobalStatsTopN(gpool, sc, wrapper,
sc.GetSessionVars().StmtCtx.TimeZone, sc.GetSessionVars().AnalyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex)
if err != nil {
return
}

// Merge histogram.
globalStats.Hg[i], err = statistics.MergePartitionHist2GlobalHist(sc.GetSessionVars().StmtCtx, allHg[i], poppedTopN,
int64(opts[ast.AnalyzeOptNumBuckets]), isIndex)
if err != nil {
return
}

// NOTICE: after merging bucket NDVs have the trend to be underestimated, so for safe we don't use them.
for j := range globalStats.Hg[i].Buckets {
globalStats.Hg[i].Buckets[j].NDV = 0
}

globalStats.Hg[i].NDV = globalStatsNDV
err = worker.MergePartitionStats2GlobalStats(sc, opts, isIndex)
if err != nil {
return nil, errors.Trace(err)
}
return
return worker.Result(), nil
}

// MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID.
Expand All @@ -270,26 +96,30 @@ func MergePartitionStats2GlobalStatsByTableID(
gpool *gp.Pool,
opts map[ast.AnalyzeOptionType]uint64,
is infoschema.InfoSchema,
physicalID int64,
tableID int64,
isIndex bool,
histIDs []int64,
allPartitionStats map[int64]*statistics.Table,
getTableByPhysicalIDFn getTableByPhysicalIDFunc,
loadTablePartitionStatsFn loadTablePartitionStatsFunc,
callWithSCtxFunc callWithSCtxFunc,
) (globalStats *GlobalStats, err error) {
// Get the partition table IDs.
globalTable, ok := getTableByPhysicalIDFn(is, physicalID)
globalTable, ok := getTableByPhysicalIDFn(is, tableID)
if !ok {
err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", physicalID)
err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", tableID)
return
}

globalTableInfo := globalTable.Meta()
globalStats, err = MergePartitionStats2GlobalStats(sc, gpool, opts, is, globalTableInfo, isIndex, histIDs, allPartitionStats, getTableByPhysicalIDFn, loadTablePartitionStatsFn)

worker, err := NewAsyncMergePartitionStats2GlobalStats(gpool, globalTableInfo, histIDs, is, getTableByPhysicalIDFn, callWithSCtxFunc)
if err != nil {
return
return nil, errors.Trace(err)
}

err = worker.MergePartitionStats2GlobalStats(sc, opts, isIndex)
if err != nil {
return nil, errors.Trace(err)
}
globalStats = worker.Result()
if len(globalStats.MissingPartitionStats) > 0 {
var item string
if !isIndex {
Expand Down
Loading

0 comments on commit 7f9ab7f

Please sign in to comment.