Skip to content

Commit

Permalink
scheduler: check replica for hot region (tikv#1609)
Browse files Browse the repository at this point in the history
* check replica for hot region scheduler

Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Jul 10, 2019
1 parent dc34a51 commit f8e3ab4
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 2 deletions.
22 changes: 20 additions & 2 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,19 @@ func (h *balanceHotRegionsScheduler) balanceByPeer(cluster schedule.Cluster, sto
for _, i := range h.r.Perm(storesStat[srcStoreID].RegionsStat.Len()) {
rs := storesStat[srcStoreID].RegionsStat[i]
srcRegion := cluster.GetRegion(rs.RegionID)
if srcRegion == nil || len(srcRegion.GetDownPeers()) != 0 || len(srcRegion.GetPendingPeers()) != 0 {
if srcRegion == nil {
schedulerCounter.WithLabelValues(h.GetName(), "no_region").Inc()
continue
}

if isRegionUnhealthy(srcRegion) {
schedulerCounter.WithLabelValues(h.GetName(), "unhealthy_replica").Inc()
continue
}

if len(srcRegion.GetPeers()) != cluster.GetMaxReplicas() {
log.Debug("region has abnormal replica count", zap.String("scheduler", h.GetName()), zap.Uint64("region-id", srcRegion.GetID()))
schedulerCounter.WithLabelValues(h.GetName(), "abnormal_replica").Inc()
continue
}

Expand Down Expand Up @@ -345,7 +357,13 @@ func (h *balanceHotRegionsScheduler) balanceByLeader(cluster schedule.Cluster, s
for _, i := range h.r.Perm(storesStat[srcStoreID].RegionsStat.Len()) {
rs := storesStat[srcStoreID].RegionsStat[i]
srcRegion := cluster.GetRegion(rs.RegionID)
if srcRegion == nil || len(srcRegion.GetDownPeers()) != 0 || len(srcRegion.GetPendingPeers()) != 0 {
if srcRegion == nil {
schedulerCounter.WithLabelValues(h.GetName(), "no_region").Inc()
continue
}

if isRegionUnhealthy(srcRegion) {
schedulerCounter.WithLabelValues(h.GetName(), "unhealthy_replica").Inc()
continue
}

Expand Down
28 changes: 28 additions & 0 deletions server/schedulers/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,34 @@ func (s *testShuffleHotRegionSchedulerSuite) TestBalance(c *C) {
c.Assert(op[0].Step(1).(schedule.PromoteLearner).ToStore, Not(Equals), 6)
}

var _ = Suite(&testHotRegionSchedulerSuite{})

type testHotRegionSchedulerSuite struct{}

func (s *testHotRegionSchedulerSuite) TestAbnormalReplica(c *C) {
opt := mockoption.NewScheduleOptions()
opt.LeaderScheduleLimit = 0
tc := mockcluster.NewCluster(opt)
hb, err := schedule.CreateScheduler("hot-read-region", schedule.NewOperatorController(nil, nil))
c.Assert(err, IsNil)

tc.AddRegionStore(1, 3)
tc.AddRegionStore(2, 2)
tc.AddRegionStore(3, 2)

// Report store read bytes.
tc.UpdateStorageReadBytes(1, 75*1024*1024)
tc.UpdateStorageReadBytes(2, 45*1024*1024)
tc.UpdateStorageReadBytes(3, 45*1024*1024)

tc.AddLeaderRegionWithReadInfo(1, 1, 512*1024*statistics.RegionHeartBeatReportInterval, 2)
tc.AddLeaderRegionWithReadInfo(2, 2, 512*1024*statistics.RegionHeartBeatReportInterval, 1, 3)
tc.AddLeaderRegionWithReadInfo(3, 1, 512*1024*statistics.RegionHeartBeatReportInterval, 2, 3)
opt.HotRegionCacheHitsThreshold = 0
c.Assert(tc.IsRegionHot(1), IsTrue)
c.Assert(hb.Schedule(tc), IsNil)
}

var _ = Suite(&testEvictLeaderSuite{})

type testEvictLeaderSuite struct{}
Expand Down
4 changes: 4 additions & 0 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func minDuration(a, b time.Duration) time.Duration {
return b
}

func isRegionUnhealthy(region *core.RegionInfo) bool {
return len(region.GetDownPeers()) != 0 || len(region.GetLearners()) != 0
}

func shouldBalance(cluster schedule.Cluster, source, target *core.StoreInfo, region *core.RegionInfo, kind core.ResourceKind, opInfluence schedule.OpInfluence) bool {
// The reason we use max(regionSize, averageRegionSize) to check is:
// 1. prevent moving small regions between stores with close scores, leading to unnecessary balance.
Expand Down
32 changes: 32 additions & 0 deletions server/schedulers/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/server/core"
)

func TestSchedulers(t *testing.T) {
Expand Down Expand Up @@ -45,3 +48,32 @@ func (s *testMinMaxSuite) TestMinDuration(c *C) {
c.Assert(minDuration(time.Second, time.Minute), Equals, time.Second)
c.Assert(minDuration(time.Second, time.Second), Equals, time.Second)
}

var _ = Suite(&testRegionUnhealthySuite{})

type testRegionUnhealthySuite struct{}

func (s *testRegionUnhealthySuite) TestIsRegionUnhealthy(c *C) {
peers := make([]*metapb.Peer, 0, 3)
for i := uint64(0); i < 2; i++ {
p := &metapb.Peer{
Id: i,
StoreId: i,
}
peers = append(peers, p)
}
peers = append(peers, &metapb.Peer{
Id: 2,
StoreId: 2,
IsLearner: true,
})

r1 := core.NewRegionInfo(&metapb.Region{Peers: peers[:2]}, peers[0], core.WithDownPeers([]*pdpb.PeerStats{{Peer: peers[1]}}))
r2 := core.NewRegionInfo(&metapb.Region{Peers: peers[:2]}, peers[0], core.WithPendingPeers([]*metapb.Peer{peers[1]}))
r3 := core.NewRegionInfo(&metapb.Region{Peers: peers[:3]}, peers[0], core.WithLearners([]*metapb.Peer{peers[2]}))
r4 := core.NewRegionInfo(&metapb.Region{Peers: peers[:2]}, peers[0])
c.Assert(isRegionUnhealthy(r1), IsTrue)
c.Assert(isRegionUnhealthy(r2), IsFalse)
c.Assert(isRegionUnhealthy(r3), IsTrue)
c.Assert(isRegionUnhealthy(r4), IsFalse)
}

0 comments on commit f8e3ab4

Please sign in to comment.