Skip to content

Commit

Permalink
schedulers: let hot region balance not affect by balance-region-sched…
Browse files Browse the repository at this point in the history
…uler-limit (tikv#1522)

Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed May 29, 2019
1 parent f0679a7 commit d8adae9
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 96 deletions.
10 changes: 7 additions & 3 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,10 +889,14 @@ func (s *testBalanceHotWriteRegionSchedulerSuite) TestBalance(c *C) {
}
}

// hot region scheduler is restricted by schedule limit.
opt.RegionScheduleLimit, opt.LeaderScheduleLimit = 0, 0
// hot region scheduler is restricted by `hot-region-schedule-limit`.
opt.HotRegionScheduleLimit = 0
c.Assert(hb.Schedule(tc, schedule.NewOpInfluence(nil, tc)), HasLen, 0)
opt.LeaderScheduleLimit = schedule.NewMockSchedulerOptions().LeaderScheduleLimit
// hot region scheduler is not affect by `balance-region-schedule-limit`.
opt.HotRegionScheduleLimit = schedule.NewMockSchedulerOptions().HotRegionScheduleLimit
opt.RegionScheduleLimit = 0
fmt.Println(hb.Schedule(tc, schedule.NewOpInfluence(nil, tc)))
c.Assert(hb.Schedule(tc, schedule.NewOpInfluence(nil, tc)), HasLen, 1)
opt.RegionScheduleLimit = schedule.NewMockSchedulerOptions().RegionScheduleLimit

// After transfer a hot region from store 1 to store 5
Expand Down
30 changes: 16 additions & 14 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ func newStoreStaticstics() *storeStatistics {
type balanceHotRegionsScheduler struct {
*baseScheduler
sync.RWMutex
limit uint64
types []BalanceType
leaderLimit uint64
peerLimit uint64
types []BalanceType

// store id -> hot regions statistics as the role of leader
stats *storeStatistics
Expand All @@ -81,7 +82,8 @@ func newBalanceHotRegionsScheduler(limiter *schedule.Limiter) *balanceHotRegions
base := newBaseScheduler(limiter)
return &balanceHotRegionsScheduler{
baseScheduler: base,
limit: 1,
leaderLimit: 1,
peerLimit: 1,
stats: newStoreStaticstics(),
types: []BalanceType{hotWriteRegionBalance, hotReadRegionBalance},
r: rand.New(rand.NewSource(time.Now().UnixNano())),
Expand All @@ -92,7 +94,8 @@ func newBalanceHotReadRegionsScheduler(limiter *schedule.Limiter) *balanceHotReg
base := newBaseScheduler(limiter)
return &balanceHotRegionsScheduler{
baseScheduler: base,
limit: 1,
leaderLimit: 1,
peerLimit: 1,
stats: newStoreStaticstics(),
types: []BalanceType{hotReadRegionBalance},
r: rand.New(rand.NewSource(time.Now().UnixNano())),
Expand All @@ -103,7 +106,8 @@ func newBalanceHotWriteRegionsScheduler(limiter *schedule.Limiter) *balanceHotRe
base := newBaseScheduler(limiter)
return &balanceHotRegionsScheduler{
baseScheduler: base,
limit: 1,
leaderLimit: 1,
peerLimit: 1,
stats: newStoreStaticstics(),
types: []BalanceType{hotWriteRegionBalance},
r: rand.New(rand.NewSource(time.Now().UnixNano())),
Expand All @@ -130,13 +134,12 @@ func min(a, b uint64) uint64 {
}

func (h *balanceHotRegionsScheduler) allowBalanceLeader(cluster schedule.Cluster) bool {
return h.limiter.OperatorCount(schedule.OpHotRegion) < min(h.limit, cluster.GetHotRegionScheduleLimit()) &&
return h.limiter.OperatorCount(schedule.OpHotRegion) < min(h.leaderLimit, cluster.GetHotRegionScheduleLimit()) &&
h.limiter.OperatorCount(schedule.OpLeader) < cluster.GetLeaderScheduleLimit()
}

func (h *balanceHotRegionsScheduler) allowBalanceRegion(cluster schedule.Cluster) bool {
return h.limiter.OperatorCount(schedule.OpHotRegion) < min(h.limit, cluster.GetHotRegionScheduleLimit()) &&
h.limiter.OperatorCount(schedule.OpRegion) < cluster.GetRegionScheduleLimit()
return h.limiter.OperatorCount(schedule.OpHotRegion) < min(h.peerLimit, cluster.GetHotRegionScheduleLimit())
}

func (h *balanceHotRegionsScheduler) Schedule(cluster schedule.Cluster, opInfluence schedule.OpInfluence) []*schedule.Operator {
Expand Down Expand Up @@ -217,7 +220,6 @@ func (h *balanceHotRegionsScheduler) balanceHotWriteRegions(cluster schedule.Clu
}
}
}

schedulerCounter.WithLabelValues(h.GetName(), "skip").Inc()
return nil
}
Expand Down Expand Up @@ -311,7 +313,7 @@ func (h *balanceHotRegionsScheduler) balanceByPeer(cluster schedule.Cluster, sto

destStoreID = h.selectDestStore(destStoreIDs, rs.FlowBytes, srcStoreID, storesStat)
if destStoreID != 0 {
h.adjustBalanceLimit(srcStoreID, storesStat)
h.peerLimit = h.adjustBalanceLimit(srcStoreID, storesStat)

srcPeer := srcRegion.GetStorePeer(srcStoreID)
if srcPeer == nil {
Expand Down Expand Up @@ -368,7 +370,8 @@ func (h *balanceHotRegionsScheduler) balanceByLeader(cluster schedule.Cluster, s

destPeer := srcRegion.GetStoreVoter(destStoreID)
if destPeer != nil {
h.adjustBalanceLimit(srcStoreID, storesStat)
h.leaderLimit = h.adjustBalanceLimit(srcStoreID, storesStat)

return srcRegion, destPeer
}
}
Expand Down Expand Up @@ -427,7 +430,7 @@ func (h *balanceHotRegionsScheduler) selectDestStore(candidateStoreIDs []uint64,
return
}

func (h *balanceHotRegionsScheduler) adjustBalanceLimit(storeID uint64, storesStat core.StoreHotRegionsStat) {
func (h *balanceHotRegionsScheduler) adjustBalanceLimit(storeID uint64, storesStat core.StoreHotRegionsStat) uint64 {
srcStoreStatistics := storesStat[storeID]

var hotRegionTotalCount float64
Expand All @@ -437,8 +440,7 @@ func (h *balanceHotRegionsScheduler) adjustBalanceLimit(storeID uint64, storesSt

avgRegionCount := hotRegionTotalCount / float64(len(storesStat))
// Multiplied by hotRegionLimitFactor to avoid transfer back and forth
limit := uint64((float64(srcStoreStatistics.RegionsStat.Len()) - avgRegionCount) * hotRegionLimitFactor)
h.limit = maxUint64(1, limit)
return uint64(math.Max((float64(srcStoreStatistics.RegionsStat.Len())-avgRegionCount)*hotRegionLimitFactor, 1))
}

func (h *balanceHotRegionsScheduler) GetHotReadStatus() *core.StoreHotRegionInfos {
Expand Down
79 changes: 0 additions & 79 deletions server/schedulers/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,85 +272,6 @@ func (s *testRejectLeaderSuite) TestRejectLeader(c *C) {
}
}
tc.Regions.AddRegion(region)
<<<<<<< HEAD
op = sl.Schedule(tc, schedule.NewOpInfluence(nil, tc))
=======
op = sl.Schedule(tc)
testutil.CheckTransferLeader(c, op[0], schedule.OpLeader, 1, 2)
}

var _ = Suite(&testShuffleHotRegionSchedulerSuite{})

type testShuffleHotRegionSchedulerSuite struct{}

func (s *testShuffleHotRegionSchedulerSuite) TestBalance(c *C) {
opt := schedule.NewMockSchedulerOptions()
newTestReplication(opt, 3, "zone", "host")
tc := schedule.NewMockCluster(opt)
hb, err := schedule.CreateScheduler("shuffle-hot-region", schedule.NewOperatorController(nil, nil))
c.Assert(err, IsNil)

// Add stores 1, 2, 3, 4, 5, 6 with hot peer counts 3, 2, 2, 2, 0, 0.
tc.AddLabelsStore(1, 3, map[string]string{"zone": "z1", "host": "h1"})
tc.AddLabelsStore(2, 2, map[string]string{"zone": "z2", "host": "h2"})
tc.AddLabelsStore(3, 2, map[string]string{"zone": "z3", "host": "h3"})
tc.AddLabelsStore(4, 2, map[string]string{"zone": "z4", "host": "h4"})
tc.AddLabelsStore(5, 0, map[string]string{"zone": "z5", "host": "h5"})
tc.AddLabelsStore(6, 0, map[string]string{"zone": "z4", "host": "h6"})

// Report store written bytes.
tc.UpdateStorageWrittenBytes(1, 75*1024*1024)
tc.UpdateStorageWrittenBytes(2, 45*1024*1024)
tc.UpdateStorageWrittenBytes(3, 45*1024*1024)
tc.UpdateStorageWrittenBytes(4, 60*1024*1024)
tc.UpdateStorageWrittenBytes(5, 0)
tc.UpdateStorageWrittenBytes(6, 0)

// Region 1, 2 and 3 are hot regions.
//| region_id | leader_store | follower_store | follower_store | written_bytes |
//|-----------|--------------|----------------|----------------|---------------|
//| 1 | 1 | 2 | 3 | 512KB |
//| 2 | 1 | 3 | 4 | 512KB |
//| 3 | 1 | 2 | 4 | 512KB |
tc.AddLeaderRegionWithWriteInfo(1, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 3)
tc.AddLeaderRegionWithWriteInfo(2, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 3, 4)
tc.AddLeaderRegionWithWriteInfo(3, 1, 512*1024*schedule.RegionHeartBeatReportInterval, 2, 4)
opt.HotRegionCacheHitsThreshold = 0

// try to get an operator
var op []*schedule.Operator
for i := 0; i < 100; i++ {
op = hb.Schedule(tc)
if op != nil {
break
}
}
c.Assert(op, NotNil)
c.Assert(op[0].Step(1).(schedule.PromoteLearner).ToStore, Equals, op[0].Step(2).(schedule.TransferLeader).ToStore)
c.Assert(op[0].Step(1).(schedule.PromoteLearner).ToStore, Not(Equals), 6)
}

var _ = Suite(&testEvictLeaderSuite{})

type testEvictLeaderSuite struct{}

func (s *testEvictLeaderSuite) TestEvictLeader(c *C) {
opt := schedule.NewMockSchedulerOptions()
tc := schedule.NewMockCluster(opt)

// Add stores 1, 2, 3
tc.AddLeaderStore(1, 0)
tc.AddLeaderStore(2, 0)
tc.AddLeaderStore(3, 0)
// Add regions 1, 2, 3 with leaders in stores 1, 2, 3
tc.AddLeaderRegion(1, 1, 2)
tc.AddLeaderRegion(2, 2, 1)
tc.AddLeaderRegion(3, 3, 1)

sl, err := schedule.CreateScheduler("evict-leader", schedule.NewOperatorController(nil, nil), "1")
c.Assert(err, IsNil)
c.Assert(sl.IsScheduleAllowed(tc), IsTrue)
op := sl.Schedule(tc)
>>>>>>> 8d6c9367... *: make hot region scheduler configurable (#1412)
testutil.CheckTransferLeader(c, op[0], schedule.OpLeader, 1, 2)
}

0 comments on commit d8adae9

Please sign in to comment.