From cf1d5accdacb8e7210836e242ed00cc4a42645cf Mon Sep 17 00:00:00 2001 From: nolouch Date: Fri, 21 Jul 2023 19:00:01 +0800 Subject: [PATCH] rule_checker: can replace unhealthPeer with orphanPeer Signed-off-by: nolouch --- Makefile | 2 +- client/Makefile | 2 +- pkg/schedule/checker/rule_checker.go | 50 ++++++++- pkg/schedule/checker/rule_checker_test.go | 121 +++++++++++++++++++++- pkg/schedule/operator/builder.go | 5 +- pkg/schedule/operator/create_operator.go | 19 ++++ pkg/schedule/operator/operator.go | 2 +- pkg/schedule/placement/fit.go | 3 + tests/integrations/client/Makefile | 2 +- tests/integrations/mcs/Makefile | 2 +- tests/integrations/tso/Makefile | 2 +- 11 files changed, 198 insertions(+), 12 deletions(-) diff --git a/Makefile b/Makefile index f412c998c69..94357321eab 100644 --- a/Makefile +++ b/Makefile @@ -149,7 +149,7 @@ static: install-tools @ echo "gofmt ..." @ gofmt -s -l -d $(PACKAGE_DIRECTORIES) 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }' @ echo "golangci-lint ..." - @ golangci-lint run --verbose $(PACKAGE_DIRECTORIES) + @ golangci-lint run --verbose $(PACKAGE_DIRECTORIES) --allow-parallel-runners @ echo "revive ..." @ revive -formatter friendly -config revive.toml $(PACKAGES) diff --git a/client/Makefile b/client/Makefile index fd34ef157bf..dae53222d92 100644 --- a/client/Makefile +++ b/client/Makefile @@ -34,7 +34,7 @@ static: install-tools @ echo "gofmt ..." @ gofmt -s -l -d . 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }' @ echo "golangci-lint ..." - @ golangci-lint run -c ../.golangci.yml --verbose ./... + @ golangci-lint run -c ../.golangci.yml --verbose ./... --allow-parallel-runners @ echo "revive ..." @ revive -formatter friendly -config ../revive.toml ./... diff --git a/pkg/schedule/checker/rule_checker.go b/pkg/schedule/checker/rule_checker.go index a7f9ba63b11..52ee4c66be0 100644 --- a/pkg/schedule/checker/rule_checker.go +++ b/pkg/schedule/checker/rule_checker.go @@ -76,6 +76,7 @@ var ( ruleCheckerMoveToBetterLocationCounter = checkerCounter.WithLabelValues(ruleChecker, "move-to-better-location") ruleCheckerSkipRemoveOrphanPeerCounter = checkerCounter.WithLabelValues(ruleChecker, "skip-remove-orphan-peer") ruleCheckerRemoveOrphanPeerCounter = checkerCounter.WithLabelValues(ruleChecker, "remove-orphan-peer") + ruleCheckerReplaceOrphanPeerCounter = checkerCounter.WithLabelValues(ruleChecker, "replace-orphan-peer") ) // RuleChecker fix/improve region by placement rules. @@ -426,14 +427,16 @@ func (c *RuleChecker) fixOrphanPeers(region *core.RegionInfo, fit *placement.Reg if len(fit.OrphanPeers) == 0 { return nil, nil } + var pinDownPeer *pdpb.PeerStats isUnhealthyPeer := func(id uint64) bool { - for _, pendingPeer := range region.GetPendingPeers() { - if pendingPeer.GetId() == id { + for _, downPeer := range region.GetDownPeers() { + if downPeer.Peer.GetId() == id { + pinDownPeer = downPeer return true } } - for _, downPeer := range region.GetDownPeers() { - if downPeer.Peer.GetId() == id { + for _, pendingPeer := range region.GetPendingPeers() { + if pendingPeer.GetId() == id { return true } } @@ -455,11 +458,46 @@ loopFits: } } } + // If hasUnhealthyFit is false, it is safe to delete the OrphanPeer. if !hasUnhealthyFit { ruleCheckerRemoveOrphanPeerCounter.Inc() return operator.CreateRemovePeerOperator("remove-orphan-peer", c.cluster, 0, region, fit.OrphanPeers[0].StoreId) } + + // try to use orphan peers to replace unhealthy down peers. + for _, orphanPeer := range fit.OrphanPeers { + if isUnhealthyPeer(orphanPeer.GetId()) { + continue + } + if pinDownPeer != nil { + // no consider witness in this path. + if pinDownPeer.GetPeer().GetIsWitness() || orphanPeer.GetIsWitness() { + continue + } + // store should be down. + if !c.isStoreDownTimeHitMaxDownTime(pinDownPeer.GetPeer().GetStoreId()) { + continue + } + // check if down peer can replace with orphan peer. + ruleCheckerReplaceOrphanPeerCounter.Inc() + dstStore := c.cluster.GetStore(orphanPeer.GetStoreId()) + if fit.Replace(pinDownPeer.GetPeer().GetStoreId(), dstStore) { + destRole := pinDownPeer.GetPeer().Role + orphanPeerRole := orphanPeer.GetRole() + switch { + case orphanPeerRole == metapb.PeerRole_Learner && destRole == metapb.PeerRole_Voter: + return operator.CreatePromoteLearnerOperatorAndRemovePeer("replace-down-peer-with-orphan-peer", c.cluster, region, orphanPeer, pinDownPeer.GetPeer()) + case orphanPeerRole == metapb.PeerRole_Voter && destRole == metapb.PeerRole_Learner: + return operator.CreateDemoteLearnerOperatorAndRemovePeer("replace-down-peer-with-orphan-peer", c.cluster, region, orphanPeer, pinDownPeer.GetPeer()) + default: + // destRole should not same with orphanPeerRole. if role is same, it fit with orphanPeer should be better than now. + // destRole never be leader, so we not consider it. + } + } + } + } + // If hasUnhealthyFit is true, try to remove unhealthy orphan peers only if number of OrphanPeers is >= 2. // Ref https://github.com/tikv/pd/issues/4045 if len(fit.OrphanPeers) >= 2 { @@ -498,6 +536,10 @@ func (c *RuleChecker) isDownPeer(region *core.RegionInfo, peer *metapb.Peer) boo func (c *RuleChecker) isStoreDownTimeHitMaxDownTime(storeID uint64) bool { store := c.cluster.GetStore(storeID) + if store == nil { + log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID)) + return false + } return store.DownTime() >= c.cluster.GetCheckerConfig().GetMaxStoreDownTime() } diff --git a/pkg/schedule/checker/rule_checker_test.go b/pkg/schedule/checker/rule_checker_test.go index f01378249a6..f3d99626059 100644 --- a/pkg/schedule/checker/rule_checker_test.go +++ b/pkg/schedule/checker/rule_checker_test.go @@ -362,7 +362,6 @@ func (suite *ruleCheckerTestSuite) TestFixRuleWitness() { op := suite.rc.Check(suite.cluster.GetRegion(1)) suite.NotNil(op) suite.Equal("add-rule-peer", op.Desc()) - fmt.Println(op) suite.Equal(uint64(3), op.Step(0).(operator.AddLearner).ToStore) suite.True(op.Step(0).(operator.AddLearner).IsWitness) } @@ -686,6 +685,126 @@ func (suite *ruleCheckerTestSuite) TestPriorityFixOrphanPeer() { suite.Equal("remove-orphan-peer", op.Desc()) } +func (suite *ruleCheckerTestSuite) TestPriorityFitHealthWithDifferentRole1() { + suite.cluster.SetEnableUseJointConsensus(true) + suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"}) + suite.cluster.AddRegionWithLearner(1, 1, []uint64{2, 3}, []uint64{4}) + r1 := suite.cluster.GetRegion(1) + suite.cluster.GetStore(3).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano() + + // set peer3 to pending and down + r1 = r1.Clone(core.WithPendingPeers([]*metapb.Peer{r1.GetPeer(3)})) + r1 = r1.Clone(core.WithDownPeers([]*pdpb.PeerStats{ + { + Peer: r1.GetStorePeer(3), + DownSeconds: 30000, + }, + })) + suite.cluster.PutRegion(r1) + + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Equal(uint64(3), op.Step(0).(operator.ChangePeerV2Enter).DemoteVoters[0].ToStore) + suite.Equal(uint64(4), op.Step(0).(operator.ChangePeerV2Enter).PromoteLearners[0].ToStore) + suite.Equal(uint64(3), op.Step(1).(operator.ChangePeerV2Leave).DemoteVoters[0].ToStore) + suite.Equal(uint64(4), op.Step(1).(operator.ChangePeerV2Leave).PromoteLearners[0].ToStore) + suite.Equal("replace-down-peer-with-orphan-peer", op.Desc()) +} + +func (suite *ruleCheckerTestSuite) TestPriorityFitHealthWithDifferentRole2() { + suite.cluster.SetEnableUseJointConsensus(true) + suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"}) + suite.cluster.AddLabelsStore(5, 1, map[string]string{"host": "host5"}) + suite.cluster.AddLeaderRegion(1, 1, 2, 3, 4, 5) + r1 := suite.cluster.GetRegion(1) + + // set peer3 to pending and down, and peer 3 to learner, and store 3 is down + suite.cluster.GetStore(3).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano() + r1 = r1.Clone(core.WithLearners([]*metapb.Peer{r1.GetPeer(3)})) + r1 = r1.Clone( + core.WithPendingPeers([]*metapb.Peer{r1.GetPeer(3)}), + core.WithDownPeers([]*pdpb.PeerStats{ + { + Peer: r1.GetStorePeer(3), + DownSeconds: 30000, + }, + }), + ) + suite.cluster.PutRegion(r1) + + // default and test group => 3 voter + 1 learner + err := suite.ruleManager.SetRule(&placement.Rule{ + GroupID: "test", + ID: "10", + Role: placement.Learner, + Count: 1, + }) + suite.NoError(err) + + op := suite.rc.Check(suite.cluster.GetRegion(1)) + suite.Equal(uint64(5), op.Step(0).(operator.ChangePeerV2Enter).DemoteVoters[0].ToStore) + suite.Equal(uint64(3), op.Step(1).(operator.RemovePeer).FromStore) + suite.Equal("replace-down-peer-with-orphan-peer", op.Desc()) +} + +func (suite *ruleCheckerTestSuite) TestPriorityFitHealthPeersAndTiFlash() { + suite.cluster.SetEnableUseJointConsensus(true) + suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) + suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"}) + suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"}) + suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4", "engine": "tiflash"}) + suite.cluster.AddRegionWithLearner(1, 1, []uint64{2, 3}, []uint64{4}) + rule := &placement.Rule{ + GroupID: "pd", + ID: "test", + Role: placement.Voter, + Count: 3, + } + rule2 := &placement.Rule{ + GroupID: "pd", + ID: "test2", + Role: placement.Learner, + Count: 1, + LabelConstraints: []placement.LabelConstraint{ + { + Key: "engine", + Op: placement.In, + Values: []string{"tiflash"}, + }, + }, + } + suite.ruleManager.SetRule(rule) + suite.ruleManager.SetRule(rule2) + suite.ruleManager.DeleteRule("pd", "default") + + r1 := suite.cluster.GetRegion(1) + // set peer3 to pending and down + r1 = r1.Clone(core.WithPendingPeers([]*metapb.Peer{r1.GetPeer(3)})) + r1 = r1.Clone(core.WithDownPeers([]*pdpb.PeerStats{ + { + Peer: r1.GetStorePeer(3), + DownSeconds: 30000, + }, + })) + suite.cluster.PutRegion(r1) + suite.cluster.GetStore(3).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano() + + op := suite.rc.Check(suite.cluster.GetRegion(1)) + // should not promote tiflash peer + suite.Nil(op) + + // scale a node, can replace the down peer + suite.cluster.AddLabelsStore(5, 1, map[string]string{"host": "host5"}) + op = suite.rc.Check(suite.cluster.GetRegion(1)) + suite.NotNil(op) + suite.Equal("fast-replace-rule-down-peer", op.Desc()) +} + func (suite *ruleCheckerTestSuite) TestIssue3293() { suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"}) suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host1"}) diff --git a/pkg/schedule/operator/builder.go b/pkg/schedule/operator/builder.go index d197ce3b3aa..8f2936b946e 100644 --- a/pkg/schedule/operator/builder.go +++ b/pkg/schedule/operator/builder.go @@ -393,7 +393,6 @@ func (b *Builder) Build(kind OpKind) (*Operator, error) { if brief, b.err = b.prepareBuild(); b.err != nil { return nil, b.err } - if b.useJointConsensus { kind, b.err = b.buildStepsWithJointConsensus(kind) } else { @@ -539,6 +538,10 @@ func (b *Builder) brief() string { return fmt.Sprintf("%s: store %s to %s", op, b.toRemove, b.toAdd) case len(b.toAdd) > 0: return fmt.Sprintf("add peer: store %s", b.toAdd) + case len(b.toRemove) > 0 && len(b.toPromote) > 0: + return fmt.Sprintf("promote peer: store %s, rm peer: store %s", b.toRemove, b.toPromote) + case len(b.toRemove) > 0 && len(b.toDemote) > 0: + return fmt.Sprintf("demote peer: store %s, rm peer: store %s", b.toDemote, b.toRemove) case len(b.toRemove) > 0: return fmt.Sprintf("rm peer: store %s", b.toRemove) case len(b.toPromote) > 0: diff --git a/pkg/schedule/operator/create_operator.go b/pkg/schedule/operator/create_operator.go index f6de48691fc..7b7c0d9c1fc 100644 --- a/pkg/schedule/operator/create_operator.go +++ b/pkg/schedule/operator/create_operator.go @@ -51,6 +51,25 @@ func CreatePromoteLearnerOperator(desc string, ci sche.SharedCluster, region *co Build(0) } +// CreatePromoteLearnerOperatorAndRemovePeer creates an operator that promotes a learner and removes a peer. +func CreatePromoteLearnerOperatorAndRemovePeer(desc string, ci sche.SharedCluster, region *core.RegionInfo, toPromote *metapb.Peer, toRemove *metapb.Peer) (*Operator, error) { + return NewBuilder(desc, ci, region). + PromoteLearner(toPromote.GetStoreId()). + RemovePeer(toRemove.GetStoreId()). + Build(0) +} + +// CreateDemoteLearnerOperatorAndRemovePeer creates an operator that demotes a learner and removes a peer. +func CreateDemoteLearnerOperatorAndRemovePeer(desc string, ci sche.SharedCluster, region *core.RegionInfo, toDemote *metapb.Peer, toRemove *metapb.Peer) (*Operator, error) { + if !ci.GetSharedConfig().IsUseJointConsensus() { + return nil, errors.Errorf("cannot build demote learner operator for region which is not in joint state") + } + return NewBuilder(desc, ci, region). + DemoteVoter(toDemote.GetStoreId()). + RemovePeer(toRemove.GetStoreId()). + Build(0) +} + // CreateRemovePeerOperator creates an operator that removes a peer from region. func CreateRemovePeerOperator(desc string, ci sche.SharedCluster, kind OpKind, region *core.RegionInfo, storeID uint64) (*Operator, error) { return NewBuilder(desc, ci, region). diff --git a/pkg/schedule/operator/operator.go b/pkg/schedule/operator/operator.go index d841c785b59..7f5f77f9b5c 100644 --- a/pkg/schedule/operator/operator.go +++ b/pkg/schedule/operator/operator.go @@ -127,7 +127,7 @@ func (o *Operator) Sync(other *Operator) { func (o *Operator) String() string { stepStrs := make([]string, len(o.steps)) for i := range o.steps { - stepStrs[i] = o.steps[i].String() + stepStrs[i] = fmt.Sprintf("step-%d:{%s}", i, o.steps[i].String()) } s := fmt.Sprintf("%s {%s} (kind:%s, region:%v(%v, %v), createAt:%s, startAt:%s, currentStep:%v, size:%d, steps:[%s], timeout:[%s])", o.desc, o.brief, o.kind, o.regionID, o.regionEpoch.GetVersion(), o.regionEpoch.GetConfVer(), o.GetCreateTime(), diff --git a/pkg/schedule/placement/fit.go b/pkg/schedule/placement/fit.go index 7d447a72926..45afc5bcfa3 100644 --- a/pkg/schedule/placement/fit.go +++ b/pkg/schedule/placement/fit.go @@ -37,6 +37,9 @@ type RegionFit struct { // Replace return true if the replacement store is fit all constraints and isolation score is not less than the origin. func (f *RegionFit) Replace(srcStoreID uint64, dstStore *core.StoreInfo) bool { + if dstStore == nil { + return false + } fit := f.getRuleFitByStoreID(srcStoreID) // check the target store is fit all constraints. if fit == nil { diff --git a/tests/integrations/client/Makefile b/tests/integrations/client/Makefile index 2d0cf748599..4b6f3336151 100644 --- a/tests/integrations/client/Makefile +++ b/tests/integrations/client/Makefile @@ -21,7 +21,7 @@ static: install-tools @ echo "gofmt ..." @ gofmt -s -l -d . 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }' @ echo "golangci-lint ..." - @ golangci-lint run -c $(ROOT_PATH)/.golangci.yml --verbose ./... + @ golangci-lint run -c $(ROOT_PATH)/.golangci.yml --verbose ./... --allow-parallel-runners @ echo "revive ..." @ revive -formatter friendly -config $(ROOT_PATH)/revive.toml ./... diff --git a/tests/integrations/mcs/Makefile b/tests/integrations/mcs/Makefile index 11862fc9e6c..01e63b5baec 100644 --- a/tests/integrations/mcs/Makefile +++ b/tests/integrations/mcs/Makefile @@ -21,7 +21,7 @@ static: install-tools @ echo "gofmt ..." @ gofmt -s -l -d . 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }' @ echo "golangci-lint ..." - @ golangci-lint run -c $(ROOT_PATH)/.golangci.yml --verbose ./... + @ golangci-lint run -c $(ROOT_PATH)/.golangci.yml --verbose ./... --allow-parallel-runners @ echo "revive ..." @ revive -formatter friendly -config $(ROOT_PATH)/revive.toml ./... diff --git a/tests/integrations/tso/Makefile b/tests/integrations/tso/Makefile index 25896ca50e4..e353f686fe7 100644 --- a/tests/integrations/tso/Makefile +++ b/tests/integrations/tso/Makefile @@ -21,7 +21,7 @@ static: install-tools @ echo "gofmt ..." @ gofmt -s -l -d . 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }' @ echo "golangci-lint ..." - @ golangci-lint run -c $(ROOT_PATH)/.golangci.yml --verbose ./... + @ golangci-lint run -c $(ROOT_PATH)/.golangci.yml --verbose ./... --allow-parallel-runners @ echo "revive ..." @ revive -formatter friendly -config $(ROOT_PATH)/revive.toml ./...