diff --git a/server/cluster_info.go b/server/cluster_info.go index f8d0295540a8..c40028b8d04b 100644 --- a/server/cluster_info.go +++ b/server/cluster_info.go @@ -415,6 +415,13 @@ func (c *clusterInfo) RandFollowerRegion(storeID uint64, opts ...core.RegionOpti return c.core.RandFollowerRegion(storeID, opts...) } +// RandPendingRegion returns a random region that has a pending peer on the store. +func (c *clusterInfo) RandPendingRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo { + c.RLock() + defer c.RUnlock() + return c.core.RandPendingRegion(storeID, opts...) +} + // GetAverageRegionSize returns the average region approximate size. func (c *clusterInfo) GetAverageRegionSize() int64 { c.RLock() diff --git a/server/core/basic_cluster.go b/server/core/basic_cluster.go index fd0020aa6568..af6109de4fcf 100644 --- a/server/core/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -99,6 +99,11 @@ func (bc *BasicCluster) RandLeaderRegion(storeID uint64, opts ...RegionOption) * return bc.Regions.RandLeaderRegion(storeID, opts...) } +// RandPendingRegion returns a random region that has a pending peer on the store. +func (bc *BasicCluster) RandPendingRegion(storeID uint64, opts ...RegionOption) *RegionInfo { + return bc.Regions.RandPendingRegion(storeID, opts...) +} + // GetAverageRegionSize returns the average region approximate size. func (bc *BasicCluster) GetAverageRegionSize() int64 { return bc.Regions.GetAverageRegionSize() diff --git a/server/core/region.go b/server/core/region.go index 4e7e213070ee..a5d101d17358 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -668,6 +668,10 @@ func (r *RegionsInfo) RandRegion(opts ...RegionOption) *RegionInfo { return randRegion(r.regions, opts...) } +func (r *RegionsInfo) RandPendingRegion(storeID uint64, opts ...RegionOption) *RegionInfo { + return randRegion(r.pendingPeers[storeID], opts...) +} + // RandLeaderRegion get a store's leader region by random func (r *RegionsInfo) RandLeaderRegion(storeID uint64, opts ...RegionOption) *RegionInfo { return randRegion(r.leaders[storeID], opts...) diff --git a/server/core/region_option.go b/server/core/region_option.go index bf772045ba0b..4aeedd90ac0f 100644 --- a/server/core/region_option.go +++ b/server/core/region_option.go @@ -28,6 +28,13 @@ func HealthRegion() RegionOption { } } +// HealthRegion checks if the region is healthy. +func HealthRegionAllowPending() RegionOption { + return func(region *RegionInfo) bool { + return len(region.downPeers) == 0 && len(region.learners) == 0 + } +} + // RegionCreateOption used to create region. type RegionCreateOption func(region *RegionInfo) diff --git a/server/schedule/filters.go b/server/schedule/filters.go index e2b7b1c979a8..73a3f0df42c1 100644 --- a/server/schedule/filters.go +++ b/server/schedule/filters.go @@ -415,6 +415,11 @@ func (f StoreStateFilter) FilterTarget(opt Options, store *core.StoreInfo) bool return true } + // only target consider the pending peers because pending more means the disk is slowler. + if opt.GetMaxPendingPeerCount() > 0 && store.GetPendingPeerCount() > int(opt.GetMaxPendingPeerCount()) { + return true + } + if f.MoveRegion && f.filterMoveRegion(opt, store) { return true } @@ -430,9 +435,6 @@ func (f StoreStateFilter) filterMoveRegion(opt Options, store *core.StoreInfo) b return true } - if opt.GetMaxPendingPeerCount() > 0 && store.GetPendingPeerCount() > int(opt.GetMaxPendingPeerCount()) { - return true - } if uint64(store.GetSendingSnapCount()) > opt.GetMaxSnapshotCount() || uint64(store.GetReceivingSnapCount()) > opt.GetMaxSnapshotCount() || uint64(store.GetApplyingSnapCount()) > opt.GetMaxSnapshotCount() { diff --git a/server/schedule/scheduler.go b/server/schedule/scheduler.go index 008f1914ee38..54d2bb9584ed 100644 --- a/server/schedule/scheduler.go +++ b/server/schedule/scheduler.go @@ -25,22 +25,28 @@ import ( "go.uber.org/zap" ) -// Cluster provides an overview of a cluster's regions distribution. -type Cluster interface { +// RegionSetInformer provides access to a shared informer of regions. +// TODO: move to core package +type RegionSetInformer interface { RandFollowerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo RandLeaderRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo + RandPendingRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo GetAverageRegionSize() int64 GetStoreRegionCount(storeID uint64) int + GetRegion(id uint64) *core.RegionInfo + GetAdjacentRegions(region *core.RegionInfo) (*core.RegionInfo, *core.RegionInfo) + ScanRegions(startKey []byte, limit int) []*core.RegionInfo +} +// Cluster provides an overview of a cluster's regions distribution. +type Cluster interface { + RegionSetInformer GetStores() []*core.StoreInfo GetStore(id uint64) *core.StoreInfo - GetRegion(id uint64) *core.RegionInfo + GetRegionStores(region *core.RegionInfo) []*core.StoreInfo GetFollowerStores(region *core.RegionInfo) []*core.StoreInfo GetLeaderStore(region *core.RegionInfo) *core.StoreInfo - GetAdjacentRegions(region *core.RegionInfo) (*core.RegionInfo, *core.RegionInfo) - ScanRegions(startKey []byte, limit int) []*core.RegionInfo - BlockStore(id uint64) error UnblockStore(id uint64) diff --git a/server/schedulers/balance_region.go b/server/schedulers/balance_region.go index aa53feb37e95..ece84d1922fb 100644 --- a/server/schedulers/balance_region.go +++ b/server/schedulers/balance_region.go @@ -95,10 +95,15 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster) []*schedule. opInfluence := s.opController.GetOpInfluence(cluster) var hasPotentialTarget bool for i := 0; i < balanceRegionRetryLimit; i++ { - // Priority the region that has a follower in the source store. - region := cluster.RandFollowerRegion(sourceID, core.HealthRegion()) + // Priority picks the region that has a pending peer. + // Pending region may means the disk is overload, remove the pending region firstly. + region := cluster.RandPendingRegion(sourceID, core.HealthRegionAllowPending()) if region == nil { - // Then the region has the leader in the source store + // Then picks the region that has a follower in the source store. + region = cluster.RandFollowerRegion(sourceID, core.HealthRegion()) + } + if region == nil { + // Last, picks the region has the leader in the source store. region = cluster.RandLeaderRegion(sourceID, core.HealthRegion()) } if region == nil { diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index f7467ba2168f..a379a1bcd476 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -489,6 +489,37 @@ func (s *testBalanceRegionSchedulerSuite) TestStoreWeight(c *C) { testutil.CheckTransferPeer(c, sb.Schedule(tc)[0], schedule.OpBalance, 1, 3) } +func (s *testBalanceRegionSchedulerSuite) TestReplacePendingRegion(c *C) { + opt := mockoption.NewScheduleOptions() + tc := mockcluster.NewCluster(opt) + oc := schedule.NewOperatorController(nil, nil) + + newTestReplication(opt, 3, "zone", "rack", "host") + + sb, err := schedule.CreateScheduler("balance-region", oc) + c.Assert(err, IsNil) + + // Store 1 has the largest region score, so the balancer try to replace peer in store 1. + tc.AddLabelsStore(1, 16, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) + tc.AddLabelsStore(2, 7, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"}) + tc.AddLabelsStore(3, 15, map[string]string{"zone": "z1", "rack": "r2", "host": "h2"}) + // Store 4 has smaller region score than store 1 and more better place than store 2. + tc.AddLabelsStore(4, 10, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) + + // set pending peer + tc.AddLeaderRegion(1, 1, 2, 3) + tc.AddLeaderRegion(2, 1, 2, 3) + tc.AddLeaderRegion(3, 2, 1, 3) + region := tc.GetRegion(3) + region = region.Clone(core.WithPendingPeers([]*metapb.Peer{region.GetStorePeer(1)})) + tc.PutRegion(region) + region = tc.GetRegion(3) + + c.Assert(sb.Schedule(tc)[0].RegionID(), Equals, uint64(3)) + testutil.CheckTransferPeer(c, sb.Schedule(tc)[0], schedule.OpBalance, 1, 4) + +} + var _ = Suite(&testReplicaCheckerSuite{}) type testReplicaCheckerSuite struct{}