From d13d8b860d93b7647ca2b7563a704920dfc3cb87 Mon Sep 17 00:00:00 2001 From: xuyifan <675434007@qq.com> Date: Sat, 6 Aug 2022 19:55:47 +0800 Subject: [PATCH 1/2] batch insert topn and buckets to storage to speed up loading stats --- statistics/handle/handle.go | 178 ++++++++++++++++++------------------ statistics/handle/update.go | 5 +- 2 files changed, 91 insertions(+), 92 deletions(-) diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index e6e9a1cfb6799..2ff053d350f96 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -1107,6 +1107,83 @@ func (h *Handle) StatsMetaCountAndModifyCount(tableID int64) (int64, int64, erro return count, modifyCount, nil } +func saveTopNToStorage(ctx context.Context, exec sqlexec.SQLExecutor, tableID int64, isIndex int, histID int64, topN *statistics.TopN) error { + if topN == nil { + return nil + } + for i := 0; i < len(topN.TopN); { + end := i + batchInsertSize + if end > len(topN.TopN) { + end = len(topN.TopN) + } + sql := new(strings.Builder) + sql.WriteString("insert into mysql.stats_top_n (table_id, is_index, hist_id, value, count) values ") + for j := i; j < end; j++ { + val := sqlexec.MustEscapeSQL("(%?, %?, %?, %?, %?)", tableID, isIndex, histID, topN.TopN[j].Encoded, topN.TopN[j].Count) + if j > i { + val = "," + val + } + if j > i && sql.Len()+len(val) > maxInsertLength { + end = j + break + } + sql.WriteString(val) + } + i = end + if _, err := exec.ExecuteInternal(ctx, sql.String()); err != nil { + return err + } + } + return nil +} + +func saveBucketsToStorage(ctx context.Context, exec sqlexec.SQLExecutor, sc *stmtctx.StatementContext, tableID int64, isIndex int, hg *statistics.Histogram) (lastAnalyzePos []byte, err error) { + if hg == nil { + return + } + for i := 0; i < len(hg.Buckets); { + end := i + batchInsertSize + if end > len(hg.Buckets) { + end = len(hg.Buckets) + } + sql := new(strings.Builder) + sql.WriteString("insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, count, repeats, lower_bound, upper_bound, ndv) values ") + for j := i; j < end; j++ { + count := hg.Buckets[j].Count + if j > 0 { + count -= hg.Buckets[j-1].Count + } + var upperBound types.Datum + upperBound, err = hg.GetUpper(j).ConvertTo(sc, types.NewFieldType(mysql.TypeBlob)) + if err != nil { + return + } + if j == len(hg.Buckets)-1 { + lastAnalyzePos = upperBound.GetBytes() + } + var lowerBound types.Datum + lowerBound, err = hg.GetLower(j).ConvertTo(sc, types.NewFieldType(mysql.TypeBlob)) + if err != nil { + return + } + val := sqlexec.MustEscapeSQL("(%?, %?, %?, %?, %?, %?, %?, %?, %?)", tableID, isIndex, hg.ID, j, count, hg.Buckets[j].Repeat, lowerBound.GetBytes(), upperBound.GetBytes(), hg.Buckets[j].NDV) + if j > i { + val = "," + val + } + if j > i && sql.Len()+len(val) > maxInsertLength { + end = j + break + } + sql.WriteString(val) + } + i = end + if _, err = exec.ExecuteInternal(ctx, sql.String()); err != nil { + return + } + } + return +} + // SaveTableStatsToStorage saves the stats of a table to storage. func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, needDumpFMS, analyzeSnapshot bool) (err error) { tableID := results.TableID.GetStatisticsID() @@ -1197,7 +1274,6 @@ func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, nee statsVer = version } // 2. Save histograms. - const maxInsertLength = 1024 * 1024 for _, result := range results.Ars { for i, hg := range result.Hist { // It's normal virtual column, skip it. @@ -1220,30 +1296,8 @@ func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, nee if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil { return err } - if topN := result.TopNs[i]; topN != nil { - for j := 0; j < len(topN.TopN); { - end := j + batchInsertSize - if end > len(topN.TopN) { - end = len(topN.TopN) - } - sql := new(strings.Builder) - sql.WriteString("insert into mysql.stats_top_n (table_id, is_index, hist_id, value, count) values ") - for k := j; k < end; k++ { - val := sqlexec.MustEscapeSQL("(%?, %?, %?, %?, %?)", tableID, result.IsIndex, hg.ID, topN.TopN[k].Encoded, topN.TopN[k].Count) - if k > j { - val = "," + val - } - if k > j && sql.Len()+len(val) > maxInsertLength { - end = k - break - } - sql.WriteString(val) - } - j = end - if _, err = exec.ExecuteInternal(ctx, sql.String()); err != nil { - return err - } - } + if err = saveTopNToStorage(ctx, exec, tableID, result.IsIndex, hg.ID, result.TopNs[i]); err != nil { + return err } if _, err := exec.ExecuteInternal(ctx, "delete from mysql.stats_fm_sketch where table_id = %? and is_index = %? and hist_id = %?", tableID, result.IsIndex, hg.ID); err != nil { return err @@ -1262,45 +1316,9 @@ func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, nee } sc := h.mu.ctx.GetSessionVars().StmtCtx var lastAnalyzePos []byte - for j := 0; j < len(hg.Buckets); { - end := j + batchInsertSize - if end > len(hg.Buckets) { - end = len(hg.Buckets) - } - sql := new(strings.Builder) - sql.WriteString("insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, count, repeats, lower_bound, upper_bound, ndv) values ") - for k := j; k < end; k++ { - count := hg.Buckets[k].Count - if k > 0 { - count -= hg.Buckets[k-1].Count - } - var upperBound types.Datum - upperBound, err = hg.GetUpper(k).ConvertTo(sc, types.NewFieldType(mysql.TypeBlob)) - if err != nil { - return err - } - if k == len(hg.Buckets)-1 { - lastAnalyzePos = upperBound.GetBytes() - } - var lowerBound types.Datum - lowerBound, err = hg.GetLower(k).ConvertTo(sc, types.NewFieldType(mysql.TypeBlob)) - if err != nil { - return err - } - val := sqlexec.MustEscapeSQL("(%?, %?, %?, %?, %?, %?, %?, %?, %?)", tableID, result.IsIndex, hg.ID, k, count, hg.Buckets[k].Repeat, lowerBound.GetBytes(), upperBound.GetBytes(), hg.Buckets[k].NDV) - if k > j { - val = "," + val - } - if k > j && sql.Len()+len(val) > maxInsertLength { - end = k - break - } - sql.WriteString(val) - } - j = end - if _, err = exec.ExecuteInternal(ctx, sql.String()); err != nil { - return err - } + lastAnalyzePos, err = saveBucketsToStorage(ctx, exec, sc, tableID, result.IsIndex, hg) + if err != nil { + return err } if len(lastAnalyzePos) > 0 { if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_histograms set last_analyze_pos = %? where table_id = %? and is_index = %? and hist_id = %?", lastAnalyzePos, tableID, result.IsIndex, hg.ID); err != nil { @@ -1384,12 +1402,8 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg if _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil { return err } - if topN != nil { - for _, meta := range topN.TopN { - if _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_top_n (table_id, is_index, hist_id, value, count) values (%?, %?, %?, %?, %?)", tableID, isIndex, hg.ID, meta.Encoded, meta.Count); err != nil { - return err - } - } + if err = saveTopNToStorage(ctx, exec, tableID, isIndex, hg.ID, topN); err != nil { + return err } if _, err := exec.ExecuteInternal(ctx, "delete from mysql.stats_fm_sketch where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil { return err @@ -1407,27 +1421,9 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg } sc := h.mu.ctx.GetSessionVars().StmtCtx var lastAnalyzePos []byte - for i := range hg.Buckets { - count := hg.Buckets[i].Count - if i > 0 { - count -= hg.Buckets[i-1].Count - } - var upperBound types.Datum - upperBound, err = hg.GetUpper(i).ConvertTo(sc, types.NewFieldType(mysql.TypeBlob)) - if err != nil { - return - } - if i == len(hg.Buckets)-1 { - lastAnalyzePos = upperBound.GetBytes() - } - var lowerBound types.Datum - lowerBound, err = hg.GetLower(i).ConvertTo(sc, types.NewFieldType(mysql.TypeBlob)) - if err != nil { - return - } - if _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_buckets(table_id, is_index, hist_id, bucket_id, count, repeats, lower_bound, upper_bound, ndv) values(%?, %?, %?, %?, %?, %?, %?, %?, %?)", tableID, isIndex, hg.ID, i, count, hg.Buckets[i].Repeat, lowerBound.GetBytes(), upperBound.GetBytes(), hg.Buckets[i].NDV); err != nil { - return err - } + lastAnalyzePos, err = saveBucketsToStorage(ctx, exec, sc, tableID, isIndex, hg) + if err != nil { + return err } if isAnalyzed == 1 && len(lastAnalyzePos) > 0 { if _, err = exec.ExecuteInternal(ctx, "update mysql.stats_histograms set last_analyze_pos = %? where table_id = %? and is_index = %? and hist_id = %?", lastAnalyzePos, tableID, isIndex, hg.ID); err != nil { diff --git a/statistics/handle/update.go b/statistics/handle/update.go index 82ce5c50af7ef..7f8f35a64e4df 100644 --- a/statistics/handle/update.go +++ b/statistics/handle/update.go @@ -340,9 +340,12 @@ func (h *Handle) sweepIdxUsageList() indexUsageMap { return mapper } -//batchInsertSize is the every insert values size limit.Used in such as DumpIndexUsageToKV,DumpColStatsUsageToKV +// batchInsertSize is the batch size used by internal SQL to insert values to some system table. const batchInsertSize = 10 +// maxInsertLength is the length limit for internal insert SQL. +const maxInsertLength = 1024 * 1024 + // DumpIndexUsageToKV will dump in-memory index usage information to KV. func (h *Handle) DumpIndexUsageToKV() error { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) From aff13dce9385d9101291629e6190427f75a0e607 Mon Sep 17 00:00:00 2001 From: xuyifan <675434007@qq.com> Date: Wed, 10 Aug 2022 18:01:56 +0800 Subject: [PATCH 2/2] reduce bounds check --- statistics/handle/handle.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 2ff053d350f96..fdf592f364610 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -1119,7 +1119,8 @@ func saveTopNToStorage(ctx context.Context, exec sqlexec.SQLExecutor, tableID in sql := new(strings.Builder) sql.WriteString("insert into mysql.stats_top_n (table_id, is_index, hist_id, value, count) values ") for j := i; j < end; j++ { - val := sqlexec.MustEscapeSQL("(%?, %?, %?, %?, %?)", tableID, isIndex, histID, topN.TopN[j].Encoded, topN.TopN[j].Count) + topn := topN.TopN[j] + val := sqlexec.MustEscapeSQL("(%?, %?, %?, %?, %?)", tableID, isIndex, histID, topn.Encoded, topn.Count) if j > i { val = "," + val } @@ -1149,7 +1150,8 @@ func saveBucketsToStorage(ctx context.Context, exec sqlexec.SQLExecutor, sc *stm sql := new(strings.Builder) sql.WriteString("insert into mysql.stats_buckets (table_id, is_index, hist_id, bucket_id, count, repeats, lower_bound, upper_bound, ndv) values ") for j := i; j < end; j++ { - count := hg.Buckets[j].Count + bucket := hg.Buckets[j] + count := bucket.Count if j > 0 { count -= hg.Buckets[j-1].Count } @@ -1166,7 +1168,7 @@ func saveBucketsToStorage(ctx context.Context, exec sqlexec.SQLExecutor, sc *stm if err != nil { return } - val := sqlexec.MustEscapeSQL("(%?, %?, %?, %?, %?, %?, %?, %?, %?)", tableID, isIndex, hg.ID, j, count, hg.Buckets[j].Repeat, lowerBound.GetBytes(), upperBound.GetBytes(), hg.Buckets[j].NDV) + val := sqlexec.MustEscapeSQL("(%?, %?, %?, %?, %?, %?, %?, %?, %?)", tableID, isIndex, hg.ID, j, count, bucket.Repeat, lowerBound.GetBytes(), upperBound.GetBytes(), bucket.NDV) if j > i { val = "," + val }