diff --git a/server/balancer.go b/server/balancer.go index c5e50cd8cda..d3642bf3670 100644 --- a/server/balancer.go +++ b/server/balancer.go @@ -214,7 +214,7 @@ func (cb *capacityBalancer) Balance(cluster *ClusterInfo) (*BalanceOperator, err return nil, nil } - leaderTransferOperator := newTransferLeaderOperator(oldLeader, newLeader) + leaderTransferOperator := newTransferLeaderOperator(oldLeader, newLeader, maxWaitCount) addPeerOperator := newAddPeerOperator(newPeer) removePeerOperator := newRemovePeerOperator(oldLeader) diff --git a/server/balancer_worker_test.go b/server/balancer_worker_test.go index 73e0fdc9e6a..65ac87bb626 100644 --- a/server/balancer_worker_test.go +++ b/server/balancer_worker_test.go @@ -72,9 +72,31 @@ func (s *testBalancerWorkerSuite) TestBalancerWorker(c *C) { c.Assert(bop.ops, HasLen, 3) op1 := bop.ops[0].(*TransferLeaderOperator) + c.Assert(op1.maxWaitCount, Equals, maxWaitCount) c.Assert(op1.oldLeader.GetStoreId(), Equals, uint64(1)) c.Assert(op1.newLeader.GetStoreId(), Equals, uint64(4)) + // Now we check the maxWaitCount for TransferLeaderOperator. + op1.maxWaitCount = 2 + + ok, res, err := op1.Do(region, leaderPeer) + c.Assert(err, IsNil) + c.Assert(ok, IsFalse) + c.Assert(res.GetTransferLeader().GetPeer().GetStoreId(), Equals, uint64(4)) + c.Assert(op1.count, Equals, 1) + + ok, res, err = op1.Do(region, leaderPeer) + c.Assert(err, IsNil) + c.Assert(ok, IsFalse) + c.Assert(res, IsNil) + c.Assert(op1.count, Equals, 2) + + ok, res, err = op1.Do(region, leaderPeer) + c.Assert(err, NotNil) + c.Assert(ok, IsFalse) + c.Assert(res, IsNil) + c.Assert(op1.count, Equals, 2) + op2 := bop.ops[1].(*ChangePeerOperator) c.Assert(op2.changePeer.GetChangeType(), Equals, raftpb.ConfChangeType_AddNode) c.Assert(op2.changePeer.GetPeer().GetStoreId(), Equals, uint64(2)) diff --git a/server/operator.go b/server/operator.go index 0058e28eb52..6b593c3764f 100644 --- a/server/operator.go +++ b/server/operator.go @@ -21,6 +21,12 @@ import ( "github.com/pingcap/kvproto/pkg/raftpb" ) +const ( + // TODO: we can make this as a config flag. + // maxWaitCount is the heartbeat count when we check whether the operator is successful. + maxWaitCount = 3 +) + // Operator is the interface to do some operations. type Operator interface { // Do does the operator, if finished then return true. @@ -176,14 +182,19 @@ func (co *ChangePeerOperator) Do(region *metapb.Region, leader *metapb.Peer) (bo // TransferLeaderOperator is used to do leader transfer. type TransferLeaderOperator struct { + count int + maxWaitCount int + oldLeader *metapb.Peer newLeader *metapb.Peer } -func newTransferLeaderOperator(oldLeader, newLeader *metapb.Peer) *TransferLeaderOperator { +func newTransferLeaderOperator(oldLeader, newLeader *metapb.Peer, waitCount int) *TransferLeaderOperator { return &TransferLeaderOperator{ - oldLeader: oldLeader, - newLeader: newLeader, + oldLeader: oldLeader, + newLeader: newLeader, + count: 0, + maxWaitCount: waitCount, } } @@ -218,10 +229,21 @@ func (lto *TransferLeaderOperator) Do(region *metapb.Region, leader *metapb.Peer return true, nil, nil } + // If lto.count is greater than 0, then we should check whether it exceeds the lto.maxWaitCount. + if lto.count > 0 { + if lto.count >= lto.maxWaitCount { + return false, nil, errors.Errorf("transfer leader operator called %d times but still be unsucceessful - %v", lto.count, lto) + } + + lto.count++ + return false, nil, nil + } + res := &pdpb.RegionHeartbeatResponse{ TransferLeader: &pdpb.TransferLeader{ Peer: lto.newLeader, }, } + lto.count++ return false, res, nil }