Skip to content

Commit

Permalink
operator: check store status for running operators
Browse files Browse the repository at this point in the history
fix tikv#3353

Signed-off-by: disksing <i@disksing.com>
  • Loading branch information
disksing committed Oct 19, 2021
1 parent 4e41c8d commit 3d43310
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 29 deletions.
62 changes: 37 additions & 25 deletions server/schedule/operator/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/tikv/pd/pkg/typeutil"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/core/storelimit"
"github.com/tikv/pd/server/schedule/opt"
"go.uber.org/zap"
)

Expand All @@ -35,7 +36,7 @@ type OpStep interface {
fmt.Stringer
ConfVerChanged(region *core.RegionInfo) uint64
IsFinish(region *core.RegionInfo) bool
CheckSafety(region *core.RegionInfo) error
CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error
Influence(opInfluence OpInfluence, region *core.RegionInfo)
}

Expand All @@ -58,16 +59,16 @@ func (tl TransferLeader) IsFinish(region *core.RegionInfo) bool {
return region.GetLeader().GetStoreId() == tl.ToStore
}

// CheckSafety checks if the step meets the safety properties.
func (tl TransferLeader) CheckSafety(region *core.RegionInfo) error {
// CheckInProgress checks if the step is in the progress of advancing.
func (tl TransferLeader) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error {
peer := region.GetStorePeer(tl.ToStore)
if peer == nil {
return errors.New("peer does not existed")
}
if core.IsLearner(peer) {
return errors.New("peer already is a learner")
}
return nil
return validateStore(cluster, tl.ToStore)
}

// Influence calculates the store difference that current step makes.
Expand Down Expand Up @@ -122,13 +123,13 @@ func (ap AddPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
to.AdjustStepCost(storelimit.AddPeer, regionSize)
}

// CheckSafety checks if the step meets the safety properties.
func (ap AddPeer) CheckSafety(region *core.RegionInfo) error {
// CheckInProgress checks if the step is in the progress of advancing.
func (ap AddPeer) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error {
peer := region.GetStorePeer(ap.ToStore)
if peer != nil && peer.GetId() != ap.PeerID {
return errors.Errorf("peer %d has already existed in store %d, the operator is trying to add peer %d on the same store", peer.GetId(), ap.ToStore, ap.PeerID)
}
return nil
return validateStore(cluster, ap.ToStore)
}

// AddLearner is an OpStep that adds a region learner peer.
Expand Down Expand Up @@ -159,8 +160,8 @@ func (al AddLearner) IsFinish(region *core.RegionInfo) bool {
return false
}

// CheckSafety checks if the step meets the safety properties.
func (al AddLearner) CheckSafety(region *core.RegionInfo) error {
// CheckInProgress checks if the step is in the progress of advancing.
func (al AddLearner) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error {
peer := region.GetStorePeer(al.ToStore)
if peer == nil {
return nil
Expand All @@ -171,7 +172,7 @@ func (al AddLearner) CheckSafety(region *core.RegionInfo) error {
if !core.IsLearner(peer) {
return errors.New("peer already is a voter")
}
return nil
return validateStore(cluster, al.ToStore)
}

// Influence calculates the store difference that current step makes.
Expand Down Expand Up @@ -213,13 +214,13 @@ func (pl PromoteLearner) IsFinish(region *core.RegionInfo) bool {
return false
}

// CheckSafety checks if the step meets the safety properties.
func (pl PromoteLearner) CheckSafety(region *core.RegionInfo) error {
// CheckInProgress checks if the step is in the progress of advancing.
func (pl PromoteLearner) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error {
peer := region.GetStorePeer(pl.ToStore)
if peer.GetId() != pl.PeerID {
return errors.New("peer does not exist")
}
return nil
return validateStore(cluster, pl.ToStore)
}

// Influence calculates the store difference that current step makes.
Expand Down Expand Up @@ -254,8 +255,8 @@ func (rp RemovePeer) IsFinish(region *core.RegionInfo) bool {
return region.GetStorePeer(rp.FromStore) == nil
}

// CheckSafety checks if the step meets the safety properties.
func (rp RemovePeer) CheckSafety(region *core.RegionInfo) error {
// CheckInProgress checks if the step is in the progress of advancing.
func (rp RemovePeer) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error {
if rp.FromStore == region.GetLeader().GetStoreId() {
return errors.New("cannot remove leader peer")
}
Expand Down Expand Up @@ -306,8 +307,8 @@ func (mr MergeRegion) IsFinish(region *core.RegionInfo) bool {
return false
}

// CheckSafety checks if the step meets the safety properties.
func (mr MergeRegion) CheckSafety(region *core.RegionInfo) error {
// CheckInProgress checks if the step is in the progress of advancing.
func (mr MergeRegion) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error {
return nil
}

Expand Down Expand Up @@ -356,8 +357,8 @@ func (sr SplitRegion) Influence(opInfluence OpInfluence, region *core.RegionInfo
}
}

// CheckSafety checks if the step meets the safety properties.
func (sr SplitRegion) CheckSafety(region *core.RegionInfo) error {
// CheckInProgress checks if the step is in the progress of advancing.
func (sr SplitRegion) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error {
return nil
}

Expand Down Expand Up @@ -387,8 +388,8 @@ func (df DemoteFollower) IsFinish(region *core.RegionInfo) bool {
return false
}

// CheckSafety checks if the step meets the safety properties.
func (df DemoteFollower) CheckSafety(region *core.RegionInfo) error {
// CheckInProgress checks if the step is in the progress of advancing.
func (df DemoteFollower) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error {
peer := region.GetStorePeer(df.ToStore)
if peer.GetId() != df.PeerID {
return errors.New("peer does not exist")
Expand Down Expand Up @@ -487,8 +488,8 @@ func (cpe ChangePeerV2Enter) IsFinish(region *core.RegionInfo) bool {
return true
}

// CheckSafety checks if the step meets the safety properties.
func (cpe ChangePeerV2Enter) CheckSafety(region *core.RegionInfo) error {
// CheckInProgress checks if the step is in the progress of advancing.
func (cpe ChangePeerV2Enter) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error {
inJointState, notInJointState := false, false
for _, pl := range cpe.PromoteLearners {
peer := region.GetStorePeer(pl.ToStore)
Expand Down Expand Up @@ -627,8 +628,8 @@ func (cpl ChangePeerV2Leave) IsFinish(region *core.RegionInfo) bool {
return true
}

// CheckSafety checks if the step meets the safety properties.
func (cpl ChangePeerV2Leave) CheckSafety(region *core.RegionInfo) error {
// CheckInProgress checks if the step is in the progress of advancing.
func (cpl ChangePeerV2Leave) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error {
inJointState, notInJointState, demoteLeader := false, false, false
leaderStoreID := region.GetLeader().GetStoreId()

Expand Down Expand Up @@ -688,3 +689,14 @@ func (cpl ChangePeerV2Leave) CheckSafety(region *core.RegionInfo) error {

// Influence calculates the store difference that current step makes.
func (cpl ChangePeerV2Leave) Influence(opInfluence OpInfluence, region *core.RegionInfo) {}

func validateStore(cluster opt.Cluster, id uint64) error {
store := cluster.GetStore(id)
if store == nil {
return errors.New("target store does not exist")
}
if store.DownTime() > cluster.GetOpts().GetMaxStoreDownTime() {
return errors.New("target store is down")
}
return nil
}
19 changes: 16 additions & 3 deletions server/schedule/operator/step_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,33 @@
package operator

import (
"context"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
)

type testStepSuite struct{}
type testStepSuite struct {
cluster *mockcluster.Cluster
}

var _ = Suite(&testStepSuite{})

type testCase struct {
Peers []*metapb.Peer // first is leader
ConfVerChanged uint64
IsFinish bool
CheckSafety Checker
CheckInProgres Checker
}

func (s *testStepSuite) SetUpTest(c *C) {
s.cluster = mockcluster.NewCluster(context.Background(), config.NewTestOptions())
for i := 1; i <= 10; i++ {
s.cluster.PutStoreWithLabels(uint64(i))
}
}

func (s *testStepSuite) TestDemoteFollower(c *C) {
Expand Down Expand Up @@ -345,6 +358,6 @@ func (s *testStepSuite) check(c *C, step OpStep, desc string, cases []testCase)
region := core.NewRegionInfo(&metapb.Region{Id: 1, Peers: tc.Peers}, tc.Peers[0])
c.Assert(step.ConfVerChanged(region), Equals, tc.ConfVerChanged)
c.Assert(step.IsFinish(region), Equals, tc.IsFinish)
c.Assert(step.CheckSafety(region), tc.CheckSafety)
c.Assert(step.CheckInProgress(s.cluster, region), tc.CheckInProgres)
}
}
2 changes: 1 addition & 1 deletion server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) {
}

func (oc *OperatorController) checkStaleOperator(op *operator.Operator, step operator.OpStep, region *core.RegionInfo) bool {
err := step.CheckSafety(region)
err := step.CheckInProgress(oc.cluster, region)
if err != nil {
if oc.RemoveOperator(op, zap.String("reason", err.Error())) {
operatorCounter.WithLabelValues(op.Desc(), "stale").Inc()
Expand Down

0 comments on commit 3d43310

Please sign in to comment.