Skip to content

Commit

Permalink
domain: load stats when stats lease is 0 (#10771) (#10811)
Browse files Browse the repository at this point in the history
  • Loading branch information
alivxxx authored and zz-jason committed Jun 19, 2019
1 parent 076e3b2 commit a4461d4
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 59 deletions.
68 changes: 41 additions & 27 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,8 @@ func (do *Domain) UpdateTableStatsLoop(ctx sessionctx.Context) error {
statsHandle := handle.NewHandle(ctx, do.statsLease)
atomic.StorePointer(&do.statsHandle, unsafe.Pointer(statsHandle))
do.ddl.RegisterEventCh(statsHandle.DDLEventCh())
do.wg.Add(1)
go do.loadStatsWorker()
if do.statsLease <= 0 {
return nil
}
Expand Down Expand Up @@ -904,22 +906,15 @@ func (do *Domain) newStatsOwner() owner.Manager {
return statsOwner
}

func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager) {
defer recoverInDomain("updateStatsWorker", false)
func (do *Domain) loadStatsWorker() {
defer recoverInDomain("loadStatsWorker", false)
defer do.wg.Done()
lease := do.statsLease
deltaUpdateDuration := lease * 20
if lease == 0 {
lease = 3 * time.Second
}
loadTicker := time.NewTicker(lease)
defer loadTicker.Stop()
deltaUpdateTicker := time.NewTicker(deltaUpdateDuration)
defer deltaUpdateTicker.Stop()
loadHistogramTicker := time.NewTicker(lease)
defer loadHistogramTicker.Stop()
gcStatsTicker := time.NewTicker(100 * lease)
defer gcStatsTicker.Stop()
dumpFeedbackTicker := time.NewTicker(200 * lease)
defer dumpFeedbackTicker.Stop()
loadFeedbackTicker := time.NewTicker(5 * lease)
defer loadFeedbackTicker.Stop()
statsHandle := do.StatsHandle()
t := time.Now()
err := statsHandle.InitStats(do.InfoSchema())
Expand All @@ -928,56 +923,75 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager)
} else {
logutil.Logger(context.Background()).Info("init stats info time", zap.Duration("take time", time.Since(t)))
}
defer func() {
do.SetStatsUpdating(false)
do.wg.Done()
}()
for {
select {
case <-loadTicker.C:
err = statsHandle.Update(do.InfoSchema())
if err != nil {
logutil.Logger(context.Background()).Debug("update stats info failed", zap.Error(err))
}
err = statsHandle.LoadNeededHistograms()
if err != nil {
logutil.Logger(context.Background()).Debug("load histograms failed", zap.Error(err))
}
case <-do.exit:
return
}
}
}

func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager) {
defer recoverInDomain("updateStatsWorker", false)
lease := do.statsLease
deltaUpdateTicker := time.NewTicker(20 * lease)
defer deltaUpdateTicker.Stop()
gcStatsTicker := time.NewTicker(100 * lease)
defer gcStatsTicker.Stop()
dumpFeedbackTicker := time.NewTicker(200 * lease)
defer dumpFeedbackTicker.Stop()
loadFeedbackTicker := time.NewTicker(5 * lease)
defer loadFeedbackTicker.Stop()
statsHandle := do.StatsHandle()
defer func() {
do.SetStatsUpdating(false)
do.wg.Done()
}()
for {
select {
case <-do.exit:
statsHandle.FlushStats()
return
// This channel is sent only by ddl owner.
case t := <-statsHandle.DDLEventCh():
err = statsHandle.HandleDDLEvent(t)
err := statsHandle.HandleDDLEvent(t)
if err != nil {
logutil.Logger(context.Background()).Debug("handle ddl event failed", zap.Error(err))
}
case <-deltaUpdateTicker.C:
err = statsHandle.DumpStatsDeltaToKV(handle.DumpDelta)
err := statsHandle.DumpStatsDeltaToKV(handle.DumpDelta)
if err != nil {
logutil.Logger(context.Background()).Debug("dump stats delta failed", zap.Error(err))
}
statsHandle.UpdateErrorRate(do.InfoSchema())
case <-loadHistogramTicker.C:
err = statsHandle.LoadNeededHistograms()
if err != nil {
logutil.Logger(context.Background()).Debug("load histograms failed", zap.Error(err))
}
case <-loadFeedbackTicker.C:
statsHandle.UpdateStatsByLocalFeedback(do.InfoSchema())
if !owner.IsOwner() {
continue
}
err = statsHandle.HandleUpdateStats(do.InfoSchema())
err := statsHandle.HandleUpdateStats(do.InfoSchema())
if err != nil {
logutil.Logger(context.Background()).Debug("update stats using feedback failed", zap.Error(err))
}
case <-dumpFeedbackTicker.C:
err = statsHandle.DumpStatsFeedbackToKV()
err := statsHandle.DumpStatsFeedbackToKV()
if err != nil {
logutil.Logger(context.Background()).Debug("dump stats feedback failed", zap.Error(err))
}
case <-gcStatsTicker.C:
if !owner.IsOwner() {
continue
}
err = statsHandle.GCStats(do.InfoSchema(), do.DDL().GetLease())
err := statsHandle.GCStats(do.InfoSchema(), do.DDL().GetLease())
if err != nil {
logutil.Logger(context.Background()).Debug("GC stats failed", zap.Error(err))
}
Expand Down
2 changes: 1 addition & 1 deletion executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,7 @@ func (e *AnalyzeFastExec) buildHist(ID int64, collector *statistics.SampleCollec
}
stats := domain.GetDomain(e.ctx).StatsHandle()
rowCount := int64(e.rowCount)
if stats.Lease > 0 {
if stats.Lease() > 0 {
rowCount = mathutil.MinInt64(stats.GetTableStats(e.tblInfo).Count, rowCount)
}
// build CMSketch
Expand Down
4 changes: 2 additions & 2 deletions executor/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,12 @@ func (s *testSuite3) TestDropStats(c *C) {
statsTbl = h.GetTableStats(tableInfo)
c.Assert(statsTbl.Pseudo, IsFalse)

h.Lease = 1
h.SetLease(1)
testKit.MustExec("drop stats t")
h.Update(is)
statsTbl = h.GetTableStats(tableInfo)
c.Assert(statsTbl.Pseudo, IsTrue)
h.Lease = 0
h.SetLease(0)
}

func (s *testSuite3) TestFlushTables(c *C) {
Expand Down
2 changes: 0 additions & 2 deletions planner/core/cbo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,8 +658,6 @@ func (s *testAnalyzeSuite) TestNullCount(c *C) {
))
h := dom.StatsHandle()
h.Clear()
h.Lease = 1
defer func() { h.Lease = 0 }()
c.Assert(h.Update(dom.InfoSchema()), IsNil)
testKit.MustQuery("explain select * from t where b = 1").Check(testkit.Rows(
"TableReader_7 0.00 root data:Selection_6",
Expand Down
6 changes: 3 additions & 3 deletions statistics/handle/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ func (s *testStatsSuite) TestDumpAlteredTable(c *C) {
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
h := s.do.StatsHandle()
oriLease := h.Lease
h.Lease = 1
defer func() { h.Lease = oriLease }()
oriLease := h.Lease()
h.SetLease(1)
defer func() { h.SetLease(oriLease) }()
tk.MustExec("create table t(a int, b int)")
tk.MustExec("analyze table t")
tk.MustExec("alter table t drop column a")
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) error {
// To make sure that all the deleted tables' schema and stats info have been acknowledged to all tidb,
// we only garbage collect version before 10 lease.
lease := mathutil.MaxInt64(int64(h.Lease), int64(ddlLease))
lease := mathutil.MaxInt64(int64(h.Lease()), int64(ddlLease))
offset := DurationToTS(10 * time.Duration(lease))
if h.LastUpdateVersion() < offset {
return nil
Expand Down
18 changes: 14 additions & 4 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type Handle struct {
// feedback is used to store query feedback info.
feedback []*statistics.QueryFeedback

Lease time.Duration
lease atomic2.Duration
}

// Clear the StatsCache, only for test.
Expand Down Expand Up @@ -99,9 +99,9 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration) *Handle {
ddlEventCh: make(chan *util.Event, 100),
listHead: &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)},
globalMap: make(tableDeltaMap),
Lease: lease,
feedback: make([]*statistics.QueryFeedback, 0, MaxQueryFeedbackCount.Load()),
}
handle.lease.Store(lease)
// It is safe to use it concurrently because the exec won't touch the ctx.
if exec, ok := ctx.(sqlexec.RestrictedSQLExecutor); ok {
handle.restrictedExec = exec
Expand All @@ -112,6 +112,16 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration) *Handle {
return handle
}

// Lease returns the stats lease.
func (h *Handle) Lease() time.Duration {
return h.lease.Load()
}

// SetLease sets the stats lease.
func (h *Handle) SetLease(lease time.Duration) {
h.lease.Store(lease)
}

// GetQueryFeedback gets the query feedback. It is only use in test.
func (h *Handle) GetQueryFeedback() []*statistics.QueryFeedback {
defer func() {
Expand All @@ -133,7 +143,7 @@ func (h *Handle) Update(is infoschema.InfoSchema) error {
// and A0 < B0 < B1 < A1. We will first read the stats of B, and update the lastVersion to B0, but we cannot read
// the table stats of A0 if we read stats that greater than lastVersion which is B0.
// We can read the stats if the diff between commit time and version is less than three lease.
offset := DurationToTS(3 * h.Lease)
offset := DurationToTS(3 * h.Lease())
if lastVersion >= offset {
lastVersion = lastVersion - offset
} else {
Expand Down Expand Up @@ -402,7 +412,7 @@ func (h *Handle) columnStatsFromStorage(row chunk.Row, table *statistics.Table,
// 2. this column is not handle, and:
// 3. the column doesn't has buckets before, and:
// 4. loadAll is false.
notNeedLoad := h.Lease > 0 &&
notNeedLoad := h.Lease() > 0 &&
!isHandle &&
(col == nil || col.Len() == 0 && col.LastUpdateVersion < histVer) &&
!loadAll
Expand Down
10 changes: 5 additions & 5 deletions statistics/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func (s *testStatsSuite) TestInitStats(c *C) {
c.Assert(err, IsNil)
// `Update` will not use load by need strategy when `Lease` is 0, and `InitStats` is only called when
// `Lease` is not 0, so here we just change it.
h.Lease = time.Millisecond
h.SetLease(time.Millisecond)

h.Clear()
c.Assert(h.InitStats(is), IsNil)
Expand All @@ -388,7 +388,7 @@ func (s *testStatsSuite) TestInitStats(c *C) {
c.Assert(h.Update(is), IsNil)
table1 := h.GetTableStats(tbl.Meta())
assertTableEqual(c, table0, table1)
h.Lease = 0
h.SetLease(0)
}

func (s *testStatsSuite) TestLoadStats(c *C) {
Expand All @@ -398,10 +398,10 @@ func (s *testStatsSuite) TestLoadStats(c *C) {
testKit.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b))")
testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3)")

oriLease := s.do.StatsHandle().Lease
s.do.StatsHandle().Lease = 1
oriLease := s.do.StatsHandle().Lease()
s.do.StatsHandle().SetLease(1)
defer func() {
s.do.StatsHandle().Lease = oriLease
s.do.StatsHandle().SetLease(oriLease)
}()
testKit.MustExec("analyze table t")

Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ func (h *Handle) autoAnalyzeTable(tblInfo *model.TableInfo, statsTbl *statistics
if statsTbl.Pseudo || statsTbl.Count < AutoAnalyzeMinCnt {
return false
}
if needAnalyze, reason := NeedAnalyzeTable(statsTbl, 20*h.Lease, ratio, start, end, time.Now()); needAnalyze {
if needAnalyze, reason := NeedAnalyzeTable(statsTbl, 20*h.Lease(), ratio, start, end, time.Now()); needAnalyze {
logutil.Logger(context.Background()).Info("[stats] auto analyze triggered", zap.String("sql", sql), zap.String("reason", reason))
h.execAutoAnalyze(sql)
return true
Expand Down
26 changes: 13 additions & 13 deletions statistics/handle/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,8 @@ func (s *testStatsSuite) TestAutoUpdate(c *C) {
}

// Test that even if the table is recently modified, we can still analyze the table.
h.Lease = time.Second
defer func() { h.Lease = 0 }()
h.SetLease(time.Second)
defer func() { h.SetLease(0) }()
_, err = testKit.Exec("insert into t values ('fff')")
c.Assert(err, IsNil)
c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil)
Expand Down Expand Up @@ -523,11 +523,11 @@ func (s *testStatsSuite) TestTableAnalyzed(c *C) {
c.Assert(handle.TableAnalyzed(statsTbl), IsTrue)

h.Clear()
oriLease := h.Lease
oriLease := h.Lease()
// set it to non-zero so we will use load by need strategy
h.Lease = 1
h.SetLease(1)
defer func() {
h.Lease = oriLease
h.SetLease(oriLease)
}()
h.Update(is)
statsTbl = h.GetTableStats(tableInfo)
Expand All @@ -538,7 +538,7 @@ func (s *testStatsSuite) TestUpdateErrorRate(c *C) {
defer cleanEnv(c, s.store, s.do)
h := s.do.StatsHandle()
is := s.do.InfoSchema()
h.Lease = 0
h.SetLease(0)
h.Update(is)

oriProbability := statistics.FeedbackProbability
Expand Down Expand Up @@ -608,7 +608,7 @@ func (s *testStatsSuite) TestUpdatePartitionErrorRate(c *C) {
defer cleanEnv(c, s.store, s.do)
h := s.do.StatsHandle()
is := s.do.InfoSchema()
h.Lease = 0
h.SetLease(0)
h.Update(is)

oriProbability := statistics.FeedbackProbability
Expand Down Expand Up @@ -1086,18 +1086,18 @@ func (s *testStatsSuite) TestLogDetailedInfo(c *C) {
oriMinLogCount := handle.MinLogScanCount
oriMinError := handle.MinLogErrorRate
oriLevel := log.GetLevel()
oriLease := s.do.StatsHandle().Lease
oriLease := s.do.StatsHandle().Lease()
defer func() {
statistics.FeedbackProbability = oriProbability
handle.MinLogScanCount = oriMinLogCount
handle.MinLogErrorRate = oriMinError
s.do.StatsHandle().Lease = oriLease
s.do.StatsHandle().SetLease(oriLease)
log.SetLevel(oriLevel)
}()
statistics.FeedbackProbability.Store(1)
handle.MinLogScanCount = 0
handle.MinLogErrorRate = 0
s.do.StatsHandle().Lease = 1
s.do.StatsHandle().SetLease(1)

testKit := testkit.NewTestKit(c, s.store)
testKit.MustExec("use test")
Expand Down Expand Up @@ -1612,9 +1612,9 @@ func (s *testStatsSuite) TestLoadHistCorrelation(c *C) {
defer cleanEnv(c, s.store, s.do)
testKit := testkit.NewTestKit(c, s.store)
h := s.do.StatsHandle()
origLease := h.Lease
h.Lease = time.Second
defer func() { h.Lease = origLease }()
origLease := h.Lease()
h.SetLease(time.Second)
defer func() { h.SetLease(origLease) }()
testKit.MustExec("use test")
testKit.MustExec("create table t(c int)")
testKit.MustExec("insert into t values(1),(2),(3),(4),(5)")
Expand Down

0 comments on commit a4461d4

Please sign in to comment.