Skip to content

Commit

Permalink
tats: fix panic when init stats for cm sketch (pingcap#14421)
Browse files Browse the repository at this point in the history
  • Loading branch information
alivxxx committed Jan 14, 2020
1 parent d1e0cfa commit 0cb5f9a
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 88 deletions.
39 changes: 15 additions & 24 deletions statistics/cmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package statistics

import (
"bytes"
"fmt"
"math"
"sort"

Expand All @@ -25,8 +24,8 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tipb/go-tipb"
"github.com/spaolacci/murmur3"
)
Expand Down Expand Up @@ -403,7 +402,7 @@ func CMSketchToProto(c *CMSketch) *tipb.CMSketch {

// CMSketchFromProto converts CMSketch from its protobuf representation.
func CMSketchFromProto(protoSketch *tipb.CMSketch) *CMSketch {
if protoSketch == nil {
if protoSketch == nil || len(protoSketch.Rows) == 0 {
return nil
}
c := NewCMSketch(int32(len(protoSketch.Rows)), int32(len(protoSketch.Rows[0].Counters)))
Expand Down Expand Up @@ -438,8 +437,8 @@ func EncodeCMSketchWithoutTopN(c *CMSketch) ([]byte, error) {
return protoData, err
}

// decodeCMSketch decode a CMSketch from the given byte slice.
func decodeCMSketch(data []byte, topN []*TopNMeta) (*CMSketch, error) {
// DecodeCMSketch decode a CMSketch from the given byte slice.
func DecodeCMSketch(data []byte, topNRows []chunk.Row) (*CMSketch, error) {
if data == nil {
return nil, nil
}
Expand All @@ -448,29 +447,12 @@ func decodeCMSketch(data []byte, topN []*TopNMeta) (*CMSketch, error) {
if err != nil {
return nil, errors.Trace(err)
}
if len(p.Rows) == 0 && len(topN) == 0 {
return nil, nil
}
for _, meta := range topN {
p.TopN = append(p.TopN, &tipb.CMSketchTopN{Data: meta.Data, Count: meta.Count})
}
return CMSketchFromProto(p), nil
}

// LoadCMSketchWithTopN loads the CM sketch with topN from storage.
func LoadCMSketchWithTopN(exec sqlexec.RestrictedSQLExecutor, tableID, isIndex, histID int64, cms []byte) (*CMSketch, error) {
sql := fmt.Sprintf("select HIGH_PRIORITY value, count from mysql.stats_top_n where table_id = %d and is_index = %d and hist_id = %d", tableID, isIndex, histID)
topNRows, _, err := exec.ExecRestrictedSQL(nil, sql)
if err != nil {
return nil, err
}
topN := make([]*TopNMeta, 0, len(topNRows))
for _, row := range topNRows {
data := make([]byte, len(row.GetBytes(0)))
copy(data, row.GetBytes(0))
topN = append(topN, &TopNMeta{Data: data, Count: row.GetUint64(1)})
p.TopN = append(p.TopN, &tipb.CMSketchTopN{Data: data, Count: row.GetUint64(1)})
}
return decodeCMSketch(cms, topN)
return CMSketchFromProto(p), nil
}

// TotalCount returns the total count in the sketch, it is only used for test.
Expand Down Expand Up @@ -554,6 +536,15 @@ func (c *CMSketch) TopN() []*TopNMeta {
return topN
}

// AppendTopN appends a topn into the cm sketch.
func (c *CMSketch) AppendTopN(data []byte, count uint64) {
if c.topN == nil {
c.topN = make(map[uint64][]*TopNMeta)
}
h1, h2 := murmur3.Sum128(data)
c.topN[h1] = append(c.topN[h1], &TopNMeta{h2, data, count})
}

// GetWidthAndDepth returns the width and depth of CM Sketch.
func (c *CMSketch) GetWidthAndDepth() (int32, int32) {
return c.width, c.depth
Expand Down
15 changes: 13 additions & 2 deletions statistics/cmsketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/spaolacci/murmur3"
)
Expand Down Expand Up @@ -153,7 +155,7 @@ func (s *testStatisticsSuite) TestCMSketchCoding(c *C) {
bytes, err := EncodeCMSketchWithoutTopN(lSketch)
c.Assert(err, IsNil)
c.Assert(len(bytes), Equals, 61457)
rSketch, err := decodeCMSketch(bytes, nil)
rSketch, err := DecodeCMSketch(bytes, nil)
c.Assert(err, IsNil)
c.Assert(lSketch.Equal(rSketch), IsTrue)
}
Expand Down Expand Up @@ -226,16 +228,25 @@ func (s *testStatisticsSuite) TestCMSketchCodingTopN(c *C) {
}
}
lSketch.topN = make(map[uint64][]*TopNMeta)
unsignedLong := types.NewFieldType(mysql.TypeLonglong)
unsignedLong.Flag |= mysql.UnsignedFlag
chk := chunk.New([]*types.FieldType{types.NewFieldType(mysql.TypeBlob), unsignedLong}, 20, 20)
var rows []chunk.Row
for i := 0; i < 20; i++ {
tString := []byte(fmt.Sprintf("%20000d", i))
h1, h2 := murmur3.Sum128(tString)
lSketch.topN[h1] = []*TopNMeta{{h2, tString, math.MaxUint64}}
chk.AppendBytes(0, tString)
chk.AppendUint64(1, math.MaxUint64)
rows = append(rows, chk.GetRow(i))
}

bytes, err := EncodeCMSketchWithoutTopN(lSketch)
c.Assert(err, IsNil)
c.Assert(len(bytes), Equals, 61457)
rSketch, err := decodeCMSketch(bytes, lSketch.TopN())
rSketch, err := DecodeCMSketch(bytes, rows)
c.Assert(err, IsNil)
c.Assert(lSketch.Equal(rSketch), IsTrue)
// do not panic
DecodeCMSketch([]byte{}, rows)
}
66 changes: 58 additions & 8 deletions statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache *statsCache
}

func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (statsCache, error) {
h.mu.Lock()
defer h.mu.Unlock()
sql := "select HIGH_PRIORITY version, table_id, modify_count, count from mysql.stats_meta"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if len(rc) > 0 {
Expand Down Expand Up @@ -105,7 +103,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache *stat
if idxInfo == nil {
continue
}
cms, err := statistics.LoadCMSketchWithTopN(h.restrictedExec, row.GetInt64(0), row.GetInt64(1), row.GetInt64(2), row.GetBytes(6))
cms, err := statistics.DecodeCMSketch(row.GetBytes(6), nil)
if err != nil {
cms = nil
terror.Log(errors.Trace(err))
Expand Down Expand Up @@ -139,8 +137,6 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache *stat
}

func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache *statsCache) error {
h.mu.Lock()
defer h.mu.Unlock()
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if len(rc) > 0 {
Expand All @@ -164,6 +160,46 @@ func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache *statsCache
return nil
}

func (h *Handle) initStatsTopN4Chunk(cache *statsCache, iter *chunk.Iterator4Chunk) {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
table, ok := cache.tables[row.GetInt64(0)]
if !ok {
continue
}
idx, ok := table.Indices[row.GetInt64(1)]
if !ok || idx.CMSketch == nil {
continue
}
data := make([]byte, len(row.GetBytes(2)))
copy(data, row.GetBytes(2))
idx.CMSketch.AppendTopN(data, row.GetUint64(3))
}
}

func (h *Handle) initStatsTopN(cache *statsCache) error {
sql := "select HIGH_PRIORITY table_id, hist_id, value, count from mysql.stats_top_n where is_index = 1"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if len(rc) > 0 {
defer terror.Call(rc[0].Close)
}
if err != nil {
return errors.Trace(err)
}
req := rc[0].NewChunk()
iter := chunk.NewIterator4Chunk(req)
for {
err := rc[0].Next(context.TODO(), req)
if err != nil {
return errors.Trace(err)
}
if req.NumRows() == 0 {
break
}
h.initStatsTopN4Chunk(cache, iter)
}
return nil
}

func initStatsBuckets4Chunk(ctx sessionctx.Context, cache *statsCache, iter *chunk.Iterator4Chunk) {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
tableID, isIndex, histID := row.GetInt64(0), row.GetInt64(1), row.GetInt64(2)
Expand Down Expand Up @@ -211,8 +247,6 @@ func initStatsBuckets4Chunk(ctx sessionctx.Context, cache *statsCache, iter *chu
}

func (h *Handle) initStatsBuckets(cache *statsCache) error {
h.mu.Lock()
defer h.mu.Unlock()
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id"
rc, err := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if len(rc) > 0 {
Expand Down Expand Up @@ -254,7 +288,19 @@ func (h *Handle) initStatsBuckets(cache *statsCache) error {
}

// InitStats will init the stats cache using full load strategy.
func (h *Handle) InitStats(is infoschema.InfoSchema) error {
func (h *Handle) InitStats(is infoschema.InfoSchema) (err error) {
h.mu.Lock()
defer func() {
_, err1 := h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "commit")
if err == nil && err1 != nil {
err = err1
}
h.mu.Unlock()
}()
_, err = h.mu.ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "begin")
if err != nil {
return err
}
cache, err := h.initStatsMeta(is)
if err != nil {
return errors.Trace(err)
Expand All @@ -263,6 +309,10 @@ func (h *Handle) InitStats(is infoschema.InfoSchema) error {
if err != nil {
return errors.Trace(err)
}
err = h.initStatsTopN(&cache)
if err != nil {
return err
}
err = h.initStatsBuckets(&cache)
if err != nil {
return errors.Trace(err)
Expand Down
Loading

0 comments on commit 0cb5f9a

Please sign in to comment.