Skip to content

Commit

Permalink
schedulers: balance region consider pending peer
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Jul 3, 2019
1 parent edb930a commit 4b12d03
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 12 deletions.
7 changes: 7 additions & 0 deletions server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,13 @@ func (c *clusterInfo) RandFollowerRegion(storeID uint64, opts ...core.RegionOpti
return c.core.RandFollowerRegion(storeID, opts...)
}

// RandPendingRegion returns a random region that has a pending peer on the store.
func (c *clusterInfo) RandPendingRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
c.RLock()
defer c.RUnlock()
return c.core.RandPendingRegion(storeID, opts...)
}

// GetAverageRegionSize returns the average region approximate size.
func (c *clusterInfo) GetAverageRegionSize() int64 {
c.RLock()
Expand Down
5 changes: 5 additions & 0 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ func (bc *BasicCluster) RandLeaderRegion(storeID uint64, opts ...RegionOption) *
return bc.Regions.RandLeaderRegion(storeID, opts...)
}

// RandPendingRegion returns a random region that has a pending peer on the store.
func (bc *BasicCluster) RandPendingRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
return bc.Regions.RandPendingRegion(storeID, opts...)
}

// GetAverageRegionSize returns the average region approximate size.
func (bc *BasicCluster) GetAverageRegionSize() int64 {
return bc.Regions.GetAverageRegionSize()
Expand Down
4 changes: 4 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,10 @@ func (r *RegionsInfo) RandRegion(opts ...RegionOption) *RegionInfo {
return randRegion(r.regions, opts...)
}

func (r *RegionsInfo) RandPendingRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
return randRegion(r.pendingPeers[storeID], opts...)
}

// RandLeaderRegion get a store's leader region by random
func (r *RegionsInfo) RandLeaderRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
return randRegion(r.leaders[storeID], opts...)
Expand Down
7 changes: 7 additions & 0 deletions server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ func HealthRegion() RegionOption {
}
}

// HealthRegion checks if the region is healthy.
func HealthRegionAllowPending() RegionOption {
return func(region *RegionInfo) bool {
return len(region.downPeers) == 0 && len(region.learners) == 0
}
}

// RegionCreateOption used to create region.
type RegionCreateOption func(region *RegionInfo)

Expand Down
8 changes: 5 additions & 3 deletions server/schedule/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,11 @@ func (f StoreStateFilter) FilterTarget(opt Options, store *core.StoreInfo) bool
return true
}

// only target consider the pending peers because pending more means the disk is slowler.
if opt.GetMaxPendingPeerCount() > 0 && store.GetPendingPeerCount() > int(opt.GetMaxPendingPeerCount()) {
return true
}

if f.MoveRegion && f.filterMoveRegion(opt, store) {
return true
}
Expand All @@ -430,9 +435,6 @@ func (f StoreStateFilter) filterMoveRegion(opt Options, store *core.StoreInfo) b
return true
}

if opt.GetMaxPendingPeerCount() > 0 && store.GetPendingPeerCount() > int(opt.GetMaxPendingPeerCount()) {
return true
}
if uint64(store.GetSendingSnapCount()) > opt.GetMaxSnapshotCount() ||
uint64(store.GetReceivingSnapCount()) > opt.GetMaxSnapshotCount() ||
uint64(store.GetApplyingSnapCount()) > opt.GetMaxSnapshotCount() {
Expand Down
18 changes: 12 additions & 6 deletions server/schedule/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,28 @@ import (
"go.uber.org/zap"
)

// Cluster provides an overview of a cluster's regions distribution.
type Cluster interface {
// RegionSetInformer provides access to a shared informer of regions.
// TODO: move to core package
type RegionSetInformer interface {
RandFollowerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo
RandLeaderRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo
RandPendingRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo
GetAverageRegionSize() int64
GetStoreRegionCount(storeID uint64) int
GetRegion(id uint64) *core.RegionInfo
GetAdjacentRegions(region *core.RegionInfo) (*core.RegionInfo, *core.RegionInfo)
ScanRegions(startKey []byte, limit int) []*core.RegionInfo
}

// Cluster provides an overview of a cluster's regions distribution.
type Cluster interface {
RegionSetInformer
GetStores() []*core.StoreInfo
GetStore(id uint64) *core.StoreInfo
GetRegion(id uint64) *core.RegionInfo

GetRegionStores(region *core.RegionInfo) []*core.StoreInfo
GetFollowerStores(region *core.RegionInfo) []*core.StoreInfo
GetLeaderStore(region *core.RegionInfo) *core.StoreInfo
GetAdjacentRegions(region *core.RegionInfo) (*core.RegionInfo, *core.RegionInfo)
ScanRegions(startKey []byte, limit int) []*core.RegionInfo

BlockStore(id uint64) error
UnblockStore(id uint64)

Expand Down
11 changes: 8 additions & 3 deletions server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,15 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster) []*schedule.
opInfluence := s.opController.GetOpInfluence(cluster)
var hasPotentialTarget bool
for i := 0; i < balanceRegionRetryLimit; i++ {
// Priority the region that has a follower in the source store.
region := cluster.RandFollowerRegion(sourceID, core.HealthRegion())
// Priority picks the region that has a pending peer.
// Pending region may means the disk is overload, remove the pending region firstly.
region := cluster.RandPendingRegion(sourceID, core.HealthRegionAllowPending())
if region == nil {
// Then the region has the leader in the source store
// Then picks the region that has a follower in the source store.
region = cluster.RandFollowerRegion(sourceID, core.HealthRegion())
}
if region == nil {
// Last, picks the region has the leader in the source store.
region = cluster.RandLeaderRegion(sourceID, core.HealthRegion())
}
if region == nil {
Expand Down
31 changes: 31 additions & 0 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,37 @@ func (s *testBalanceRegionSchedulerSuite) TestStoreWeight(c *C) {
testutil.CheckTransferPeer(c, sb.Schedule(tc)[0], schedule.OpBalance, 1, 3)
}

func (s *testBalanceRegionSchedulerSuite) TestReplacePendingRegion(c *C) {
opt := mockoption.NewScheduleOptions()
tc := mockcluster.NewCluster(opt)
oc := schedule.NewOperatorController(nil, nil)

newTestReplication(opt, 3, "zone", "rack", "host")

sb, err := schedule.CreateScheduler("balance-region", oc)
c.Assert(err, IsNil)

// Store 1 has the largest region score, so the balancer try to replace peer in store 1.
tc.AddLabelsStore(1, 16, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
tc.AddLabelsStore(2, 7, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"})
tc.AddLabelsStore(3, 15, map[string]string{"zone": "z1", "rack": "r2", "host": "h2"})
// Store 4 has smaller region score than store 1 and more better place than store 2.
tc.AddLabelsStore(4, 10, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})

// set pending peer
tc.AddLeaderRegion(1, 1, 2, 3)
tc.AddLeaderRegion(2, 1, 2, 3)
tc.AddLeaderRegion(3, 2, 1, 3)
region := tc.GetRegion(3)
region = region.Clone(core.WithPendingPeers([]*metapb.Peer{region.GetStorePeer(1)}))
tc.PutRegion(region)
region = tc.GetRegion(3)

c.Assert(sb.Schedule(tc)[0].RegionID(), Equals, uint64(3))
testutil.CheckTransferPeer(c, sb.Schedule(tc)[0], schedule.OpBalance, 1, 4)

}

var _ = Suite(&testReplicaCheckerSuite{})

type testReplicaCheckerSuite struct{}
Expand Down

0 comments on commit 4b12d03

Please sign in to comment.