Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rule_checker: can replace unhealthPeer with orphanPeer (#6831) #6844

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ static: install-tools
@ echo "gofmt ..."
@ gofmt -s -l -d $(PACKAGE_DIRECTORIES) 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }'
@ echo "golangci-lint ..."
@ golangci-lint run --verbose $(PACKAGE_DIRECTORIES)
@ golangci-lint run --verbose $(PACKAGE_DIRECTORIES) --allow-parallel-runners
@ echo "revive ..."
@ revive -formatter friendly -config revive.toml $(PACKAGES)

Expand Down
6 changes: 4 additions & 2 deletions client/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ install-tools:

static: install-tools
@ gofmt -s -l -d . 2>&1 | awk '{ print } END { if (NR > 0) { exit 1 } }'
@ golangci-lint run -c ../.golangci.yml ./...
@ revive -formatter friendly -config ../revive.toml .
@ echo "golangci-lint ..."
@ golangci-lint run -c ../.golangci.yml --verbose ./... --allow-parallel-runners
@ echo "revive ..."
@ revive -formatter friendly -config ../revive.toml ./...

tidy:
@ go mod tidy
Expand Down
4 changes: 3 additions & 1 deletion client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ const (
// NotServedErr indicates an tso node/pod received the requests for the keyspace groups which are not served by it.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotServedErr = "is not served"
NotServedErr = "is not served"
// RetryTimeoutErr indicates the request is timeout.
RetryTimeoutErr = "retry timeout"
)

Expand Down Expand Up @@ -87,6 +88,7 @@ var (
ErrClientResourceGroupThrottled = errors.Normalize("exceeded resource group quota limitation", errors.RFCCodeText("PD:client:ErrClientResourceGroupThrottled"))
)

// ErrClientGetResourceGroup is the error type for getting resource group.
type ErrClientGetResourceGroup struct {
ResourceGroupName string
Cause string
Expand Down
1 change: 1 addition & 0 deletions client/resource_group/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//revive:disable
package controller

import (
Expand Down
3 changes: 1 addition & 2 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,9 +1047,8 @@ func (gc *groupCostController) onRequestWait(
sub(gc.mu.consumption, delta)
gc.mu.Unlock()
return nil, nil, err
} else {
gc.successfulRequestDuration.Observe(d.Seconds())
}
gc.successfulRequestDuration.Observe(d.Seconds())
}

gc.mu.Lock()
Expand Down
2 changes: 2 additions & 0 deletions client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ type tokenBucketReconfigureArgs struct {
NotifyThreshold float64
}

// LimiterOption is used to modify the Limiter during construction.
type LimiterOption func(*Limiter)

func resetLowProcess() func(*Limiter) {
Expand Down Expand Up @@ -386,6 +387,7 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur
return r
}

// ResetRemainingNotifyTimes resets remainingNotifyTimes.
func (lim *Limiter) ResetRemainingNotifyTimes() {
lim.mu.Lock()
defer lim.mu.Unlock()
Expand Down
1 change: 1 addition & 0 deletions client/resource_group/controller/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type TestResponseInfo struct {
succeed bool
}

// NewTestResponseInfo creates a new TestResponseInfo.
func NewTestResponseInfo(readBytes uint64, kvCPU time.Duration, succeed bool) *TestResponseInfo {
return &TestResponseInfo{
readBytes: readBytes,
Expand Down
54 changes: 50 additions & 4 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ var (
ruleCheckerMoveToBetterLocationCounter = checkerCounter.WithLabelValues(ruleChecker, "move-to-better-location")
ruleCheckerSkipRemoveOrphanPeerCounter = checkerCounter.WithLabelValues(ruleChecker, "skip-remove-orphan-peer")
ruleCheckerRemoveOrphanPeerCounter = checkerCounter.WithLabelValues(ruleChecker, "remove-orphan-peer")
ruleCheckerReplaceOrphanPeerCounter = checkerCounter.WithLabelValues(ruleChecker, "replace-orphan-peer")
)

// RuleChecker fix/improve region by placement rules.
Expand Down Expand Up @@ -426,14 +427,15 @@ func (c *RuleChecker) fixOrphanPeers(region *core.RegionInfo, fit *placement.Reg
if len(fit.OrphanPeers) == 0 {
return nil, nil
}
var pinDownPeer *metapb.Peer
isUnhealthyPeer := func(id uint64) bool {
for _, pendingPeer := range region.GetPendingPeers() {
if pendingPeer.GetId() == id {
for _, downPeer := range region.GetDownPeers() {
if downPeer.Peer.GetId() == id {
return true
}
}
for _, downPeer := range region.GetDownPeers() {
if downPeer.Peer.GetId() == id {
for _, pendingPeer := range region.GetPendingPeers() {
if pendingPeer.GetId() == id {
return true
}
}
Expand All @@ -450,16 +452,56 @@ loopFits:
}
for _, p := range rf.Peers {
if isUnhealthyPeer(p.GetId()) {
// make sure is down peer.
if region.GetDownPeer(p.GetId()) != nil {
pinDownPeer = p
}
hasUnhealthyFit = true
break loopFits
}
}
}

// If hasUnhealthyFit is false, it is safe to delete the OrphanPeer.
if !hasUnhealthyFit {
ruleCheckerRemoveOrphanPeerCounter.Inc()
return operator.CreateRemovePeerOperator("remove-orphan-peer", c.cluster, 0, region, fit.OrphanPeers[0].StoreId)
}

// try to use orphan peers to replace unhealthy down peers.
for _, orphanPeer := range fit.OrphanPeers {
if pinDownPeer != nil {
// make sure the orphan peer is healthy.
if isUnhealthyPeer(orphanPeer.GetId()) {
continue
}
// no consider witness in this path.
if pinDownPeer.GetIsWitness() || orphanPeer.GetIsWitness() {
continue
}
// down peer's store should be down.
if !c.isStoreDownTimeHitMaxDownTime(pinDownPeer.GetStoreId()) {
continue
}
// check if down peer can replace with orphan peer.
dstStore := c.cluster.GetStore(orphanPeer.GetStoreId())
if fit.Replace(pinDownPeer.GetStoreId(), dstStore) {
destRole := pinDownPeer.GetRole()
orphanPeerRole := orphanPeer.GetRole()
ruleCheckerReplaceOrphanPeerCounter.Inc()
switch {
case orphanPeerRole == metapb.PeerRole_Learner && destRole == metapb.PeerRole_Voter:
return operator.CreatePromoteLearnerOperatorAndRemovePeer("replace-down-peer-with-orphan-peer", c.cluster, region, orphanPeer, pinDownPeer)
case orphanPeerRole == metapb.PeerRole_Voter && destRole == metapb.PeerRole_Learner:
return operator.CreateDemoteLearnerOperatorAndRemovePeer("replace-down-peer-with-orphan-peer", c.cluster, region, orphanPeer, pinDownPeer)
default:
// destRole should not same with orphanPeerRole. if role is same, it fit with orphanPeer should be better than now.
// destRole never be leader, so we not consider it.
}
}
}
}

// If hasUnhealthyFit is true, try to remove unhealthy orphan peers only if number of OrphanPeers is >= 2.
// Ref https://github.com/tikv/pd/issues/4045
if len(fit.OrphanPeers) >= 2 {
Expand Down Expand Up @@ -498,6 +540,10 @@ func (c *RuleChecker) isDownPeer(region *core.RegionInfo, peer *metapb.Peer) boo

func (c *RuleChecker) isStoreDownTimeHitMaxDownTime(storeID uint64) bool {
store := c.cluster.GetStore(storeID)
if store == nil {
log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID))
return false
}
return store.DownTime() >= c.cluster.GetOpts().GetMaxStoreDownTime()
}

Expand Down
127 changes: 126 additions & 1 deletion pkg/schedule/checker/rule_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,6 @@ func (suite *ruleCheckerTestSuite) TestFixRuleWitness() {
op := suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("add-rule-peer", op.Desc())
fmt.Println(op)
suite.Equal(uint64(3), op.Step(0).(operator.AddLearner).ToStore)
suite.True(op.Step(0).(operator.AddLearner).IsWitness)
}
Expand Down Expand Up @@ -686,6 +685,132 @@ func (suite *ruleCheckerTestSuite) TestPriorityFixOrphanPeer() {
suite.Equal("remove-orphan-peer", op.Desc())
}

func (suite *ruleCheckerTestSuite) TestPriorityFitHealthWithDifferentRole1() {
suite.cluster.SetEnableUseJointConsensus(true)
suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})
suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"})
suite.cluster.AddRegionWithLearner(1, 1, []uint64{2, 3}, []uint64{4})
r1 := suite.cluster.GetRegion(1)
suite.cluster.GetStore(3).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano()

// set peer3 to pending and down
r1 = r1.Clone(core.WithPendingPeers([]*metapb.Peer{r1.GetPeer(3)}))
r1 = r1.Clone(core.WithDownPeers([]*pdpb.PeerStats{
{
Peer: r1.GetStorePeer(3),
DownSeconds: 30000,
},
}))
suite.cluster.PutRegion(r1)

op := suite.rc.Check(suite.cluster.GetRegion(1))
suite.Equal(uint64(3), op.Step(0).(operator.ChangePeerV2Enter).DemoteVoters[0].ToStore)
suite.Equal(uint64(4), op.Step(0).(operator.ChangePeerV2Enter).PromoteLearners[0].ToStore)
suite.Equal(uint64(3), op.Step(1).(operator.ChangePeerV2Leave).DemoteVoters[0].ToStore)
suite.Equal(uint64(4), op.Step(1).(operator.ChangePeerV2Leave).PromoteLearners[0].ToStore)
suite.Equal("replace-down-peer-with-orphan-peer", op.Desc())

// set peer3 only pending
r1 = r1.Clone(core.WithDownPeers(nil))
suite.cluster.PutRegion(r1)
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.Nil(op)
}

func (suite *ruleCheckerTestSuite) TestPriorityFitHealthWithDifferentRole2() {
suite.cluster.SetEnableUseJointConsensus(true)
suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})
suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"})
suite.cluster.AddLabelsStore(5, 1, map[string]string{"host": "host5"})
suite.cluster.AddLeaderRegion(1, 1, 2, 3, 4, 5)
r1 := suite.cluster.GetRegion(1)

// set peer3 to pending and down, and peer 3 to learner, and store 3 is down
suite.cluster.GetStore(3).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano()
r1 = r1.Clone(core.WithLearners([]*metapb.Peer{r1.GetPeer(3)}))
r1 = r1.Clone(
core.WithPendingPeers([]*metapb.Peer{r1.GetPeer(3)}),
core.WithDownPeers([]*pdpb.PeerStats{
{
Peer: r1.GetStorePeer(3),
DownSeconds: 30000,
},
}),
)
suite.cluster.PutRegion(r1)

// default and test group => 3 voter + 1 learner
err := suite.ruleManager.SetRule(&placement.Rule{
GroupID: "test",
ID: "10",
Role: placement.Learner,
Count: 1,
})
suite.NoError(err)

op := suite.rc.Check(suite.cluster.GetRegion(1))
suite.Equal(uint64(5), op.Step(0).(operator.ChangePeerV2Enter).DemoteVoters[0].ToStore)
suite.Equal(uint64(3), op.Step(1).(operator.RemovePeer).FromStore)
suite.Equal("replace-down-peer-with-orphan-peer", op.Desc())
}

func (suite *ruleCheckerTestSuite) TestPriorityFitHealthPeersAndTiFlash() {
suite.cluster.SetEnableUseJointConsensus(true)
suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
suite.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})
suite.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4", "engine": "tiflash"})
suite.cluster.AddRegionWithLearner(1, 1, []uint64{2, 3}, []uint64{4})
rule := &placement.Rule{
GroupID: "pd",
ID: "test",
Role: placement.Voter,
Count: 3,
}
rule2 := &placement.Rule{
GroupID: "pd",
ID: "test2",
Role: placement.Learner,
Count: 1,
LabelConstraints: []placement.LabelConstraint{
{
Key: "engine",
Op: placement.In,
Values: []string{"tiflash"},
},
},
}
suite.ruleManager.SetRule(rule)
suite.ruleManager.SetRule(rule2)
suite.ruleManager.DeleteRule("pd", "default")

r1 := suite.cluster.GetRegion(1)
// set peer3 to pending and down
r1 = r1.Clone(core.WithPendingPeers([]*metapb.Peer{r1.GetPeer(3)}))
r1 = r1.Clone(core.WithDownPeers([]*pdpb.PeerStats{
{
Peer: r1.GetStorePeer(3),
DownSeconds: 30000,
},
}))
suite.cluster.PutRegion(r1)
suite.cluster.GetStore(3).GetMeta().LastHeartbeat = time.Now().Add(-31 * time.Minute).UnixNano()

op := suite.rc.Check(suite.cluster.GetRegion(1))
// should not promote tiflash peer
suite.Nil(op)

// scale a node, can replace the down peer
suite.cluster.AddLabelsStore(5, 1, map[string]string{"host": "host5"})
op = suite.rc.Check(suite.cluster.GetRegion(1))
suite.NotNil(op)
suite.Equal("fast-replace-rule-down-peer", op.Desc())
}

func (suite *ruleCheckerTestSuite) TestIssue3293() {
suite.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
suite.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host1"})
Expand Down
5 changes: 4 additions & 1 deletion pkg/schedule/operator/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,6 @@ func (b *Builder) Build(kind OpKind) (*Operator, error) {
if brief, b.err = b.prepareBuild(); b.err != nil {
return nil, b.err
}

if b.useJointConsensus {
kind, b.err = b.buildStepsWithJointConsensus(kind)
} else {
Expand Down Expand Up @@ -549,6 +548,10 @@ func (b *Builder) brief() string {
return fmt.Sprintf("%s: store %s to %s", op, b.toRemove, b.toAdd)
case len(b.toAdd) > 0:
return fmt.Sprintf("add peer: store %s", b.toAdd)
case len(b.toRemove) > 0 && len(b.toPromote) > 0:
return fmt.Sprintf("promote peer: store %s, rm peer: store %s", b.toRemove, b.toPromote)
case len(b.toRemove) > 0 && len(b.toDemote) > 0:
return fmt.Sprintf("demote peer: store %s, rm peer: store %s", b.toDemote, b.toRemove)
case len(b.toRemove) > 0:
return fmt.Sprintf("rm peer: store %s", b.toRemove)
case len(b.toPromote) > 0:
Expand Down
21 changes: 20 additions & 1 deletion pkg/schedule/operator/create_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,25 @@ func CreatePromoteLearnerOperator(desc string, ci ClusterInformer, region *core.
Build(0)
}

// CreatePromoteLearnerOperatorAndRemovePeer creates an operator that promotes a learner and removes a peer.
func CreatePromoteLearnerOperatorAndRemovePeer(desc string, ci ClusterInformer, region *core.RegionInfo, toPromote *metapb.Peer, toRemove *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, ci, region).
PromoteLearner(toPromote.GetStoreId()).
RemovePeer(toRemove.GetStoreId()).
Build(0)
}

// CreateDemoteLearnerOperatorAndRemovePeer creates an operator that demotes a learner and removes a peer.
func CreateDemoteLearnerOperatorAndRemovePeer(desc string, ci ClusterInformer, region *core.RegionInfo, toDemote *metapb.Peer, toRemove *metapb.Peer) (*Operator, error) {
if !ci.GetOpts().IsUseJointConsensus() {
return nil, errors.Errorf("cannot build demote learner operator due to disabling using joint state")
}
return NewBuilder(desc, ci, region).
DemoteVoter(toDemote.GetStoreId()).
RemovePeer(toRemove.GetStoreId()).
Build(0)
}

// CreateRemovePeerOperator creates an operator that removes a peer from region.
func CreateRemovePeerOperator(desc string, ci ClusterInformer, kind OpKind, region *core.RegionInfo, storeID uint64) (*Operator, error) {
return NewBuilder(desc, ci, region).
Expand Down Expand Up @@ -246,7 +265,7 @@ func CreateLeaveJointStateOperator(desc string, ci ClusterInformer, origin *core
b := NewBuilder(desc, ci, origin, SkipOriginJointStateCheck, SkipPlacementRulesCheck)

if b.err == nil && !core.IsInJointState(origin.GetPeers()...) {
b.err = errors.Errorf("cannot build leave joint state operator for region which is not in joint state")
b.err = errors.Errorf("cannot build leave joint state operator due to disabling using joint state")
}

if b.err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (o *Operator) Sync(other *Operator) {
func (o *Operator) String() string {
stepStrs := make([]string, len(o.steps))
for i := range o.steps {
stepStrs[i] = o.steps[i].String()
stepStrs[i] = fmt.Sprintf("%d:{%s}", i, o.steps[i].String())
}
s := fmt.Sprintf("%s {%s} (kind:%s, region:%v(%v, %v), createAt:%s, startAt:%s, currentStep:%v, size:%d, steps:[%s],timeout:[%s])",
o.desc, o.brief, o.kind, o.regionID, o.regionEpoch.GetVersion(), o.regionEpoch.GetConfVer(), o.GetCreateTime(),
Expand Down
3 changes: 3 additions & 0 deletions pkg/schedule/placement/fit.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type RegionFit struct {

// Replace return true if the replacement store is fit all constraints and isolation score is not less than the origin.
func (f *RegionFit) Replace(srcStoreID uint64, dstStore *core.StoreInfo) bool {
if dstStore == nil {
return false
}
fit := f.getRuleFitByStoreID(srcStoreID)
// check the target store is fit all constraints.
if fit == nil {
Expand Down
Loading