Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain: load stats when stats lease is 0 (#10771) #10811

Merged
merged 2 commits into from
Jun 19, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 41 additions & 27 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,8 @@ func (do *Domain) UpdateTableStatsLoop(ctx sessionctx.Context) error {
statsHandle := handle.NewHandle(ctx, do.statsLease)
atomic.StorePointer(&do.statsHandle, unsafe.Pointer(statsHandle))
do.ddl.RegisterEventCh(statsHandle.DDLEventCh())
do.wg.Add(1)
go do.loadStatsWorker()
if do.statsLease <= 0 {
return nil
}
Expand Down Expand Up @@ -904,22 +906,15 @@ func (do *Domain) newStatsOwner() owner.Manager {
return statsOwner
}

func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager) {
defer recoverInDomain("updateStatsWorker", false)
func (do *Domain) loadStatsWorker() {
defer recoverInDomain("loadStatsWorker", false)
defer do.wg.Done()
lease := do.statsLease
deltaUpdateDuration := lease * 20
if lease == 0 {
lease = 3 * time.Second
}
loadTicker := time.NewTicker(lease)
defer loadTicker.Stop()
deltaUpdateTicker := time.NewTicker(deltaUpdateDuration)
defer deltaUpdateTicker.Stop()
loadHistogramTicker := time.NewTicker(lease)
defer loadHistogramTicker.Stop()
gcStatsTicker := time.NewTicker(100 * lease)
defer gcStatsTicker.Stop()
dumpFeedbackTicker := time.NewTicker(200 * lease)
defer dumpFeedbackTicker.Stop()
loadFeedbackTicker := time.NewTicker(5 * lease)
defer loadFeedbackTicker.Stop()
statsHandle := do.StatsHandle()
t := time.Now()
err := statsHandle.InitStats(do.InfoSchema())
Expand All @@ -928,56 +923,75 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager)
} else {
logutil.Logger(context.Background()).Info("init stats info time", zap.Duration("take time", time.Since(t)))
}
defer func() {
do.SetStatsUpdating(false)
do.wg.Done()
}()
for {
select {
case <-loadTicker.C:
err = statsHandle.Update(do.InfoSchema())
if err != nil {
logutil.Logger(context.Background()).Debug("update stats info failed", zap.Error(err))
}
err = statsHandle.LoadNeededHistograms()
if err != nil {
logutil.Logger(context.Background()).Debug("load histograms failed", zap.Error(err))
}
case <-do.exit:
return
}
}
}

func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager) {
defer recoverInDomain("updateStatsWorker", false)
lease := do.statsLease
deltaUpdateTicker := time.NewTicker(20 * lease)
defer deltaUpdateTicker.Stop()
gcStatsTicker := time.NewTicker(100 * lease)
defer gcStatsTicker.Stop()
dumpFeedbackTicker := time.NewTicker(200 * lease)
defer dumpFeedbackTicker.Stop()
loadFeedbackTicker := time.NewTicker(5 * lease)
defer loadFeedbackTicker.Stop()
statsHandle := do.StatsHandle()
defer func() {
do.SetStatsUpdating(false)
do.wg.Done()
}()
for {
select {
case <-do.exit:
statsHandle.FlushStats()
return
// This channel is sent only by ddl owner.
case t := <-statsHandle.DDLEventCh():
err = statsHandle.HandleDDLEvent(t)
err := statsHandle.HandleDDLEvent(t)
if err != nil {
logutil.Logger(context.Background()).Debug("handle ddl event failed", zap.Error(err))
}
case <-deltaUpdateTicker.C:
err = statsHandle.DumpStatsDeltaToKV(handle.DumpDelta)
err := statsHandle.DumpStatsDeltaToKV(handle.DumpDelta)
if err != nil {
logutil.Logger(context.Background()).Debug("dump stats delta failed", zap.Error(err))
}
statsHandle.UpdateErrorRate(do.InfoSchema())
case <-loadHistogramTicker.C:
err = statsHandle.LoadNeededHistograms()
if err != nil {
logutil.Logger(context.Background()).Debug("load histograms failed", zap.Error(err))
}
case <-loadFeedbackTicker.C:
statsHandle.UpdateStatsByLocalFeedback(do.InfoSchema())
if !owner.IsOwner() {
continue
}
err = statsHandle.HandleUpdateStats(do.InfoSchema())
err := statsHandle.HandleUpdateStats(do.InfoSchema())
if err != nil {
logutil.Logger(context.Background()).Debug("update stats using feedback failed", zap.Error(err))
}
case <-dumpFeedbackTicker.C:
err = statsHandle.DumpStatsFeedbackToKV()
err := statsHandle.DumpStatsFeedbackToKV()
if err != nil {
logutil.Logger(context.Background()).Debug("dump stats feedback failed", zap.Error(err))
}
case <-gcStatsTicker.C:
if !owner.IsOwner() {
continue
}
err = statsHandle.GCStats(do.InfoSchema(), do.DDL().GetLease())
err := statsHandle.GCStats(do.InfoSchema(), do.DDL().GetLease())
if err != nil {
logutil.Logger(context.Background()).Debug("GC stats failed", zap.Error(err))
}
Expand Down
2 changes: 1 addition & 1 deletion executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,7 @@ func (e *AnalyzeFastExec) buildHist(ID int64, collector *statistics.SampleCollec
}
stats := domain.GetDomain(e.ctx).StatsHandle()
rowCount := int64(e.rowCount)
if stats.Lease > 0 {
if stats.Lease() > 0 {
rowCount = mathutil.MinInt64(stats.GetTableStats(e.tblInfo).Count, rowCount)
}
// build CMSketch
Expand Down
4 changes: 2 additions & 2 deletions executor/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,12 @@ func (s *testSuite3) TestDropStats(c *C) {
statsTbl = h.GetTableStats(tableInfo)
c.Assert(statsTbl.Pseudo, IsFalse)

h.Lease = 1
h.SetLease(1)
testKit.MustExec("drop stats t")
h.Update(is)
statsTbl = h.GetTableStats(tableInfo)
c.Assert(statsTbl.Pseudo, IsTrue)
h.Lease = 0
h.SetLease(0)
}

func (s *testSuite3) TestFlushTables(c *C) {
Expand Down
2 changes: 0 additions & 2 deletions planner/core/cbo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,8 +658,6 @@ func (s *testAnalyzeSuite) TestNullCount(c *C) {
))
h := dom.StatsHandle()
h.Clear()
h.Lease = 1
defer func() { h.Lease = 0 }()
c.Assert(h.Update(dom.InfoSchema()), IsNil)
testKit.MustQuery("explain select * from t where b = 1").Check(testkit.Rows(
"TableReader_7 0.00 root data:Selection_6",
Expand Down
6 changes: 3 additions & 3 deletions statistics/handle/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ func (s *testStatsSuite) TestDumpAlteredTable(c *C) {
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
h := s.do.StatsHandle()
oriLease := h.Lease
h.Lease = 1
defer func() { h.Lease = oriLease }()
oriLease := h.Lease()
h.SetLease(1)
defer func() { h.SetLease(oriLease) }()
tk.MustExec("create table t(a int, b int)")
tk.MustExec("analyze table t")
tk.MustExec("alter table t drop column a")
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) error {
// To make sure that all the deleted tables' schema and stats info have been acknowledged to all tidb,
// we only garbage collect version before 10 lease.
lease := mathutil.MaxInt64(int64(h.Lease), int64(ddlLease))
lease := mathutil.MaxInt64(int64(h.Lease()), int64(ddlLease))
offset := DurationToTS(10 * time.Duration(lease))
if h.LastUpdateVersion() < offset {
return nil
Expand Down
18 changes: 14 additions & 4 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type Handle struct {
// feedback is used to store query feedback info.
feedback []*statistics.QueryFeedback

Lease time.Duration
lease atomic2.Duration
}

// Clear the StatsCache, only for test.
Expand Down Expand Up @@ -99,9 +99,9 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration) *Handle {
ddlEventCh: make(chan *util.Event, 100),
listHead: &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)},
globalMap: make(tableDeltaMap),
Lease: lease,
feedback: make([]*statistics.QueryFeedback, 0, MaxQueryFeedbackCount.Load()),
}
handle.lease.Store(lease)
// It is safe to use it concurrently because the exec won't touch the ctx.
if exec, ok := ctx.(sqlexec.RestrictedSQLExecutor); ok {
handle.restrictedExec = exec
Expand All @@ -112,6 +112,16 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration) *Handle {
return handle
}

// Lease returns the stats lease.
func (h *Handle) Lease() time.Duration {
return h.lease.Load()
}

// SetLease sets the stats lease.
func (h *Handle) SetLease(lease time.Duration) {
h.lease.Store(lease)
}

// GetQueryFeedback gets the query feedback. It is only use in test.
func (h *Handle) GetQueryFeedback() []*statistics.QueryFeedback {
defer func() {
Expand All @@ -133,7 +143,7 @@ func (h *Handle) Update(is infoschema.InfoSchema) error {
// and A0 < B0 < B1 < A1. We will first read the stats of B, and update the lastVersion to B0, but we cannot read
// the table stats of A0 if we read stats that greater than lastVersion which is B0.
// We can read the stats if the diff between commit time and version is less than three lease.
offset := DurationToTS(3 * h.Lease)
offset := DurationToTS(3 * h.Lease())
if lastVersion >= offset {
lastVersion = lastVersion - offset
} else {
Expand Down Expand Up @@ -402,7 +412,7 @@ func (h *Handle) columnStatsFromStorage(row chunk.Row, table *statistics.Table,
// 2. this column is not handle, and:
// 3. the column doesn't has buckets before, and:
// 4. loadAll is false.
notNeedLoad := h.Lease > 0 &&
notNeedLoad := h.Lease() > 0 &&
!isHandle &&
(col == nil || col.Len() == 0 && col.LastUpdateVersion < histVer) &&
!loadAll
Expand Down
10 changes: 5 additions & 5 deletions statistics/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func (s *testStatsSuite) TestInitStats(c *C) {
c.Assert(err, IsNil)
// `Update` will not use load by need strategy when `Lease` is 0, and `InitStats` is only called when
// `Lease` is not 0, so here we just change it.
h.Lease = time.Millisecond
h.SetLease(time.Millisecond)

h.Clear()
c.Assert(h.InitStats(is), IsNil)
Expand All @@ -388,7 +388,7 @@ func (s *testStatsSuite) TestInitStats(c *C) {
c.Assert(h.Update(is), IsNil)
table1 := h.GetTableStats(tbl.Meta())
assertTableEqual(c, table0, table1)
h.Lease = 0
h.SetLease(0)
}

func (s *testStatsSuite) TestLoadStats(c *C) {
Expand All @@ -398,10 +398,10 @@ func (s *testStatsSuite) TestLoadStats(c *C) {
testKit.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b))")
testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3)")

oriLease := s.do.StatsHandle().Lease
s.do.StatsHandle().Lease = 1
oriLease := s.do.StatsHandle().Lease()
s.do.StatsHandle().SetLease(1)
defer func() {
s.do.StatsHandle().Lease = oriLease
s.do.StatsHandle().SetLease(oriLease)
}()
testKit.MustExec("analyze table t")

Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ func (h *Handle) autoAnalyzeTable(tblInfo *model.TableInfo, statsTbl *statistics
if statsTbl.Pseudo || statsTbl.Count < AutoAnalyzeMinCnt {
return false
}
if needAnalyze, reason := NeedAnalyzeTable(statsTbl, 20*h.Lease, ratio, start, end, time.Now()); needAnalyze {
if needAnalyze, reason := NeedAnalyzeTable(statsTbl, 20*h.Lease(), ratio, start, end, time.Now()); needAnalyze {
logutil.Logger(context.Background()).Info("[stats] auto analyze triggered", zap.String("sql", sql), zap.String("reason", reason))
h.execAutoAnalyze(sql)
return true
Expand Down
26 changes: 13 additions & 13 deletions statistics/handle/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,8 @@ func (s *testStatsSuite) TestAutoUpdate(c *C) {
}

// Test that even if the table is recently modified, we can still analyze the table.
h.Lease = time.Second
defer func() { h.Lease = 0 }()
h.SetLease(time.Second)
defer func() { h.SetLease(0) }()
_, err = testKit.Exec("insert into t values ('fff')")
c.Assert(err, IsNil)
c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil)
Expand Down Expand Up @@ -523,11 +523,11 @@ func (s *testStatsSuite) TestTableAnalyzed(c *C) {
c.Assert(handle.TableAnalyzed(statsTbl), IsTrue)

h.Clear()
oriLease := h.Lease
oriLease := h.Lease()
// set it to non-zero so we will use load by need strategy
h.Lease = 1
h.SetLease(1)
defer func() {
h.Lease = oriLease
h.SetLease(oriLease)
}()
h.Update(is)
statsTbl = h.GetTableStats(tableInfo)
Expand All @@ -538,7 +538,7 @@ func (s *testStatsSuite) TestUpdateErrorRate(c *C) {
defer cleanEnv(c, s.store, s.do)
h := s.do.StatsHandle()
is := s.do.InfoSchema()
h.Lease = 0
h.SetLease(0)
h.Update(is)

oriProbability := statistics.FeedbackProbability
Expand Down Expand Up @@ -608,7 +608,7 @@ func (s *testStatsSuite) TestUpdatePartitionErrorRate(c *C) {
defer cleanEnv(c, s.store, s.do)
h := s.do.StatsHandle()
is := s.do.InfoSchema()
h.Lease = 0
h.SetLease(0)
h.Update(is)

oriProbability := statistics.FeedbackProbability
Expand Down Expand Up @@ -1086,18 +1086,18 @@ func (s *testStatsSuite) TestLogDetailedInfo(c *C) {
oriMinLogCount := handle.MinLogScanCount
oriMinError := handle.MinLogErrorRate
oriLevel := log.GetLevel()
oriLease := s.do.StatsHandle().Lease
oriLease := s.do.StatsHandle().Lease()
defer func() {
statistics.FeedbackProbability = oriProbability
handle.MinLogScanCount = oriMinLogCount
handle.MinLogErrorRate = oriMinError
s.do.StatsHandle().Lease = oriLease
s.do.StatsHandle().SetLease(oriLease)
log.SetLevel(oriLevel)
}()
statistics.FeedbackProbability.Store(1)
handle.MinLogScanCount = 0
handle.MinLogErrorRate = 0
s.do.StatsHandle().Lease = 1
s.do.StatsHandle().SetLease(1)

testKit := testkit.NewTestKit(c, s.store)
testKit.MustExec("use test")
Expand Down Expand Up @@ -1612,9 +1612,9 @@ func (s *testStatsSuite) TestLoadHistCorrelation(c *C) {
defer cleanEnv(c, s.store, s.do)
testKit := testkit.NewTestKit(c, s.store)
h := s.do.StatsHandle()
origLease := h.Lease
h.Lease = time.Second
defer func() { h.Lease = origLease }()
origLease := h.Lease()
h.SetLease(time.Second)
defer func() { h.SetLease(origLease) }()
testKit.MustExec("use test")
testKit.MustExec("create table t(c int)")
testKit.MustExec("insert into t values(1),(2),(3),(4),(5)")
Expand Down