From e63348dc7eb2d9100d0faa39f73ab8a40ec6d555 Mon Sep 17 00:00:00 2001 From: lightmelodies Date: Fri, 28 Jan 2022 14:42:59 +0800 Subject: [PATCH] Support to pause scheduling on specific regions Signed-off-by: lightmelodies --- pkg/mock/mockcluster/mockcluster.go | 15 +++++ server/api/region.go | 65 +++++++++++++++++++ server/api/router.go | 2 + server/cluster/cluster.go | 37 +++++++++++ server/schedule/checker/merge_checker.go | 10 +++ server/schedule/checker/merge_checker_test.go | 15 +++++ server/schedule/checker/split_checker.go | 5 ++ server/schedule/cluster.go | 1 + server/schedule/healthy.go | 5 ++ server/schedulers/balance_leader.go | 12 ++-- server/schedulers/balance_region.go | 12 ++-- server/schedulers/balance_test.go | 48 ++++++++++++++ server/schedulers/hot_region.go | 5 ++ server/schedulers/random_merge.go | 3 + server/schedulers/shuffle_hot_region.go | 3 + server/schedulers/shuffle_leader.go | 2 +- server/schedulers/shuffle_region.go | 9 ++- 17 files changed, 235 insertions(+), 14 deletions(-) diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 728d645afbe..2621296ad6d 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/cache" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/server/config" @@ -53,6 +54,7 @@ type Cluster struct { *config.PersistOptions ID uint64 suspectRegions map[uint64]struct{} + pinnedRegions *cache.TTLUint64 } // NewCluster creates a new Cluster @@ -63,6 +65,7 @@ func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster { HotStat: statistics.NewHotStat(ctx), PersistOptions: opts, suspectRegions: map[uint64]struct{}{}, + pinnedRegions: cache.NewIDTTL(ctx, time.Minute, 3*time.Minute), } if clus.PersistOptions.GetReplicationConfig().EnablePlacementRules { clus.initRuleManager() @@ -730,6 +733,18 @@ func (mc *Cluster) SetStoreLabel(storeID uint64, labels map[string]string) { mc.PutStore(newStore) } +// PinRegions pause scheduling on specific regions for a duration. +func (mc *Cluster) PinRegions(regionIDs []uint64, duration time.Duration) { + for _, regionID := range regionIDs { + mc.pinnedRegions.PutWithTTL(regionID, nil, duration) + } +} + +// IsRegionPinned check if region is pinned. +func (mc *Cluster) IsRegionPinned(region *core.RegionInfo) bool { + return mc.pinnedRegions.Exists(region.GetID()) +} + // AddSuspectRegions mock method func (mc *Cluster) AddSuspectRegions(ids ...uint64) { for _, id := range ids { diff --git a/server/api/region.go b/server/api/region.go index 955484bda30..615371458cd 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -22,6 +22,7 @@ import ( "net/url" "sort" "strconv" + "time" "github.com/gorilla/mux" "github.com/pingcap/failpoint" @@ -505,6 +506,18 @@ func (h *regionsHandler) GetEmptyRegion(w http.ResponseWriter, r *http.Request) h.rd.JSON(w, http.StatusOK, regionsInfo) } +// @Tags region +// @Summary List all pinned regions. +// @Produce json +// @Success 200 {object} RegionsInfo +// @Router /regions/check/pinned-region [get] +func (h *regionsHandler) GetPinnedRegions(w http.ResponseWriter, r *http.Request) { + rc := getCluster(r) + regions := rc.GetPinnedRegions() + regionsInfo := convertToAPIRegions(regions) + h.rd.JSON(w, http.StatusOK, regionsInfo) +} + type histItem struct { Start int64 `json:"start"` End int64 `json:"end"` @@ -914,6 +927,58 @@ func (h *regionsHandler) SplitRegions(w http.ResponseWriter, r *http.Request) { h.rd.JSON(w, http.StatusOK, &s) } +// @Tags region +// @Summary Pin regions by given key range or regions id with given duration. +// @Accept json +// @Param body body object true "json params" +// @Produce json +// @Success 200 {string} string "Pin regions successfully." +// @Failure 400 {string} string "The input is invalid." +// @Router /regions/pin [post] +func (h *regionsHandler) PinRegions(w http.ResponseWriter, r *http.Request) { + rc := getCluster(r) + var input struct { + StartKey *string `json:"start_key"` + EndKey *string `json:"end_key"` + RegionsID *[]uint64 `json:"regions_id"` + Duration *uint64 `json:"duration"` + } + if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + if input.Duration == nil { + h.rd.JSON(w, http.StatusBadRequest, "missing pin duration") + return + } + duration := time.Second * time.Duration(*input.Duration) + if input.StartKey != nil && input.EndKey != nil { + startKey, err := hex.DecodeString(*input.StartKey) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, "start_key is not in hex format") + return + } + endKey, err := hex.DecodeString(*input.EndKey) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, "end_key is not in hex format") + return + } + regions := rc.ScanRegions(startKey, endKey, -1) + regionsID := make([]uint64, len(regions)) + for i, region := range regions { + regionsID[i] = region.GetID() + } + rc.PinRegions(regionsID, duration) + } else { + if input.RegionsID == nil { + h.rd.JSON(w, http.StatusBadRequest, "missing key range or regions id") + return + } + rc.PinRegions(*input.RegionsID, duration) + } + h.rd.JSON(w, http.StatusOK, "Pin regions successfully.") +} + // RegionHeap implements heap.Interface, used for selecting top n regions. type RegionHeap struct { regions []*core.RegionInfo diff --git a/server/api/router.go b/server/api/router.go index e2abc5e4250..de5945b5891 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -266,6 +266,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { registerFunc(clusterRouter, "GetLearnerPeerRegions", "/regions/check/learner-peer", regionsHandler.GetLearnerPeerRegions, setMethods("GET")) registerFunc(clusterRouter, "GetEmptyRegion", "/regions/check/empty-region", regionsHandler.GetEmptyRegion, setMethods("GET")) registerFunc(clusterRouter, "GetOfflinePeer", "/regions/check/offline-peer", regionsHandler.GetOfflinePeer, setMethods("GET")) + registerFunc(clusterRouter, "GetPinnedRegions", "/regions/check/pinned-region", regionsHandler.GetPinnedRegions, setMethods("GET")) registerFunc(clusterRouter, "GetSizeHistogram", "/regions/check/hist-size", regionsHandler.GetSizeHistogram, setMethods("GET")) registerFunc(clusterRouter, "GetKeysHistogram", "/regions/check/hist-keys", regionsHandler.GetKeysHistogram, setMethods("GET")) @@ -273,6 +274,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { registerFunc(clusterRouter, "AccelerateRegionsSchedule", "/regions/accelerate-schedule", regionsHandler.AccelerateRegionsScheduleInRange, setMethods("POST")) registerFunc(clusterRouter, "ScatterRegions", "/regions/scatter", regionsHandler.ScatterRegions, setMethods("POST")) registerFunc(clusterRouter, "SplitRegions", "/regions/split", regionsHandler.SplitRegions, setMethods("POST")) + registerFunc(clusterRouter, "PinRegions", "/regions/pin", regionsHandler.PinRegions, setMethods("POST")) registerFunc(clusterRouter, "GetRangeHoles", "/regions/range-holes", regionsHandler.GetRangeHoles, setMethods("GET")) registerFunc(clusterRouter, "CheckRegionsReplicated", "/regions/replicated", regionsHandler.CheckRegionsReplicated, setMethods("GET"), setQueries("startKey", "{startKey}", "endKey", "{endKey}")) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 487d908817e..c24653249a5 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/cache" "github.com/tikv/pd/pkg/component" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/etcdutil" @@ -110,6 +111,8 @@ type RaftCluster struct { regionStats *statistics.RegionStatistics hotStat *statistics.HotStat + pinnedRegions *cache.TTLUint64 + coordinator *coordinator regionSyncer *syncer.RegionSyncer @@ -209,6 +212,7 @@ func (c *RaftCluster) InitCluster( c.labelLevelStats = statistics.NewLabelStatistics() c.hotStat = statistics.NewHotStat(c.ctx) c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit) + c.pinnedRegions = cache.NewIDTTL(c.ctx, time.Minute, 3*time.Minute) } // Start starts a cluster. @@ -489,6 +493,39 @@ func (c *RaftCluster) GetOpts() *config.PersistOptions { return c.opt } +// PinRegions pause scheduling on specific regions for a duration. +func (c *RaftCluster) PinRegions(regionIDs []uint64, duration time.Duration) { + c.Lock() + defer c.Unlock() + if duration > 0 { + for _, regionID := range regionIDs { + c.pinnedRegions.PutWithTTL(regionID, nil, duration) + } + } else { + for _, regionID := range regionIDs { + c.pinnedRegions.Remove(regionID) + } + } +} + +// IsRegionPinned check if region is pinned. +func (c *RaftCluster) IsRegionPinned(region *core.RegionInfo) bool { + return c.pinnedRegions.Exists(region.GetID()) +} + +// GetPinnedRegions returns all pinned regions' information in detail. +func (c *RaftCluster) GetPinnedRegions() []*core.RegionInfo { + regionIds := c.pinnedRegions.GetAllID() + regions := make([]*core.RegionInfo, 0, len(regionIds)) + for _, regionID := range regionIds { + region := c.GetRegion(regionID) + if region != nil { + regions = append(regions, region) + } + } + return regions +} + // AddSuspectRegions adds regions to suspect list. func (c *RaftCluster) AddSuspectRegions(regionIDs ...uint64) { c.coordinator.checkers.AddSuspectRegions(regionIDs...) diff --git a/server/schedule/checker/merge_checker.go b/server/schedule/checker/merge_checker.go index 932f285ad24..7d60b8dd5d9 100644 --- a/server/schedule/checker/merge_checker.go +++ b/server/schedule/checker/merge_checker.go @@ -84,6 +84,11 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator { return nil } + if m.cluster.IsRegionPinned(region) { + checkerCounter.WithLabelValues("merge_checker", "pinned").Inc() + return nil + } + expireTime := m.startTime.Add(m.opts.GetSplitMergeInterval()) if time.Now().Before(expireTime) { checkerCounter.WithLabelValues("merge_checker", "recently-start").Inc() @@ -172,6 +177,11 @@ func (m *MergeChecker) checkTarget(region, adjacent *core.RegionInfo) bool { return false } + if m.cluster.IsRegionPinned(adjacent) { + checkerCounter.WithLabelValues("merge_checker", "adj-pinned").Inc() + return false + } + if m.splitCache.Exists(adjacent.GetID()) { checkerCounter.WithLabelValues("merge_checker", "adj-recently-split").Inc() return false diff --git a/server/schedule/checker/merge_checker_test.go b/server/schedule/checker/merge_checker_test.go index f1e61be6c54..fe477fe328d 100644 --- a/server/schedule/checker/merge_checker_test.go +++ b/server/schedule/checker/merge_checker_test.go @@ -165,6 +165,12 @@ func (s *testMergeCheckerSuite) TestBasic(c *C) { c.Assert(ops[0].RegionID(), Equals, s.regions[2].GetID()) c.Assert(ops[1].RegionID(), Equals, s.regions[1].GetID()) + // Test pin region. + s.cluster.PinRegions([]uint64{s.regions[2].GetID()}, time.Minute) + ops = s.mc.Check(s.regions[2]) + c.Assert(ops, IsNil) + s.cluster.PinRegions([]uint64{s.regions[2].GetID()}, 0) + // Enable one way merge s.cluster.SetEnableOneWayMerge(true) ops = s.mc.Check(s.regions[2]) @@ -180,6 +186,15 @@ func (s *testMergeCheckerSuite) TestBasic(c *C) { c.Assert(ops[0].RegionID(), Equals, s.regions[2].GetID()) c.Assert(ops[1].RegionID(), Equals, s.regions[3].GetID()) + // Test pin next region. + s.cluster.PinRegions([]uint64{s.regions[3].GetID()}, time.Minute) + ops = s.mc.Check(s.regions[2]) + c.Assert(ops, NotNil) + // Now it merges to prev region. + c.Assert(ops[0].RegionID(), Equals, s.regions[2].GetID()) + c.Assert(ops[1].RegionID(), Equals, s.regions[1].GetID()) + s.cluster.PinRegions([]uint64{s.regions[3].GetID()}, 0) + // merge cannot across rule key. s.cluster.SetEnablePlacementRules(true) s.cluster.RuleManager.SetRule(&placement.Rule{ diff --git a/server/schedule/checker/split_checker.go b/server/schedule/checker/split_checker.go index 1b0db44df47..56c9f95f492 100644 --- a/server/schedule/checker/split_checker.go +++ b/server/schedule/checker/split_checker.go @@ -56,6 +56,11 @@ func (c *SplitChecker) Check(region *core.RegionInfo) *operator.Operator { return nil } + if c.cluster.IsRegionPinned(region) { + checkerCounter.WithLabelValues("split_checker", "pinned").Inc() + return nil + } + start, end := region.GetStartKey(), region.GetEndKey() // We may consider to merge labeler split keys and rule split keys together // before creating operator. It can help to reduce operator count. However, diff --git a/server/schedule/cluster.go b/server/schedule/cluster.go index e5cee73645d..30decf85066 100644 --- a/server/schedule/cluster.go +++ b/server/schedule/cluster.go @@ -33,4 +33,5 @@ type Cluster interface { RemoveScheduler(name string) error AddSuspectRegions(ids ...uint64) + IsRegionPinned(region *core.RegionInfo) bool } diff --git a/server/schedule/healthy.go b/server/schedule/healthy.go index 906c85ae0e3..65e0a73e96f 100644 --- a/server/schedule/healthy.go +++ b/server/schedule/healthy.go @@ -44,3 +44,8 @@ func IsRegionReplicated(cluster Cluster, region *core.RegionInfo) bool { func ReplicatedRegion(cluster Cluster) func(*core.RegionInfo) bool { return func(region *core.RegionInfo) bool { return IsRegionReplicated(cluster, region) } } + +// NonPinnedRegion returns a function that checks if a region is not pinned. +func NonPinnedRegion(cluster Cluster) func(*core.RegionInfo) bool { + return func(region *core.RegionInfo) bool { return !cluster.IsRegionPinned(region) } +} diff --git a/server/schedulers/balance_leader.go b/server/schedulers/balance_leader.go index 0805d73ab00..13c1f85ab43 100644 --- a/server/schedulers/balance_leader.go +++ b/server/schedulers/balance_leader.go @@ -169,7 +169,7 @@ func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster) []*operator. l.counter.WithLabelValues("high-score", plan.SourceMetricLabel()).Inc() for j := 0; j < retryLimit; j++ { schedulerCounter.WithLabelValues(l.GetName(), "total").Inc() - if ops := l.transferLeaderOut(plan); len(ops) > 0 { + if ops := l.transferLeaderOut(cluster, plan); len(ops) > 0 { l.retryQuota.ResetLimit(plan.source) ops[0].Counters = append(ops[0].Counters, l.counter.WithLabelValues("transfer-out", plan.SourceMetricLabel())) return ops @@ -185,7 +185,7 @@ func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster) []*operator. l.counter.WithLabelValues("low-score", plan.TargetMetricLabel()).Inc() for j := 0; j < retryLimit; j++ { schedulerCounter.WithLabelValues(l.GetName(), "total").Inc() - if ops := l.transferLeaderIn(plan); len(ops) > 0 { + if ops := l.transferLeaderIn(cluster, plan); len(ops) > 0 { l.retryQuota.ResetLimit(plan.target) ops[0].Counters = append(ops[0].Counters, l.counter.WithLabelValues("transfer-in", plan.TargetMetricLabel())) return ops @@ -202,8 +202,8 @@ func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster) []*operator. // transferLeaderOut transfers leader from the source store. // It randomly selects a health region from the source store, then picks // the best follower peer and transfers the leader. -func (l *balanceLeaderScheduler) transferLeaderOut(plan *balancePlan) []*operator.Operator { - plan.region = plan.RandLeaderRegion(plan.SourceStoreID(), l.conf.Ranges, schedule.IsRegionHealthy) +func (l *balanceLeaderScheduler) transferLeaderOut(cluster schedule.Cluster, plan *balancePlan) []*operator.Operator { + plan.region = plan.RandLeaderRegion(plan.SourceStoreID(), l.conf.Ranges, schedule.IsRegionHealthy, schedule.NonPinnedRegion(cluster)) if plan.region == nil { log.Debug("store has no leader", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", plan.SourceStoreID())) schedulerCounter.WithLabelValues(l.GetName(), "no-leader-region").Inc() @@ -235,8 +235,8 @@ func (l *balanceLeaderScheduler) transferLeaderOut(plan *balancePlan) []*operato // transferLeaderIn transfers leader to the target store. // It randomly selects a health region from the target store, then picks // the worst follower peer and transfers the leader. -func (l *balanceLeaderScheduler) transferLeaderIn(plan *balancePlan) []*operator.Operator { - plan.region = plan.RandFollowerRegion(plan.TargetStoreID(), l.conf.Ranges, schedule.IsRegionHealthy) +func (l *balanceLeaderScheduler) transferLeaderIn(cluster schedule.Cluster, plan *balancePlan) []*operator.Operator { + plan.region = plan.RandFollowerRegion(plan.TargetStoreID(), l.conf.Ranges, schedule.IsRegionHealthy, schedule.NonPinnedRegion(cluster)) if plan.region == nil { log.Debug("store has no follower", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", plan.TargetStoreID())) schedulerCounter.WithLabelValues(l.GetName(), "no-follower-region").Inc() diff --git a/server/schedulers/balance_region.go b/server/schedulers/balance_region.go index b1a55f6916c..0bab045c0ab 100644 --- a/server/schedulers/balance_region.go +++ b/server/schedulers/balance_region.go @@ -171,18 +171,22 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster) []*operator. schedulerCounter.WithLabelValues(s.GetName(), "total").Inc() // Priority pick the region that has a pending peer. // Pending region may means the disk is overload, remove the pending region firstly. - plan.region = cluster.RandPendingRegion(plan.SourceStoreID(), s.conf.Ranges, schedule.IsRegionHealthyAllowPending, schedule.ReplicatedRegion(cluster), allowBalanceEmptyRegion) + plan.region = cluster.RandPendingRegion(plan.SourceStoreID(), s.conf.Ranges, schedule.IsRegionHealthyAllowPending, + schedule.ReplicatedRegion(cluster), schedule.NonPinnedRegion(cluster), allowBalanceEmptyRegion) if plan.region == nil { // Then pick the region that has a follower in the source store. - plan.region = cluster.RandFollowerRegion(plan.SourceStoreID(), s.conf.Ranges, schedule.IsRegionHealthy, schedule.ReplicatedRegion(cluster), allowBalanceEmptyRegion) + plan.region = cluster.RandFollowerRegion(plan.SourceStoreID(), s.conf.Ranges, schedule.IsRegionHealthy, + schedule.ReplicatedRegion(cluster), schedule.NonPinnedRegion(cluster), allowBalanceEmptyRegion) } if plan.region == nil { // Then pick the region has the leader in the source store. - plan.region = cluster.RandLeaderRegion(plan.SourceStoreID(), s.conf.Ranges, schedule.IsRegionHealthy, schedule.ReplicatedRegion(cluster), allowBalanceEmptyRegion) + plan.region = cluster.RandLeaderRegion(plan.SourceStoreID(), s.conf.Ranges, schedule.IsRegionHealthy, + schedule.ReplicatedRegion(cluster), schedule.NonPinnedRegion(cluster), allowBalanceEmptyRegion) } if plan.region == nil { // Finally pick learner. - plan.region = cluster.RandLearnerRegion(plan.SourceStoreID(), s.conf.Ranges, schedule.IsRegionHealthy, schedule.ReplicatedRegion(cluster), allowBalanceEmptyRegion) + plan.region = cluster.RandLearnerRegion(plan.SourceStoreID(), s.conf.Ranges, schedule.IsRegionHealthy, + schedule.ReplicatedRegion(cluster), schedule.NonPinnedRegion(cluster), allowBalanceEmptyRegion) } if plan.region == nil { schedulerCounter.WithLabelValues(s.GetName(), "no-region").Inc() diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index 358bd1f2789..80078954eae 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "math/rand" + "time" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" @@ -490,6 +491,22 @@ func (s *testBalanceLeaderSchedulerSuite) TestBalanceSelector(c *C) { testutil.CheckTransferLeader(c, s.schedule()[0], operator.OpKind(0), 4, 3) } +func (s *testBalanceLeaderSchedulerSuite) TestBalanceSelectorWithPinnedRegion(c *C) { + // Stores: 1 2 3 4 + // Leaders: 1 2 3 16 + // Region1: - F F L + // Region2: - F F L + s.tc.AddLeaderStore(1, 1) + s.tc.AddLeaderStore(2, 2) + s.tc.AddLeaderStore(3, 3) + s.tc.AddLeaderStore(4, 16) + s.tc.AddLeaderRegion(1, 4, 2, 3) + s.tc.PinRegions([]uint64{1}, 5*time.Minute) + c.Check(s.schedule(), IsNil) + s.tc.AddLeaderRegion(2, 4, 2, 3) + c.Assert(s.schedule()[0].RegionID(), Not(Equals), 1) +} + var _ = Suite(&testBalanceLeaderRangeSchedulerSuite{}) type testBalanceLeaderRangeSchedulerSuite struct { @@ -633,6 +650,37 @@ func (s *testBalanceRegionSchedulerSuite) TestBalance(c *C) { c.Assert(sb.Schedule(tc), NotNil) } +func (s *testBalanceRegionSchedulerSuite) TestBalanceWithPinnedRegion(c *C) { + opt := config.NewTestOptions() + // TODO: enable placementrules + opt.SetPlacementRuleEnabled(false) + tc := mockcluster.NewCluster(s.ctx, opt) + tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) + oc := schedule.NewOperatorController(s.ctx, nil, nil) + + sb, err := schedule.CreateScheduler(BalanceRegionType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) + c.Assert(err, IsNil) + + opt.SetMaxReplicas(1) + + tc.AddRegionStore(1, 6) + tc.AddRegionStore(2, 8) + tc.AddRegionStore(3, 8) + tc.AddRegionStore(4, 16) + + tc.AddLeaderRegion(1, 4) + tc.AddLeaderRegion(2, 3) + tc.AddLeaderRegion(3, 3) + tc.PinRegions([]uint64{1, 2}, 5*time.Minute) + + c.Check(sb.Schedule(tc), IsNil) + + tc.UpdateRegionCount(3, 16) + tc.UpdateRegionCount(4, 8) + + c.Check(sb.Schedule(tc)[0].RegionID(), Not(Equals), 2) +} + func (s *testBalanceRegionSchedulerSuite) TestReplicas3(c *C) { opt := config.NewTestOptions() // TODO: enable placementrules diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index e8ece7b54c8..986a3384d69 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -664,6 +664,11 @@ func (bs *balanceSolver) isRegionAvailable(region *core.RegionInfo) bool { return false } + if bs.IsRegionPinned(region) { + schedulerCounter.WithLabelValues(bs.sche.GetName(), "pinned-region").Inc() + return false + } + return true } diff --git a/server/schedulers/random_merge.go b/server/schedulers/random_merge.go index 0dabfbac900..a792c1c24e1 100644 --- a/server/schedulers/random_merge.go +++ b/server/schedulers/random_merge.go @@ -148,5 +148,8 @@ func (s *randomMergeScheduler) allowMerge(cluster schedule.Cluster, region, targ if cluster.IsRegionHot(region) || cluster.IsRegionHot(target) { return false } + if cluster.IsRegionPinned(region) || cluster.IsRegionPinned(target) { + return false + } return checker.AllowMerge(cluster, region, target) } diff --git a/server/schedulers/shuffle_hot_region.go b/server/schedulers/shuffle_hot_region.go index 55a894f3c70..dba53f837f1 100644 --- a/server/schedulers/shuffle_hot_region.go +++ b/server/schedulers/shuffle_hot_region.go @@ -171,6 +171,9 @@ func (s *shuffleHotRegionScheduler) randomSchedule(cluster schedule.Cluster, loa if srcRegion == nil || len(srcRegion.GetDownPeers()) != 0 || len(srcRegion.GetPendingPeers()) != 0 { continue } + if cluster.IsRegionPinned(srcRegion) { + continue + } srcStoreID := srcRegion.GetLeader().GetStoreId() srcStore := cluster.GetStore(srcStoreID) if srcStore == nil { diff --git a/server/schedulers/shuffle_leader.go b/server/schedulers/shuffle_leader.go index 30bc74804e9..75288ad46f4 100644 --- a/server/schedulers/shuffle_leader.go +++ b/server/schedulers/shuffle_leader.go @@ -115,7 +115,7 @@ func (s *shuffleLeaderScheduler) Schedule(cluster schedule.Cluster) []*operator. schedulerCounter.WithLabelValues(s.GetName(), "no-target-store").Inc() return nil } - region := cluster.RandFollowerRegion(targetStore.GetID(), s.conf.Ranges, schedule.IsRegionHealthy) + region := cluster.RandFollowerRegion(targetStore.GetID(), s.conf.Ranges, schedule.IsRegionHealthy, schedule.NonPinnedRegion(cluster)) if region == nil { schedulerCounter.WithLabelValues(s.GetName(), "no-follower").Inc() return nil diff --git a/server/schedulers/shuffle_region.go b/server/schedulers/shuffle_region.go index b0a6286ceb0..45f1da88332 100644 --- a/server/schedulers/shuffle_region.go +++ b/server/schedulers/shuffle_region.go @@ -135,13 +135,16 @@ func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster schedule.Cluster) (* for _, source := range candidates.Stores { var region *core.RegionInfo if s.conf.IsRoleAllow(roleFollower) { - region = cluster.RandFollowerRegion(source.GetID(), s.conf.GetRanges(), schedule.IsRegionHealthy, schedule.ReplicatedRegion(cluster)) + region = cluster.RandFollowerRegion(source.GetID(), s.conf.GetRanges(), schedule.IsRegionHealthy, + schedule.ReplicatedRegion(cluster), schedule.NonPinnedRegion(cluster)) } if region == nil && s.conf.IsRoleAllow(roleLeader) { - region = cluster.RandLeaderRegion(source.GetID(), s.conf.GetRanges(), schedule.IsRegionHealthy, schedule.ReplicatedRegion(cluster)) + region = cluster.RandLeaderRegion(source.GetID(), s.conf.GetRanges(), schedule.IsRegionHealthy, + schedule.ReplicatedRegion(cluster), schedule.NonPinnedRegion(cluster)) } if region == nil && s.conf.IsRoleAllow(roleLearner) { - region = cluster.RandLearnerRegion(source.GetID(), s.conf.GetRanges(), schedule.IsRegionHealthy, schedule.ReplicatedRegion(cluster)) + region = cluster.RandLearnerRegion(source.GetID(), s.conf.GetRanges(), schedule.IsRegionHealthy, + schedule.ReplicatedRegion(cluster), schedule.NonPinnedRegion(cluster)) } if region != nil { return region, region.GetStorePeer(source.GetID())