Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

checker: replace down check with disconnect check when fixing orphan peer #7294

Merged
merged 4 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,9 @@ var (
// tikv's store heartbeat for a short time, maybe caused by process restart or
// temporary network failure.
func (s *StoreInfo) IsDisconnected() bool {
if s == nil {
return true
}
return s.DownTime() > storeDisconnectDuration
}

Expand Down
62 changes: 38 additions & 24 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func (c *RuleChecker) fixOrphanPeers(region *core.RegionInfo, fit *placement.Reg
if len(fit.OrphanPeers) == 0 {
return nil, nil
}
var pinDownPeer *metapb.Peer

isUnhealthyPeer := func(id uint64) bool {
for _, downPeer := range region.GetDownPeers() {
if downPeer.Peer.GetId() == id {
Expand All @@ -461,31 +461,41 @@ func (c *RuleChecker) fixOrphanPeers(region *core.RegionInfo, fit *placement.Reg
}
return false
}

isDisconnectedPeer := func(p *metapb.Peer) bool {
// avoid to meet down store when fix orphan peers,
// Isdisconnected is more strictly than IsUnhealthy.
return c.cluster.GetStore(p.GetStoreId()).IsDisconnected()
}

checkDownPeer := func(peers []*metapb.Peer) (*metapb.Peer, bool) {
for _, p := range peers {
if isUnhealthyPeer(p.GetId()) {
// make sure is down peer.
if region.GetDownPeer(p.GetId()) != nil {
return p, true
}
return nil, true
}
if isDisconnectedPeer(p) {
return p, true
}
}
return nil, false
}

// remove orphan peers only when all rules are satisfied (count+role) and all peers selected
// by RuleFits is not pending or down.
var pinDownPeer *metapb.Peer
hasUnhealthyFit := false
loopFits:
for _, rf := range fit.RuleFits {
if !rf.IsSatisfied() {
hasUnhealthyFit = true
break
}
for _, p := range rf.Peers {
if isUnhealthyPeer(p.GetId()) {
// make sure is down peer.
if region.GetDownPeer(p.GetId()) != nil {
pinDownPeer = p
}
hasUnhealthyFit = true
break loopFits
}
// avoid to meet down store when fix orpahn peers,
// Isdisconnected is more strictly than IsUnhealthy.
if c.cluster.GetStore(p.GetStoreId()).IsDisconnected() {
hasUnhealthyFit = true
pinDownPeer = p
break loopFits
}
pinDownPeer, hasUnhealthyFit = checkDownPeer(rf.Peers)
if hasUnhealthyFit {
break
}
}

Expand All @@ -502,15 +512,15 @@ loopFits:
continue
}
// make sure the orphan peer is healthy.
if isUnhealthyPeer(orphanPeer.GetId()) {
if isUnhealthyPeer(orphanPeer.GetId()) || isDisconnectedPeer(orphanPeer) {
continue
}
// no consider witness in this path.
if pinDownPeer.GetIsWitness() || orphanPeer.GetIsWitness() {
continue
}
// down peer's store should be down.
if !c.isStoreDownTimeHitMaxDownTime(pinDownPeer.GetStoreId()) {
// down peer's store should be disconnected
if !isDisconnectedPeer(pinDownPeer) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use isDisconnectedPeer rather than isStoreDownTimeHitMaxDownTime, because we use stricter check in isDisconnectedPeer

continue
}
// check if down peer can replace with orphan peer.
Expand All @@ -525,7 +535,7 @@ loopFits:
case orphanPeerRole == metapb.PeerRole_Voter && destRole == metapb.PeerRole_Learner:
return operator.CreateDemoteLearnerOperatorAndRemovePeer("replace-down-peer-with-orphan-peer", c.cluster, region, orphanPeer, pinDownPeer)
case orphanPeerRole == metapb.PeerRole_Voter && destRole == metapb.PeerRole_Voter &&
c.cluster.GetStore(pinDownPeer.GetStoreId()).IsDisconnected() && !dstStore.IsDisconnected():
isDisconnectedPeer(pinDownPeer) && !dstStore.IsDisconnected():
return operator.CreateRemovePeerOperator("remove-replaced-orphan-peer", c.cluster, 0, region, pinDownPeer.GetStoreId())
default:
// destRole should not same with orphanPeerRole. if role is same, it fit with orphanPeer should be better than now.
Expand All @@ -541,8 +551,12 @@ loopFits:
hasHealthPeer := false
for _, orphanPeer := range fit.OrphanPeers {
if isUnhealthyPeer(orphanPeer.GetId()) {
ruleCheckerRemoveOrphanPeerCounter.Inc()
return operator.CreateRemovePeerOperator("remove-orphan-peer", c.cluster, 0, region, orphanPeer.StoreId)
checkerCounter.WithLabelValues("rule_checker", "remove-orphan-peer").Inc()
return operator.CreateRemovePeerOperator("remove-unhealthy-orphan-peer", c.cluster, 0, region, orphanPeer.StoreId)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use "remove-unhealthy-orphan-peer" and "remove-disconnected-orphan-peer" to distinguish

}
if isDisconnectedPeer(orphanPeer) {
checkerCounter.WithLabelValues("rule_checker", "remove-orphan-peer").Inc()
return operator.CreateRemovePeerOperator("remove-disconnected-orphan-peer", c.cluster, 0, region, orphanPeer.StoreId)
}
if hasHealthPeer {
// there already exists a healthy orphan peer, so we can remove other orphan Peers.
Expand Down
71 changes: 69 additions & 2 deletions pkg/schedule/checker/rule_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (suite *ruleCheckerTestSuite) TestFixToManyOrphanPeers() {
suite.cluster.PutRegion(region)
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("remove-orphan-peer", op.Desc())
suite.Equal("remove-unhealthy-orphan-peer", op.Desc())
suite.Equal(uint64(4), op.Step(0).(operator.RemovePeer).FromStore)
}

Expand Down Expand Up @@ -702,7 +702,7 @@ func (suite *ruleCheckerTestSuite) TestPriorityFixOrphanPeer() {
suite.cluster.PutRegion(testRegion)
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("remove-orphan-peer", op.Desc())
suite.Equal("remove-unhealthy-orphan-peer", op.Desc())
suite.IsType(remove, op.Step(0))
// Ref #3521
suite.cluster.SetStoreOffline(2)
Expand All @@ -723,6 +723,73 @@ func (suite *ruleCheckerTestSuite) TestPriorityFixOrphanPeer() {
suite.Equal("remove-orphan-peer", op.Desc())
}

// Ref https://github.com/tikv/pd/issues/7249 https://github.com/tikv/tikv/issues/15799
func (suite *ruleCheckerTestSuite) TestFixOrphanPeerWithDisconnectedStoreAndRuleChanged() {
// init cluster with 5 replicas
suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})
suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"})
suite.cluster.AddLabelsStore(5, 1, map[string]string{"host": "host5"})
storeIDs := []uint64{1, 2, 3, 4, 5}
suite.cluster.AddLeaderRegionWithRange(1, "", "", storeIDs[0], storeIDs[1:]...)
rule := &placement.Rule{
GroupID: "pd",
ID: "default",
Role: placement.Voter,
Count: 5,
StartKey: []byte{},
EndKey: []byte{},
}
suite.ruleManager.SetRule(rule)
op := suite.rc.Check(suite.cluster.GetRegion(1))
suite.Nil(op)

// set store 1, 2 to disconnected
suite.cluster.SetStoreDisconnect(1)
suite.cluster.SetStoreDisconnect(2)

// change rule to 3 replicas
rule = &placement.Rule{
GroupID: "pd",
ID: "default",
Role: placement.Voter,
Count: 3,
StartKey: []byte{},
EndKey: []byte{},
Override: true,
}
suite.ruleManager.SetRule(rule)

// remove store 1 from region 1
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("remove-replaced-orphan-peer", op.Desc())
newLeaderID := op.Step(0).(operator.TransferLeader).ToStore
removedPeerID := op.Step(1).(operator.RemovePeer).FromStore
r1 := suite.cluster.GetRegion(1)
r1 = r1.Clone(
core.WithLeader(r1.GetPeer(newLeaderID)),
core.WithRemoveStorePeer(removedPeerID))
suite.cluster.PutRegion(r1)
r1 = suite.cluster.GetRegion(1)
suite.Len(r1.GetPeers(), 4)

// remove store 2 from region 1
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("remove-replaced-orphan-peer", op.Desc())
removedPeerID = op.Step(0).(operator.RemovePeer).FromStore
r1 = r1.Clone(core.WithRemoveStorePeer(removedPeerID))
suite.cluster.PutRegion(r1)
r1 = suite.cluster.GetRegion(1)
suite.Len(r1.GetPeers(), 3)
for _, p := range r1.GetPeers() {
suite.NotEqual(p.GetStoreId(), 1)
suite.NotEqual(p.GetStoreId(), 2)
}
}

func (suite *ruleCheckerTestSuite) TestPriorityFitHealthWithDifferentRole1() {
suite.cluster.SetEnableUseJointConsensus(true)
suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
Expand Down