diff --git a/pkg/core/store.go b/pkg/core/store.go index 1d3362cac0e..b3c62f45750 100644 --- a/pkg/core/store.go +++ b/pkg/core/store.go @@ -551,6 +551,9 @@ var ( // tikv's store heartbeat for a short time, maybe caused by process restart or // temporary network failure. func (s *StoreInfo) IsDisconnected() bool { + if s == nil { + return true + } return s.DownTime() > storeDisconnectDuration } diff --git a/pkg/schedule/checker/rule_checker.go b/pkg/schedule/checker/rule_checker.go index 7012359ca36..84cafaa871e 100644 --- a/pkg/schedule/checker/rule_checker.go +++ b/pkg/schedule/checker/rule_checker.go @@ -447,7 +447,7 @@ func (c *RuleChecker) fixOrphanPeers(region *core.RegionInfo, fit *placement.Reg if len(fit.OrphanPeers) == 0 { return nil, nil } - var pinDownPeer *metapb.Peer + isUnhealthyPeer := func(id uint64) bool { for _, downPeer := range region.GetDownPeers() { if downPeer.Peer.GetId() == id { @@ -461,31 +461,41 @@ func (c *RuleChecker) fixOrphanPeers(region *core.RegionInfo, fit *placement.Reg } return false } + + isDisconnectedPeer := func(p *metapb.Peer) bool { + // avoid to meet down store when fix orphan peers, + // Isdisconnected is more strictly than IsUnhealthy. + return c.cluster.GetStore(p.GetStoreId()).IsDisconnected() + } + + checkDownPeer := func(peers []*metapb.Peer) (*metapb.Peer, bool) { + for _, p := range peers { + if isUnhealthyPeer(p.GetId()) { + // make sure is down peer. + if region.GetDownPeer(p.GetId()) != nil { + return p, true + } + return nil, true + } + if isDisconnectedPeer(p) { + return p, true + } + } + return nil, false + } + // remove orphan peers only when all rules are satisfied (count+role) and all peers selected // by RuleFits is not pending or down. + var pinDownPeer *metapb.Peer hasUnhealthyFit := false -loopFits: for _, rf := range fit.RuleFits { if !rf.IsSatisfied() { hasUnhealthyFit = true break } - for _, p := range rf.Peers { - if isUnhealthyPeer(p.GetId()) { - // make sure is down peer. - if region.GetDownPeer(p.GetId()) != nil { - pinDownPeer = p - } - hasUnhealthyFit = true - break loopFits - } - // avoid to meet down store when fix orpahn peers, - // Isdisconnected is more strictly than IsUnhealthy. - if c.cluster.GetStore(p.GetStoreId()).IsDisconnected() { - hasUnhealthyFit = true - pinDownPeer = p - break loopFits - } + pinDownPeer, hasUnhealthyFit = checkDownPeer(rf.Peers) + if hasUnhealthyFit { + break } } @@ -502,15 +512,15 @@ loopFits: continue } // make sure the orphan peer is healthy. - if isUnhealthyPeer(orphanPeer.GetId()) { + if isUnhealthyPeer(orphanPeer.GetId()) || isDisconnectedPeer(orphanPeer) { continue } // no consider witness in this path. if pinDownPeer.GetIsWitness() || orphanPeer.GetIsWitness() { continue } - // down peer's store should be down. - if !c.isStoreDownTimeHitMaxDownTime(pinDownPeer.GetStoreId()) { + // down peer's store should be disconnected + if !isDisconnectedPeer(pinDownPeer) { continue } // check if down peer can replace with orphan peer. @@ -525,7 +535,7 @@ loopFits: case orphanPeerRole == metapb.PeerRole_Voter && destRole == metapb.PeerRole_Learner: return operator.CreateDemoteLearnerOperatorAndRemovePeer("replace-down-peer-with-orphan-peer", c.cluster, region, orphanPeer, pinDownPeer) case orphanPeerRole == metapb.PeerRole_Voter && destRole == metapb.PeerRole_Voter && - c.cluster.GetStore(pinDownPeer.GetStoreId()).IsDisconnected() && !dstStore.IsDisconnected(): + isDisconnectedPeer(pinDownPeer) && !dstStore.IsDisconnected(): return operator.CreateRemovePeerOperator("remove-replaced-orphan-peer", c.cluster, 0, region, pinDownPeer.GetStoreId()) default: // destRole should not same with orphanPeerRole. if role is same, it fit with orphanPeer should be better than now. @@ -542,7 +552,11 @@ loopFits: for _, orphanPeer := range fit.OrphanPeers { if isUnhealthyPeer(orphanPeer.GetId()) { ruleCheckerRemoveOrphanPeerCounter.Inc() - return operator.CreateRemovePeerOperator("remove-orphan-peer", c.cluster, 0, region, orphanPeer.StoreId) + return operator.CreateRemovePeerOperator("remove-unhealthy-orphan-peer", c.cluster, 0, region, orphanPeer.StoreId) + } + if isDisconnectedPeer(orphanPeer) { + ruleCheckerRemoveOrphanPeerCounter.Inc() + return operator.CreateRemovePeerOperator("remove-disconnected-orphan-peer", c.cluster, 0, region, orphanPeer.StoreId) } if hasHealthPeer { // there already exists a healthy orphan peer, so we can remove other orphan Peers. diff --git a/pkg/schedule/checker/rule_checker_test.go b/pkg/schedule/checker/rule_checker_test.go index 8ee3b1eccfa..0c4a2a9ecc9 100644 --- a/pkg/schedule/checker/rule_checker_test.go +++ b/pkg/schedule/checker/rule_checker_test.go @@ -235,7 +235,7 @@ func (suite *ruleCheckerTestSuite) TestFixToManyOrphanPeers() { suite.cluster.PutRegion(region) op = suite.rc.Check(suite.cluster.GetRegion(1)) suite.NotNil(op) - suite.Equal("remove-orphan-peer", op.Desc()) + suite.Equal("remove-unhealthy-orphan-peer", op.Desc()) suite.Equal(uint64(4), op.Step(0).(operator.RemovePeer).FromStore) } @@ -702,7 +702,7 @@ func (suite *ruleCheckerTestSuite) TestPriorityFixOrphanPeer() { suite.cluster.PutRegion(testRegion) op = suite.rc.Check(suite.cluster.GetRegion(1)) suite.NotNil(op) - suite.Equal("remove-orphan-peer", op.Desc()) + suite.Equal("remove-unhealthy-orphan-peer", op.Desc()) suite.IsType(remove, op.Step(0)) // Ref #3521 suite.cluster.SetStoreOffline(2) @@ -723,6 +723,178 @@ func (suite *ruleCheckerTestSuite) TestPriorityFixOrphanPeer() { suite.Equal("remove-orphan-peer", op.Desc()) } +// Ref https://github.com/tikv/pd/issues/7249 https://github.com/tikv/tikv/issues/15799 +func (suite *ruleCheckerTestSuite) TestFixOrphanPeerWithDisconnectedStoreAndRuleChanged() { + // init cluster with 5 replicas + suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"}) + suite.cluster.AddLabelsStore(5, 1, map[string]string{"host": "host5"}) + storeIDs := []uint64{1, 2, 3, 4, 5} + suite.cluster.AddLeaderRegionWithRange(1, "", "", storeIDs[0], storeIDs[1:]...) + rule := &placement.Rule{ + GroupID: "pd", + ID: "default", + Role: placement.Voter, + Count: 5, + StartKey: []byte{}, + EndKey: []byte{}, + } + suite.ruleManager.SetRule(rule) + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Nil(op) + + // set store 1, 2 to disconnected + suite.cluster.SetStoreDisconnect(1) + suite.cluster.SetStoreDisconnect(2) + + // change rule to 3 replicas + rule = &placement.Rule{ + GroupID: "pd", + ID: "default", + Role: placement.Voter, + Count: 3, + StartKey: []byte{}, + EndKey: []byte{}, + Override: true, + } + suite.ruleManager.SetRule(rule) + + // remove store 1 from region 1 + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("remove-replaced-orphan-peer", op.Desc()) + suite.Equal(op.Len(), 2) + newLeaderID := op.Step(0).(operator.TransferLeader).ToStore + removedPeerID := op.Step(1).(operator.RemovePeer).FromStore + r1 := suite.cluster.GetRegion(1) + r1 = r1.Clone( + core.WithLeader(r1.GetPeer(newLeaderID)), + core.WithRemoveStorePeer(removedPeerID)) + suite.cluster.PutRegion(r1) + r1 = suite.cluster.GetRegion(1) + suite.Len(r1.GetPeers(), 4) + + // remove store 2 from region 1 + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("remove-replaced-orphan-peer", op.Desc()) + suite.Equal(op.Len(), 1) + removedPeerID = op.Step(0).(operator.RemovePeer).FromStore + r1 = r1.Clone(core.WithRemoveStorePeer(removedPeerID)) + suite.cluster.PutRegion(r1) + r1 = suite.cluster.GetRegion(1) + suite.Len(r1.GetPeers(), 3) + for _, p := range r1.GetPeers() { + suite.NotEqual(p.GetStoreId(), 1) + suite.NotEqual(p.GetStoreId(), 2) + } +} + +// Ref https://github.com/tikv/pd/issues/7249 https://github.com/tikv/tikv/issues/15799 +func (suite *ruleCheckerTestSuite) TestFixOrphanPeerWithDisconnectedStoreAndRuleChanged2() { + // init cluster with 5 voters and 1 learner + suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"}) + suite.cluster.AddLabelsStore(5, 1, map[string]string{"host": "host5"}) + suite.cluster.AddLabelsStore(6, 1, map[string]string{"host": "host6"}) + storeIDs := []uint64{1, 2, 3, 4, 5} + suite.cluster.AddLeaderRegionWithRange(1, "", "", storeIDs[0], storeIDs[1:]...) + r1 := suite.cluster.GetRegion(1) + r1 = r1.Clone(core.WithAddPeer(&metapb.Peer{Id: 6, StoreId: 6, Role: metapb.PeerRole_Learner})) + suite.cluster.PutRegion(r1) + err := suite.ruleManager.SetRules([]*placement.Rule{ + { + GroupID: "pd", + ID: "default", + Index: 100, + Override: true, + Role: placement.Voter, + Count: 5, + IsWitness: false, + }, + { + GroupID: "pd", + ID: "r1", + Index: 100, + Override: false, + Role: placement.Learner, + Count: 1, + IsWitness: false, + }, + }) + suite.NoError(err) + + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Nil(op) + + // set store 1, 2 to disconnected + suite.cluster.SetStoreDisconnect(1) + suite.cluster.SetStoreDisconnect(2) + suite.cluster.SetStoreDisconnect(3) + + // change rule to 3 replicas + suite.ruleManager.DeleteRuleGroup("pd") + suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "pd", + ID: "default", + Role: placement.Voter, + Count: 2, + StartKey: []byte{}, + EndKey: []byte{}, + Override: true, + }) + + // remove store 1 from region 1 + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("remove-replaced-orphan-peer", op.Desc()) + suite.Equal(op.Len(), 2) + newLeaderID := op.Step(0).(operator.TransferLeader).ToStore + removedPeerID := op.Step(1).(operator.RemovePeer).FromStore + r1 = suite.cluster.GetRegion(1) + r1 = r1.Clone( + core.WithLeader(r1.GetPeer(newLeaderID)), + core.WithRemoveStorePeer(removedPeerID)) + suite.cluster.PutRegion(r1) + r1 = suite.cluster.GetRegion(1) + suite.Len(r1.GetPeers(), 5) + + // remove store 2 from region 1 + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("remove-replaced-orphan-peer", op.Desc()) + suite.Equal(op.Len(), 1) + removedPeerID = op.Step(0).(operator.RemovePeer).FromStore + r1 = r1.Clone(core.WithRemoveStorePeer(removedPeerID)) + suite.cluster.PutRegion(r1) + r1 = suite.cluster.GetRegion(1) + suite.Len(r1.GetPeers(), 4) + for _, p := range r1.GetPeers() { + fmt.Println(p.GetStoreId(), p.Role.String()) + } + + // remove store 3 from region 1 + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("remove-replaced-orphan-peer", op.Desc()) + suite.Equal(op.Len(), 1) + removedPeerID = op.Step(0).(operator.RemovePeer).FromStore + r1 = r1.Clone(core.WithRemoveStorePeer(removedPeerID)) + suite.cluster.PutRegion(r1) + r1 = suite.cluster.GetRegion(1) + suite.Len(r1.GetPeers(), 3) + + for _, p := range r1.GetPeers() { + suite.NotEqual(p.GetStoreId(), 1) + suite.NotEqual(p.GetStoreId(), 2) + suite.NotEqual(p.GetStoreId(), 3) + } +} + func (suite *ruleCheckerTestSuite) TestPriorityFitHealthWithDifferentRole1() { suite.cluster.SetEnableUseJointConsensus(true) suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})