diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index ab26e1add29..cac835a1cb7 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -606,11 +606,21 @@ func (o *PersistConfig) GetRegionMaxKeys() uint64 { return o.GetStoreConfig().GetRegionMaxKeys() } +// IsSynced returns true if the cluster config is synced. +func (o *PersistConfig) IsSynced() bool { + return o.GetStoreConfig().IsSynced() +} + // IsEnableRegionBucket return true if the region bucket is enabled. func (o *PersistConfig) IsEnableRegionBucket() bool { return o.GetStoreConfig().IsEnableRegionBucket() } +// IsRaftKV2 returns the whether the cluster use `raft-kv2` engine. +func (o *PersistConfig) IsRaftKV2() bool { + return o.GetStoreConfig().IsRaftKV2() +} + // TODO: implement the following methods // AddSchedulerCfg adds the scheduler configurations. diff --git a/pkg/schedule/config/config_provider.go b/pkg/schedule/config/config_provider.go index 920467a07e2..eeda9fbeb2c 100644 --- a/pkg/schedule/config/config_provider.go +++ b/pkg/schedule/config/config_provider.go @@ -139,5 +139,7 @@ type StoreConfigProvider interface { GetRegionMaxKeys() uint64 CheckRegionSize(uint64, uint64) error CheckRegionKeys(uint64, uint64) error + IsSynced() bool IsEnableRegionBucket() bool + IsRaftKV2() bool } diff --git a/pkg/schedule/config/store_config.go b/pkg/schedule/config/store_config.go index 0d773d56471..d7c873fb9c9 100644 --- a/pkg/schedule/config/store_config.go +++ b/pkg/schedule/config/store_config.go @@ -48,6 +48,8 @@ type StoreConfig struct { RegionMaxSizeMB uint64 `json:"-"` RegionSplitSizeMB uint64 `json:"-"` RegionBucketSizeMB uint64 `json:"-"` + + Sync bool `json:"sync"` } // Storage is the config for the tikv storage. @@ -73,6 +75,7 @@ func (c *StoreConfig) Adjust() { if c == nil { return } + c.RegionMaxSizeMB = typeutil.ParseMBFromText(c.RegionMaxSize, defaultRegionMaxSize) c.RegionSplitSizeMB = typeutil.ParseMBFromText(c.RegionSplitSize, defaultRegionSplitSize) c.RegionBucketSizeMB = typeutil.ParseMBFromText(c.RegionBucketSize, defaultBucketSize) @@ -124,6 +127,21 @@ func (c *StoreConfig) GetRegionMaxKeys() uint64 { return uint64(c.RegionMaxKeys) } +// SetSynced marks StoreConfig has been synced. +func (c *StoreConfig) SetSynced() { + if c != nil { + c.Sync = true + } +} + +// IsSynced returns whether the StoreConfig is synced or not. +func (c *StoreConfig) IsSynced() bool { + if c == nil { + return false + } + return c.Sync +} + // IsEnableRegionBucket return true if the region bucket is enabled. func (c *StoreConfig) IsEnableRegionBucket() bool { if c == nil { diff --git a/pkg/schedule/coordinator.go b/pkg/schedule/coordinator.go index 58fd3093299..5b2815271ec 100644 --- a/pkg/schedule/coordinator.go +++ b/pkg/schedule/coordinator.go @@ -19,6 +19,7 @@ import ( "context" "strconv" "sync" + "testing" "time" "github.com/pingcap/errors" @@ -375,7 +376,6 @@ func (c *Coordinator) initSchedulers() { if err != nil { log.Fatal("cannot load schedulers' config", errs.ZapError(err)) } - scheduleCfg := c.cluster.GetSchedulerConfig().GetScheduleConfig().Clone() // The new way to create scheduler with the independent configuration. for i, name := range scheduleNames { @@ -439,6 +439,20 @@ func (c *Coordinator) initSchedulers() { if err := c.cluster.GetSchedulerConfig().Persist(c.cluster.GetStorage()); err != nil { log.Error("cannot persist schedule config", errs.ZapError(err)) } + + // If the cluster was set up with `raft-kv2` engine, this cluster should + // enable `evict-slow-trend` scheduler as default. + if c.GetCluster().GetStoreConfig().IsRaftKV2() { + name := schedulers.EvictSlowTrendType + args := []string{} + + s, err := schedulers.CreateScheduler(name, c.opController, c.cluster.GetStorage(), schedulers.ConfigSliceDecoder(name, args), c.schedulers.RemoveScheduler) + if err != nil { + log.Warn("initializing evict-slow-trend scheduler failed", errs.ZapError(err)) + } else if err = c.schedulers.AddScheduler(s, args...); err != nil { + log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err)) + } + } } // LoadPlugin load user plugin @@ -625,7 +639,11 @@ func (c *Coordinator) ResetHotSpotMetrics() { // ShouldRun returns true if the coordinator should run. func (c *Coordinator) ShouldRun() bool { - return c.prepareChecker.check(c.cluster.GetBasicCluster()) + isSynced := c.cluster.GetStoreConfig().IsSynced() + if testing.Testing() { + isSynced = true + } + return c.prepareChecker.check(c.cluster.GetBasicCluster()) && isSynced } // GetSchedulersController returns the schedulers controller. diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index 067f9e517f2..8737e3b8619 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -36,10 +36,24 @@ const ( EvictSlowTrendType = "evict-slow-trend" ) +const ( + alterEpsilon = 1e-9 + minReCheckDurationGap = 120 // default gap for re-check the slow node, unit: s + defaultRecoveryDurationGap = 600 // default gap for recovery, unit: s. +) + +type slowCandidate struct { + storeID uint64 + captureTS time.Time + recoverTS time.Time +} + type evictSlowTrendSchedulerConfig struct { - storage endpoint.ConfigStorage - evictCandidate uint64 - candidateCaptureTime time.Time + storage endpoint.ConfigStorage + // Candidate for eviction in current tick. + evictCandidate slowCandidate + // Last chosen candidate for eviction. + lastEvictCandidate slowCandidate // Only evict one store for now EvictedStores []uint64 `json:"evict-by-trend-stores"` @@ -76,32 +90,58 @@ func (conf *evictSlowTrendSchedulerConfig) evictedStore() uint64 { if len(conf.EvictedStores) == 0 { return 0 } + // If a candidate passes all checks and proved to be slow, it will be + // recorded in `conf.EvictStores`, and `conf.lastEvictCandidate` will record + // the captured timestamp of this store. return conf.EvictedStores[0] } func (conf *evictSlowTrendSchedulerConfig) candidate() uint64 { - return conf.evictCandidate + return conf.evictCandidate.storeID } func (conf *evictSlowTrendSchedulerConfig) captureTS() time.Time { - return conf.candidateCaptureTime + return conf.evictCandidate.captureTS } func (conf *evictSlowTrendSchedulerConfig) candidateCapturedSecs() uint64 { - return uint64(time.Since(conf.candidateCaptureTime).Seconds()) + return DurationSinceAsSecs(conf.evictCandidate.captureTS) +} + +func (conf *evictSlowTrendSchedulerConfig) lastCapturedCandidate() *slowCandidate { + return &conf.lastEvictCandidate +} + +func (conf *evictSlowTrendSchedulerConfig) lastCandidateCapturedSecs() uint64 { + return DurationSinceAsSecs(conf.lastEvictCandidate.captureTS) } func (conf *evictSlowTrendSchedulerConfig) captureCandidate(id uint64) { - conf.evictCandidate = id - conf.candidateCaptureTime = time.Now() + conf.evictCandidate = slowCandidate{ + storeID: id, + captureTS: time.Now(), + recoverTS: time.Now(), + } + if conf.lastEvictCandidate == (slowCandidate{}) { + conf.lastEvictCandidate = conf.evictCandidate + } } -func (conf *evictSlowTrendSchedulerConfig) popCandidate() uint64 { - id := conf.evictCandidate - conf.evictCandidate = 0 +func (conf *evictSlowTrendSchedulerConfig) popCandidate(updLast bool) uint64 { + id := conf.evictCandidate.storeID + if updLast { + conf.lastEvictCandidate = conf.evictCandidate + } + conf.evictCandidate = slowCandidate{} return id } +func (conf *evictSlowTrendSchedulerConfig) markCandidateRecovered() { + if conf.lastEvictCandidate != (slowCandidate{}) { + conf.lastEvictCandidate.recoverTS = time.Now() + } +} + func (conf *evictSlowTrendSchedulerConfig) setStoreAndPersist(id uint64) error { conf.EvictedStores = []uint64{id} return conf.Persist() @@ -166,6 +206,8 @@ func (s *evictSlowTrendScheduler) cleanupEvictLeader(cluster sche.SchedulerClust log.Info("evict-slow-trend-scheduler persist config failed", zap.Uint64("store-id", evictedStoreID)) } if evictedStoreID != 0 { + // Assertion: evictStoreID == s.conf.LastEvictCandidate.storeID + s.conf.markCandidateRecovered() cluster.SlowTrendRecovered(evictedStoreID) } } @@ -200,15 +242,13 @@ func (s *evictSlowTrendScheduler) Schedule(cluster sche.SchedulerCluster, dryRun if store == nil || store.IsRemoved() { // Previous slow store had been removed, remove the scheduler and check // slow node next time. - log.Info("store evicted by slow trend has been removed", - zap.Uint64("store-id", store.GetID())) - storeSlowTrendActionStatusGauge.WithLabelValues("evict.stop:removed").Inc() - } else if checkStoreCanRecover(cluster, store) { - log.Info("store evicted by slow trend has been recovered", - zap.Uint64("store-id", store.GetID())) - storeSlowTrendActionStatusGauge.WithLabelValues("evict.stop:recovered").Inc() + log.Info("store evicted by slow trend has been removed", zap.Uint64("store-id", store.GetID())) + storeSlowTrendActionStatusGauge.WithLabelValues("evict", "stop_removed").Inc() + } else if checkStoreCanRecover(cluster, store, s.conf.lastCandidateCapturedSecs()) { + log.Info("store evicted by slow trend has been recovered", zap.Uint64("store-id", store.GetID())) + storeSlowTrendActionStatusGauge.WithLabelValues("evict", "stop_recovered").Inc() } else { - storeSlowTrendActionStatusGauge.WithLabelValues("evict.continue").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("evict", "continue").Inc() return s.scheduleEvictLeader(cluster), nil } s.cleanupEvictLeader(cluster) @@ -217,34 +257,32 @@ func (s *evictSlowTrendScheduler) Schedule(cluster sche.SchedulerCluster, dryRun candFreshCaptured := false if s.conf.candidate() == 0 { - candidate := chooseEvictCandidate(cluster) + candidate := chooseEvictCandidate(cluster, s.conf.lastCapturedCandidate()) if candidate != nil { - storeSlowTrendActionStatusGauge.WithLabelValues("cand.captured").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "captured").Inc() s.conf.captureCandidate(candidate.GetID()) candFreshCaptured = true } } else { - storeSlowTrendActionStatusGauge.WithLabelValues("cand.continue").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "continue").Inc() } + slowStoreID := s.conf.candidate() if slowStoreID == 0 { - storeSlowTrendActionStatusGauge.WithLabelValues("cand.none").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none").Inc() return ops, nil } slowStore := cluster.GetStore(slowStoreID) if !candFreshCaptured && checkStoreFasterThanOthers(cluster, slowStore) { - s.conf.popCandidate() - log.Info("slow store candidate by trend has been cancel", - zap.Uint64("store-id", slowStoreID)) - storeSlowTrendActionStatusGauge.WithLabelValues("cand.cancel:too-faster").Inc() + s.conf.popCandidate(false) + log.Info("slow store candidate by trend has been cancel", zap.Uint64("store-id", slowStoreID)) + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "canceled_too_faster").Inc() return ops, nil } - slowStoreRecordTS := s.conf.captureTS() - if !checkStoresAreUpdated(cluster, slowStoreID, slowStoreRecordTS) { - log.Info("slow store candidate waiting for other stores to update heartbeats", - zap.Uint64("store-id", slowStoreID)) - storeSlowTrendActionStatusGauge.WithLabelValues("cand.wait").Inc() + if slowStoreRecordTS := s.conf.captureTS(); !checkStoresAreUpdated(cluster, slowStoreID, slowStoreRecordTS) { + log.Info("slow store candidate waiting for other stores to update heartbeats", zap.Uint64("store-id", slowStoreID)) + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "wait").Inc() return ops, nil } @@ -252,14 +290,13 @@ func (s *evictSlowTrendScheduler) Schedule(cluster sche.SchedulerCluster, dryRun log.Info("detected slow store by trend, start to evict leaders", zap.Uint64("store-id", slowStoreID), zap.Uint64("candidate-captured-secs", candCapturedSecs)) - storeSlowTrendMiscGauge.WithLabelValues("cand.captured.secs").Set(float64(candCapturedSecs)) - err := s.prepareEvictLeader(cluster, s.conf.popCandidate()) - if err != nil { + storeSlowTrendMiscGauge.WithLabelValues("candidate", "captured_secs").Set(float64(candCapturedSecs)) + if err := s.prepareEvictLeader(cluster, s.conf.popCandidate(true)); err != nil { log.Info("prepare for evicting leader by slow trend failed", zap.Error(err), zap.Uint64("store-id", slowStoreID)) - storeSlowTrendActionStatusGauge.WithLabelValues("evict.prepare.err").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("evict", "prepare_err").Inc() return ops, nil } - storeSlowTrendActionStatusGauge.WithLabelValues("evict.start").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("evict", "start").Inc() return s.scheduleEvictLeader(cluster), nil } @@ -270,12 +307,17 @@ func newEvictSlowTrendScheduler(opController *operator.Controller, conf *evictSl } } -func chooseEvictCandidate(cluster sche.SchedulerCluster) (slowStore *core.StoreInfo) { +func chooseEvictCandidate(cluster sche.SchedulerCluster, lastEvictCandidate *slowCandidate) (slowStore *core.StoreInfo) { + isRaftKV2 := cluster.GetStoreConfig().IsRaftKV2() + failpoint.Inject("mockRaftKV2", func() { + isRaftKV2 = true + }) stores := cluster.GetStores() if len(stores) < 3 { - storeSlowTrendActionStatusGauge.WithLabelValues("cand.none:too-few").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_too_few").Inc() return } + var candidates []*core.StoreInfo var affectedStoreCount int for _, store := range stores { @@ -285,47 +327,68 @@ func chooseEvictCandidate(cluster sche.SchedulerCluster) (slowStore *core.StoreI if !(store.IsPreparing() || store.IsServing()) { continue } - slowTrend := store.GetSlowTrend() - if slowTrend != nil && slowTrend.CauseRate > alterEpsilon && slowTrend.ResultRate < -alterEpsilon { - candidates = append(candidates, store) - storeSlowTrendActionStatusGauge.WithLabelValues("cand.add").Inc() - log.Info("evict-slow-trend-scheduler pre-canptured candidate", - zap.Uint64("store-id", store.GetID()), - zap.Float64("cause-rate", slowTrend.CauseRate), - zap.Float64("result-rate", slowTrend.ResultRate), - zap.Float64("cause-value", slowTrend.CauseValue), - zap.Float64("result-value", slowTrend.ResultValue)) - } - if slowTrend != nil && slowTrend.ResultRate < -alterEpsilon { - affectedStoreCount += 1 + if slowTrend := store.GetSlowTrend(); slowTrend != nil { + if slowTrend.ResultRate < -alterEpsilon { + affectedStoreCount += 1 + } + // For the cases of disk io jitters. + // Normally, if there exists jitters on disk io or network io, the slow store must have a descending + // trend on QPS and ascending trend on duration. So, the slowTrend must match the following pattern. + if slowTrend.CauseRate > alterEpsilon && slowTrend.ResultRate < -alterEpsilon { + candidates = append(candidates, store) + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "add").Inc() + log.Info("evict-slow-trend-scheduler pre-captured candidate", + zap.Uint64("store-id", store.GetID()), + zap.Float64("cause-rate", slowTrend.CauseRate), + zap.Float64("result-rate", slowTrend.ResultRate), + zap.Float64("cause-value", slowTrend.CauseValue), + zap.Float64("result-value", slowTrend.ResultValue)) + } else if isRaftKV2 && slowTrend.CauseRate > alterEpsilon { + // Meanwhile, if the store was previously experiencing slowness in the `Duration` dimension, it should + // re-check whether this node is still encountering network I/O-related jitters. And If this node matches + // the last identified candidate, it indicates that the node is still being affected by delays in network I/O, + // and consequently, it should be re-designated as slow once more. + // Prerequisite: `raft-kv2` engine has the ability to percept the slow trend on network io jitters. + // TODO: maybe make it compatible to `raft-kv` later. + if lastEvictCandidate != nil && lastEvictCandidate.storeID == store.GetID() && DurationSinceAsSecs(lastEvictCandidate.recoverTS) <= minReCheckDurationGap { + candidates = append(candidates, store) + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "add").Inc() + log.Info("evict-slow-trend-scheduler pre-captured candidate in raft-kv2 cluster", + zap.Uint64("store-id", store.GetID()), + zap.Float64("cause-rate", slowTrend.CauseRate), + zap.Float64("result-rate", slowTrend.ResultRate), + zap.Float64("cause-value", slowTrend.CauseValue), + zap.Float64("result-value", slowTrend.ResultValue)) + } + } } } if len(candidates) == 0 { - storeSlowTrendActionStatusGauge.WithLabelValues("cand.none:no-fit").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_no_fit").Inc() return } - // TODO: Calculate to judge if one store is way slower than the others if len(candidates) > 1 { - storeSlowTrendActionStatusGauge.WithLabelValues("cand.none:too-many").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_too_many").Inc() return } + store := candidates[0] affectedStoreThreshold := int(float64(len(stores)) * cluster.GetSchedulerConfig().GetSlowStoreEvictingAffectedStoreRatioThreshold()) if affectedStoreCount < affectedStoreThreshold { log.Info("evict-slow-trend-scheduler failed to confirm candidate: it only affect a few stores", zap.Uint64("store-id", store.GetID())) - storeSlowTrendActionStatusGauge.WithLabelValues("cand.none:affect-a-few").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_affect_a_few").Inc() return } if !checkStoreSlowerThanOthers(cluster, store) { log.Info("evict-slow-trend-scheduler failed to confirm candidate: it's not slower than others", zap.Uint64("store-id", store.GetID())) - storeSlowTrendActionStatusGauge.WithLabelValues("cand.none:not-slower").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "none_not_slower").Inc() return } - storeSlowTrendActionStatusGauge.WithLabelValues("cand.add").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "add").Inc() log.Info("evict-slow-trend-scheduler captured candidate", zap.Uint64("store-id", store.GetID())) return store } @@ -350,12 +413,12 @@ func checkStoresAreUpdated(cluster sche.SchedulerCluster, slowStoreID uint64, sl updatedStores += 1 continue } - if slowStoreRecordTS.Before(store.GetLastHeartbeatTS()) { + if slowStoreRecordTS.Compare(store.GetLastHeartbeatTS()) <= 0 { updatedStores += 1 } } - storeSlowTrendMiscGauge.WithLabelValues("stores.check-updated:count").Set(float64(updatedStores)) - storeSlowTrendMiscGauge.WithLabelValues("stores.check-updated:expected").Set(float64(expected)) + storeSlowTrendMiscGauge.WithLabelValues("store", "check_updated_count").Set(float64(updatedStores)) + storeSlowTrendMiscGauge.WithLabelValues("store", "check_updated_expected").Set(float64(expected)) return updatedStores >= expected } @@ -364,7 +427,7 @@ func checkStoreSlowerThanOthers(cluster sche.SchedulerCluster, target *core.Stor expected := (len(stores)*2 + 1) / 3 targetSlowTrend := target.GetSlowTrend() if targetSlowTrend == nil { - storeSlowTrendActionStatusGauge.WithLabelValues("cand.check-slower:no-data").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "check_slower_no_data").Inc() return false } slowerThanStoresNum := 0 @@ -385,12 +448,12 @@ func checkStoreSlowerThanOthers(cluster sche.SchedulerCluster, target *core.Stor slowerThanStoresNum += 1 } } - storeSlowTrendMiscGauge.WithLabelValues("store.check-slower:count").Set(float64(slowerThanStoresNum)) - storeSlowTrendMiscGauge.WithLabelValues("store.check-slower:expected").Set(float64(expected)) + storeSlowTrendMiscGauge.WithLabelValues("store", "check_slower_count").Set(float64(slowerThanStoresNum)) + storeSlowTrendMiscGauge.WithLabelValues("store", "check_slower_expected").Set(float64(expected)) return slowerThanStoresNum >= expected } -func checkStoreCanRecover(cluster sche.SchedulerCluster, target *core.StoreInfo) bool { +func checkStoreCanRecover(cluster sche.SchedulerCluster, target *core.StoreInfo, recoveryGap uint64) bool { /* // // This might not be necessary, @@ -410,7 +473,7 @@ func checkStoreCanRecover(cluster sche.SchedulerCluster, target *core.StoreInfo) storeSlowTrendActionStatusGauge.WithLabelValues("recover.judging:got-event").Inc() } */ - return checkStoreFasterThanOthers(cluster, target) + return checkStoreFasterThanOthers(cluster, target) && checkStoreReadyForRecover(target, recoveryGap) } func checkStoreFasterThanOthers(cluster sche.SchedulerCluster, target *core.StoreInfo) bool { @@ -418,7 +481,7 @@ func checkStoreFasterThanOthers(cluster sche.SchedulerCluster, target *core.Stor expected := (len(stores) + 1) / 2 targetSlowTrend := target.GetSlowTrend() if targetSlowTrend == nil { - storeSlowTrendActionStatusGauge.WithLabelValues("cand.check-faster:no-data").Inc() + storeSlowTrendActionStatusGauge.WithLabelValues("candidate", "check_faster_no_data").Inc() return false } fasterThanStores := 0 @@ -433,15 +496,31 @@ func checkStoreFasterThanOthers(cluster sche.SchedulerCluster, target *core.Stor continue } slowTrend := store.GetSlowTrend() - // Greater `CuaseValue` means slower + // Greater `CauseValue` means slower if slowTrend != nil && targetSlowTrend.CauseValue <= slowTrend.CauseValue*1.1 && slowTrend.CauseValue > alterEpsilon && targetSlowTrend.CauseValue > alterEpsilon { fasterThanStores += 1 } } - storeSlowTrendMiscGauge.WithLabelValues("store.check-faster:count").Set(float64(fasterThanStores)) - storeSlowTrendMiscGauge.WithLabelValues("store.check-faster:expected").Set(float64(expected)) + storeSlowTrendMiscGauge.WithLabelValues("store", "check_faster_count").Set(float64(fasterThanStores)) + storeSlowTrendMiscGauge.WithLabelValues("store", "check_faster_expected").Set(float64(expected)) return fasterThanStores >= expected } -const alterEpsilon = 1e-9 +// checkStoreReadyForRecover checks whether the given target store is ready for recover. +func checkStoreReadyForRecover(target *core.StoreInfo, recoveryGap uint64) bool { + durationGap := uint64(defaultRecoveryDurationGap) + failpoint.Inject("transientRecoveryGap", func() { + durationGap = 0 + }) + if targetSlowTrend := target.GetSlowTrend(); targetSlowTrend != nil { + // TODO: setting the recovery time in SlowTrend + return recoveryGap >= durationGap + } + return true +} + +// DurationSinceAsSecs returns the duration gap since the given startTS, unit: s. +func DurationSinceAsSecs(startTS time.Time) uint64 { + return uint64(time.Since(startTS).Seconds()) +} diff --git a/pkg/schedule/schedulers/evict_slow_trend_test.go b/pkg/schedule/schedulers/evict_slow_trend_test.go index c4ff7e501e8..9320c3ad422 100644 --- a/pkg/schedule/schedulers/evict_slow_trend_test.go +++ b/pkg/schedule/schedulers/evict_slow_trend_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" @@ -78,9 +79,48 @@ func (suite *evictSlowTrendTestSuite) TearDownTest() { suite.cancel() } +func (suite *evictSlowTrendTestSuite) TestEvictSlowTrendBasicFuncs() { + es2, ok := suite.es.(*evictSlowTrendScheduler) + suite.True(ok) + + suite.Equal(es2.conf.evictedStore(), uint64(0)) + suite.Equal(es2.conf.candidate(), uint64(0)) + + // Test capture store 1 + store := suite.tc.GetStore(1) + es2.conf.captureCandidate(store.GetID()) + lastCapturedCandidate := es2.conf.lastCapturedCandidate() + suite.Equal(*lastCapturedCandidate, es2.conf.evictCandidate) + suite.Equal(es2.conf.candidateCapturedSecs(), uint64(0)) + suite.Equal(es2.conf.lastCandidateCapturedSecs(), uint64(0)) + suite.False(checkStoreReadyForRecover(store, es2.conf.lastCandidateCapturedSecs())) + recoverTS := lastCapturedCandidate.recoverTS + suite.True(recoverTS.After(lastCapturedCandidate.captureTS)) + // Pop captured store 1 and mark it has recovered. + time.Sleep(50 * time.Millisecond) + suite.Equal(es2.conf.popCandidate(true), store.GetID()) + suite.True(es2.conf.evictCandidate == (slowCandidate{})) + es2.conf.markCandidateRecovered() + lastCapturedCandidate = es2.conf.lastCapturedCandidate() + suite.True(lastCapturedCandidate.recoverTS.Compare(recoverTS) > 0) + suite.Equal(lastCapturedCandidate.storeID, store.GetID()) + + // Test capture another store 2 + store = suite.tc.GetStore(2) + es2.conf.captureCandidate(store.GetID()) + lastCapturedCandidate = es2.conf.lastCapturedCandidate() + suite.Equal(lastCapturedCandidate.storeID, uint64(1)) + suite.Equal(es2.conf.candidate(), store.GetID()) + suite.Equal(es2.conf.candidateCapturedSecs(), uint64(0)) + + suite.Equal(es2.conf.popCandidate(false), store.GetID()) + suite.Equal(lastCapturedCandidate.storeID, uint64(1)) +} + func (suite *evictSlowTrendTestSuite) TestEvictSlowTrend() { es2, ok := suite.es.(*evictSlowTrendScheduler) suite.True(ok) + suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/schedulers/transientRecoveryGap", "return(true)")) // Set store-1 to slow status, generate evict candidate suite.Equal(es2.conf.evictedStore(), uint64(0)) @@ -155,6 +195,59 @@ func (suite *evictSlowTrendTestSuite) TestEvictSlowTrend() { suite.NoError(err) suite.Equal(es2.conf.EvictedStores, persistValue.EvictedStores) suite.Zero(persistValue.evictedStore()) + suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/schedulers/transientRecoveryGap")) +} + +func (suite *evictSlowTrendTestSuite) TestEvictSlowTrendV2() { + es2, ok := suite.es.(*evictSlowTrendScheduler) + suite.True(ok) + suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/schedulers/transientRecoveryGap", "return(true)")) + suite.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/schedulers/mockRaftKV2", "return(true)")) + + suite.Equal(es2.conf.evictedStore(), uint64(0)) + suite.Equal(es2.conf.candidate(), uint64(0)) + // Set store-1 to slow status, generate slow candidate but under faster limit + storeInfo := suite.tc.GetStore(1) + newStoreInfo := storeInfo.Clone(func(store *core.StoreInfo) { + store.GetStoreStats().SlowTrend = &pdpb.SlowTrend{ + CauseValue: 5.0e6 + 100, + CauseRate: 1e7, + ResultValue: 3.0e3, + ResultRate: -1e7, + } + }) + suite.tc.PutStore(newStoreInfo) + suite.True(suite.es.IsScheduleAllowed(suite.tc)) + ops, _ := suite.es.Schedule(suite.tc, false) + suite.Empty(ops) + suite.Equal(es2.conf.evictedStore(), uint64(0)) + suite.Equal(es2.conf.candidate(), uint64(1)) + suite.Equal(es2.conf.lastCandidateCapturedSecs(), uint64(0)) + // Rescheduling to make it filtered by the related faster judgement. + ops, _ = suite.es.Schedule(suite.tc, false) + suite.Empty(ops) + suite.Equal(es2.conf.evictedStore(), uint64(0)) + suite.Equal(es2.conf.candidate(), uint64(0)) + + // Set store-1 to slow status as network-io delays + storeInfo = suite.tc.GetStore(1) + newStoreInfo = storeInfo.Clone(func(store *core.StoreInfo) { + store.GetStoreStats().SlowTrend = &pdpb.SlowTrend{ + CauseValue: 5.0e6, + CauseRate: 1e7, + ResultValue: 0, + ResultRate: 0, + } + }) + suite.tc.PutStore(newStoreInfo) + suite.True(suite.es.IsScheduleAllowed(suite.tc)) + ops, _ = suite.es.Schedule(suite.tc, false) + suite.Empty(ops) + suite.Equal(es2.conf.evictedStore(), uint64(0)) + suite.Equal(es2.conf.lastCandidateCapturedSecs(), uint64(0)) + + suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/schedulers/mockRaftKV2")) + suite.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/schedulers/transientRecoveryGap")) } func (suite *evictSlowTrendTestSuite) TestEvictSlowTrendPrepare() { diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index 8170f86cd2a..e6603778796 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -466,7 +466,7 @@ func schedulersRegister() { }) RegisterScheduler(EvictSlowTrendType, func(opController *operator.Controller, storage endpoint.ConfigStorage, decoder ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error) { - conf := &evictSlowTrendSchedulerConfig{storage: storage, EvictedStores: make([]uint64, 0), evictCandidate: 0} + conf := &evictSlowTrendSchedulerConfig{storage: storage, EvictedStores: make([]uint64, 0), evictCandidate: slowCandidate{}, lastEvictCandidate: slowCandidate{}} if err := decoder(conf); err != nil { return nil, err } diff --git a/pkg/schedule/schedulers/metrics.go b/pkg/schedule/schedulers/metrics.go index 2c0fc1df935..6ff5b2edac1 100644 --- a/pkg/schedule/schedulers/metrics.go +++ b/pkg/schedule/schedulers/metrics.go @@ -116,15 +116,15 @@ var ( Subsystem: "scheduler", Name: "store_slow_trend_action_status", Help: "Store trend scheduler calculating actions", - }, []string{"reason"}) + }, []string{"type", "status"}) storeSlowTrendMiscGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "pd", Subsystem: "scheduler", Name: "store_slow_trend_misc", - Help: "Store trend internal uncatalogued values", - }, []string{"type"}) + Help: "Store trend internal uncatelogued values", + }, []string{"type", "dim"}) // HotPendingSum is the sum of pending influence in hot region scheduler. HotPendingSum = prometheus.NewGaugeVec( diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 35bf14b617a..4dd3f077f72 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -328,6 +328,7 @@ func (c *RaftCluster) Start(s Server) error { } c.wg.Add(10) + go c.runStoreConfigSync() go c.runCoordinator() go c.runMetricsCollectionJob() go c.runNodeStateCheckJob() @@ -335,7 +336,6 @@ func (c *RaftCluster) Start(s Server) error { go c.syncRegions() go c.runReplicationMode() go c.runMinResolvedTSJob() - go c.runStoreConfigSync() go c.runUpdateStoreStats() go c.startGCTuner() @@ -507,6 +507,8 @@ func (c *RaftCluster) observeStoreConfig(ctx context.Context, address string) (b // updateStoreConfig updates the store config. This is extracted for testing. func (c *RaftCluster) updateStoreConfig(oldCfg, cfg *sc.StoreConfig) (bool, error) { cfg.Adjust() + // Mark config has been synced. + cfg.SetSynced() c.opt.SetStoreConfig(cfg) return oldCfg.Storage.Engine != sc.RaftstoreV2 && cfg.Storage.Engine == sc.RaftstoreV2, nil } diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index e5bf862174b..68aafd0e402 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -1354,12 +1354,14 @@ func TestStoreConfigUpdate(t *testing.T) { "perf-level": 2 }}` var config sc.StoreConfig + re.False(config.IsSynced()) re.NoError(json.Unmarshal([]byte(body), &config)) tc.updateStoreConfig(opt.GetStoreConfig(), &config) re.Equal(uint64(144000000), opt.GetRegionMaxKeys()) re.Equal(uint64(96000000), opt.GetRegionSplitKeys()) re.Equal(uint64(15*units.GiB/units.MiB), opt.GetRegionMaxSize()) re.Equal(uint64(10*units.GiB/units.MiB), opt.GetRegionSplitSize()) + re.True(opt.IsSynced()) } // Case2: empty config. { @@ -1371,6 +1373,7 @@ func TestStoreConfigUpdate(t *testing.T) { re.Equal(uint64(960000), opt.GetRegionSplitKeys()) re.Equal(uint64(144), opt.GetRegionMaxSize()) re.Equal(uint64(96), opt.GetRegionSplitSize()) + re.True(opt.IsSynced()) } // Case3: raft-kv2 config. { diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 1ea0b79424f..2b1f3edc266 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -1012,6 +1012,11 @@ func (o *PersistOptions) CheckRegionKeys(keys, mergeKeys uint64) error { return o.GetStoreConfig().CheckRegionKeys(keys, mergeKeys) } +// IsSynced returns true if the store config is synced. +func (o *PersistOptions) IsSynced() bool { + return o.GetStoreConfig().IsSynced() +} + // IsEnableRegionBucket return true if the region bucket is enabled. func (o *PersistOptions) IsEnableRegionBucket() bool { return o.GetStoreConfig().IsEnableRegionBucket() diff --git a/tests/integrations/mcs/scheduling/config_test.go b/tests/integrations/mcs/scheduling/config_test.go index e1c124b965f..ef52203e349 100644 --- a/tests/integrations/mcs/scheduling/config_test.go +++ b/tests/integrations/mcs/scheduling/config_test.go @@ -142,7 +142,13 @@ func (suite *configTestSuite) TestSchedulerConfigWatch() { ) testutil.Eventually(re, func() bool { schedulerNames = schedulerController.GetSchedulerNames() - return len(schedulerNames) == len(sc.DefaultSchedulers) + targetCount := len(sc.DefaultSchedulers) + // In the previous case, StoreConfig of raft-kv2 has been persisted. So, it might + // have EvictSlowTrendName. + if exists, _ := schedulerController.IsSchedulerExisted(schedulers.EvictSlowTrendName); exists { + targetCount += 1 + } + return len(schedulerNames) == targetCount }) // Check all default schedulers' configs. for _, schedulerName := range schedulerNames {