From 3132e467666eefe357ac9436ba480642fea8d337 Mon Sep 17 00:00:00 2001 From: qiuyesuifeng Date: Sat, 11 Jun 2016 12:48:13 +0800 Subject: [PATCH 1/3] server: make transfer leader operator must be successful. --- server/balancer_worker_test.go | 10 ++++++++++ server/operator.go | 8 ++++++++ 2 files changed, 18 insertions(+) diff --git a/server/balancer_worker_test.go b/server/balancer_worker_test.go index 73e0fdc9e6a..1b474a7bdc2 100644 --- a/server/balancer_worker_test.go +++ b/server/balancer_worker_test.go @@ -75,6 +75,16 @@ func (s *testBalancerWorkerSuite) TestBalancerWorker(c *C) { c.Assert(op1.oldLeader.GetStoreId(), Equals, uint64(1)) c.Assert(op1.newLeader.GetStoreId(), Equals, uint64(4)) + ok, res, err := op1.Do(region, leaderPeer) + c.Assert(err, IsNil) + c.Assert(ok, IsFalse) + c.Assert(res.GetTransferLeader().GetPeer().GetStoreId(), Equals, uint64(4)) + + ok, res, err = op1.Do(region, leaderPeer) + c.Assert(err, NotNil) + c.Assert(ok, IsFalse) + c.Assert(res, IsNil) + op2 := bop.ops[1].(*ChangePeerOperator) c.Assert(op2.changePeer.GetChangeType(), Equals, raftpb.ConfChangeType_AddNode) c.Assert(op2.changePeer.GetPeer().GetStoreId(), Equals, uint64(2)) diff --git a/server/operator.go b/server/operator.go index 0058e28eb52..ccd57f3d60e 100644 --- a/server/operator.go +++ b/server/operator.go @@ -176,6 +176,8 @@ func (co *ChangePeerOperator) Do(region *metapb.Region, leader *metapb.Peer) (bo // TransferLeaderOperator is used to do leader transfer. type TransferLeaderOperator struct { + mustSuc bool + oldLeader *metapb.Peer newLeader *metapb.Peer } @@ -218,10 +220,16 @@ func (lto *TransferLeaderOperator) Do(region *metapb.Region, leader *metapb.Peer return true, nil, nil } + // If lto.mustSuc is true, then lto.check should always be true. + if lto.mustSuc { + return false, nil, errors.Errorf("transfer leader operator called twice - %v", lto) + } + res := &pdpb.RegionHeartbeatResponse{ TransferLeader: &pdpb.TransferLeader{ Peer: lto.newLeader, }, } + lto.mustSuc = true return false, res, nil } From 8ac683ffba9923fbfea1acaac5fa7448ed75caa8 Mon Sep 17 00:00:00 2001 From: qiuyesuifeng Date: Sat, 11 Jun 2016 21:16:18 +0800 Subject: [PATCH 2/3] server: address comment. --- server/operator.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/operator.go b/server/operator.go index ccd57f3d60e..816c6e8ad27 100644 --- a/server/operator.go +++ b/server/operator.go @@ -176,7 +176,7 @@ func (co *ChangePeerOperator) Do(region *metapb.Region, leader *metapb.Peer) (bo // TransferLeaderOperator is used to do leader transfer. type TransferLeaderOperator struct { - mustSuc bool + mustSuccess bool oldLeader *metapb.Peer newLeader *metapb.Peer @@ -220,8 +220,8 @@ func (lto *TransferLeaderOperator) Do(region *metapb.Region, leader *metapb.Peer return true, nil, nil } - // If lto.mustSuc is true, then lto.check should always be true. - if lto.mustSuc { + // If lto.mustSuccess is true, then lto.check should always be true. + if lto.mustSuccess { return false, nil, errors.Errorf("transfer leader operator called twice - %v", lto) } @@ -230,6 +230,6 @@ func (lto *TransferLeaderOperator) Do(region *metapb.Region, leader *metapb.Peer Peer: lto.newLeader, }, } - lto.mustSuc = true + lto.mustSuccess = true return false, res, nil } From 20a353d5e93b8366ec7ef094a75f508f04d528df Mon Sep 17 00:00:00 2001 From: qiuyesuifeng Date: Sun, 12 Jun 2016 11:09:23 +0800 Subject: [PATCH 3/3] server: use max wait count for transfer leader operator. --- server/balancer.go | 2 +- server/balancer_worker_test.go | 12 ++++++++++++ server/operator.go | 30 ++++++++++++++++++++++-------- 3 files changed, 35 insertions(+), 9 deletions(-) diff --git a/server/balancer.go b/server/balancer.go index c5e50cd8cda..d3642bf3670 100644 --- a/server/balancer.go +++ b/server/balancer.go @@ -214,7 +214,7 @@ func (cb *capacityBalancer) Balance(cluster *ClusterInfo) (*BalanceOperator, err return nil, nil } - leaderTransferOperator := newTransferLeaderOperator(oldLeader, newLeader) + leaderTransferOperator := newTransferLeaderOperator(oldLeader, newLeader, maxWaitCount) addPeerOperator := newAddPeerOperator(newPeer) removePeerOperator := newRemovePeerOperator(oldLeader) diff --git a/server/balancer_worker_test.go b/server/balancer_worker_test.go index 1b474a7bdc2..65ac87bb626 100644 --- a/server/balancer_worker_test.go +++ b/server/balancer_worker_test.go @@ -72,18 +72,30 @@ func (s *testBalancerWorkerSuite) TestBalancerWorker(c *C) { c.Assert(bop.ops, HasLen, 3) op1 := bop.ops[0].(*TransferLeaderOperator) + c.Assert(op1.maxWaitCount, Equals, maxWaitCount) c.Assert(op1.oldLeader.GetStoreId(), Equals, uint64(1)) c.Assert(op1.newLeader.GetStoreId(), Equals, uint64(4)) + // Now we check the maxWaitCount for TransferLeaderOperator. + op1.maxWaitCount = 2 + ok, res, err := op1.Do(region, leaderPeer) c.Assert(err, IsNil) c.Assert(ok, IsFalse) c.Assert(res.GetTransferLeader().GetPeer().GetStoreId(), Equals, uint64(4)) + c.Assert(op1.count, Equals, 1) + + ok, res, err = op1.Do(region, leaderPeer) + c.Assert(err, IsNil) + c.Assert(ok, IsFalse) + c.Assert(res, IsNil) + c.Assert(op1.count, Equals, 2) ok, res, err = op1.Do(region, leaderPeer) c.Assert(err, NotNil) c.Assert(ok, IsFalse) c.Assert(res, IsNil) + c.Assert(op1.count, Equals, 2) op2 := bop.ops[1].(*ChangePeerOperator) c.Assert(op2.changePeer.GetChangeType(), Equals, raftpb.ConfChangeType_AddNode) diff --git a/server/operator.go b/server/operator.go index 816c6e8ad27..6b593c3764f 100644 --- a/server/operator.go +++ b/server/operator.go @@ -21,6 +21,12 @@ import ( "github.com/pingcap/kvproto/pkg/raftpb" ) +const ( + // TODO: we can make this as a config flag. + // maxWaitCount is the heartbeat count when we check whether the operator is successful. + maxWaitCount = 3 +) + // Operator is the interface to do some operations. type Operator interface { // Do does the operator, if finished then return true. @@ -176,16 +182,19 @@ func (co *ChangePeerOperator) Do(region *metapb.Region, leader *metapb.Peer) (bo // TransferLeaderOperator is used to do leader transfer. type TransferLeaderOperator struct { - mustSuccess bool + count int + maxWaitCount int oldLeader *metapb.Peer newLeader *metapb.Peer } -func newTransferLeaderOperator(oldLeader, newLeader *metapb.Peer) *TransferLeaderOperator { +func newTransferLeaderOperator(oldLeader, newLeader *metapb.Peer, waitCount int) *TransferLeaderOperator { return &TransferLeaderOperator{ - oldLeader: oldLeader, - newLeader: newLeader, + oldLeader: oldLeader, + newLeader: newLeader, + count: 0, + maxWaitCount: waitCount, } } @@ -220,9 +229,14 @@ func (lto *TransferLeaderOperator) Do(region *metapb.Region, leader *metapb.Peer return true, nil, nil } - // If lto.mustSuccess is true, then lto.check should always be true. - if lto.mustSuccess { - return false, nil, errors.Errorf("transfer leader operator called twice - %v", lto) + // If lto.count is greater than 0, then we should check whether it exceeds the lto.maxWaitCount. + if lto.count > 0 { + if lto.count >= lto.maxWaitCount { + return false, nil, errors.Errorf("transfer leader operator called %d times but still be unsucceessful - %v", lto.count, lto) + } + + lto.count++ + return false, nil, nil } res := &pdpb.RegionHeartbeatResponse{ @@ -230,6 +244,6 @@ func (lto *TransferLeaderOperator) Do(region *metapb.Region, leader *metapb.Peer Peer: lto.newLeader, }, } - lto.mustSuccess = true + lto.count++ return false, res, nil }