Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add max wait count for transfer leader operator. #147

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (cb *capacityBalancer) Balance(cluster *ClusterInfo) (*BalanceOperator, err
return nil, nil
}

leaderTransferOperator := newTransferLeaderOperator(oldLeader, newLeader)
leaderTransferOperator := newTransferLeaderOperator(oldLeader, newLeader, maxWaitCount)
addPeerOperator := newAddPeerOperator(newPeer)
removePeerOperator := newRemovePeerOperator(oldLeader)

Expand Down
12 changes: 12 additions & 0 deletions server/balancer_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,30 @@ func (s *testBalancerWorkerSuite) TestBalancerWorker(c *C) {
c.Assert(bop.ops, HasLen, 3)

op1 := bop.ops[0].(*TransferLeaderOperator)
c.Assert(op1.maxWaitCount, Equals, maxWaitCount)
c.Assert(op1.oldLeader.GetStoreId(), Equals, uint64(1))
c.Assert(op1.newLeader.GetStoreId(), Equals, uint64(4))

// Now we check the maxWaitCount for TransferLeaderOperator.
op1.maxWaitCount = 2

ok, res, err := op1.Do(region, leaderPeer)
c.Assert(err, IsNil)
c.Assert(ok, IsFalse)
c.Assert(res.GetTransferLeader().GetPeer().GetStoreId(), Equals, uint64(4))
c.Assert(op1.count, Equals, 1)

ok, res, err = op1.Do(region, leaderPeer)
c.Assert(err, IsNil)
c.Assert(ok, IsFalse)
c.Assert(res, IsNil)
c.Assert(op1.count, Equals, 2)

ok, res, err = op1.Do(region, leaderPeer)
c.Assert(err, NotNil)
c.Assert(ok, IsFalse)
c.Assert(res, IsNil)
c.Assert(op1.count, Equals, 2)

op2 := bop.ops[1].(*ChangePeerOperator)
c.Assert(op2.changePeer.GetChangeType(), Equals, raftpb.ConfChangeType_AddNode)
Expand Down
30 changes: 22 additions & 8 deletions server/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ import (
"github.com/pingcap/kvproto/pkg/raftpb"
)

const (
// TODO: we can make this as a config flag.
// maxWaitCount is the heartbeat count when we check whether the operator is successful.
maxWaitCount = 3
)

// Operator is the interface to do some operations.
type Operator interface {
// Do does the operator, if finished then return true.
Expand Down Expand Up @@ -176,16 +182,19 @@ func (co *ChangePeerOperator) Do(region *metapb.Region, leader *metapb.Peer) (bo

// TransferLeaderOperator is used to do leader transfer.
type TransferLeaderOperator struct {
mustSuccess bool
count int
maxWaitCount int

oldLeader *metapb.Peer
newLeader *metapb.Peer
}

func newTransferLeaderOperator(oldLeader, newLeader *metapb.Peer) *TransferLeaderOperator {
func newTransferLeaderOperator(oldLeader, newLeader *metapb.Peer, waitCount int) *TransferLeaderOperator {
return &TransferLeaderOperator{
oldLeader: oldLeader,
newLeader: newLeader,
oldLeader: oldLeader,
newLeader: newLeader,
count: 0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value of count is already zero.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is a variable like index, we should update it in Do function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

count and maxWaitCount confuse me.

you can initialize count as waitCount and then use lto.count < 0 lt.count-- to check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems not, we must store the origin maxWaitCount to make sure the first time we return the transfer leader response.
If we only use count, then we cannot distinct it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

maxWaitCount: waitCount,
}
}

Expand Down Expand Up @@ -220,16 +229,21 @@ func (lto *TransferLeaderOperator) Do(region *metapb.Region, leader *metapb.Peer
return true, nil, nil
}

// If lto.mustSuccess is true, then lto.check should always be true.
if lto.mustSuccess {
return false, nil, errors.Errorf("transfer leader operator called twice - %v", lto)
// If lto.count is greater than 0, then we should check whether it exceeds the lto.maxWaitCount.
if lto.count > 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If some one set maxWaitCount to zero, you will get infinite loop. So it is better to add a check in newTransferLeaderOperator or here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, later we will move it as config flag.
Then we can check it.
Now we have set it directly, so it will never be zero.

if lto.count >= lto.maxWaitCount {
return false, nil, errors.Errorf("transfer leader operator called %d times but still be unsucceessful - %v", lto.count, lto)
}

lto.count++
return false, nil, nil
}

res := &pdpb.RegionHeartbeatResponse{
TransferLeader: &pdpb.TransferLeader{
Peer: lto.newLeader,
},
}
lto.mustSuccess = true
lto.count++
return false, res, nil
}