diff --git a/statistics/cmsketch.go b/statistics/cmsketch.go index fca5964101337..d5752ff8527b6 100644 --- a/statistics/cmsketch.go +++ b/statistics/cmsketch.go @@ -15,7 +15,6 @@ package statistics import ( "bytes" - "fmt" "math" "sort" @@ -25,8 +24,8 @@ import ( "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/hack" - "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tipb/go-tipb" "github.com/spaolacci/murmur3" ) @@ -403,7 +402,7 @@ func CMSketchToProto(c *CMSketch) *tipb.CMSketch { // CMSketchFromProto converts CMSketch from its protobuf representation. func CMSketchFromProto(protoSketch *tipb.CMSketch) *CMSketch { - if protoSketch == nil { + if protoSketch == nil || len(protoSketch.Rows) == 0 { return nil } c := NewCMSketch(int32(len(protoSketch.Rows)), int32(len(protoSketch.Rows[0].Counters))) @@ -438,8 +437,8 @@ func EncodeCMSketchWithoutTopN(c *CMSketch) ([]byte, error) { return protoData, err } -// decodeCMSketch decode a CMSketch from the given byte slice. -func decodeCMSketch(data []byte, topN []*TopNMeta) (*CMSketch, error) { +// DecodeCMSketch decode a CMSketch from the given byte slice. +func DecodeCMSketch(data []byte, topNRows []chunk.Row) (*CMSketch, error) { if data == nil { return nil, nil } @@ -448,29 +447,12 @@ func decodeCMSketch(data []byte, topN []*TopNMeta) (*CMSketch, error) { if err != nil { return nil, errors.Trace(err) } - if len(p.Rows) == 0 && len(topN) == 0 { - return nil, nil - } - for _, meta := range topN { - p.TopN = append(p.TopN, &tipb.CMSketchTopN{Data: meta.Data, Count: meta.Count}) - } - return CMSketchFromProto(p), nil -} - -// LoadCMSketchWithTopN loads the CM sketch with topN from storage. -func LoadCMSketchWithTopN(exec sqlexec.RestrictedSQLExecutor, tableID, isIndex, histID int64, cms []byte) (*CMSketch, error) { - sql := fmt.Sprintf("select HIGH_PRIORITY value, count from mysql.stats_top_n where table_id = %d and is_index = %d and hist_id = %d", tableID, isIndex, histID) - topNRows, _, err := exec.ExecRestrictedSQL(nil, sql) - if err != nil { - return nil, err - } - topN := make([]*TopNMeta, 0, len(topNRows)) for _, row := range topNRows { data := make([]byte, len(row.GetBytes(0))) copy(data, row.GetBytes(0)) - topN = append(topN, &TopNMeta{Data: data, Count: row.GetUint64(1)}) + p.TopN = append(p.TopN, &tipb.CMSketchTopN{Data: data, Count: row.GetUint64(1)}) } - return decodeCMSketch(cms, topN) + return CMSketchFromProto(p), nil } // TotalCount returns the total count in the sketch, it is only used for test. @@ -554,6 +536,15 @@ func (c *CMSketch) TopN() []*TopNMeta { return topN } +// AppendTopN appends a topn into the cm sketch. +func (c *CMSketch) AppendTopN(data []byte, count uint64) { + if c.topN == nil { + c.topN = make(map[uint64][]*TopNMeta) + } + h1, h2 := murmur3.Sum128(data) + c.topN[h1] = append(c.topN[h1], &TopNMeta{h2, data, count}) +} + // GetWidthAndDepth returns the width and depth of CM Sketch. func (c *CMSketch) GetWidthAndDepth() (int32, int32) { return c.width, c.depth diff --git a/statistics/cmsketch_test.go b/statistics/cmsketch_test.go index ab5b1e3b0b858..e2bb86ed5c528 100644 --- a/statistics/cmsketch_test.go +++ b/statistics/cmsketch_test.go @@ -21,8 +21,10 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" "github.com/spaolacci/murmur3" ) @@ -153,7 +155,7 @@ func (s *testStatisticsSuite) TestCMSketchCoding(c *C) { bytes, err := EncodeCMSketchWithoutTopN(lSketch) c.Assert(err, IsNil) c.Assert(len(bytes), Equals, 61457) - rSketch, err := decodeCMSketch(bytes, nil) + rSketch, err := DecodeCMSketch(bytes, nil) c.Assert(err, IsNil) c.Assert(lSketch.Equal(rSketch), IsTrue) } @@ -226,16 +228,25 @@ func (s *testStatisticsSuite) TestCMSketchCodingTopN(c *C) { } } lSketch.topN = make(map[uint64][]*TopNMeta) + unsignedLong := types.NewFieldType(mysql.TypeLonglong) + unsignedLong.Flag |= mysql.UnsignedFlag + chk := chunk.New([]*types.FieldType{types.NewFieldType(mysql.TypeBlob), unsignedLong}, 20, 20) + var rows []chunk.Row for i := 0; i < 20; i++ { tString := []byte(fmt.Sprintf("%20000d", i)) h1, h2 := murmur3.Sum128(tString) lSketch.topN[h1] = []*TopNMeta{{h2, tString, math.MaxUint64}} + chk.AppendBytes(0, tString) + chk.AppendUint64(1, math.MaxUint64) + rows = append(rows, chk.GetRow(i)) } bytes, err := EncodeCMSketchWithoutTopN(lSketch) c.Assert(err, IsNil) c.Assert(len(bytes), Equals, 61457) - rSketch, err := decodeCMSketch(bytes, lSketch.TopN()) + rSketch, err := DecodeCMSketch(bytes, rows) c.Assert(err, IsNil) c.Assert(lSketch.Equal(rSketch), IsTrue) + // do not panic + DecodeCMSketch([]byte{}, rows) } diff --git a/statistics/handle/bootstrap.go b/statistics/handle/bootstrap.go index b1b04fb828e9f..68b4b941db7a0 100644 --- a/statistics/handle/bootstrap.go +++ b/statistics/handle/bootstrap.go @@ -59,8 +59,6 @@ func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache *statsCache } func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (statsCache, error) { - h.mu.Lock() - defer h.mu.Unlock() sql := "select HIGH_PRIORITY version, table_id, modify_count, count from mysql.stats_meta" rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) if len(rc) > 0 { @@ -105,7 +103,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache *stat if idxInfo == nil { continue } - cms, err := statistics.LoadCMSketchWithTopN(h.restrictedExec, row.GetInt64(0), row.GetInt64(1), row.GetInt64(2), row.GetBytes(6)) + cms, err := statistics.DecodeCMSketch(row.GetBytes(6), nil) if err != nil { cms = nil terror.Log(errors.Trace(err)) @@ -139,8 +137,6 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache *stat } func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache *statsCache) error { - h.mu.Lock() - defer h.mu.Unlock() sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms" rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) if len(rc) > 0 { @@ -164,6 +160,46 @@ func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache *statsCache return nil } +func (h *Handle) initStatsTopN4Chunk(cache *statsCache, iter *chunk.Iterator4Chunk) { + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + table, ok := cache.tables[row.GetInt64(0)] + if !ok { + continue + } + idx, ok := table.Indices[row.GetInt64(1)] + if !ok || idx.CMSketch == nil { + continue + } + data := make([]byte, len(row.GetBytes(2))) + copy(data, row.GetBytes(2)) + idx.CMSketch.AppendTopN(data, row.GetUint64(3)) + } +} + +func (h *Handle) initStatsTopN(cache *statsCache) error { + sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1" + rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + if len(rc) > 0 { + defer terror.Call(rc[0].Close) + } + if err != nil { + return errors.Trace(err) + } + req := rc[0].NewChunk() + iter := chunk.NewIterator4Chunk(req) + for { + err := rc[0].Next(context.TODO(), req) + if err != nil { + return errors.Trace(err) + } + if req.NumRows() == 0 { + break + } + h.initStatsTopN4Chunk(cache, iter) + } + return nil +} + func initStatsBuckets4Chunk(ctx sessionctx.Context, cache *statsCache, iter *chunk.Iterator4Chunk) { for row := iter.Begin(); row != iter.End(); row = iter.Next() { tableID, isIndex, histID := row.GetInt64(0), row.GetInt64(1), row.GetInt64(2) @@ -211,8 +247,6 @@ func initStatsBuckets4Chunk(ctx sessionctx.Context, cache *statsCache, iter *chu } func (h *Handle) initStatsBuckets(cache *statsCache) error { - h.mu.Lock() - defer h.mu.Unlock() sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id" rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) if len(rc) > 0 { @@ -254,7 +288,19 @@ func (h *Handle) initStatsBuckets(cache *statsCache) error { } // InitStats will init the stats cache using full load strategy. -func (h *Handle) InitStats(is infoschema.InfoSchema) error { +func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) { + h.mu.Lock() + defer func() { + _, err1 := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "commit") + if err == nil && err1 != nil { + err = err1 + } + h.mu.Unlock() + }() + _, err = h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "begin") + if err != nil { + return err + } cache, err := h.initStatsMeta(is) if err != nil { return errors.Trace(err) @@ -263,6 +309,10 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) error { if err != nil { return errors.Trace(err) } + err = h.initStatsTopN(&cache) + if err != nil { + return err + } err = h.initStatsBuckets(&cache) if err != nil { return errors.Trace(err) diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index e2b65287490ec..22c6fc7514dba 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" + "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/sessionctx" @@ -279,8 +280,19 @@ func (sc statsCache) update(tables []*statistics.Table, deletedIDs []int64, newV } // LoadNeededHistograms will load histograms for those needed columns. -func (h *Handle) LoadNeededHistograms() error { +func (h *Handle) LoadNeededHistograms() (err error) { cols := statistics.HistogramNeededColumns.AllCols() + reader, err := h.getStatsReader(nil) + defer func() { + err1 := h.releaseStatsReader(reader) + if err1 != nil && err == nil { + err = err1 + } + }() + if err != nil { + return err + } + for _, col := range cols { statsCache := h.statsCache.Load().(statsCache) tbl, ok := statsCache.tables[col.TableID] @@ -293,11 +305,11 @@ func (h *Handle) LoadNeededHistograms() error { statistics.HistogramNeededColumns.Delete(col) continue } - hg, err := h.histogramFromStorage(col.TableID, c.ID, &c.Info.FieldType, c.NDV, 0, c.LastUpdateVersion, c.NullCount, c.TotColSize, c.Correlation, nil) + hg, err := h.histogramFromStorage(reader, col.TableID, c.ID, &c.Info.FieldType, c.NDV, 0, c.LastUpdateVersion, c.NullCount, c.TotColSize, c.Correlation) if err != nil { return errors.Trace(err) } - cms, err := h.cmSketchFromStorage(col.TableID, 0, col.ColumnID, nil) + cms, err := h.cmSketchFromStorage(reader, col.TableID, 0, col.ColumnID) if err != nil { return errors.Trace(err) } @@ -342,24 +354,21 @@ func (h *Handle) FlushStats() { } } -func (h *Handle) cmSketchFromStorage(tblID int64, isIndex, histID int64, historyStatsExec sqlexec.RestrictedSQLExecutor) (_ *statistics.CMSketch, err error) { +func (h *Handle) cmSketchFromStorage(reader *statsReader, tblID int64, isIndex, histID int64) (_ *statistics.CMSketch, err error) { selSQL := fmt.Sprintf("select cm_sketch from mysql.stats_histograms where table_id = %d and is_index = %d and hist_id = %d", tblID, isIndex, histID) - var rows []chunk.Row - if historyStatsExec != nil { - rows, _, err = historyStatsExec.ExecRestrictedSQLWithSnapshot(nil, selSQL) - } else { - rows, _, err = h.restrictedExec.ExecRestrictedSQL(nil, selSQL) + rows, _, err := reader.read(selSQL) + if err != nil || len(rows) == 0 { + return nil, err } + selSQL = fmt.Sprintf("select HIGH_PRIORITY value, count from mysql.stats_top_n where table_id = %d and is_index = %d and hist_id = %d", tblID, isIndex, histID) + topNRows, _, err := reader.read(selSQL) if err != nil { - return nil, errors.Trace(err) - } - if len(rows) == 0 { - return nil, nil + return nil, err } - return statistics.LoadCMSketchWithTopN(h.restrictedExec, tblID, isIndex, histID, rows[0].GetBytes(0)) + return statistics.DecodeCMSketch(rows[0].GetBytes(0), topNRows) } -func (h *Handle) indexStatsFromStorage(row chunk.Row, table *statistics.Table, tableInfo *model.TableInfo, historyStatsExec sqlexec.RestrictedSQLExecutor) error { +func (h *Handle) indexStatsFromStorage(reader *statsReader, row chunk.Row, table *statistics.Table, tableInfo *model.TableInfo) error { histID := row.GetInt64(2) distinct := row.GetInt64(3) histVer := row.GetUint64(4) @@ -368,10 +377,8 @@ func (h *Handle) indexStatsFromStorage(row chunk.Row, table *statistics.Table, t errorRate := statistics.ErrorRate{} flag := row.GetInt64(8) lastAnalyzePos := row.GetDatum(10, types.NewFieldType(mysql.TypeBlob)) - if statistics.IsAnalyzed(flag) { - h.mu.Lock() + if statistics.IsAnalyzed(flag) && !reader.isHistory() { h.mu.rateMap.clear(table.PhysicalID, histID, true) - h.mu.Unlock() } else if idx != nil { errorRate = idx.ErrorRate } @@ -380,11 +387,11 @@ func (h *Handle) indexStatsFromStorage(row chunk.Row, table *statistics.Table, t continue } if idx == nil || idx.LastUpdateVersion < histVer { - hg, err := h.histogramFromStorage(table.PhysicalID, histID, types.NewFieldType(mysql.TypeBlob), distinct, 1, histVer, nullCount, 0, 0, historyStatsExec) + hg, err := h.histogramFromStorage(reader, table.PhysicalID, histID, types.NewFieldType(mysql.TypeBlob), distinct, 1, histVer, nullCount, 0, 0) if err != nil { return errors.Trace(err) } - cms, err := h.cmSketchFromStorage(table.PhysicalID, 1, idxInfo.ID, historyStatsExec) + cms, err := h.cmSketchFromStorage(reader, table.PhysicalID, 1, idxInfo.ID) if err != nil { return errors.Trace(err) } @@ -400,7 +407,7 @@ func (h *Handle) indexStatsFromStorage(row chunk.Row, table *statistics.Table, t return nil } -func (h *Handle) columnStatsFromStorage(row chunk.Row, table *statistics.Table, tableInfo *model.TableInfo, loadAll bool, historyStatsExec sqlexec.RestrictedSQLExecutor) error { +func (h *Handle) columnStatsFromStorage(reader *statsReader, row chunk.Row, table *statistics.Table, tableInfo *model.TableInfo, loadAll bool) error { histID := row.GetInt64(2) distinct := row.GetInt64(3) histVer := row.GetUint64(4) @@ -411,10 +418,8 @@ func (h *Handle) columnStatsFromStorage(row chunk.Row, table *statistics.Table, col := table.Columns[histID] errorRate := statistics.ErrorRate{} flag := row.GetInt64(8) - if statistics.IsAnalyzed(flag) { - h.mu.Lock() + if statistics.IsAnalyzed(flag) && !reader.isHistory() { h.mu.rateMap.clear(table.PhysicalID, histID, false) - h.mu.Unlock() } else if col != nil { errorRate = col.ErrorRate } @@ -433,7 +438,7 @@ func (h *Handle) columnStatsFromStorage(row chunk.Row, table *statistics.Table, (col == nil || col.Len() == 0 && col.LastUpdateVersion < histVer) && !loadAll if notNeedLoad { - count, err := h.columnCountFromStorage(table.PhysicalID, histID) + count, err := h.columnCountFromStorage(reader, table.PhysicalID, histID) if err != nil { return errors.Trace(err) } @@ -451,11 +456,11 @@ func (h *Handle) columnStatsFromStorage(row chunk.Row, table *statistics.Table, break } if col == nil || col.LastUpdateVersion < histVer || loadAll { - hg, err := h.histogramFromStorage(table.PhysicalID, histID, &colInfo.FieldType, distinct, 0, histVer, nullCount, totColSize, correlation, historyStatsExec) + hg, err := h.histogramFromStorage(reader, table.PhysicalID, histID, &colInfo.FieldType, distinct, 0, histVer, nullCount, totColSize, correlation) if err != nil { return errors.Trace(err) } - cms, err := h.cmSketchFromStorage(table.PhysicalID, 0, colInfo.ID, historyStatsExec) + cms, err := h.cmSketchFromStorage(reader, table.PhysicalID, 0, colInfo.ID) if err != nil { return errors.Trace(err) } @@ -492,6 +497,16 @@ func (h *Handle) columnStatsFromStorage(row chunk.Row, table *statistics.Table, // tableStatsFromStorage loads table stats info from storage. func (h *Handle) tableStatsFromStorage(tableInfo *model.TableInfo, physicalID int64, loadAll bool, historyStatsExec sqlexec.RestrictedSQLExecutor) (_ *statistics.Table, err error) { + reader, err := h.getStatsReader(historyStatsExec) + defer func() { + err1 := h.releaseStatsReader(reader) + if err == nil && err1 != nil { + err = err1 + } + }() + if err != nil { + return nil, err + } table, ok := h.statsCache.Load().(statsCache).tables[physicalID] // If table stats is pseudo, we also need to copy it, since we will use the column stats when // the average error rate of it is small. @@ -511,28 +526,19 @@ func (h *Handle) tableStatsFromStorage(tableInfo *model.TableInfo, physicalID in } table.Pseudo = false selSQL := fmt.Sprintf("select table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, flag, correlation, last_analyze_pos from mysql.stats_histograms where table_id = %d", physicalID) - var rows []chunk.Row - if historyStatsExec != nil { - rows, _, err = historyStatsExec.ExecRestrictedSQLWithSnapshot(nil, selSQL) - } else { - rows, _, err = h.restrictedExec.ExecRestrictedSQL(nil, selSQL) - } - if err != nil { - return nil, err - } + rows, _, err := reader.read(selSQL) // Check deleted table. if len(rows) == 0 { return nil, nil } for _, row := range rows { if row.GetInt64(1) > 0 { - if err := h.indexStatsFromStorage(row, table, tableInfo, historyStatsExec); err != nil { - return nil, errors.Trace(err) - } + err = h.indexStatsFromStorage(reader, row, table, tableInfo) } else { - if err := h.columnStatsFromStorage(row, table, tableInfo, loadAll, historyStatsExec); err != nil { - return nil, errors.Trace(err) - } + err = h.columnStatsFromStorage(reader, row, table, tableInfo, loadAll) + } + if err != nil { + return nil, err } } return table, nil @@ -660,17 +666,9 @@ func (h *Handle) SaveMetaToStorage(tableID, count, modifyCount int64) (err error return } -func (h *Handle) histogramFromStorage(tableID int64, colID int64, tp *types.FieldType, distinct int64, isIndex int, ver uint64, nullCount int64, totColSize int64, corr float64, historyStatsExec sqlexec.RestrictedSQLExecutor) (_ *statistics.Histogram, err error) { +func (h *Handle) histogramFromStorage(reader *statsReader, tableID int64, colID int64, tp *types.FieldType, distinct int64, isIndex int, ver uint64, nullCount int64, totColSize int64, corr float64) (_ *statistics.Histogram, err error) { selSQL := fmt.Sprintf("select count, repeats, lower_bound, upper_bound from mysql.stats_buckets where table_id = %d and is_index = %d and hist_id = %d order by bucket_id", tableID, isIndex, colID) - var ( - rows []chunk.Row - fields []*ast.ResultField - ) - if historyStatsExec != nil { - rows, fields, err = historyStatsExec.ExecRestrictedSQLWithSnapshot(nil, selSQL) - } else { - rows, fields, err = h.restrictedExec.ExecRestrictedSQL(nil, selSQL) - } + rows, fields, err := reader.read(selSQL) if err != nil { return nil, errors.Trace(err) } @@ -705,9 +703,9 @@ func (h *Handle) histogramFromStorage(tableID int64, colID int64, tp *types.Fiel return hg, nil } -func (h *Handle) columnCountFromStorage(tableID, colID int64) (int64, error) { +func (h *Handle) columnCountFromStorage(reader *statsReader, tableID, colID int64) (int64, error) { selSQL := fmt.Sprintf("select sum(count) from mysql.stats_buckets where table_id = %d and is_index = %d and hist_id = %d", tableID, 0, colID) - rows, _, err := h.restrictedExec.ExecRestrictedSQL(nil, selSQL) + rows, _, err := reader.read(selSQL) if err != nil { return 0, errors.Trace(err) } @@ -733,3 +731,62 @@ func (h *Handle) statsMetaByTableIDFromStorage(tableID int64, historyStatsExec s count = rows[0].GetInt64(2) return } + +// statsReader is used for simplify code that needs to read system tables in different sqls +// but requires the same transactions. +type statsReader struct { + ctx sessionctx.Context + history sqlexec.RestrictedSQLExecutor +} + +func (sr *statsReader) read(sql string) (rows []chunk.Row, fields []*ast.ResultField, err error) { + if sr.history != nil { + return sr.history.ExecRestrictedSQLWithSnapshot(nil, sql) + } + rc, err := sr.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql) + if len(rc) > 0 { + defer terror.Call(rc[0].Close) + } + if err != nil { + return nil, nil, err + } + for { + req := rc[0].NewChunk() + err := rc[0].Next(context.TODO(), req) + if err != nil { + return nil, nil, err + } + if req.NumRows() == 0 { + break + } + for i := 0; i < req.NumRows(); i++ { + rows = append(rows, req.GetRow(i)) + } + } + return rows, rc[0].Fields(), nil +} + +func (sr *statsReader) isHistory() bool { + return sr.history != nil +} + +func (h *Handle) getStatsReader(history sqlexec.RestrictedSQLExecutor) (*statsReader, error) { + if history != nil { + return &statsReader{history: history}, nil + } + h.mu.Lock() + _, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "begin") + if err != nil { + return nil, err + } + return &statsReader{ctx: h.mu.ctx}, nil +} + +func (h *Handle) releaseStatsReader(reader *statsReader) error { + if reader.history != nil { + return nil + } + _, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "commit") + h.mu.Unlock() + return err +}