Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: batch insert topn and bucket when saving table stats (#35326) #35545

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 238 additions & 0 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -953,6 +954,243 @@ func (h *Handle) extendedStatsFromStorage(reader *statsReader, table *statistics
return table, nil
}

<<<<<<< HEAD
=======
// StatsMetaCountAndModifyCount reads count and modify_count for the given table from mysql.stats_meta.
func (h *Handle) StatsMetaCountAndModifyCount(tableID int64) (int64, int64, error) {
reader, err := h.getGlobalStatsReader(0)
if err != nil {
return 0, 0, err
}
defer func() {
err1 := h.releaseGlobalStatsReader(reader)
if err1 != nil && err == nil {
err = err1
}
}()
rows, _, err := reader.read("select count, modify_count from mysql.stats_meta where table_id = %?", tableID)
if err != nil {
return 0, 0, err
}
if len(rows) == 0 {
return 0, 0, nil
}
count := int64(rows[0].GetUint64(0))
modifyCount := rows[0].GetInt64(1)
return count, modifyCount, nil
}

// SaveTableStatsToStorage saves the stats of a table to storage.
func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, needDumpFMS bool) (err error) {
tableID := results.TableID.GetStatisticsID()
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
err = h.recordHistoricalStatsMeta(tableID, statsVer)
}
}()
h.mu.Lock()
defer h.mu.Unlock()
ctx := context.TODO()
exec := h.mu.ctx.(sqlexec.SQLExecutor)
_, err = exec.ExecuteInternal(ctx, "begin pessimistic")
if err != nil {
return err
}
defer func() {
err = finishTransaction(context.Background(), exec, err)
}()
txn, err := h.mu.ctx.Txn(true)
if err != nil {
return err
}
version := txn.StartTS()
// 1. Save mysql.stats_meta.
var rs sqlexec.RecordSet
// Lock this row to prevent writing of concurrent analyze.
rs, err = exec.ExecuteInternal(ctx, "select snapshot, count, modify_count from mysql.stats_meta where table_id = %? for update", tableID)
if err != nil {
return err
}
var rows []chunk.Row
rows, err = sqlexec.DrainRecordSet(ctx, rs, h.mu.ctx.GetSessionVars().MaxChunkSize)
if err != nil {
return err
}
var curCnt, curModifyCnt int64
if len(rows) > 0 {
snapshot := rows[0].GetUint64(0)
// A newer version analyze result has been written, so skip this writing.
if snapshot >= results.Snapshot && results.StatsVer == statistics.Version2 {
return nil
}
curCnt = int64(rows[0].GetUint64(1))
curModifyCnt = rows[0].GetInt64(2)
}
if len(rows) == 0 || results.StatsVer != statistics.Version2 {
if _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_meta (version, table_id, count, snapshot) values (%?, %?, %?, %?)", version, tableID, results.Count, results.Snapshot); err != nil {
return err
}
statsVer = version
} else {
modifyCnt := curModifyCnt - results.BaseModifyCnt
if modifyCnt < 0 {
modifyCnt = 0
}
cnt := curCnt + results.Count - results.BaseCount
if cnt < 0 {
cnt = 0
}
if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version=%?, modify_count=%?, count=%?, snapshot=%? where table_id=%?", version, modifyCnt, cnt, results.Snapshot, tableID); err != nil {
return err
}
statsVer = version
}
// 2. Save histograms.
const maxInsertLength = 1024 * 1024
for _, result := range results.Ars {
for i, hg := range result.Hist {
// It's normal virtual column, skip it.
if hg == nil {
continue
}
var cms *statistics.CMSketch
if results.StatsVer != statistics.Version2 {
cms = result.Cms[i]
}
cmSketch, err := statistics.EncodeCMSketchWithoutTopN(cms)
if err != nil {
return err
}
fmSketch, err := statistics.EncodeFMSketch(result.Fms[i])
if err != nil {
return err
}
// Delete outdated data
if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil {
return err
}
if topN := result.TopNs[i]; topN != nil {
for j := 0; j < len(topN.TopN); {
end := j + batchInsertSize
if end > len(topN.TopN) {
end = len(topN.TopN)
}
sql := new(strings.Builder)
sql.WriteString("insert into mysql.stats_top_n (table_id, is_index, hist_id, value, count) values ")
for k := j; k < end; k++ {
val := sqlexec.MustEscapeSQL("(%?, %?, %?, %?, %?)", tableID, result.IsIndex, hg.ID, topN.TopN[k].Encoded, topN.TopN[k].Count)
if k > j {
val = "," + val
}
if k > j && sql.Len()+len(val) > maxInsertLength {
end = k
break
}
sql.WriteString(val)
}
j = end
if _, err = exec.ExecuteInternal(ctx, sql.String()); err != nil {
return err
}
}
}
if _, err := exec.ExecuteInternal(ctx, "delete from mysql.stats_fm_sketch where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil {
return err
}
if fmSketch != nil && needDumpFMS {
if _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_fm_sketch (table_id, is_index, hist_id, value) values (%?, %?, %?, %?)", tableID, result.IsIndex, hg.ID, fmSketch); err != nil {
return err
}
}
if _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, flag, correlation) values (%?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?)",
tableID, result.IsIndex, hg.ID, hg.NDV, version, hg.NullCount, cmSketch, hg.TotColSize, results.StatsVer, statistics.AnalyzeFlag, hg.Correlation); err != nil {
return err
}
if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil {
return err
}
sc := h.mu.ctx.GetSessionVars().StmtCtx
var lastAnalyzePos []byte
for j := 0; j < len(hg.Buckets); {
end := j + batchInsertSize
if end > len(hg.Buckets) {
end = len(hg.Buckets)
}
sql := new(strings.Builder)
sql.WriteString("insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, count, repeats, lower_bound, upper_bound, ndv) values ")
for k := j; k < end; k++ {
count := hg.Buckets[k].Count
if k > 0 {
count -= hg.Buckets[k-1].Count
}
var upperBound types.Datum
upperBound, err = hg.GetUpper(k).ConvertTo(sc, types.NewFieldType(mysql.TypeBlob))
if err != nil {
return err
}
if k == len(hg.Buckets)-1 {
lastAnalyzePos = upperBound.GetBytes()
}
var lowerBound types.Datum
lowerBound, err = hg.GetLower(k).ConvertTo(sc, types.NewFieldType(mysql.TypeBlob))
if err != nil {
return err
}
val := sqlexec.MustEscapeSQL("(%?, %?, %?, %?, %?, %?, %?, %?, %?)", tableID, result.IsIndex, hg.ID, k, count, hg.Buckets[k].Repeat, lowerBound.GetBytes(), upperBound.GetBytes(), hg.Buckets[k].NDV)
if k > j {
val = "," + val
}
if k > j && sql.Len()+len(val) > maxInsertLength {
end = k
break
}
sql.WriteString(val)
}
j = end
if _, err = exec.ExecuteInternal(ctx, sql.String()); err != nil {
return err
}
}
if len(lastAnalyzePos) > 0 {
if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_histograms set last_analyze_pos = %? where table_id = %? and is_index = %? and hist_id = %?", lastAnalyzePos, tableID, result.IsIndex, hg.ID); err != nil {
return err
}
}
if result.IsIndex == 0 {
if _, err = exec.ExecuteInternal(ctx, "insert into mysql.column_stats_usage (table_id, column_id, last_analyzed_at) values(%?, %?, current_timestamp()) on duplicate key update last_analyzed_at = values(last_analyzed_at)", tableID, hg.ID); err != nil {
return err
}
}
}
}
// 3. Save extended statistics.
extStats := results.ExtStats
if extStats == nil || len(extStats.Stats) == 0 {
return nil
}
var bytes []byte
var statsStr string
for name, item := range extStats.Stats {
bytes, err = json.Marshal(item.ColIDs)
if err != nil {
return err
}
strColIDs := string(bytes)
switch item.Tp {
case ast.StatsTypeCardinality, ast.StatsTypeCorrelation:
statsStr = fmt.Sprintf("%f", item.ScalarVals)
case ast.StatsTypeDependency:
statsStr = item.StringVals
}
if _, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_extended values (%?, %?, %?, %?, %?, %?, %?)", name, item.Tp, tableID, strColIDs, statsStr, version, StatsStatusAnalyzed); err != nil {
return err
}
}
return
}

>>>>>>> 6266817ce... statistics: batch insert topn and bucket when saving table stats (#35326)
// SaveStatsToStorage saves the stats to storage.
func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg *statistics.Histogram, cms *statistics.CMSketch, topN *statistics.TopN, fms *statistics.FMSketch, statsVersion int, isAnalyzed int64) (err error) {
h.mu.Lock()
Expand Down