diff --git a/server/schedule/operator/step.go b/server/schedule/operator/step.go index 422402964806..47f5f0d83642 100644 --- a/server/schedule/operator/step.go +++ b/server/schedule/operator/step.go @@ -27,6 +27,7 @@ import ( "github.com/tikv/pd/pkg/typeutil" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/core/storelimit" + "github.com/tikv/pd/server/schedule/opt" "go.uber.org/zap" ) @@ -35,7 +36,7 @@ type OpStep interface { fmt.Stringer ConfVerChanged(region *core.RegionInfo) uint64 IsFinish(region *core.RegionInfo) bool - CheckSafety(region *core.RegionInfo) error + CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error Influence(opInfluence OpInfluence, region *core.RegionInfo) } @@ -58,8 +59,8 @@ func (tl TransferLeader) IsFinish(region *core.RegionInfo) bool { return region.GetLeader().GetStoreId() == tl.ToStore } -// CheckSafety checks if the step meets the safety properties. -func (tl TransferLeader) CheckSafety(region *core.RegionInfo) error { +// CheckInProgress checks if the step is in the progress of advancing. +func (tl TransferLeader) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error { peer := region.GetStorePeer(tl.ToStore) if peer == nil { return errors.New("peer does not existed") @@ -67,7 +68,7 @@ func (tl TransferLeader) CheckSafety(region *core.RegionInfo) error { if core.IsLearner(peer) { return errors.New("peer already is a learner") } - return nil + return validateStore(cluster, tl.ToStore) } // Influence calculates the store difference that current step makes. @@ -122,13 +123,13 @@ func (ap AddPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) { to.AdjustStepCost(storelimit.AddPeer, regionSize) } -// CheckSafety checks if the step meets the safety properties. -func (ap AddPeer) CheckSafety(region *core.RegionInfo) error { +// CheckInProgress checks if the step is in the progress of advancing. +func (ap AddPeer) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error { peer := region.GetStorePeer(ap.ToStore) if peer != nil && peer.GetId() != ap.PeerID { return errors.Errorf("peer %d has already existed in store %d, the operator is trying to add peer %d on the same store", peer.GetId(), ap.ToStore, ap.PeerID) } - return nil + return validateStore(cluster, ap.ToStore) } // AddLearner is an OpStep that adds a region learner peer. @@ -159,8 +160,8 @@ func (al AddLearner) IsFinish(region *core.RegionInfo) bool { return false } -// CheckSafety checks if the step meets the safety properties. -func (al AddLearner) CheckSafety(region *core.RegionInfo) error { +// CheckInProgress checks if the step is in the progress of advancing. +func (al AddLearner) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error { peer := region.GetStorePeer(al.ToStore) if peer == nil { return nil @@ -171,7 +172,7 @@ func (al AddLearner) CheckSafety(region *core.RegionInfo) error { if !core.IsLearner(peer) { return errors.New("peer already is a voter") } - return nil + return validateStore(cluster, al.ToStore) } // Influence calculates the store difference that current step makes. @@ -213,13 +214,13 @@ func (pl PromoteLearner) IsFinish(region *core.RegionInfo) bool { return false } -// CheckSafety checks if the step meets the safety properties. -func (pl PromoteLearner) CheckSafety(region *core.RegionInfo) error { +// CheckInProgress checks if the step is in the progress of advancing. +func (pl PromoteLearner) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error { peer := region.GetStorePeer(pl.ToStore) if peer.GetId() != pl.PeerID { return errors.New("peer does not exist") } - return nil + return validateStore(cluster, pl.ToStore) } // Influence calculates the store difference that current step makes. @@ -254,8 +255,8 @@ func (rp RemovePeer) IsFinish(region *core.RegionInfo) bool { return region.GetStorePeer(rp.FromStore) == nil } -// CheckSafety checks if the step meets the safety properties. -func (rp RemovePeer) CheckSafety(region *core.RegionInfo) error { +// CheckInProgress checks if the step is in the progress of advancing. +func (rp RemovePeer) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error { if rp.FromStore == region.GetLeader().GetStoreId() { return errors.New("cannot remove leader peer") } @@ -306,8 +307,8 @@ func (mr MergeRegion) IsFinish(region *core.RegionInfo) bool { return false } -// CheckSafety checks if the step meets the safety properties. -func (mr MergeRegion) CheckSafety(region *core.RegionInfo) error { +// CheckInProgress checks if the step is in the progress of advancing. +func (mr MergeRegion) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error { return nil } @@ -356,8 +357,8 @@ func (sr SplitRegion) Influence(opInfluence OpInfluence, region *core.RegionInfo } } -// CheckSafety checks if the step meets the safety properties. -func (sr SplitRegion) CheckSafety(region *core.RegionInfo) error { +// CheckInProgress checks if the step is in the progress of advancing. +func (sr SplitRegion) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error { return nil } @@ -387,8 +388,8 @@ func (df DemoteFollower) IsFinish(region *core.RegionInfo) bool { return false } -// CheckSafety checks if the step meets the safety properties. -func (df DemoteFollower) CheckSafety(region *core.RegionInfo) error { +// CheckInProgress checks if the step is in the progress of advancing. +func (df DemoteFollower) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error { peer := region.GetStorePeer(df.ToStore) if peer.GetId() != df.PeerID { return errors.New("peer does not exist") @@ -487,8 +488,8 @@ func (cpe ChangePeerV2Enter) IsFinish(region *core.RegionInfo) bool { return true } -// CheckSafety checks if the step meets the safety properties. -func (cpe ChangePeerV2Enter) CheckSafety(region *core.RegionInfo) error { +// CheckInProgress checks if the step is in the progress of advancing. +func (cpe ChangePeerV2Enter) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error { inJointState, notInJointState := false, false for _, pl := range cpe.PromoteLearners { peer := region.GetStorePeer(pl.ToStore) @@ -627,8 +628,8 @@ func (cpl ChangePeerV2Leave) IsFinish(region *core.RegionInfo) bool { return true } -// CheckSafety checks if the step meets the safety properties. -func (cpl ChangePeerV2Leave) CheckSafety(region *core.RegionInfo) error { +// CheckInProgress checks if the step is in the progress of advancing. +func (cpl ChangePeerV2Leave) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error { inJointState, notInJointState, demoteLeader := false, false, false leaderStoreID := region.GetLeader().GetStoreId() @@ -688,3 +689,14 @@ func (cpl ChangePeerV2Leave) CheckSafety(region *core.RegionInfo) error { // Influence calculates the store difference that current step makes. func (cpl ChangePeerV2Leave) Influence(opInfluence OpInfluence, region *core.RegionInfo) {} + +func validateStore(cluster opt.Cluster, id uint64) error { + store := cluster.GetStore(id) + if store == nil { + return errors.New("target store does not exist") + } + if store.DownTime() > cluster.GetOpts().GetMaxStoreDownTime() { + return errors.New("target store is down") + } + return nil +} diff --git a/server/schedule/operator/step_test.go b/server/schedule/operator/step_test.go index 2f5b7f5a8440..cad0a37682c3 100644 --- a/server/schedule/operator/step_test.go +++ b/server/schedule/operator/step_test.go @@ -15,12 +15,18 @@ package operator import ( + "context" + . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/tikv/pd/pkg/mock/mockcluster" + "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" ) -type testStepSuite struct{} +type testStepSuite struct { + cluster *mockcluster.Cluster +} var _ = Suite(&testStepSuite{}) @@ -28,7 +34,14 @@ type testCase struct { Peers []*metapb.Peer // first is leader ConfVerChanged uint64 IsFinish bool - CheckSafety Checker + CheckInProgres Checker +} + +func (s *testStepSuite) SetUpTest(c *C) { + s.cluster = mockcluster.NewCluster(context.Background(), config.NewTestOptions()) + for i := 1; i <= 10; i++ { + s.cluster.PutStoreWithLabels(uint64(i)) + } } func (s *testStepSuite) TestDemoteFollower(c *C) { @@ -345,6 +358,6 @@ func (s *testStepSuite) check(c *C, step OpStep, desc string, cases []testCase) region := core.NewRegionInfo(&metapb.Region{Id: 1, Peers: tc.Peers}, tc.Peers[0]) c.Assert(step.ConfVerChanged(region), Equals, tc.ConfVerChanged) c.Assert(step.IsFinish(region), Equals, tc.IsFinish) - c.Assert(step.CheckSafety(region), tc.CheckSafety) + c.Assert(step.CheckInProgress(s.cluster, region), tc.CheckInProgres) } } diff --git a/server/schedule/operator_controller.go b/server/schedule/operator_controller.go index e8c6c4e4db60..36d28e60bb05 100644 --- a/server/schedule/operator_controller.go +++ b/server/schedule/operator_controller.go @@ -160,7 +160,7 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) { } func (oc *OperatorController) checkStaleOperator(op *operator.Operator, step operator.OpStep, region *core.RegionInfo) bool { - err := step.CheckSafety(region) + err := step.CheckInProgress(oc.cluster, region) if err != nil { if oc.RemoveOperator(op, zap.String("reason", err.Error())) { operatorCounter.WithLabelValues(op.Desc(), "stale").Inc()