Skip to content

Commit

Permalink
statistics: add comments and change isIndex to bool (#47062)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 committed Sep 19, 2023
1 parent 51f6bb9 commit 4bd39b5
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 49 deletions.
3 changes: 2 additions & 1 deletion executor/analyze_global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, globalStatsMap glob
e.Ctx(),
globalOpts, e.Ctx().GetInfoSchema().(infoschema.InfoSchema),
globalStatsID.tableID,
info.isIndex, info.histIDs,
info.isIndex == 1,
info.histIDs,
tableAllPartitionStats,
)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions statistics/handle/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (h *Handle) updateGlobalStats(tblInfo *model.TableInfo) error {
opts[ast.AnalyzeOptNumBuckets] = uint64(globalColStatsBucketNum)
}
// Generate the new column global-stats
newColGlobalStats, err := h.mergePartitionStats2GlobalStats(opts, is, tblInfo, 0, nil, nil)
newColGlobalStats, err := h.mergePartitionStats2GlobalStats(opts, is, tblInfo, false, nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -240,7 +240,7 @@ func (h *Handle) updateGlobalStats(tblInfo *model.TableInfo) error {
if globalIdxStatsBucketNum != 0 {
opts[ast.AnalyzeOptNumBuckets] = uint64(globalIdxStatsBucketNum)
}
newIndexGlobalStats, err := h.mergePartitionStats2GlobalStats(opts, is, tblInfo, 1, []int64{idx.ID}, nil)
newIndexGlobalStats, err := h.mergePartitionStats2GlobalStats(opts, is, tblInfo, true, []int64{idx.ID}, nil)
if err != nil {
return err
}
Expand Down
116 changes: 76 additions & 40 deletions statistics/handle/globalstats/global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,24 @@ type GlobalStats struct {
ModifyCount int64
}

func newGlobalStats(histCount int) *GlobalStats {
globalStats := new(GlobalStats)
globalStats.Num = histCount
globalStats.Count = 0
globalStats.Hg = make([]*statistics.Histogram, globalStats.Num)
globalStats.Cms = make([]*statistics.CMSketch, globalStats.Num)
globalStats.TopN = make([]*statistics.TopN, globalStats.Num)
globalStats.Fms = make([]*statistics.FMSketch, globalStats.Num)

return globalStats
}

type (
getTableByPhysicalIDFunc func(is infoschema.InfoSchema, physicalID int64) (table.Table, bool)
loadTablePartitionStatsFunc func(tableInfo *model.TableInfo, partitionDef *model.PartitionDefinition) (*statistics.Table, error)
// GlobalStatusHandler is used to handle the global-level stats.
GlobalStatusHandler struct {
// this gpool is used to reuse goroutine in the mergeGlobalStatsTopN.
// This gpool is used to reuse goroutine in the mergeGlobalStatsTopN.
gpool *gp.Pool
}
)
Expand All @@ -69,14 +81,18 @@ func NewGlobalStatusHandler(gpool *gp.Pool) *GlobalStatusHandler {
}

// MergePartitionStats2GlobalStats merge the partition-level stats to global-level stats based on the tableInfo.
func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(sc sessionctx.Context,
opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema, globalTableInfo *model.TableInfo,
isIndex int, histIDs []int64,
allPartitionStats map[int64]*statistics.Table, getTableByPhysicalIDFn getTableByPhysicalIDFunc, loadTablePartitionStatsFn loadTablePartitionStatsFunc) (globalStats *GlobalStats, err error) {
func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(
sc sessionctx.Context,
opts map[ast.AnalyzeOptionType]uint64,
is infoschema.InfoSchema,
globalTableInfo *model.TableInfo,
isIndex bool,
histIDs []int64,
allPartitionStats map[int64]*statistics.Table,
getTableByPhysicalIDFn getTableByPhysicalIDFunc,
loadTablePartitionStatsFn loadTablePartitionStatsFunc,
) (globalStats *GlobalStats, err error) {
partitionNum := len(globalTableInfo.Partition.Definitions)

// initialized the globalStats
globalStats = new(GlobalStats)
if len(histIDs) == 0 {
for _, col := range globalTableInfo.Columns {
// The virtual generated column stats can not be merged to the global stats.
Expand All @@ -86,17 +102,15 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(sc sessionctx.Cont
histIDs = append(histIDs, col.ID)
}
}
globalStats.Num = len(histIDs)
globalStats.Count = 0
globalStats.Hg = make([]*statistics.Histogram, globalStats.Num)
globalStats.Cms = make([]*statistics.CMSketch, globalStats.Num)
globalStats.TopN = make([]*statistics.TopN, globalStats.Num)
globalStats.Fms = make([]*statistics.FMSketch, globalStats.Num)

// The first dimension of slice is means the number of column or index stats in the globalStats.
// The second dimension of slice is means the number of partition tables.
// Initialized the globalStats.
globalStats = newGlobalStats(len(histIDs))

// Slice Dimensions Explanation
// First dimension: Column or Index Stats
// Second dimension: Partition Tables
// Because all topN and histograms need to be collected before they can be merged.
// So we should store all of the partition-level stats first, and merge them together.
// So we should store all the partition-level stats first, and merge them together.
allHg := make([][]*statistics.Histogram, globalStats.Num)
allCms := make([][]*statistics.CMSketch, globalStats.Num)
allTopN := make([][]*statistics.TopN, globalStats.Num)
Expand All @@ -116,12 +130,14 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(sc sessionctx.Cont
err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", partitionID)
return
}

tableInfo := partitionTable.Meta()
var partitionStats *statistics.Table
if allPartitionStats != nil {
partitionStats, ok = allPartitionStats[partitionID]
}
// If pre-load partition stats isn't provided, then we load partition stats directly and set it into allPartitionStats

// If preload partition stats isn't provided, then we load partition stats directly and set it into allPartitionStats.
if allPartitionStats == nil || partitionStats == nil || !ok {
var err1 error
partitionStats, err1 = loadTablePartitionStatsFn(tableInfo, &def)
Expand All @@ -138,12 +154,13 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(sc sessionctx.Cont
}
allPartitionStats[partitionID] = partitionStats
}

for i := 0; i < globalStats.Num; i++ {
hg, cms, topN, fms, analyzed := partitionStats.GetStatsInfo(histIDs[i], isIndex == 1)
hg, cms, topN, fms, analyzed := partitionStats.GetStatsInfo(histIDs[i], isIndex)
skipPartition := false
if !analyzed {
var missingPart string
if isIndex == 0 {
if !isIndex {
missingPart = fmt.Sprintf("partition `%s` column `%s`", def.Name.L, tableInfo.FindColumnNameByID(histIDs[i]))
} else {
missingPart = fmt.Sprintf("partition `%s` index `%s`", def.Name.L, tableInfo.FindIndexNameByID(histIDs[i]))
Expand All @@ -155,10 +172,11 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(sc sessionctx.Cont
globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, missingPart)
skipPartition = true
}
// partition stats is not empty but column stats(hist, topn) is missing

// Partition stats is not empty but column stats(hist, topN) is missing.
if partitionStats.RealtimeCount > 0 && (hg == nil || hg.TotalRowCount() <= 0) && (topN == nil || topN.TotalCount() <= 0) {
var missingPart string
if isIndex == 0 {
if !isIndex {
missingPart = fmt.Sprintf("partition `%s` column `%s`", def.Name.L, tableInfo.FindColumnNameByID(histIDs[i]))
} else {
missingPart = fmt.Sprintf("partition `%s` index `%s`", def.Name.L, tableInfo.FindIndexNameByID(histIDs[i]))
Expand All @@ -167,14 +185,16 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(sc sessionctx.Cont
err = types.ErrPartitionColumnStatsMissing.GenWithStackByArgs(fmt.Sprintf("table `%s` %s", tableInfo.Name.L, missingPart))
return
}
globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, missingPart+" hist and topn")
globalStats.MissingPartitionStats = append(globalStats.MissingPartitionStats, missingPart+" hist and topN")
skipPartition = true
}

if i == 0 {
// In a partition, we will only update globalStats.Count once
// In a partition, we will only update globalStats.Count once.
globalStats.Count += partitionStats.RealtimeCount
globalStats.ModifyCount += partitionStats.ModifyCount
}

if !skipPartition {
allHg[i] = append(allHg[i], hg)
allCms[i] = append(allCms[i], cms)
Expand All @@ -184,15 +204,15 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(sc sessionctx.Cont
}
}

// After collect all of the statistics from the partition-level stats,
// After collect all the statistics from the partition-level stats,
// we should merge them together.
for i := 0; i < globalStats.Num; i++ {
if len(allHg[i]) == 0 {
// If all partitions have no stats, we skip merging global stats because it may not handle the case `len(allHg[i]) == 0`
// correctly. It can avoid unexpected behaviors such as nil pointer panic.
continue
}
// Merge CMSketch
// Merge CMSketch.
globalStats.Cms[i] = allCms[i][0].Copy()
for j := 1; j < len(allCms[i]); j++ {
err = globalStats.Cms[i].MergeCMSketch(allCms[i][j])
Expand All @@ -201,18 +221,21 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(sc sessionctx.Cont
}
}

// Merge topN. We need to merge TopN before merging the histogram.
// Merge topN.
// Note: We need to merge TopN before merging the histogram.
// Because after merging TopN, some numbers will be left.
// These remaining topN numbers will be used as a separate bucket for later histogram merging.
var popedTopN []statistics.TopNMeta
var poppedTopN []statistics.TopNMeta
wrapper := NewStatsWrapper(allHg[i], allTopN[i])
globalStats.TopN[i], popedTopN, allHg[i], err = mergeGlobalStatsTopN(g.gpool, sc, wrapper, sc.GetSessionVars().StmtCtx.TimeZone, sc.GetSessionVars().AnalyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex == 1)
globalStats.TopN[i], poppedTopN, allHg[i], err = mergeGlobalStatsTopN(g.gpool, sc, wrapper,
sc.GetSessionVars().StmtCtx.TimeZone, sc.GetSessionVars().AnalyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex)
if err != nil {
return
}

// Merge histogram
globalStats.Hg[i], err = statistics.MergePartitionHist2GlobalHist(sc.GetSessionVars().StmtCtx, allHg[i], popedTopN, int64(opts[ast.AnalyzeOptNumBuckets]), isIndex == 1)
// Merge histogram.
globalStats.Hg[i], err = statistics.MergePartitionHist2GlobalHist(sc.GetSessionVars().StmtCtx, allHg[i], poppedTopN,
int64(opts[ast.AnalyzeOptNumBuckets]), isIndex)
if err != nil {
return
}
Expand All @@ -222,51 +245,64 @@ func (g *GlobalStatusHandler) MergePartitionStats2GlobalStats(sc sessionctx.Cont
globalStats.Hg[i].Buckets[j].NDV = 0
}

// Update NDV of global-level stats
// Merge FMSketch.
globalStats.Fms[i] = allFms[i][0].Copy()
for j := 1; j < len(allFms[i]); j++ {
globalStats.Fms[i].MergeFMSketch(allFms[i][j])
}

// update the NDV
// Update the global NDV.
globalStatsNDV := globalStats.Fms[i].NDV()
if globalStatsNDV > globalStats.Count {
globalStatsNDV = globalStats.Count
}
globalStats.Hg[i].NDV = globalStatsNDV
}

return
}

// MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID.
func (g *GlobalStatusHandler) MergePartitionStats2GlobalStatsByTableID(sc sessionctx.Context,
opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema,
physicalID int64, isIndex int, histIDs []int64,
tablePartitionStats map[int64]*statistics.Table, getTableByPhysicalIDFn getTableByPhysicalIDFunc, loadTablePartitionStatsFn loadTablePartitionStatsFunc) (globalStats *GlobalStats, err error) {
// get the partition table IDs
func (g *GlobalStatusHandler) MergePartitionStats2GlobalStatsByTableID(
sc sessionctx.Context,
opts map[ast.AnalyzeOptionType]uint64,
is infoschema.InfoSchema,
physicalID int64,
isIndex bool,
histIDs []int64,
tablePartitionStats map[int64]*statistics.Table,
getTableByPhysicalIDFn getTableByPhysicalIDFunc,
loadTablePartitionStatsFn loadTablePartitionStatsFunc,
) (globalStats *GlobalStats, err error) {
// Get the partition table IDs.
globalTable, ok := getTableByPhysicalIDFn(is, physicalID)
if !ok {
err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", physicalID)
return
}

globalTableInfo := globalTable.Meta()
globalStats, err = g.MergePartitionStats2GlobalStats(sc, opts, is, globalTableInfo, isIndex, histIDs, tablePartitionStats, getTableByPhysicalIDFn, loadTablePartitionStatsFn)
globalStats, err = g.MergePartitionStats2GlobalStats(sc, opts, is, globalTableInfo, isIndex, histIDs,
tablePartitionStats, getTableByPhysicalIDFn, loadTablePartitionStatsFn)
if err != nil {
return
}

if len(globalStats.MissingPartitionStats) > 0 {
var item string
if isIndex == 0 {
if !isIndex {
item = "columns"
} else {
item = "index"
if len(histIDs) > 0 {
item += " " + globalTableInfo.FindIndexNameByID(histIDs[0])
}
}

logutil.BgLogger().Warn("missing partition stats when merging global stats", zap.String("table", globalTableInfo.Name.L),
zap.String("item", item), zap.Strings("missing", globalStats.MissingPartitionStats))
}

return
}

Expand Down
21 changes: 15 additions & 6 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,14 @@ func UpdateSCtxVarsForStats(sctx sessionctx.Context) error {
}

// MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID.
func (h *Handle) MergePartitionStats2GlobalStatsByTableID(sc sessionctx.Context,
func (h *Handle) MergePartitionStats2GlobalStatsByTableID(
sc sessionctx.Context,
opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema,
physicalID int64, isIndex int, histIDs []int64,
tablePartitionStats map[int64]*statistics.Table) (globalStats *globalstats.GlobalStats, err error) {
physicalID int64,
isIndex bool,
histIDs []int64,
tablePartitionStats map[int64]*statistics.Table,
) (globalStats *globalstats.GlobalStats, err error) {
return h.globalstatushandler.MergePartitionStats2GlobalStatsByTableID(sc, opts, is, physicalID, isIndex, histIDs, tablePartitionStats, h.getTableByPhysicalID, h.loadTablePartitionStats)
}

Expand All @@ -398,9 +402,14 @@ func (h *Handle) loadTablePartitionStats(tableInfo *model.TableInfo, partitionDe
}

// MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableInfo.
func (h *Handle) mergePartitionStats2GlobalStats(opts map[ast.AnalyzeOptionType]uint64,
is infoschema.InfoSchema, globalTableInfo *model.TableInfo, isIndex int, histIDs []int64,
allPartitionStats map[int64]*statistics.Table) (globalStats *globalstats.GlobalStats, err error) {
func (h *Handle) mergePartitionStats2GlobalStats(
opts map[ast.AnalyzeOptionType]uint64,
is infoschema.InfoSchema,
globalTableInfo *model.TableInfo,
isIndex bool,
histIDs []int64,
allPartitionStats map[int64]*statistics.Table,
) (globalStats *globalstats.GlobalStats, err error) {
se, err := h.pool.Get()
if err != nil {
return nil, err
Expand Down

0 comments on commit 4bd39b5

Please sign in to comment.