diff --git a/domain/domain.go b/domain/domain.go index 3a1fb6368ffd8..b3310426db8eb 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -873,6 +873,8 @@ func (do *Domain) UpdateTableStatsLoop(ctx sessionctx.Context) error { statsHandle := handle.NewHandle(ctx, do.statsLease) atomic.StorePointer(&do.statsHandle, unsafe.Pointer(statsHandle)) do.ddl.RegisterEventCh(statsHandle.DDLEventCh()) + do.wg.Add(1) + go do.loadStatsWorker() if do.statsLease <= 0 { return nil } @@ -904,22 +906,15 @@ func (do *Domain) newStatsOwner() owner.Manager { return statsOwner } -func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager) { - defer recoverInDomain("updateStatsWorker", false) +func (do *Domain) loadStatsWorker() { + defer recoverInDomain("loadStatsWorker", false) + defer do.wg.Done() lease := do.statsLease - deltaUpdateDuration := lease * 20 + if lease == 0 { + lease = 3 * time.Second + } loadTicker := time.NewTicker(lease) defer loadTicker.Stop() - deltaUpdateTicker := time.NewTicker(deltaUpdateDuration) - defer deltaUpdateTicker.Stop() - loadHistogramTicker := time.NewTicker(lease) - defer loadHistogramTicker.Stop() - gcStatsTicker := time.NewTicker(100 * lease) - defer gcStatsTicker.Stop() - dumpFeedbackTicker := time.NewTicker(200 * lease) - defer dumpFeedbackTicker.Stop() - loadFeedbackTicker := time.NewTicker(5 * lease) - defer loadFeedbackTicker.Stop() statsHandle := do.StatsHandle() t := time.Now() err := statsHandle.InitStats(do.InfoSchema()) @@ -928,10 +923,6 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager) } else { logutil.Logger(context.Background()).Info("init stats info time", zap.Duration("take time", time.Since(t))) } - defer func() { - do.SetStatsUpdating(false) - do.wg.Done() - }() for { select { case <-loadTicker.C: @@ -939,37 +930,60 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager) if err != nil { logutil.Logger(context.Background()).Debug("update stats info failed", zap.Error(err)) } + err = statsHandle.LoadNeededHistograms() + if err != nil { + logutil.Logger(context.Background()).Debug("load histograms failed", zap.Error(err)) + } + case <-do.exit: + return + } + } +} + +func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager) { + defer recoverInDomain("updateStatsWorker", false) + lease := do.statsLease + deltaUpdateTicker := time.NewTicker(20 * lease) + defer deltaUpdateTicker.Stop() + gcStatsTicker := time.NewTicker(100 * lease) + defer gcStatsTicker.Stop() + dumpFeedbackTicker := time.NewTicker(200 * lease) + defer dumpFeedbackTicker.Stop() + loadFeedbackTicker := time.NewTicker(5 * lease) + defer loadFeedbackTicker.Stop() + statsHandle := do.StatsHandle() + defer func() { + do.SetStatsUpdating(false) + do.wg.Done() + }() + for { + select { case <-do.exit: statsHandle.FlushStats() return // This channel is sent only by ddl owner. case t := <-statsHandle.DDLEventCh(): - err = statsHandle.HandleDDLEvent(t) + err := statsHandle.HandleDDLEvent(t) if err != nil { logutil.Logger(context.Background()).Debug("handle ddl event failed", zap.Error(err)) } case <-deltaUpdateTicker.C: - err = statsHandle.DumpStatsDeltaToKV(handle.DumpDelta) + err := statsHandle.DumpStatsDeltaToKV(handle.DumpDelta) if err != nil { logutil.Logger(context.Background()).Debug("dump stats delta failed", zap.Error(err)) } statsHandle.UpdateErrorRate(do.InfoSchema()) - case <-loadHistogramTicker.C: - err = statsHandle.LoadNeededHistograms() - if err != nil { - logutil.Logger(context.Background()).Debug("load histograms failed", zap.Error(err)) - } case <-loadFeedbackTicker.C: statsHandle.UpdateStatsByLocalFeedback(do.InfoSchema()) if !owner.IsOwner() { continue } - err = statsHandle.HandleUpdateStats(do.InfoSchema()) + err := statsHandle.HandleUpdateStats(do.InfoSchema()) if err != nil { logutil.Logger(context.Background()).Debug("update stats using feedback failed", zap.Error(err)) } case <-dumpFeedbackTicker.C: - err = statsHandle.DumpStatsFeedbackToKV() + err := statsHandle.DumpStatsFeedbackToKV() if err != nil { logutil.Logger(context.Background()).Debug("dump stats feedback failed", zap.Error(err)) } @@ -977,7 +991,7 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager) if !owner.IsOwner() { continue } - err = statsHandle.GCStats(do.InfoSchema(), do.DDL().GetLease()) + err := statsHandle.GCStats(do.InfoSchema(), do.DDL().GetLease()) if err != nil { logutil.Logger(context.Background()).Debug("GC stats failed", zap.Error(err)) } diff --git a/executor/analyze.go b/executor/analyze.go index de0176ed5e18c..62bd924ce4da1 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -992,7 +992,7 @@ func (e *AnalyzeFastExec) buildHist(ID int64, collector *statistics.SampleCollec } stats := domain.GetDomain(e.ctx).StatsHandle() rowCount := int64(e.rowCount) - if stats.Lease > 0 { + if stats.Lease() > 0 { rowCount = mathutil.MinInt64(stats.GetTableStats(e.tblInfo).Count, rowCount) } // build CMSketch diff --git a/executor/simple_test.go b/executor/simple_test.go index 267b2763c2ab3..cbbf4a76d921d 100644 --- a/executor/simple_test.go +++ b/executor/simple_test.go @@ -412,12 +412,12 @@ func (s *testSuite3) TestDropStats(c *C) { statsTbl = h.GetTableStats(tableInfo) c.Assert(statsTbl.Pseudo, IsFalse) - h.Lease = 1 + h.SetLease(1) testKit.MustExec("drop stats t") h.Update(is) statsTbl = h.GetTableStats(tableInfo) c.Assert(statsTbl.Pseudo, IsTrue) - h.Lease = 0 + h.SetLease(0) } func (s *testSuite3) TestFlushTables(c *C) { diff --git a/planner/core/cbo_test.go b/planner/core/cbo_test.go index 8dca13b7215d2..f3034acf1136f 100644 --- a/planner/core/cbo_test.go +++ b/planner/core/cbo_test.go @@ -658,8 +658,6 @@ func (s *testAnalyzeSuite) TestNullCount(c *C) { )) h := dom.StatsHandle() h.Clear() - h.Lease = 1 - defer func() { h.Lease = 0 }() c.Assert(h.Update(dom.InfoSchema()), IsNil) testKit.MustQuery("explain select * from t where b = 1").Check(testkit.Rows( "TableReader_7 0.00 root data:Selection_6", diff --git a/statistics/handle/dump_test.go b/statistics/handle/dump_test.go index 7f080b92ad32c..72fa7d3a6b7c0 100644 --- a/statistics/handle/dump_test.go +++ b/statistics/handle/dump_test.go @@ -105,9 +105,9 @@ func (s *testStatsSuite) TestDumpAlteredTable(c *C) { tk.MustExec("use test") tk.MustExec("drop table if exists t") h := s.do.StatsHandle() - oriLease := h.Lease - h.Lease = 1 - defer func() { h.Lease = oriLease }() + oriLease := h.Lease() + h.SetLease(1) + defer func() { h.SetLease(oriLease) }() tk.MustExec("create table t(a int, b int)") tk.MustExec("analyze table t") tk.MustExec("alter table t drop column a") diff --git a/statistics/handle/gc.go b/statistics/handle/gc.go index 0cc79e89667e7..5f1d9b907c3a9 100644 --- a/statistics/handle/gc.go +++ b/statistics/handle/gc.go @@ -29,7 +29,7 @@ import ( func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) error { // To make sure that all the deleted tables' schema and stats info have been acknowledged to all tidb, // we only garbage collect version before 10 lease. - lease := mathutil.MaxInt64(int64(h.Lease), int64(ddlLease)) + lease := mathutil.MaxInt64(int64(h.Lease()), int64(ddlLease)) offset := DurationToTS(10 * time.Duration(lease)) if h.LastUpdateVersion() < offset { return nil diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 912dbed1e6865..7cd5c8fca1a9a 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -70,7 +70,7 @@ type Handle struct { // feedback is used to store query feedback info. feedback []*statistics.QueryFeedback - Lease time.Duration + lease atomic2.Duration } // Clear the StatsCache, only for test. @@ -99,9 +99,9 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration) *Handle { ddlEventCh: make(chan *util.Event, 100), listHead: &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)}, globalMap: make(tableDeltaMap), - Lease: lease, feedback: make([]*statistics.QueryFeedback, 0, MaxQueryFeedbackCount.Load()), } + handle.lease.Store(lease) // It is safe to use it concurrently because the exec won't touch the ctx. if exec, ok := ctx.(sqlexec.RestrictedSQLExecutor); ok { handle.restrictedExec = exec @@ -112,6 +112,16 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration) *Handle { return handle } +// Lease returns the stats lease. +func (h *Handle) Lease() time.Duration { + return h.lease.Load() +} + +// SetLease sets the stats lease. +func (h *Handle) SetLease(lease time.Duration) { + h.lease.Store(lease) +} + // GetQueryFeedback gets the query feedback. It is only use in test. func (h *Handle) GetQueryFeedback() []*statistics.QueryFeedback { defer func() { @@ -133,7 +143,7 @@ func (h *Handle) Update(is infoschema.InfoSchema) error { // and A0 < B0 < B1 < A1. We will first read the stats of B, and update the lastVersion to B0, but we cannot read // the table stats of A0 if we read stats that greater than lastVersion which is B0. // We can read the stats if the diff between commit time and version is less than three lease. - offset := DurationToTS(3 * h.Lease) + offset := DurationToTS(3 * h.Lease()) if lastVersion >= offset { lastVersion = lastVersion - offset } else { @@ -402,7 +412,7 @@ func (h *Handle) columnStatsFromStorage(row chunk.Row, table *statistics.Table, // 2. this column is not handle, and: // 3. the column doesn't has buckets before, and: // 4. loadAll is false. - notNeedLoad := h.Lease > 0 && + notNeedLoad := h.Lease() > 0 && !isHandle && (col == nil || col.Len() == 0 && col.LastUpdateVersion < histVer) && !loadAll diff --git a/statistics/handle/handle_test.go b/statistics/handle/handle_test.go index 2be9b8e798194..2c3aebfb00687 100644 --- a/statistics/handle/handle_test.go +++ b/statistics/handle/handle_test.go @@ -379,7 +379,7 @@ func (s *testStatsSuite) TestInitStats(c *C) { c.Assert(err, IsNil) // `Update` will not use load by need strategy when `Lease` is 0, and `InitStats` is only called when // `Lease` is not 0, so here we just change it. - h.Lease = time.Millisecond + h.SetLease(time.Millisecond) h.Clear() c.Assert(h.InitStats(is), IsNil) @@ -388,7 +388,7 @@ func (s *testStatsSuite) TestInitStats(c *C) { c.Assert(h.Update(is), IsNil) table1 := h.GetTableStats(tbl.Meta()) assertTableEqual(c, table0, table1) - h.Lease = 0 + h.SetLease(0) } func (s *testStatsSuite) TestLoadStats(c *C) { @@ -398,10 +398,10 @@ func (s *testStatsSuite) TestLoadStats(c *C) { testKit.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b))") testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3)") - oriLease := s.do.StatsHandle().Lease - s.do.StatsHandle().Lease = 1 + oriLease := s.do.StatsHandle().Lease() + s.do.StatsHandle().SetLease(1) defer func() { - s.do.StatsHandle().Lease = oriLease + s.do.StatsHandle().SetLease(oriLease) }() testKit.MustExec("analyze table t") diff --git a/statistics/handle/update.go b/statistics/handle/update.go index 5d6d13767ef1a..28a284b0922fe 100644 --- a/statistics/handle/update.go +++ b/statistics/handle/update.go @@ -740,7 +740,7 @@ func (h *Handle) autoAnalyzeTable(tblInfo *model.TableInfo, statsTbl *statistics if statsTbl.Pseudo || statsTbl.Count < AutoAnalyzeMinCnt { return false } - if needAnalyze, reason := NeedAnalyzeTable(statsTbl, 20*h.Lease, ratio, start, end, time.Now()); needAnalyze { + if needAnalyze, reason := NeedAnalyzeTable(statsTbl, 20*h.Lease(), ratio, start, end, time.Now()); needAnalyze { logutil.Logger(context.Background()).Info("[stats] auto analyze triggered", zap.String("sql", sql), zap.String("reason", reason)) h.execAutoAnalyze(sql) return true diff --git a/statistics/handle/update_test.go b/statistics/handle/update_test.go index 456dbad12d290..e6ffa3ad39f18 100644 --- a/statistics/handle/update_test.go +++ b/statistics/handle/update_test.go @@ -408,8 +408,8 @@ func (s *testStatsSuite) TestAutoUpdate(c *C) { } // Test that even if the table is recently modified, we can still analyze the table. - h.Lease = time.Second - defer func() { h.Lease = 0 }() + h.SetLease(time.Second) + defer func() { h.SetLease(0) }() _, err = testKit.Exec("insert into t values ('fff')") c.Assert(err, IsNil) c.Assert(h.DumpStatsDeltaToKV(handle.DumpAll), IsNil) @@ -523,11 +523,11 @@ func (s *testStatsSuite) TestTableAnalyzed(c *C) { c.Assert(handle.TableAnalyzed(statsTbl), IsTrue) h.Clear() - oriLease := h.Lease + oriLease := h.Lease() // set it to non-zero so we will use load by need strategy - h.Lease = 1 + h.SetLease(1) defer func() { - h.Lease = oriLease + h.SetLease(oriLease) }() h.Update(is) statsTbl = h.GetTableStats(tableInfo) @@ -538,7 +538,7 @@ func (s *testStatsSuite) TestUpdateErrorRate(c *C) { defer cleanEnv(c, s.store, s.do) h := s.do.StatsHandle() is := s.do.InfoSchema() - h.Lease = 0 + h.SetLease(0) h.Update(is) oriProbability := statistics.FeedbackProbability @@ -608,7 +608,7 @@ func (s *testStatsSuite) TestUpdatePartitionErrorRate(c *C) { defer cleanEnv(c, s.store, s.do) h := s.do.StatsHandle() is := s.do.InfoSchema() - h.Lease = 0 + h.SetLease(0) h.Update(is) oriProbability := statistics.FeedbackProbability @@ -1086,18 +1086,18 @@ func (s *testStatsSuite) TestLogDetailedInfo(c *C) { oriMinLogCount := handle.MinLogScanCount oriMinError := handle.MinLogErrorRate oriLevel := log.GetLevel() - oriLease := s.do.StatsHandle().Lease + oriLease := s.do.StatsHandle().Lease() defer func() { statistics.FeedbackProbability = oriProbability handle.MinLogScanCount = oriMinLogCount handle.MinLogErrorRate = oriMinError - s.do.StatsHandle().Lease = oriLease + s.do.StatsHandle().SetLease(oriLease) log.SetLevel(oriLevel) }() statistics.FeedbackProbability.Store(1) handle.MinLogScanCount = 0 handle.MinLogErrorRate = 0 - s.do.StatsHandle().Lease = 1 + s.do.StatsHandle().SetLease(1) testKit := testkit.NewTestKit(c, s.store) testKit.MustExec("use test") @@ -1612,9 +1612,9 @@ func (s *testStatsSuite) TestLoadHistCorrelation(c *C) { defer cleanEnv(c, s.store, s.do) testKit := testkit.NewTestKit(c, s.store) h := s.do.StatsHandle() - origLease := h.Lease - h.Lease = time.Second - defer func() { h.Lease = origLease }() + origLease := h.Lease() + h.SetLease(time.Second) + defer func() { h.SetLease(origLease) }() testKit.MustExec("use test") testKit.MustExec("create table t(c int)") testKit.MustExec("insert into t values(1),(2),(3),(4),(5)")