Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: update stats using query feedback #6197

Merged
merged 12 commits into from
Apr 10, 2018
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ type Performance struct {
RunAutoAnalyze bool `toml:"run-auto-analyze" json:"run-auto-analyze"`
StmtCountLimit uint `toml:"stmt-count-limit" json:"stmt-count-limit"`
FeedbackProbability float64 `toml:"feedback-probability" json:"feedback-probability"`
QueryFeedbackLimit uint `toml:"query-feedback-limit" json:"query-feedback-limit"`
}

// XProtocol is the XProtocol section of the config.
Expand Down Expand Up @@ -256,6 +257,7 @@ var defaultConf = Config{
RunAutoAnalyze: true,
StmtCountLimit: 5000,
FeedbackProbability: 0,
QueryFeedbackLimit: 1024,
},
XProtocol: XProtocol{
XHost: "",
Expand Down
3 changes: 3 additions & 0 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ run-auto-analyze = true
# Probability to use the query feedback to update stats, 0 or 1 for always false/true.
feedback-probability = 0.0

# The max number of query feedback that cache in memory.
query-feedback-limit = 1024

[proxy-protocol]
# PROXY protocol acceptable client networks.
# Empty string means disable PROXY protocol, * means all networks.
Expand Down
29 changes: 23 additions & 6 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,10 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager)
defer loadHistogramTicker.Stop()
gcStatsTicker := time.NewTicker(100 * lease)
defer gcStatsTicker.Stop()
dumpFeedbackTicker := time.NewTicker(200 * lease)
defer dumpFeedbackTicker.Stop()
loadFeedbackTicker := time.NewTicker(5 * lease)
defer loadFeedbackTicker.Stop()
statsHandle := do.StatsHandle()
t := time.Now()
err := statsHandle.InitStats(do.InfoSchema())
Expand All @@ -644,7 +648,7 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager)
for {
select {
case <-loadTicker.C:
err := statsHandle.Update(do.InfoSchema())
err = statsHandle.Update(do.InfoSchema())
if err != nil {
log.Error("[stats] update stats info fail: ", errors.ErrorStack(err))
}
Expand All @@ -653,32 +657,45 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager)
return
// This channel is sent only by ddl owner or the drop stats executor.
case t := <-statsHandle.DDLEventCh():
err := statsHandle.HandleDDLEvent(t)
err = statsHandle.HandleDDLEvent(t)
if err != nil {
log.Error("[stats] handle ddl event fail: ", errors.ErrorStack(err))
}
case t := <-statsHandle.AnalyzeResultCh():
for i, hg := range t.Hist {
err := statistics.SaveStatsToStorage(ctx, t.TableID, t.Count, t.IsIndex, hg, t.Cms[i])
err = statistics.SaveStatsToStorage(ctx, t.TableID, t.Count, t.IsIndex, hg, t.Cms[i])
if err != nil {
log.Error("[stats] save histogram to storage fail: ", errors.ErrorStack(err))
}
}
case <-deltaUpdateTicker.C:
err := statsHandle.DumpStatsDeltaToKV()
err = statsHandle.DumpStatsDeltaToKV()
if err != nil {
log.Error("[stats] dump stats delta fail: ", errors.ErrorStack(err))
}
case <-loadHistogramTicker.C:
err := statsHandle.LoadNeededHistograms()
err = statsHandle.LoadNeededHistograms()
if err != nil {
log.Error("[stats] load histograms fail: ", errors.ErrorStack(err))
}
case <-loadFeedbackTicker.C:
if !owner.IsOwner() {
continue
}
err = statsHandle.HandleUpdateStats(do.InfoSchema())
if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a metrics here.

Copy link
Contributor Author

@alivxxx alivxxx Apr 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to add metrics in a separate pr, it is already too large now.

log.Errorf("[stats] update stats using feedback fail: ", errors.ErrorStack(err))
}
case <-dumpFeedbackTicker.C:
err = statsHandle.DumpStatsFeedbackToKV()
if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do it in a separate pr.

log.Error("[stats] dump stats feedback fail: ", errors.ErrorStack(err))
}
case <-gcStatsTicker.C:
if !owner.IsOwner() {
continue
}
err := statsHandle.GCStats(do.InfoSchema(), do.DDL().GetLease())
err = statsHandle.GCStats(do.InfoSchema(), do.DDL().GetLease())
if err != nil {
log.Error("[stats] gc stats fail: ", errors.ErrorStack(err))
}
Expand Down
2 changes: 1 addition & 1 deletion executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (s *testSuite) TestAggregation(c *C) {

result = tk.MustQuery("select count(*) from information_schema.columns")
// When adding new memory columns in information_schema, please update this variable.
columnCountOfAllInformationSchemaTables := "737"
columnCountOfAllInformationSchemaTables := "741"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 column added instead of 4?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is 4 column added.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, i misread it.

result.Check(testkit.Rows(columnCountOfAllInformationSchemaTables))

tk.MustExec("drop table if exists t1")
Expand Down
20 changes: 20 additions & 0 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,15 @@ const (
UNIQUE KEY (element_id),
KEY (job_id, element_id)
);`

// CreateStatsFeedbackTable stores the feedback info which is used to update stats.
CreateStatsFeedbackTable = `CREATE TABLE IF NOT EXISTS mysql.stats_feedback (
table_id bigint(64) NOT NULL,
is_index tinyint(2) NOT NULL,
hist_id bigint(64) NOT NULL,
feedback blob NOT NULL,
index hist(table_id, is_index, hist_id)
);`
)

// bootstrap initiates system DB for a store.
Expand Down Expand Up @@ -233,6 +242,7 @@ const (
version17 = 17
version18 = 18
version19 = 19
version20 = 20
)

func checkBootstrapped(s Session) (bool, error) {
Expand Down Expand Up @@ -367,6 +377,10 @@ func upgrade(s Session) {
upgradeToVer19(s)
}

if ver < version20 {
upgradeToVer20(s)
}

updateBootstrapVer(s)
_, err = s.Execute(context.Background(), "COMMIT")

Expand Down Expand Up @@ -589,6 +603,10 @@ func upgradeToVer19(s Session) {
doReentrantDDL(s, "ALTER TABLE mysql.columns_priv MODIFY User CHAR(32)")
}

func upgradeToVer20(s Session) {
doReentrantDDL(s, CreateStatsFeedbackTable)
}

// updateBootstrapVer updates bootstrap version variable in mysql.TiDB table.
func updateBootstrapVer(s Session) {
// Update bootstrap version.
Expand Down Expand Up @@ -635,6 +653,8 @@ func doDDLWorks(s Session) {
mustExecute(s, CreateStatsBucketsTable)
// Create gc_delete_range table.
mustExecute(s, CreateGCDeleteRangeTable)
// Create stats_feedback table.
mustExecute(s, CreateStatsFeedbackTable)
}

// doDMLWorks executes DML statements in bootstrap stage.
Expand Down
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1197,7 +1197,7 @@ func createSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er

const (
notBootstrapped = 0
currentBootstrapVersion = 19
currentBootstrapVersion = 20
)

func getStoreBootstrapVersion(store kv.Storage) int64 {
Expand Down
30 changes: 29 additions & 1 deletion statistics/cmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
tipb "github.com/pingcap/tipb/go-tipb"
"github.com/pingcap/tipb/go-tipb"
"github.com/spaolacci/murmur3"
)

Expand Down Expand Up @@ -54,6 +54,18 @@ func (c *CMSketch) InsertBytes(bytes []byte) {
}
}

// setValue sets the count for value that hashed into (h1, h2).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(h1, h2) is an interval?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it means the hash value pair.

func (c *CMSketch) setValue(h1, h2 uint64, count uint32) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment for this function.

oriCount := c.queryHashValue(h1, h2)
c.count += uint64(count) - uint64(oriCount)
// let it overflow naturally
deltaCount := count - oriCount
for i := range c.table {
j := (h1 + h2*uint64(i)) % uint64(c.width)
c.table[i][j] = c.table[i][j] + deltaCount
}
}

func (c *CMSketch) queryValue(sc *stmtctx.StatementContext, val types.Datum) (uint32, error) {
bytes, err := codec.EncodeValue(sc, nil, val)
if err != nil {
Expand All @@ -64,6 +76,10 @@ func (c *CMSketch) queryValue(sc *stmtctx.StatementContext, val types.Datum) (ui

func (c *CMSketch) queryBytes(bytes []byte) uint32 {
h1, h2 := murmur3.Sum128(bytes)
return c.queryHashValue(h1, h2)
}

func (c *CMSketch) queryHashValue(h1, h2 uint64) uint32 {
vals := make([]uint32, c.depth)
min := uint32(math.MaxUint32)
for i := range c.table {
Expand Down Expand Up @@ -173,3 +189,15 @@ func (c *CMSketch) Equal(rc *CMSketch) bool {
}
return true
}

func (c *CMSketch) copy() *CMSketch {
if c == nil {
return nil
}
tbl := make([][]uint32, c.depth)
for i := range tbl {
tbl[i] = make([]uint32, c.width)
copy(tbl[i], c.table[i])
}
return &CMSketch{count: c.count, width: c.width, depth: c.depth, table: tbl}
}
Loading