Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: batch insert topn and buckets to storage to speed up loading stats #36948

Merged
merged 3 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 89 additions & 91 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,85 @@ func (h *Handle) StatsMetaCountAndModifyCount(tableID int64) (int64, int64, erro
return count, modifyCount, nil
}

func saveTopNToStorage(ctx context.Context, exec sqlexec.SQLExecutor, tableID int64, isIndex int, histID int64, topN *statistics.TopN) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when you enable bce to build it, you can see the bce case.

go build -gcflags="-d=ssa/check_bce/debug=1" ./statistics/handle/...
statistics/handle/handle.go:1122:92: Found IsInBounds
statistics/handle/handle.go:1122:114: Found IsInBounds
statistics/handle/handle.go:1152:23: Found IsInBounds
statistics/handle/handle.go:1169:118: Found IsInBounds
statistics/handle/handle.go:1169:186: Found IsInBounds

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change the code to avoid some unnecessary bounds checks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will fix it.

if topN == nil {
return nil
}
for i := 0; i < len(topN.TopN); {
end := i + batchInsertSize
if end > len(topN.TopN) {
end = len(topN.TopN)
}
sql := new(strings.Builder)
sql.WriteString("insert into mysql.stats_top_n (table_id, is_index, hist_id, value, count) values ")
for j := i; j < end; j++ {
topn := topN.TopN[j]
val := sqlexec.MustEscapeSQL("(%?, %?, %?, %?, %?)", tableID, isIndex, histID, topn.Encoded, topn.Count)
if j > i {
val = "," + val
}
if j > i && sql.Len()+len(val) > maxInsertLength {
end = j
break
}
sql.WriteString(val)
}
i = end
if _, err := exec.ExecuteInternal(ctx, sql.String()); err != nil {
return err
}
}
return nil
}

func saveBucketsToStorage(ctx context.Context, exec sqlexec.SQLExecutor, sc *stmtctx.StatementContext, tableID int64, isIndex int, hg *statistics.Histogram) (lastAnalyzePos []byte, err error) {
if hg == nil {
return
}
for i := 0; i < len(hg.Buckets); {
end := i + batchInsertSize
if end > len(hg.Buckets) {
end = len(hg.Buckets)
}
sql := new(strings.Builder)
sql.WriteString("insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, count, repeats, lower_bound, upper_bound, ndv) values ")
for j := i; j < end; j++ {
bucket := hg.Buckets[j]
count := bucket.Count
if j > 0 {
count -= hg.Buckets[j-1].Count
}
var upperBound types.Datum
upperBound, err = hg.GetUpper(j).ConvertTo(sc, types.NewFieldType(mysql.TypeBlob))
if err != nil {
return
}
if j == len(hg.Buckets)-1 {
lastAnalyzePos = upperBound.GetBytes()
}
var lowerBound types.Datum
lowerBound, err = hg.GetLower(j).ConvertTo(sc, types.NewFieldType(mysql.TypeBlob))
if err != nil {
return
}
val := sqlexec.MustEscapeSQL("(%?, %?, %?, %?, %?, %?, %?, %?, %?)", tableID, isIndex, hg.ID, j, count, bucket.Repeat, lowerBound.GetBytes(), upperBound.GetBytes(), bucket.NDV)
if j > i {
val = "," + val
}
if j > i && sql.Len()+len(val) > maxInsertLength {
end = j
break
}
sql.WriteString(val)
}
i = end
if _, err = exec.ExecuteInternal(ctx, sql.String()); err != nil {
return
}
}
return
}

// SaveTableStatsToStorage saves the stats of a table to storage.
func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, needDumpFMS, analyzeSnapshot bool) (err error) {
tableID := results.TableID.GetStatisticsID()
Expand Down Expand Up @@ -1197,7 +1276,6 @@ func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, nee
statsVer = version
}
// 2. Save histograms.
const maxInsertLength = 1024 * 1024
for _, result := range results.Ars {
for i, hg := range result.Hist {
// It's normal virtual column, skip it.
Expand All @@ -1220,30 +1298,8 @@ func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, nee
if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil {
return err
}
if topN := result.TopNs[i]; topN != nil {
for j := 0; j < len(topN.TopN); {
end := j + batchInsertSize
if end > len(topN.TopN) {
end = len(topN.TopN)
}
sql := new(strings.Builder)
sql.WriteString("insert into mysql.stats_top_n (table_id, is_index, hist_id, value, count) values ")
for k := j; k < end; k++ {
val := sqlexec.MustEscapeSQL("(%?, %?, %?, %?, %?)", tableID, result.IsIndex, hg.ID, topN.TopN[k].Encoded, topN.TopN[k].Count)
if k > j {
val = "," + val
}
if k > j && sql.Len()+len(val) > maxInsertLength {
end = k
break
}
sql.WriteString(val)
}
j = end
if _, err = exec.ExecuteInternal(ctx, sql.String()); err != nil {
return err
}
}
if err = saveTopNToStorage(ctx, exec, tableID, result.IsIndex, hg.ID, result.TopNs[i]); err != nil {
return err
}
if _, err := exec.ExecuteInternal(ctx, "delete from mysql.stats_fm_sketch where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil {
return err
Expand All @@ -1262,45 +1318,9 @@ func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, nee
}
sc := h.mu.ctx.GetSessionVars().StmtCtx
var lastAnalyzePos []byte
for j := 0; j < len(hg.Buckets); {
end := j + batchInsertSize
if end > len(hg.Buckets) {
end = len(hg.Buckets)
}
sql := new(strings.Builder)
sql.WriteString("insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, count, repeats, lower_bound, upper_bound, ndv) values ")
for k := j; k < end; k++ {
count := hg.Buckets[k].Count
if k > 0 {
count -= hg.Buckets[k-1].Count
}
var upperBound types.Datum
upperBound, err = hg.GetUpper(k).ConvertTo(sc, types.NewFieldType(mysql.TypeBlob))
if err != nil {
return err
}
if k == len(hg.Buckets)-1 {
lastAnalyzePos = upperBound.GetBytes()
}
var lowerBound types.Datum
lowerBound, err = hg.GetLower(k).ConvertTo(sc, types.NewFieldType(mysql.TypeBlob))
if err != nil {
return err
}
val := sqlexec.MustEscapeSQL("(%?, %?, %?, %?, %?, %?, %?, %?, %?)", tableID, result.IsIndex, hg.ID, k, count, hg.Buckets[k].Repeat, lowerBound.GetBytes(), upperBound.GetBytes(), hg.Buckets[k].NDV)
if k > j {
val = "," + val
}
if k > j && sql.Len()+len(val) > maxInsertLength {
end = k
break
}
sql.WriteString(val)
}
j = end
if _, err = exec.ExecuteInternal(ctx, sql.String()); err != nil {
return err
}
lastAnalyzePos, err = saveBucketsToStorage(ctx, exec, sc, tableID, result.IsIndex, hg)
if err != nil {
return err
}
if len(lastAnalyzePos) > 0 {
if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_histograms set last_analyze_pos = %? where table_id = %? and is_index = %? and hist_id = %?", lastAnalyzePos, tableID, result.IsIndex, hg.ID); err != nil {
Expand Down Expand Up @@ -1384,12 +1404,8 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg
if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil {
return err
}
if topN != nil {
for _, meta := range topN.TopN {
if _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_top_n (table_id, is_index, hist_id, value, count) values (%?, %?, %?, %?, %?)", tableID, isIndex, hg.ID, meta.Encoded, meta.Count); err != nil {
return err
}
}
if err = saveTopNToStorage(ctx, exec, tableID, isIndex, hg.ID, topN); err != nil {
return err
}
if _, err := exec.ExecuteInternal(ctx, "delete from mysql.stats_fm_sketch where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil {
return err
Expand All @@ -1407,27 +1423,9 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg
}
sc := h.mu.ctx.GetSessionVars().StmtCtx
var lastAnalyzePos []byte
for i := range hg.Buckets {
count := hg.Buckets[i].Count
if i > 0 {
count -= hg.Buckets[i-1].Count
}
var upperBound types.Datum
upperBound, err = hg.GetUpper(i).ConvertTo(sc, types.NewFieldType(mysql.TypeBlob))
if err != nil {
return
}
if i == len(hg.Buckets)-1 {
lastAnalyzePos = upperBound.GetBytes()
}
var lowerBound types.Datum
lowerBound, err = hg.GetLower(i).ConvertTo(sc, types.NewFieldType(mysql.TypeBlob))
if err != nil {
return
}
if _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_buckets(table_id, is_index, hist_id, bucket_id, count, repeats, lower_bound, upper_bound, ndv) values(%?, %?, %?, %?, %?, %?, %?, %?, %?)", tableID, isIndex, hg.ID, i, count, hg.Buckets[i].Repeat, lowerBound.GetBytes(), upperBound.GetBytes(), hg.Buckets[i].NDV); err != nil {
return err
}
lastAnalyzePos, err = saveBucketsToStorage(ctx, exec, sc, tableID, isIndex, hg)
if err != nil {
return err
}
if isAnalyzed == 1 && len(lastAnalyzePos) > 0 {
if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_histograms set last_analyze_pos = %? where table_id = %? and is_index = %? and hist_id = %?", lastAnalyzePos, tableID, isIndex, hg.ID); err != nil {
Expand Down
5 changes: 4 additions & 1 deletion statistics/handle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,12 @@ func (h *Handle) sweepIdxUsageList() indexUsageMap {
return mapper
}

//batchInsertSize is the every insert values size limit.Used in such as DumpIndexUsageToKV,DumpColStatsUsageToKV
// batchInsertSize is the batch size used by internal SQL to insert values to some system table.
const batchInsertSize = 10

// maxInsertLength is the length limit for internal insert SQL.
const maxInsertLength = 1024 * 1024

// DumpIndexUsageToKV will dump in-memory index usage information to KV.
func (h *Handle) DumpIndexUsageToKV() error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
Expand Down