Skip to content

Commit

Permalink
schdueler: fix unstable transfer leader in hot scheduler and add some…
Browse files Browse the repository at this point in the history
… log (tikv#3507) (tikv#3508)

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>

Co-authored-by: lhy1024 <admin@liudos.us>
Co-authored-by: Ti Chi Robot <71242396+ti-chi-bot@users.noreply.github.com>
  • Loading branch information
3 people authored Mar 24, 2021
1 parent 6e3b24f commit 9a70cb5
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 5 deletions.
3 changes: 2 additions & 1 deletion server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func (h *hotScheduler) gcRegionPendings() {
for ty, op := range pendings {
if op != nil && op.IsEnd() {
if time.Now().After(op.GetCreateTime().Add(h.conf.GetMaxZombieDuration())) {
log.Debug("gc pending influence in hot region scheduler", zap.Uint64("region-id", regionID), zap.Time("create", op.GetCreateTime()), zap.Time("now", time.Now()), zap.Duration("zombie", h.conf.GetMaxZombieDuration()))
schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Dec()
pendings[ty] = nil
}
Expand Down Expand Up @@ -616,7 +617,7 @@ func (bs *balanceSolver) filterHotPeers() []*statistics.HotPeerStat {

// filter pending region
appendItem := func(items []*statistics.HotPeerStat, item *statistics.HotPeerStat) []*statistics.HotPeerStat {
if _, ok := bs.sche.regionPendings[item.ID()]; !ok {
if _, ok := bs.sche.regionPendings[item.ID()]; !ok && !item.IsJustTransferLeader() {
items = append(items, item)
}
return items
Expand Down
21 changes: 17 additions & 4 deletions server/schedulers/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1079,16 +1079,21 @@ func addRegionInfo(tc *mockcluster.Cluster, rwTy rwType, regions []testRegionInf
}

func (s *testHotCacheSuite) TestCheckRegionFlow(c *C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(opt)
tc.SetMaxReplicas(3)
tc.SetLocationLabels([]string{"zone", "host"})
tc.DisableFeature(versioninfo.JointConsensus)
s.checkRegionFlowTest(c, tc.AddLeaderRegionWithWriteInfo)
s.checkRegionFlowTest(c, tc.AddLeaderRegionWithReadInfo)
sche, err := schedule.CreateScheduler(HotRegionType, schedule.NewOperatorController(ctx, tc, nil), core.NewStorage(kv.NewMemoryKV()), schedule.ConfigJSONDecoder([]byte("null")))
c.Assert(err, IsNil)
hb := sche.(*hotScheduler)
s.checkRegionFlowTest(c, tc, hb, tc.AddLeaderRegionWithWriteInfo)
s.checkRegionFlowTest(c, tc, hb, tc.AddLeaderRegionWithReadInfo)
}

func (s *testHotCacheSuite) checkRegionFlowTest(c *C, heartbeat func(
func (s *testHotCacheSuite) checkRegionFlowTest(c *C, tc *mockcluster.Cluster, hb *hotScheduler, heartbeat func(
regionID uint64, leaderID uint64,
readBytes, readKeys uint64,
reportInterval uint64,
Expand All @@ -1101,12 +1106,20 @@ func (s *testHotCacheSuite) checkRegionFlowTest(c *C, heartbeat func(
c.Check(item.HotDegree, Equals, 3)
}

// transfer leader and skip the first heartbeat
// transfer leader, skip the first heartbeat and schedule.
items = heartbeat(1, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 3}, 1)
for _, item := range items {
c.Check(item.HotDegree, Equals, 3)
c.Check(item.IsJustTransferLeader(), Equals, true)
}

tc.AddRegionStore(2, 20)
tc.UpdateStorageReadStats(2, 9.5*MB*statistics.StoreHeartBeatReportInterval, 9.5*MB*statistics.StoreHeartBeatReportInterval)
hb.prepareForBalance(tc)
leaderSolver := newBalanceSolver(hb, tc, read, transferLeader)
leaderSolver.cur = &solution{srcStoreID: 2}
c.Check(leaderSolver.filterHotPeers(), HasLen, 0) // skip schedule

// move peer: add peer and remove peer
items = heartbeat(1, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 3, 4}, 1)
for _, item := range items {
Expand Down
26 changes: 26 additions & 0 deletions server/statistics/hot_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/tikv/pd/pkg/movingaverage"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -114,6 +115,31 @@ func (stat *HotPeerStat) Less(k int, than TopNItem) bool {
}
}

// Log is used to output some info
func (stat *HotPeerStat) Log(str string, level func(msg string, fields ...zap.Field)) {
level(str,
zap.Uint64("interval", stat.interval),
zap.Uint64("region-id", stat.RegionID),
zap.Uint64("store", stat.StoreID),
zap.Float64("byte-rate", stat.GetByteRate()),
zap.Float64("byte-rate-instant", stat.ByteRate),
zap.Float64("byte-rate-threshold", stat.thresholds[byteDim]),
zap.Float64("key-rate", stat.GetKeyRate()),
zap.Float64("key-rate-instant", stat.KeyRate),
zap.Float64("key-rate-threshold", stat.thresholds[keyDim]),
zap.Int("hot-degree", stat.HotDegree),
zap.Int("hot-anti-count", stat.AntiCount),
zap.Bool("just-transfer-leader", stat.justTransferLeader),
zap.Bool("is-leader", stat.isLeader),
zap.Bool("need-delete", stat.IsNeedDelete()),
zap.String("type", stat.Kind.String()))
}

// IsJustTransferLeader indicates the item belong to the leader.
func (stat *HotPeerStat) IsJustTransferLeader() bool {
return stat.justTransferLeader
}

// IsNeedDelete to delete the item in cache.
func (stat *HotPeerStat) IsNeedDelete() bool {
return stat.needDelete
Expand Down
2 changes: 2 additions & 0 deletions server/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (f *hotPeerCache) Update(item *HotPeerStat) {
if stores, ok := f.storesOfRegion[item.RegionID]; ok {
delete(stores, item.StoreID)
}
item.Log("region heartbeat delete from cache", log.Debug)
} else {
peers, ok := f.peersOfStore[item.StoreID]
if !ok {
Expand All @@ -108,6 +109,7 @@ func (f *hotPeerCache) Update(item *HotPeerStat) {
f.storesOfRegion[item.RegionID] = stores
}
stores[item.StoreID] = struct{}{}
item.Log("region heartbeat update", log.Debug)
}
}

Expand Down

0 comments on commit 9a70cb5

Please sign in to comment.