From 0513b006e1d46bb46d1630a8a943fd55b438e62d Mon Sep 17 00:00:00 2001 From: ShuNing Date: Wed, 29 May 2019 12:45:41 +0800 Subject: [PATCH] pick some hot region optimization to release 2.1 (#1551) *: make hot region scheduler configurable (#1412) Signed-off-by: nolouch * use high priority for the hot region related operators (#1492) Signed-off-by: Ryan Leung * schedulers: let hot region balance not affect by balance-region-scheduler-limit (#1522) Signed-off-by: nolouch --- server/cluster_info.go | 12 ++++--- server/config.go | 51 +++++++++++++++++++--------- server/option.go | 16 +++++++-- server/schedule/basic_cluster.go | 5 --- server/schedule/mockcluster.go | 56 +++++++++++++++++++------------ server/schedule/opts.go | 3 +- server/schedulers/balance_test.go | 14 +++++--- server/schedulers/hot_region.go | 52 ++++++++++++++++++---------- server/server.go | 10 +++--- 9 files changed, 145 insertions(+), 74 deletions(-) diff --git a/server/cluster_info.go b/server/cluster_info.go index eba23903d73..91abab9c572 100644 --- a/server/cluster_info.go +++ b/server/cluster_info.go @@ -265,14 +265,14 @@ func (c *clusterInfo) GetRegion(regionID uint64) *core.RegionInfo { func (c *clusterInfo) IsRegionHot(id uint64) bool { c.RLock() defer c.RUnlock() - return c.core.IsRegionHot(id, c.GetHotRegionLowThreshold()) + return c.core.IsRegionHot(id, c.GetHotRegionCacheHitsThreshold()) } // RandHotRegionFromStore randomly picks a hot region in specified store. func (c *clusterInfo) RandHotRegionFromStore(store uint64, kind schedule.FlowKind) *core.RegionInfo { c.RLock() defer c.RUnlock() - r := c.core.HotCache.RandHotRegionFromStore(store, kind, c.GetHotRegionLowThreshold()) + r := c.core.HotCache.RandHotRegionFromStore(store, kind, c.GetHotRegionCacheHitsThreshold()) if r == nil { return nil } @@ -620,6 +620,10 @@ func (c *clusterInfo) GetMergeScheduleLimit() uint64 { return c.opt.GetMergeScheduleLimit(namespace.DefaultNamespace) } +func (c *clusterInfo) GetHotRegionScheduleLimit() uint64 { + return c.opt.GetHotRegionScheduleLimit(namespace.DefaultNamespace) +} + func (c *clusterInfo) GetTolerantSizeRatio() float64 { return c.opt.GetTolerantSizeRatio() } @@ -668,8 +672,8 @@ func (c *clusterInfo) GetLocationLabels() []string { return c.opt.GetLocationLabels() } -func (c *clusterInfo) GetHotRegionLowThreshold() int { - return c.opt.GetHotRegionLowThreshold() +func (c *clusterInfo) GetHotRegionCacheHitsThreshold() int { + return c.opt.GetHotRegionCacheHitsThreshold() } func (c *clusterInfo) IsRaftLearnerEnabled() bool { diff --git a/server/config.go b/server/config.go index 90c82ec01fe..ed770f3330d 100644 --- a/server/config.go +++ b/server/config.go @@ -446,6 +446,12 @@ type ScheduleConfig struct { ReplicaScheduleLimit uint64 `toml:"replica-schedule-limit,omitempty" json:"replica-schedule-limit"` // MergeScheduleLimit is the max coexist merge schedules. MergeScheduleLimit uint64 `toml:"merge-schedule-limit,omitempty" json:"merge-schedule-limit"` + // HotRegionScheduleLimit is the max coexist hot region schedules. + HotRegionScheduleLimit uint64 `toml:"hot-region-schedule-limit,omitempty" json:"hot-region-schedule-limit"` + // HotRegionCacheHitThreshold is the cache hits threshold of the hot region. + // If the number of times a region hits the hot cache is greater than this + // threshold, it is considered a hot region. + HotRegionCacheHitsThreshold uint64 `toml:"hot-region-cache-hits-threshold,omitempty" json:"hot-region-cache-hits-threshold"` // TolerantSizeRatio is the ratio of buffer size for balance scheduler. TolerantSizeRatio float64 `toml:"tolerant-size-ratio,omitempty" json:"tolerant-size-ratio"` // @@ -501,6 +507,8 @@ func (c *ScheduleConfig) clone() *ScheduleConfig { RegionScheduleLimit: c.RegionScheduleLimit, ReplicaScheduleLimit: c.ReplicaScheduleLimit, MergeScheduleLimit: c.MergeScheduleLimit, + HotRegionScheduleLimit: c.HotRegionScheduleLimit, + HotRegionCacheHitsThreshold: c.HotRegionCacheHitsThreshold, TolerantSizeRatio: c.TolerantSizeRatio, LowSpaceRatio: c.LowSpaceRatio, HighSpaceRatio: c.HighSpaceRatio, @@ -516,21 +524,25 @@ func (c *ScheduleConfig) clone() *ScheduleConfig { } const ( - defaultMaxReplicas = 3 - defaultMaxSnapshotCount = 3 - defaultMaxPendingPeerCount = 16 - defaultMaxMergeRegionSize = 0 - defaultMaxMergeRegionKeys = 0 - defaultSplitMergeInterval = 1 * time.Hour - defaultPatrolRegionInterval = 100 * time.Millisecond - defaultMaxStoreDownTime = 30 * time.Minute - defaultLeaderScheduleLimit = 4 - defaultRegionScheduleLimit = 4 - defaultReplicaScheduleLimit = 8 - defaultMergeScheduleLimit = 8 - defaultTolerantSizeRatio = 5 - defaultLowSpaceRatio = 0.8 - defaultHighSpaceRatio = 0.6 + defaultMaxReplicas = 3 + defaultMaxSnapshotCount = 3 + defaultMaxPendingPeerCount = 16 + defaultMaxMergeRegionSize = 0 + defaultMaxMergeRegionKeys = 0 + defaultSplitMergeInterval = 1 * time.Hour + defaultPatrolRegionInterval = 100 * time.Millisecond + defaultMaxStoreDownTime = 30 * time.Minute + defaultLeaderScheduleLimit = 4 + defaultRegionScheduleLimit = 4 + defaultReplicaScheduleLimit = 8 + defaultMergeScheduleLimit = 8 + defaultHotRegionScheduleLimit = 2 + defaultTolerantSizeRatio = 5 + defaultLowSpaceRatio = 0.8 + defaultHighSpaceRatio = 0.6 + // defaultHotRegionCacheHitsThreshold is the low hit number threshold of the + // hot region. + defaultHotRegionCacheHitsThreshold = 3 ) func (c *ScheduleConfig) adjust(meta *configMetaData) error { @@ -561,6 +573,12 @@ func (c *ScheduleConfig) adjust(meta *configMetaData) error { if !meta.IsDefined("merge-schedule-limit") { adjustUint64(&c.MergeScheduleLimit, defaultMergeScheduleLimit) } + if !meta.IsDefined("hot-region-schedule-limit") { + adjustUint64(&c.HotRegionScheduleLimit, defaultHotRegionScheduleLimit) + } + if !meta.IsDefined("hot-region-cache-hits-threshold") { + adjustUint64(&c.HotRegionCacheHitsThreshold, defaultHotRegionCacheHitsThreshold) + } if !meta.IsDefined("tolerant-size-ratio") { adjustFloat64(&c.TolerantSizeRatio, defaultTolerantSizeRatio) } @@ -660,6 +678,8 @@ type NamespaceConfig struct { ReplicaScheduleLimit uint64 `json:"replica-schedule-limit"` // MergeScheduleLimit is the max coexist merge schedules. MergeScheduleLimit uint64 `json:"merge-schedule-limit"` + // HotRegionScheduleLimit is the max coexist hot region schedules. + HotRegionScheduleLimit uint64 `json:"hot-region-schedule-limit"` // MaxReplicas is the number of replicas for each region. MaxReplicas uint64 `json:"max-replicas"` } @@ -669,6 +689,7 @@ func (c *NamespaceConfig) adjust(opt *scheduleOption) { adjustUint64(&c.RegionScheduleLimit, opt.GetRegionScheduleLimit(namespace.DefaultNamespace)) adjustUint64(&c.ReplicaScheduleLimit, opt.GetReplicaScheduleLimit(namespace.DefaultNamespace)) adjustUint64(&c.MergeScheduleLimit, opt.GetMergeScheduleLimit(namespace.DefaultNamespace)) + adjustUint64(&c.HotRegionScheduleLimit, opt.GetHotRegionScheduleLimit(namespace.DefaultNamespace)) adjustUint64(&c.MaxReplicas, uint64(opt.GetMaxReplicas(namespace.DefaultNamespace))) } diff --git a/server/option.go b/server/option.go index b14fe566cc4..a0b2cd67847 100644 --- a/server/option.go +++ b/server/option.go @@ -130,6 +130,13 @@ func (o *scheduleOption) GetMergeScheduleLimit(name string) uint64 { return o.load().MergeScheduleLimit } +func (o *scheduleOption) GetHotRegionScheduleLimit(name string) uint64 { + if n, ok := o.ns[name]; ok { + return n.GetHotRegionScheduleLimit() + } + return o.load().HotRegionScheduleLimit +} + func (o *scheduleOption) GetTolerantSizeRatio() float64 { return o.load().TolerantSizeRatio } @@ -333,8 +340,8 @@ func (o *scheduleOption) adjustScheduleCfg(persistentCfg *Config) { o.store(scheduleCfg) } -func (o *scheduleOption) GetHotRegionLowThreshold() int { - return schedule.HotRegionLowThreshold +func (o *scheduleOption) GetHotRegionCacheHitsThreshold() int { + return int(o.load().HotRegionCacheHitsThreshold) } func (o *scheduleOption) CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool { @@ -429,3 +436,8 @@ func (n *namespaceOption) GetReplicaScheduleLimit() uint64 { func (n *namespaceOption) GetMergeScheduleLimit() uint64 { return n.load().MergeScheduleLimit } + +// GetHotRegionScheduleLimit returns the limit for hot region schedule. +func (n *namespaceOption) GetHotRegionScheduleLimit() uint64 { + return n.load().HotRegionScheduleLimit +} diff --git a/server/schedule/basic_cluster.go b/server/schedule/basic_cluster.go index f21f61fd315..35c3d8d8e4e 100644 --- a/server/schedule/basic_cluster.go +++ b/server/schedule/basic_cluster.go @@ -17,11 +17,6 @@ import ( "github.com/pingcap/pd/server/core" ) -var ( - // HotRegionLowThreshold is the low threadshold of hot region - HotRegionLowThreshold = 3 -) - const ( // RegionHeartBeatReportInterval is the heartbeat report interval of a region RegionHeartBeatReportInterval = 60 diff --git a/server/schedule/mockcluster.go b/server/schedule/mockcluster.go index 6504cd85cfd..7f1fb08e66c 100644 --- a/server/schedule/mockcluster.go +++ b/server/schedule/mockcluster.go @@ -58,12 +58,12 @@ func (mc *MockCluster) LoadRegion(regionID uint64, followerIds ...uint64) { // IsRegionHot checks if the region is hot func (mc *MockCluster) IsRegionHot(id uint64) bool { - return mc.BasicCluster.IsRegionHot(id, mc.GetHotRegionLowThreshold()) + return mc.BasicCluster.IsRegionHot(id, mc.GetHotRegionCacheHitsThreshold()) } // RandHotRegionFromStore random picks a hot region in specify store. func (mc *MockCluster) RandHotRegionFromStore(store uint64, kind FlowKind) *core.RegionInfo { - r := mc.HotCache.RandHotRegionFromStore(store, kind, mc.GetHotRegionLowThreshold()) + r := mc.HotCache.RandHotRegionFromStore(store, kind, mc.GetHotRegionCacheHitsThreshold()) if r == nil { return nil } @@ -406,6 +406,11 @@ func (mc *MockCluster) GetMergeScheduleLimit() uint64 { return mc.MockSchedulerOptions.GetMergeScheduleLimit(namespace.DefaultNamespace) } +// GetHotRegionScheduleLimit mocks method. +func (mc *MockCluster) GetHotRegionScheduleLimit() uint64 { + return mc.MockSchedulerOptions.GetHotRegionScheduleLimit(namespace.DefaultNamespace) +} + // GetMaxReplicas mocks method. func (mc *MockCluster) GetMaxReplicas() int { return mc.MockSchedulerOptions.GetMaxReplicas(namespace.DefaultNamespace) @@ -424,20 +429,22 @@ func (mc *MockCluster) CheckLabelProperty(typ string, labels []*metapb.StoreLabe } const ( - defaultMaxReplicas = 3 - defaultMaxSnapshotCount = 3 - defaultMaxPendingPeerCount = 16 - defaultMaxMergeRegionSize = 0 - defaultMaxMergeRegionKeys = 0 - defaultSplitMergeInterval = 0 - defaultMaxStoreDownTime = 30 * time.Minute - defaultLeaderScheduleLimit = 4 - defaultRegionScheduleLimit = 4 - defaultReplicaScheduleLimit = 8 - defaultMergeScheduleLimit = 8 - defaultTolerantSizeRatio = 2.5 - defaultLowSpaceRatio = 0.8 - defaultHighSpaceRatio = 0.6 + defaultMaxReplicas = 3 + defaultMaxSnapshotCount = 3 + defaultMaxPendingPeerCount = 16 + defaultMaxMergeRegionSize = 0 + defaultMaxMergeRegionKeys = 0 + defaultSplitMergeInterval = 0 + defaultMaxStoreDownTime = 30 * time.Minute + defaultLeaderScheduleLimit = 4 + defaultRegionScheduleLimit = 4 + defaultReplicaScheduleLimit = 8 + defaultMergeScheduleLimit = 8 + defaultHotRegionScheduleLimit = 2 + defaultTolerantSizeRatio = 2.5 + defaultLowSpaceRatio = 0.8 + defaultHighSpaceRatio = 0.6 + defaultHotRegionCacheHitsThreshold = 3 ) // MockSchedulerOptions is a mock of SchedulerOptions @@ -447,6 +454,7 @@ type MockSchedulerOptions struct { LeaderScheduleLimit uint64 ReplicaScheduleLimit uint64 MergeScheduleLimit uint64 + HotRegionScheduleLimit uint64 MaxSnapshotCount uint64 MaxPendingPeerCount uint64 MaxMergeRegionSize uint64 @@ -455,7 +463,7 @@ type MockSchedulerOptions struct { MaxStoreDownTime time.Duration MaxReplicas int LocationLabels []string - HotRegionLowThreshold int + HotRegionCacheHitsThreshold int TolerantSizeRatio float64 LowSpaceRatio float64 HighSpaceRatio float64 @@ -476,13 +484,14 @@ func NewMockSchedulerOptions() *MockSchedulerOptions { mso.LeaderScheduleLimit = defaultLeaderScheduleLimit mso.ReplicaScheduleLimit = defaultReplicaScheduleLimit mso.MergeScheduleLimit = defaultMergeScheduleLimit + mso.HotRegionScheduleLimit = defaultHotRegionScheduleLimit mso.MaxSnapshotCount = defaultMaxSnapshotCount mso.MaxMergeRegionSize = defaultMaxMergeRegionSize mso.MaxMergeRegionKeys = defaultMaxMergeRegionKeys mso.SplitMergeInterval = defaultSplitMergeInterval mso.MaxStoreDownTime = defaultMaxStoreDownTime mso.MaxReplicas = defaultMaxReplicas - mso.HotRegionLowThreshold = HotRegionLowThreshold + mso.HotRegionCacheHitsThreshold = defaultHotRegionCacheHitsThreshold mso.MaxPendingPeerCount = defaultMaxPendingPeerCount mso.TolerantSizeRatio = defaultTolerantSizeRatio mso.LowSpaceRatio = defaultLowSpaceRatio @@ -510,6 +519,11 @@ func (mso *MockSchedulerOptions) GetMergeScheduleLimit(name string) uint64 { return mso.MergeScheduleLimit } +// GetHotRegionScheduleLimit mock method +func (mso *MockSchedulerOptions) GetHotRegionScheduleLimit(name string) uint64 { + return mso.HotRegionScheduleLimit +} + // GetMaxSnapshotCount mock method func (mso *MockSchedulerOptions) GetMaxSnapshotCount() uint64 { return mso.MaxSnapshotCount @@ -550,9 +564,9 @@ func (mso *MockSchedulerOptions) GetLocationLabels() []string { return mso.LocationLabels } -// GetHotRegionLowThreshold mock method -func (mso *MockSchedulerOptions) GetHotRegionLowThreshold() int { - return mso.HotRegionLowThreshold +// GetHotRegionCacheHitsThreshold mock method +func (mso *MockSchedulerOptions) GetHotRegionCacheHitsThreshold() int { + return mso.HotRegionCacheHitsThreshold } // GetTolerantSizeRatio mock method diff --git a/server/schedule/opts.go b/server/schedule/opts.go index 18c802c0210..5492ef3f20d 100644 --- a/server/schedule/opts.go +++ b/server/schedule/opts.go @@ -29,6 +29,7 @@ type Options interface { GetRegionScheduleLimit() uint64 GetReplicaScheduleLimit() uint64 GetMergeScheduleLimit() uint64 + GetHotRegionScheduleLimit() uint64 GetMaxSnapshotCount() uint64 GetMaxPendingPeerCount() uint64 @@ -40,7 +41,7 @@ type Options interface { GetMaxReplicas() int GetLocationLabels() []string - GetHotRegionLowThreshold() int + GetHotRegionCacheHitsThreshold() int GetTolerantSizeRatio() float64 GetLowSpaceRatio() float64 GetHighSpaceRatio() float64 diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index 72e65d9e454..dd160cc8a5c 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -868,7 +868,7 @@ func (s *testBalanceHotWriteRegionSchedulerSuite) TestBalance(c *C) { tc.AddLeaderRegionWithWriteInfo(1, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 3) tc.AddLeaderRegionWithWriteInfo(2, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 3, 4) tc.AddLeaderRegionWithWriteInfo(3, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 4) - opt.HotRegionLowThreshold = 0 + opt.HotRegionCacheHitsThreshold = 0 // Will transfer a hot region from store 1, because the total count of peers // which is hot for store 1 is more larger than other stores. @@ -889,10 +889,14 @@ func (s *testBalanceHotWriteRegionSchedulerSuite) TestBalance(c *C) { } } - // hot region scheduler is restricted by schedule limit. - opt.RegionScheduleLimit, opt.LeaderScheduleLimit = 0, 0 + // hot region scheduler is restricted by `hot-region-schedule-limit`. + opt.HotRegionScheduleLimit = 0 c.Assert(hb.Schedule(tc, schedule.NewOpInfluence(nil, tc)), HasLen, 0) - opt.LeaderScheduleLimit = schedule.NewMockSchedulerOptions().LeaderScheduleLimit + // hot region scheduler is not affect by `balance-region-schedule-limit`. + opt.HotRegionScheduleLimit = schedule.NewMockSchedulerOptions().HotRegionScheduleLimit + opt.RegionScheduleLimit = 0 + fmt.Println(hb.Schedule(tc, schedule.NewOpInfluence(nil, tc))) + c.Assert(hb.Schedule(tc, schedule.NewOpInfluence(nil, tc)), HasLen, 1) opt.RegionScheduleLimit = schedule.NewMockSchedulerOptions().RegionScheduleLimit // After transfer a hot region from store 1 to store 5 @@ -963,7 +967,7 @@ func (s *testBalanceHotReadRegionSchedulerSuite) TestBalance(c *C) { tc.AddLeaderRegionWithReadInfo(3, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 3) // lower than hot read flow rate, but higher than write flow rate tc.AddLeaderRegionWithReadInfo(11, 1, 24*1024*schedule.RegionHeartBeatReportInterval, 2, 3) - opt.HotRegionLowThreshold = 0 + opt.HotRegionCacheHitsThreshold = 0 c.Assert(tc.IsRegionHot(1), IsTrue) c.Assert(tc.IsRegionHot(11), IsFalse) // check randomly pick hot region diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 7e5e7583349..5e2abb73e8f 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -69,8 +69,9 @@ func newStoreStaticstics() *storeStatistics { type balanceHotRegionsScheduler struct { *baseScheduler sync.RWMutex - limit uint64 - types []BalanceType + leaderLimit uint64 + peerLimit uint64 + types []BalanceType // store id -> hot regions statistics as the role of leader stats *storeStatistics @@ -81,7 +82,8 @@ func newBalanceHotRegionsScheduler(limiter *schedule.Limiter) *balanceHotRegions base := newBaseScheduler(limiter) return &balanceHotRegionsScheduler{ baseScheduler: base, - limit: 1, + leaderLimit: 1, + peerLimit: 1, stats: newStoreStaticstics(), types: []BalanceType{hotWriteRegionBalance, hotReadRegionBalance}, r: rand.New(rand.NewSource(time.Now().UnixNano())), @@ -92,7 +94,8 @@ func newBalanceHotReadRegionsScheduler(limiter *schedule.Limiter) *balanceHotReg base := newBaseScheduler(limiter) return &balanceHotRegionsScheduler{ baseScheduler: base, - limit: 1, + leaderLimit: 1, + peerLimit: 1, stats: newStoreStaticstics(), types: []BalanceType{hotReadRegionBalance}, r: rand.New(rand.NewSource(time.Now().UnixNano())), @@ -103,7 +106,8 @@ func newBalanceHotWriteRegionsScheduler(limiter *schedule.Limiter) *balanceHotRe base := newBaseScheduler(limiter) return &balanceHotRegionsScheduler{ baseScheduler: base, - limit: 1, + leaderLimit: 1, + peerLimit: 1, stats: newStoreStaticstics(), types: []BalanceType{hotWriteRegionBalance}, r: rand.New(rand.NewSource(time.Now().UnixNano())), @@ -122,14 +126,20 @@ func (h *balanceHotRegionsScheduler) IsScheduleAllowed(cluster schedule.Cluster) return h.allowBalanceLeader(cluster) || h.allowBalanceRegion(cluster) } +func min(a, b uint64) uint64 { + if a < b { + return a + } + return b +} + func (h *balanceHotRegionsScheduler) allowBalanceLeader(cluster schedule.Cluster) bool { - return h.limiter.OperatorCount(schedule.OpHotRegion) < h.limit && + return h.limiter.OperatorCount(schedule.OpHotRegion) < min(h.leaderLimit, cluster.GetHotRegionScheduleLimit()) && h.limiter.OperatorCount(schedule.OpLeader) < cluster.GetLeaderScheduleLimit() } func (h *balanceHotRegionsScheduler) allowBalanceRegion(cluster schedule.Cluster) bool { - return h.limiter.OperatorCount(schedule.OpHotRegion) < h.limit && - h.limiter.OperatorCount(schedule.OpRegion) < cluster.GetRegionScheduleLimit() + return h.limiter.OperatorCount(schedule.OpHotRegion) < min(h.peerLimit, cluster.GetHotRegionScheduleLimit()) } func (h *balanceHotRegionsScheduler) Schedule(cluster schedule.Cluster, opInfluence schedule.OpInfluence) []*schedule.Operator { @@ -158,7 +168,9 @@ func (h *balanceHotRegionsScheduler) balanceHotReadRegions(cluster schedule.Clus if srcRegion != nil { schedulerCounter.WithLabelValues(h.GetName(), "move_leader").Inc() step := schedule.TransferLeader{FromStore: srcRegion.GetLeader().GetStoreId(), ToStore: newLeader.GetStoreId()} - return []*schedule.Operator{schedule.NewOperator("transferHotReadLeader", srcRegion.GetID(), srcRegion.GetRegionEpoch(), schedule.OpHotRegion|schedule.OpLeader, step)} + op := schedule.NewOperator("transferHotReadLeader", srcRegion.GetID(), srcRegion.GetRegionEpoch(), schedule.OpHotRegion|schedule.OpLeader, step) + op.SetPriorityLevel(core.HighPriority) + return []*schedule.Operator{op} } // balance by peer @@ -169,6 +181,7 @@ func (h *balanceHotRegionsScheduler) balanceHotReadRegions(cluster schedule.Clus schedulerCounter.WithLabelValues(h.GetName(), "create_operator_fail").Inc() return nil } + op.SetPriorityLevel(core.HighPriority) schedulerCounter.WithLabelValues(h.GetName(), "move_peer").Inc() return []*schedule.Operator{op} } @@ -191,6 +204,7 @@ func (h *balanceHotRegionsScheduler) balanceHotWriteRegions(cluster schedule.Clu schedulerCounter.WithLabelValues(h.GetName(), "create_operator_fail").Inc() return nil } + op.SetPriorityLevel(core.HighPriority) schedulerCounter.WithLabelValues(h.GetName(), "move_peer").Inc() return []*schedule.Operator{op} } @@ -200,11 +214,12 @@ func (h *balanceHotRegionsScheduler) balanceHotWriteRegions(cluster schedule.Clu if srcRegion != nil { schedulerCounter.WithLabelValues(h.GetName(), "move_leader").Inc() step := schedule.TransferLeader{FromStore: srcRegion.GetLeader().GetStoreId(), ToStore: newLeader.GetStoreId()} - return []*schedule.Operator{schedule.NewOperator("transferHotWriteLeader", srcRegion.GetID(), srcRegion.GetRegionEpoch(), schedule.OpHotRegion|schedule.OpLeader, step)} + op := schedule.NewOperator("transferHotWriteLeader", srcRegion.GetID(), srcRegion.GetRegionEpoch(), schedule.OpHotRegion|schedule.OpLeader, step) + op.SetPriorityLevel(core.HighPriority) + return []*schedule.Operator{op} } } } - schedulerCounter.WithLabelValues(h.GetName(), "skip").Inc() return nil } @@ -212,7 +227,10 @@ func (h *balanceHotRegionsScheduler) balanceHotWriteRegions(cluster schedule.Clu func (h *balanceHotRegionsScheduler) calcScore(items []*core.RegionStat, cluster schedule.Cluster, kind core.ResourceKind) core.StoreHotRegionsStat { stats := make(core.StoreHotRegionsStat) for _, r := range items { - if r.HotDegree < cluster.GetHotRegionLowThreshold() { + // HotDegree is the update times on the hot cache. If the heartbeat report + // the flow of the region exceeds the threshold, the scheduler will update the region in + // the hot cache and the hotdegree of the region will increase. + if r.HotDegree < cluster.GetHotRegionCacheHitsThreshold() { continue } @@ -295,7 +313,7 @@ func (h *balanceHotRegionsScheduler) balanceByPeer(cluster schedule.Cluster, sto destStoreID = h.selectDestStore(destStoreIDs, rs.FlowBytes, srcStoreID, storesStat) if destStoreID != 0 { - h.adjustBalanceLimit(srcStoreID, storesStat) + h.peerLimit = h.adjustBalanceLimit(srcStoreID, storesStat) srcPeer := srcRegion.GetStorePeer(srcStoreID) if srcPeer == nil { @@ -352,7 +370,8 @@ func (h *balanceHotRegionsScheduler) balanceByLeader(cluster schedule.Cluster, s destPeer := srcRegion.GetStoreVoter(destStoreID) if destPeer != nil { - h.adjustBalanceLimit(srcStoreID, storesStat) + h.leaderLimit = h.adjustBalanceLimit(srcStoreID, storesStat) + return srcRegion, destPeer } } @@ -411,7 +430,7 @@ func (h *balanceHotRegionsScheduler) selectDestStore(candidateStoreIDs []uint64, return } -func (h *balanceHotRegionsScheduler) adjustBalanceLimit(storeID uint64, storesStat core.StoreHotRegionsStat) { +func (h *balanceHotRegionsScheduler) adjustBalanceLimit(storeID uint64, storesStat core.StoreHotRegionsStat) uint64 { srcStoreStatistics := storesStat[storeID] var hotRegionTotalCount float64 @@ -421,8 +440,7 @@ func (h *balanceHotRegionsScheduler) adjustBalanceLimit(storeID uint64, storesSt avgRegionCount := hotRegionTotalCount / float64(len(storesStat)) // Multiplied by hotRegionLimitFactor to avoid transfer back and forth - limit := uint64((float64(srcStoreStatistics.RegionsStat.Len()) - avgRegionCount) * hotRegionLimitFactor) - h.limit = maxUint64(1, limit) + return uint64(math.Max((float64(srcStoreStatistics.RegionsStat.Len())-avgRegionCount)*hotRegionLimitFactor, 1)) } func (h *balanceHotRegionsScheduler) GetHotReadStatus() *core.StoreHotRegionInfos { diff --git a/server/server.go b/server/server.go index 76856e045db..2086bba7ab5 100644 --- a/server/server.go +++ b/server/server.go @@ -524,10 +524,12 @@ func (s *Server) GetNamespaceConfig(name string) *NamespaceConfig { } cfg := &NamespaceConfig{ - LeaderScheduleLimit: s.scheduleOpt.GetLeaderScheduleLimit(name), - RegionScheduleLimit: s.scheduleOpt.GetRegionScheduleLimit(name), - ReplicaScheduleLimit: s.scheduleOpt.GetReplicaScheduleLimit(name), - MaxReplicas: uint64(s.scheduleOpt.GetMaxReplicas(name)), + LeaderScheduleLimit: s.scheduleOpt.GetLeaderScheduleLimit(name), + RegionScheduleLimit: s.scheduleOpt.GetRegionScheduleLimit(name), + ReplicaScheduleLimit: s.scheduleOpt.GetReplicaScheduleLimit(name), + HotRegionScheduleLimit: s.scheduleOpt.GetHotRegionScheduleLimit(name), + MergeScheduleLimit: s.scheduleOpt.GetMergeScheduleLimit(name), + MaxReplicas: uint64(s.scheduleOpt.GetMaxReplicas(name)), } return cfg