Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

checker: refactor fixOrphanPeers #7342

Merged
merged 6 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 25 additions & 18 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,23 +449,31 @@ func (c *RuleChecker) fixOrphanPeers(region *core.RegionInfo, fit *placement.Reg
return nil, nil
}

isUnhealthyPeer := func(id uint64) bool {
for _, downPeer := range region.GetDownPeers() {
if downPeer.Peer.GetId() == id {
isPendingPeer := func(id uint64) bool {
for _, pendingPeer := range region.GetPendingPeers() {
if pendingPeer.GetId() == id {
return true
}
}
for _, pendingPeer := range region.GetPendingPeers() {
if pendingPeer.GetId() == id {
return false
}

isDownPeer := func(id uint64) bool {
for _, downPeer := range region.GetDownPeers() {
if downPeer.Peer.GetId() == id {
return true
}
}
return false
}

isDisconnectedPeer := func(p *metapb.Peer) bool {
isUnhealthyPeer := func(id uint64) bool {
return isPendingPeer(id) || isDownPeer(id)
}

isInDisconnectedStore := func(p *metapb.Peer) bool {
// avoid to meet down store when fix orphan peers,
// Isdisconnected is more strictly than IsUnhealthy.
// isInDisconnectedStore is usually more strictly than IsUnhealthy.
store := c.cluster.GetStore(p.GetStoreId())
if store == nil {
return true
Expand All @@ -475,16 +483,15 @@ func (c *RuleChecker) fixOrphanPeers(region *core.RegionInfo, fit *placement.Reg

checkDownPeer := func(peers []*metapb.Peer) (*metapb.Peer, bool) {
for _, p := range peers {
if isUnhealthyPeer(p.GetId()) {
// make sure is down peer.
if region.GetDownPeer(p.GetId()) != nil {
return p, true
}
return nil, true
if isInDisconnectedStore(p) {
return p, true
}
if isDisconnectedPeer(p) {
if isDownPeer(p.GetId()) {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
return p, true
}
if isPendingPeer(p.GetId()) {
return nil, true
}
}
return nil, false
}
Expand Down Expand Up @@ -517,15 +524,15 @@ func (c *RuleChecker) fixOrphanPeers(region *core.RegionInfo, fit *placement.Reg
continue
}
// make sure the orphan peer is healthy.
if isUnhealthyPeer(orphanPeer.GetId()) || isDisconnectedPeer(orphanPeer) {
if isUnhealthyPeer(orphanPeer.GetId()) || isInDisconnectedStore(orphanPeer) {
continue
}
// no consider witness in this path.
if pinDownPeer.GetIsWitness() || orphanPeer.GetIsWitness() {
continue
}
// pinDownPeer's store should be disconnected, because we use more strict judge before.
if !isDisconnectedPeer(pinDownPeer) {
if !isInDisconnectedStore(pinDownPeer) {
continue
}
// check if down peer can replace with orphan peer.
Expand All @@ -539,7 +546,7 @@ func (c *RuleChecker) fixOrphanPeers(region *core.RegionInfo, fit *placement.Reg
return operator.CreatePromoteLearnerOperatorAndRemovePeer("replace-down-peer-with-orphan-peer", c.cluster, region, orphanPeer, pinDownPeer)
case orphanPeerRole == metapb.PeerRole_Voter && destRole == metapb.PeerRole_Learner:
return operator.CreateDemoteLearnerOperatorAndRemovePeer("replace-down-peer-with-orphan-peer", c.cluster, region, orphanPeer, pinDownPeer)
case orphanPeerRole == destRole && isDisconnectedPeer(pinDownPeer) && !dstStore.IsDisconnected():
case orphanPeerRole == destRole && isInDisconnectedStore(pinDownPeer) && !dstStore.IsDisconnected():
return operator.CreateRemovePeerOperator("remove-replaced-orphan-peer", c.cluster, 0, region, pinDownPeer.GetStoreId())
default:
// destRole should not same with orphanPeerRole. if role is same, it fit with orphanPeer should be better than now.
Expand All @@ -557,7 +564,7 @@ func (c *RuleChecker) fixOrphanPeers(region *core.RegionInfo, fit *placement.Reg
hasHealthPeer := false
var disconnectedPeer *metapb.Peer
for _, orphanPeer := range fit.OrphanPeers {
if isDisconnectedPeer(orphanPeer) {
if isInDisconnectedStore(orphanPeer) {
disconnectedPeer = orphanPeer
break
}
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2792,7 +2792,7 @@ func TestReplica(t *testing.T) {
re.NoError(dispatchHeartbeat(co, region, stream))
waitNoResponse(re, stream)

// Remove peer from store 4.
// Remove peer from store 3.
re.NoError(tc.addLeaderRegion(2, 1, 2, 3, 4))
region = tc.GetRegion(2)
re.NoError(dispatchHeartbeat(co, region, stream))
Expand Down
Loading