Skip to content

Commit

Permalink
operator: check store status for running operators (#4223)
Browse files Browse the repository at this point in the history
* operator: check store status for running operators

close #3353

Signed-off-by: disksing <i@disksing.com>

* add test

Signed-off-by: disksing <i@disksing.com>

* add tests

Signed-off-by: disksing <i@disksing.com>

* address comment

Signed-off-by: disksing <i@disksing.com>
  • Loading branch information
disksing authored Nov 23, 2021
1 parent dbe5e29 commit fb3d204
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 26 deletions.
62 changes: 40 additions & 22 deletions server/schedule/operator/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/tikv/pd/pkg/typeutil"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/core/storelimit"
"github.com/tikv/pd/server/schedule/opt"
"go.uber.org/zap"
)

Expand All @@ -35,7 +36,7 @@ type OpStep interface {
fmt.Stringer
ConfVerChanged(region *core.RegionInfo) uint64
IsFinish(region *core.RegionInfo) bool
CheckSafety(region *core.RegionInfo) error
CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error
Influence(opInfluence OpInfluence, region *core.RegionInfo)
}

Expand All @@ -58,16 +59,16 @@ func (tl TransferLeader) IsFinish(region *core.RegionInfo) bool {
return region.GetLeader().GetStoreId() == tl.ToStore
}

// CheckSafety checks if the step meets the safety properties.
func (tl TransferLeader) CheckSafety(region *core.RegionInfo) error {
// CheckInProgress checks if the step is in the progress of advancing.
func (tl TransferLeader) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error {
peer := region.GetStorePeer(tl.ToStore)
if peer == nil {
return errors.New("peer does not existed")
}
if core.IsLearner(peer) {
return errors.New("peer already is a learner")
}
return nil
return validateStore(cluster, tl.ToStore)
}

// Influence calculates the store difference that current step makes.
Expand Down Expand Up @@ -122,8 +123,11 @@ func (ap AddPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
to.AdjustStepCost(storelimit.AddPeer, regionSize)
}

// CheckSafety checks if the step meets the safety properties.
func (ap AddPeer) CheckSafety(region *core.RegionInfo) error {
// CheckInProgress checks if the step is in the progress of advancing.
func (ap AddPeer) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error {
if err := validateStore(cluster, ap.ToStore); err != nil {
return err
}
peer := region.GetStorePeer(ap.ToStore)
if peer != nil && peer.GetId() != ap.PeerID {
return errors.Errorf("peer %d has already existed in store %d, the operator is trying to add peer %d on the same store", peer.GetId(), ap.ToStore, ap.PeerID)
Expand Down Expand Up @@ -159,8 +163,11 @@ func (al AddLearner) IsFinish(region *core.RegionInfo) bool {
return false
}

// CheckSafety checks if the step meets the safety properties.
func (al AddLearner) CheckSafety(region *core.RegionInfo) error {
// CheckInProgress checks if the step is in the progress of advancing.
func (al AddLearner) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error {
if err := validateStore(cluster, al.ToStore); err != nil {
return err
}
peer := region.GetStorePeer(al.ToStore)
if peer == nil {
return nil
Expand Down Expand Up @@ -213,8 +220,8 @@ func (pl PromoteLearner) IsFinish(region *core.RegionInfo) bool {
return false
}

// CheckSafety checks if the step meets the safety properties.
func (pl PromoteLearner) CheckSafety(region *core.RegionInfo) error {
// CheckInProgress checks if the step is in the progress of advancing.
func (pl PromoteLearner) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error {
peer := region.GetStorePeer(pl.ToStore)
if peer.GetId() != pl.PeerID {
return errors.New("peer does not exist")
Expand Down Expand Up @@ -254,8 +261,8 @@ func (rp RemovePeer) IsFinish(region *core.RegionInfo) bool {
return region.GetStorePeer(rp.FromStore) == nil
}

// CheckSafety checks if the step meets the safety properties.
func (rp RemovePeer) CheckSafety(region *core.RegionInfo) error {
// CheckInProgress checks if the step is in the progress of advancing.
func (rp RemovePeer) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error {
if rp.FromStore == region.GetLeader().GetStoreId() {
return errors.New("cannot remove leader peer")
}
Expand Down Expand Up @@ -306,8 +313,8 @@ func (mr MergeRegion) IsFinish(region *core.RegionInfo) bool {
return false
}

// CheckSafety checks if the step meets the safety properties.
func (mr MergeRegion) CheckSafety(region *core.RegionInfo) error {
// CheckInProgress checks if the step is in the progress of advancing.
func (mr MergeRegion) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error {
return nil
}

Expand Down Expand Up @@ -356,8 +363,8 @@ func (sr SplitRegion) Influence(opInfluence OpInfluence, region *core.RegionInfo
}
}

// CheckSafety checks if the step meets the safety properties.
func (sr SplitRegion) CheckSafety(region *core.RegionInfo) error {
// CheckInProgress checks if the step is in the progress of advancing.
func (sr SplitRegion) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error {
return nil
}

Expand Down Expand Up @@ -387,8 +394,8 @@ func (df DemoteFollower) IsFinish(region *core.RegionInfo) bool {
return false
}

// CheckSafety checks if the step meets the safety properties.
func (df DemoteFollower) CheckSafety(region *core.RegionInfo) error {
// CheckInProgress checks if the step is in the progress of advancing.
func (df DemoteFollower) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error {
peer := region.GetStorePeer(df.ToStore)
if peer.GetId() != df.PeerID {
return errors.New("peer does not exist")
Expand Down Expand Up @@ -487,8 +494,8 @@ func (cpe ChangePeerV2Enter) IsFinish(region *core.RegionInfo) bool {
return true
}

// CheckSafety checks if the step meets the safety properties.
func (cpe ChangePeerV2Enter) CheckSafety(region *core.RegionInfo) error {
// CheckInProgress checks if the step is in the progress of advancing.
func (cpe ChangePeerV2Enter) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error {
inJointState, notInJointState := false, false
for _, pl := range cpe.PromoteLearners {
peer := region.GetStorePeer(pl.ToStore)
Expand Down Expand Up @@ -627,8 +634,8 @@ func (cpl ChangePeerV2Leave) IsFinish(region *core.RegionInfo) bool {
return true
}

// CheckSafety checks if the step meets the safety properties.
func (cpl ChangePeerV2Leave) CheckSafety(region *core.RegionInfo) error {
// CheckInProgress checks if the step is in the progress of advancing.
func (cpl ChangePeerV2Leave) CheckInProgress(cluster opt.Cluster, region *core.RegionInfo) error {
inJointState, notInJointState, demoteLeader := false, false, false
leaderStoreID := region.GetLeader().GetStoreId()

Expand Down Expand Up @@ -688,3 +695,14 @@ func (cpl ChangePeerV2Leave) CheckSafety(region *core.RegionInfo) error {

// Influence calculates the store difference that current step makes.
func (cpl ChangePeerV2Leave) Influence(opInfluence OpInfluence, region *core.RegionInfo) {}

func validateStore(cluster opt.Cluster, id uint64) error {
store := cluster.GetStore(id)
if store == nil {
return errors.New("target store does not exist")
}
if store.DownTime() > cluster.GetOpts().GetMaxStoreDownTime() {
return errors.New("target store is down")
}
return nil
}
148 changes: 145 additions & 3 deletions server/schedule/operator/step_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,162 @@
package operator

import (
"context"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
)

type testStepSuite struct{}
type testStepSuite struct {
cluster *mockcluster.Cluster
}

var _ = Suite(&testStepSuite{})

type testCase struct {
Peers []*metapb.Peer // first is leader
ConfVerChanged uint64
IsFinish bool
CheckSafety Checker
CheckInProgres Checker
}

func (s *testStepSuite) SetUpTest(c *C) {
s.cluster = mockcluster.NewCluster(context.Background(), config.NewTestOptions())
for i := 1; i <= 10; i++ {
s.cluster.PutStoreWithLabels(uint64(i))
}
s.cluster.SetStoreDown(8)
s.cluster.SetStoreDown(9)
s.cluster.SetStoreDown(10)
}

func (s *testStepSuite) TestTransferLeader(c *C) {
step := TransferLeader{FromStore: 1, ToStore: 2}
cases := []testCase{
{
[]*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter},
{Id: 3, StoreId: 3, Role: metapb.PeerRole_Voter},
},
0,
false,
IsNil,
},
{
[]*metapb.Peer{
{Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter},
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 3, StoreId: 3, Role: metapb.PeerRole_Voter},
},
0,
true,
IsNil,
},
{
[]*metapb.Peer{
{Id: 3, StoreId: 3, Role: metapb.PeerRole_Voter},
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter},
},
0,
false,
IsNil,
},
}
s.check(c, step, "transfer leader from store 1 to store 2", cases)

step = TransferLeader{FromStore: 1, ToStore: 9} // 9 is down
cases = []testCase{
{
[]*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter},
{Id: 9, StoreId: 9, Role: metapb.PeerRole_Voter},
},
0,
false,
NotNil,
},
}
s.check(c, step, "transfer leader from store 1 to store 9", cases)
}

func (s *testStepSuite) TestAddPeer(c *C) {
step := AddPeer{ToStore: 2, PeerID: 2}
cases := []testCase{
{
[]*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
},
0,
false,
IsNil,
},
{
[]*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter},
},
1,
true,
IsNil,
},
}
s.check(c, step, "add peer 2 on store 2", cases)

step = AddPeer{ToStore: 9, PeerID: 9}
cases = []testCase{
{
[]*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
},
0,
false,
NotNil,
},
}
s.check(c, step, "add peer 9 on store 9", cases)
}

func (s *testStepSuite) TestAddLearner(c *C) {
step := AddLearner{ToStore: 2, PeerID: 2}
cases := []testCase{
{
[]*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
},
0,
false,
IsNil,
},
{
[]*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 2, StoreId: 2, Role: metapb.PeerRole_Learner},
},
1,
true,
IsNil,
},
}
s.check(c, step, "add learner peer 2 on store 2", cases)

step = AddLearner{ToStore: 9, PeerID: 9}
cases = []testCase{
{
[]*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
},
0,
false,
NotNil,
},
}
s.check(c, step, "add learner peer 9 on store 9", cases)
}

func (s *testStepSuite) TestDemoteFollower(c *C) {
Expand Down Expand Up @@ -345,6 +487,6 @@ func (s *testStepSuite) check(c *C, step OpStep, desc string, cases []testCase)
region := core.NewRegionInfo(&metapb.Region{Id: 1, Peers: tc.Peers}, tc.Peers[0])
c.Assert(step.ConfVerChanged(region), Equals, tc.ConfVerChanged)
c.Assert(step.IsFinish(region), Equals, tc.IsFinish)
c.Assert(step.CheckSafety(region), tc.CheckSafety)
c.Assert(step.CheckInProgress(s.cluster, region), tc.CheckInProgres)
}
}
2 changes: 1 addition & 1 deletion server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) {
}

func (oc *OperatorController) checkStaleOperator(op *operator.Operator, step operator.OpStep, region *core.RegionInfo) bool {
err := step.CheckSafety(region)
err := step.CheckInProgress(oc.cluster, region)
if err != nil {
if oc.RemoveOperator(op, zap.String("reason", err.Error())) {
operatorCounter.WithLabelValues(op.Desc(), "stale").Inc()
Expand Down
19 changes: 19 additions & 0 deletions server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,25 @@ func (t *testOperatorControllerSuite) TestFastFailOperator(c *C) {
c.Assert(oc.GetOperator(region.GetID()), IsNil)
}

// Issue 3353
func (t *testOperatorControllerSuite) TestFastFailWithUnhealthyStore(c *C) {
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(t.ctx, opt)
stream := hbstream.NewTestHeartbeatStreams(t.ctx, tc.ID, tc, false /* no need to run */)
oc := NewOperatorController(t.ctx, tc, stream)
tc.AddLeaderStore(1, 2)
tc.AddLeaderStore(2, 0)
tc.AddLeaderStore(3, 0)
tc.AddLeaderRegion(1, 1, 2)
region := tc.GetRegion(1)
steps := []operator.OpStep{operator.TransferLeader{ToStore: 2}}
op := operator.NewOperator("test", "test", 1, region.GetRegionEpoch(), operator.OpLeader, steps...)
oc.SetOperator(op)
c.Assert(oc.checkStaleOperator(op, steps[0], region), IsFalse)
tc.SetStoreDown(2)
c.Assert(oc.checkStaleOperator(op, steps[0], region), IsTrue)
}

func (t *testOperatorControllerSuite) TestCheckAddUnexpectedStatus(c *C) {
c.Assert(failpoint.Disable("github.com/tikv/pd/server/schedule/unexpectedOperator"), IsNil)
opt := config.NewTestOptions()
Expand Down

0 comments on commit fb3d204

Please sign in to comment.