Skip to content

Commit

Permalink
rule_checker: can replace unhealthPeer with orphanPeer (#6831)
Browse files Browse the repository at this point in the history
close #6559

add logic try to replace unhealthy peer with orphan peer

Signed-off-by: nolouch <nolouch@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
nolouch and ti-chi-bot[bot] authored Jul 26, 2023
1 parent de985b8 commit f916e90
Show file tree
Hide file tree
Showing 12 changed files with 226 additions and 24 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ static: install-tools
@ echo "gofmt ..."
@ gofmt -s -l -d $(PACKAGE_DIRECTORIES) 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }'
@ echo "golangci-lint ..."
@ golangci-lint run --verbose $(PACKAGE_DIRECTORIES)
@ golangci-lint run --verbose $(PACKAGE_DIRECTORIES) --allow-parallel-runners
@ echo "revive ..."
@ revive -formatter friendly -config revive.toml $(PACKAGES)

Expand Down
2 changes: 1 addition & 1 deletion client/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ static: install-tools
@ echo "gofmt ..."
@ gofmt -s -l -d . 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }'
@ echo "golangci-lint ..."
@ golangci-lint run -c ../.golangci.yml --verbose ./...
@ golangci-lint run -c ../.golangci.yml --verbose ./... --allow-parallel-runners
@ echo "revive ..."
@ revive -formatter friendly -config ../revive.toml ./...

Expand Down
54 changes: 50 additions & 4 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ var (
ruleCheckerMoveToBetterLocationCounter = checkerCounter.WithLabelValues(ruleChecker, "move-to-better-location")
ruleCheckerSkipRemoveOrphanPeerCounter = checkerCounter.WithLabelValues(ruleChecker, "skip-remove-orphan-peer")
ruleCheckerRemoveOrphanPeerCounter = checkerCounter.WithLabelValues(ruleChecker, "remove-orphan-peer")
ruleCheckerReplaceOrphanPeerCounter = checkerCounter.WithLabelValues(ruleChecker, "replace-orphan-peer")
)

// RuleChecker fix/improve region by placement rules.
Expand Down Expand Up @@ -426,14 +427,15 @@ func (c *RuleChecker) fixOrphanPeers(region *core.RegionInfo, fit *placement.Reg
if len(fit.OrphanPeers) == 0 {
return nil, nil
}
var pinDownPeer *metapb.Peer
isUnhealthyPeer := func(id uint64) bool {
for _, pendingPeer := range region.GetPendingPeers() {
if pendingPeer.GetId() == id {
for _, downPeer := range region.GetDownPeers() {
if downPeer.Peer.GetId() == id {
return true
}
}
for _, downPeer := range region.GetDownPeers() {
if downPeer.Peer.GetId() == id {
for _, pendingPeer := range region.GetPendingPeers() {
if pendingPeer.GetId() == id {
return true
}
}
Expand All @@ -450,16 +452,56 @@ loopFits:
}
for _, p := range rf.Peers {
if isUnhealthyPeer(p.GetId()) {
// make sure is down peer.
if region.GetDownPeer(p.GetId()) != nil {
pinDownPeer = p
}
hasUnhealthyFit = true
break loopFits
}
}
}

// If hasUnhealthyFit is false, it is safe to delete the OrphanPeer.
if !hasUnhealthyFit {
ruleCheckerRemoveOrphanPeerCounter.Inc()
return operator.CreateRemovePeerOperator("remove-orphan-peer", c.cluster, 0, region, fit.OrphanPeers[0].StoreId)
}

// try to use orphan peers to replace unhealthy down peers.
for _, orphanPeer := range fit.OrphanPeers {
if pinDownPeer != nil {
// make sure the orphan peer is healthy.
if isUnhealthyPeer(orphanPeer.GetId()) {
continue
}
// no consider witness in this path.
if pinDownPeer.GetIsWitness() || orphanPeer.GetIsWitness() {
continue
}
// down peer's store should be down.
if !c.isStoreDownTimeHitMaxDownTime(pinDownPeer.GetStoreId()) {
continue
}
// check if down peer can replace with orphan peer.
dstStore := c.cluster.GetStore(orphanPeer.GetStoreId())
if fit.Replace(pinDownPeer.GetStoreId(), dstStore) {
destRole := pinDownPeer.GetRole()
orphanPeerRole := orphanPeer.GetRole()
ruleCheckerReplaceOrphanPeerCounter.Inc()
switch {
case orphanPeerRole == metapb.PeerRole_Learner && destRole == metapb.PeerRole_Voter:
return operator.CreatePromoteLearnerOperatorAndRemovePeer("replace-down-peer-with-orphan-peer", c.cluster, region, orphanPeer, pinDownPeer)
case orphanPeerRole == metapb.PeerRole_Voter && destRole == metapb.PeerRole_Learner:
return operator.CreateDemoteLearnerOperatorAndRemovePeer("replace-down-peer-with-orphan-peer", c.cluster, region, orphanPeer, pinDownPeer)
default:
// destRole should not same with orphanPeerRole. if role is same, it fit with orphanPeer should be better than now.
// destRole never be leader, so we not consider it.
}
}
}
}

// If hasUnhealthyFit is true, try to remove unhealthy orphan peers only if number of OrphanPeers is >= 2.
// Ref https://github.com/tikv/pd/issues/4045
if len(fit.OrphanPeers) >= 2 {
Expand Down Expand Up @@ -498,6 +540,10 @@ func (c *RuleChecker) isDownPeer(region *core.RegionInfo, peer *metapb.Peer) boo

func (c *RuleChecker) isStoreDownTimeHitMaxDownTime(storeID uint64) bool {
store := c.cluster.GetStore(storeID)
if store == nil {
log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID))
return false
}
return store.DownTime() >= c.cluster.GetCheckerConfig().GetMaxStoreDownTime()
}

Expand Down
127 changes: 126 additions & 1 deletion pkg/schedule/checker/rule_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,6 @@ func (suite *ruleCheckerTestSuite) TestFixRuleWitness() {
op := suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("add-rule-peer", op.Desc())
fmt.Println(op)
suite.Equal(uint64(3), op.Step(0).(operator.AddLearner).ToStore)
suite.True(op.Step(0).(operator.AddLearner).IsWitness)
}
Expand Down Expand Up @@ -686,6 +685,132 @@ func (suite *ruleCheckerTestSuite) TestPriorityFixOrphanPeer() {
suite.Equal("remove-orphan-peer", op.Desc())
}

func (suite *ruleCheckerTestSuite) TestPriorityFitHealthWithDifferentRole1() {
suite.cluster.SetEnableUseJointConsensus(true)
suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})
suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"})
suite.cluster.AddRegionWithLearner(1, 1, []uint64{2, 3}, []uint64{4})
r1 := suite.cluster.GetRegion(1)
suite.cluster.GetStore(3).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano()

// set peer3 to pending and down
r1 = r1.Clone(core.WithPendingPeers([]*metapb.Peer{r1.GetPeer(3)}))
r1 = r1.Clone(core.WithDownPeers([]*pdpb.PeerStats{
{
Peer: r1.GetStorePeer(3),
DownSeconds: 30000,
},
}))
suite.cluster.PutRegion(r1)

op := suite.rc.Check(suite.cluster.GetRegion(1))
suite.Equal(uint64(3), op.Step(0).(operator.ChangePeerV2Enter).DemoteVoters[0].ToStore)
suite.Equal(uint64(4), op.Step(0).(operator.ChangePeerV2Enter).PromoteLearners[0].ToStore)
suite.Equal(uint64(3), op.Step(1).(operator.ChangePeerV2Leave).DemoteVoters[0].ToStore)
suite.Equal(uint64(4), op.Step(1).(operator.ChangePeerV2Leave).PromoteLearners[0].ToStore)
suite.Equal("replace-down-peer-with-orphan-peer", op.Desc())

// set peer3 only pending
r1 = r1.Clone(core.WithDownPeers(nil))
suite.cluster.PutRegion(r1)
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.Nil(op)
}

func (suite *ruleCheckerTestSuite) TestPriorityFitHealthWithDifferentRole2() {
suite.cluster.SetEnableUseJointConsensus(true)
suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})
suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"})
suite.cluster.AddLabelsStore(5, 1, map[string]string{"host": "host5"})
suite.cluster.AddLeaderRegion(1, 1, 2, 3, 4, 5)
r1 := suite.cluster.GetRegion(1)

// set peer3 to pending and down, and peer 3 to learner, and store 3 is down
suite.cluster.GetStore(3).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano()
r1 = r1.Clone(core.WithLearners([]*metapb.Peer{r1.GetPeer(3)}))
r1 = r1.Clone(
core.WithPendingPeers([]*metapb.Peer{r1.GetPeer(3)}),
core.WithDownPeers([]*pdpb.PeerStats{
{
Peer: r1.GetStorePeer(3),
DownSeconds: 30000,
},
}),
)
suite.cluster.PutRegion(r1)

// default and test group => 3 voter + 1 learner
err := suite.ruleManager.SetRule(&placement.Rule{
GroupID: "test",
ID: "10",
Role: placement.Learner,
Count: 1,
})
suite.NoError(err)

op := suite.rc.Check(suite.cluster.GetRegion(1))
suite.Equal(uint64(5), op.Step(0).(operator.ChangePeerV2Enter).DemoteVoters[0].ToStore)
suite.Equal(uint64(3), op.Step(1).(operator.RemovePeer).FromStore)
suite.Equal("replace-down-peer-with-orphan-peer", op.Desc())
}

func (suite *ruleCheckerTestSuite) TestPriorityFitHealthPeersAndTiFlash() {
suite.cluster.SetEnableUseJointConsensus(true)
suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})
suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4", "engine": "tiflash"})
suite.cluster.AddRegionWithLearner(1, 1, []uint64{2, 3}, []uint64{4})
rule := &placement.Rule{
GroupID: "pd",
ID: "test",
Role: placement.Voter,
Count: 3,
}
rule2 := &placement.Rule{
GroupID: "pd",
ID: "test2",
Role: placement.Learner,
Count: 1,
LabelConstraints: []placement.LabelConstraint{
{
Key: "engine",
Op: placement.In,
Values: []string{"tiflash"},
},
},
}
suite.ruleManager.SetRule(rule)
suite.ruleManager.SetRule(rule2)
suite.ruleManager.DeleteRule("pd", "default")

r1 := suite.cluster.GetRegion(1)
// set peer3 to pending and down
r1 = r1.Clone(core.WithPendingPeers([]*metapb.Peer{r1.GetPeer(3)}))
r1 = r1.Clone(core.WithDownPeers([]*pdpb.PeerStats{
{
Peer: r1.GetStorePeer(3),
DownSeconds: 30000,
},
}))
suite.cluster.PutRegion(r1)
suite.cluster.GetStore(3).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano()

op := suite.rc.Check(suite.cluster.GetRegion(1))
// should not promote tiflash peer
suite.Nil(op)

// scale a node, can replace the down peer
suite.cluster.AddLabelsStore(5, 1, map[string]string{"host": "host5"})
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("fast-replace-rule-down-peer", op.Desc())
}

func (suite *ruleCheckerTestSuite) TestIssue3293() {
suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host1"})
Expand Down
5 changes: 4 additions & 1 deletion pkg/schedule/operator/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,6 @@ func (b *Builder) Build(kind OpKind) (*Operator, error) {
if brief, b.err = b.prepareBuild(); b.err != nil {
return nil, b.err
}

if b.useJointConsensus {
kind, b.err = b.buildStepsWithJointConsensus(kind)
} else {
Expand Down Expand Up @@ -539,6 +538,10 @@ func (b *Builder) brief() string {
return fmt.Sprintf("%s: store %s to %s", op, b.toRemove, b.toAdd)
case len(b.toAdd) > 0:
return fmt.Sprintf("add peer: store %s", b.toAdd)
case len(b.toRemove) > 0 && len(b.toPromote) > 0:
return fmt.Sprintf("promote peer: store %s, rm peer: store %s", b.toRemove, b.toPromote)
case len(b.toRemove) > 0 && len(b.toDemote) > 0:
return fmt.Sprintf("demote peer: store %s, rm peer: store %s", b.toDemote, b.toRemove)
case len(b.toRemove) > 0:
return fmt.Sprintf("rm peer: store %s", b.toRemove)
case len(b.toPromote) > 0:
Expand Down
21 changes: 20 additions & 1 deletion pkg/schedule/operator/create_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,25 @@ func CreatePromoteLearnerOperator(desc string, ci sche.SharedCluster, region *co
Build(0)
}

// CreatePromoteLearnerOperatorAndRemovePeer creates an operator that promotes a learner and removes a peer.
func CreatePromoteLearnerOperatorAndRemovePeer(desc string, ci sche.SharedCluster, region *core.RegionInfo, toPromote *metapb.Peer, toRemove *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, ci, region).
PromoteLearner(toPromote.GetStoreId()).
RemovePeer(toRemove.GetStoreId()).
Build(0)
}

// CreateDemoteLearnerOperatorAndRemovePeer creates an operator that demotes a learner and removes a peer.
func CreateDemoteLearnerOperatorAndRemovePeer(desc string, ci sche.SharedCluster, region *core.RegionInfo, toDemote *metapb.Peer, toRemove *metapb.Peer) (*Operator, error) {
if !ci.GetSharedConfig().IsUseJointConsensus() {
return nil, errors.Errorf("cannot build demote learner operator due to disabling using joint state")
}
return NewBuilder(desc, ci, region).
DemoteVoter(toDemote.GetStoreId()).
RemovePeer(toRemove.GetStoreId()).
Build(0)
}

// CreateRemovePeerOperator creates an operator that removes a peer from region.
func CreateRemovePeerOperator(desc string, ci sche.SharedCluster, kind OpKind, region *core.RegionInfo, storeID uint64) (*Operator, error) {
return NewBuilder(desc, ci, region).
Expand Down Expand Up @@ -247,7 +266,7 @@ func CreateLeaveJointStateOperator(desc string, ci sche.SharedCluster, origin *c
b := NewBuilder(desc, ci, origin, SkipOriginJointStateCheck, SkipPlacementRulesCheck)

if b.err == nil && !core.IsInJointState(origin.GetPeers()...) {
b.err = errors.Errorf("cannot build leave joint state operator for region which is not in joint state")
b.err = errors.Errorf("cannot build leave joint state operator due to disabling using joint state")
}

if b.err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (o *Operator) Sync(other *Operator) {
func (o *Operator) String() string {
stepStrs := make([]string, len(o.steps))
for i := range o.steps {
stepStrs[i] = o.steps[i].String()
stepStrs[i] = fmt.Sprintf("%d:{%s}", i, o.steps[i].String())
}
s := fmt.Sprintf("%s {%s} (kind:%s, region:%v(%v, %v), createAt:%s, startAt:%s, currentStep:%v, size:%d, steps:[%s], timeout:[%s])",
o.desc, o.brief, o.kind, o.regionID, o.regionEpoch.GetVersion(), o.regionEpoch.GetConfVer(), o.GetCreateTime(),
Expand Down
3 changes: 3 additions & 0 deletions pkg/schedule/placement/fit.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type RegionFit struct {

// Replace return true if the replacement store is fit all constraints and isolation score is not less than the origin.
func (f *RegionFit) Replace(srcStoreID uint64, dstStore *core.StoreInfo) bool {
if dstStore == nil {
return false
}
fit := f.getRuleFitByStoreID(srcStoreID)
// check the target store is fit all constraints.
if fit == nil {
Expand Down
Loading

0 comments on commit f916e90

Please sign in to comment.