Skip to content

Commit

Permalink
remove after add
Browse files Browse the repository at this point in the history
Signed-off-by: Connor1996 <zbk602423539@gmail.com>
  • Loading branch information
Connor1996 committed Nov 30, 2018
1 parent 51acd2e commit 61ed900
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 36 deletions.
92 changes: 59 additions & 33 deletions server/schedule/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package schedule

import (
"bytes"
"errors"
"fmt"
"reflect"
"sync/atomic"
Expand Down Expand Up @@ -492,77 +493,102 @@ func CreateMergeRegionOperator(desc string, cluster Cluster, source *core.Region

// matchPeerSteps returns the steps to match the location of peer stores of source region with target's.
func matchPeerSteps(cluster Cluster, source *core.RegionInfo, target *core.RegionInfo) ([]OperatorStep, OperatorKind, error) {
storeIDs := make(map[uint64]struct{})
var steps []OperatorStep
var kind OperatorKind

sourcePeers := source.GetPeers()
targetPeers := target.GetPeers()

for _, peer := range targetPeers {
storeIDs[peer.GetStoreId()] = struct{}{}
// make sure the peer count is same
if len(sourcePeers) != len(targetPeers) {
return nil, kind, errors.New("mismatch count of peer")
}

// there is a case that a follower is added and transfer leader to it,
// and the apply process of it is slow so leader regards it as voter
// but actually it is still learner. Once that, the follower can't be leader,
// but old leader can't know that so there is no leader to serve for a while.
var targetLeader uint64
// Add missing peers.
for id := range storeIDs {
if source.GetStorePeer(id) != nil {
continue
}
peer, err := cluster.AllocPeer(id)
if err != nil {
log.Debugf("peer alloc failed: %v", err)
return nil, kind, err
}
if cluster.IsRaftLearnerEnabled() {
steps = append(steps,
AddLearner{ToStore: id, PeerID: peer.Id},
PromoteLearner{ToStore: id, PeerID: peer.Id},
)
} else {
steps = append(steps, AddPeer{ToStore: id, PeerID: peer.Id})
}
if targetLeader == 0 {
targetLeader = id
}
kind |= OpRegion
}
var toAdds [][]OperatorStep

// Check whether to transfer leader or not
intersection := getIntersectionStores(sourcePeers, targetPeers)
for _, peer := range targetPeers {
storeID := peer.GetStoreId()
if _, found := intersection[storeID]; !found {
// Find missing peers.
var addSteps []OperatorStep

peer, err := cluster.AllocPeer(storeID)
if err != nil {
log.Debugf("peer alloc failed: %v", err)
return nil, kind, err
}
if cluster.IsRaftLearnerEnabled() {
addSteps = append(addSteps,
AddLearner{ToStore: storeID, PeerID: peer.Id},
PromoteLearner{ToStore: storeID, PeerID: peer.Id},
)
} else {
addSteps = append(addSteps, AddPeer{ToStore: storeID, PeerID: peer.Id})
}
toAdds = append(toAdds, addSteps)

if targetLeader == 0 {
targetLeader = storeID
}
kind |= OpRegion
}
}

leaderID := source.GetLeader().GetStoreId()
for _, storeID := range intersection {
for storeID := range intersection {
if storeID == leaderID {
targetLeader = 0
break
}
targetLeader = storeID
}
if targetLeader != 0 {
// target leader should be to the first added follower if there is no transection stores.

if len(intersection) != 0 && targetLeader != 0 {
// if intersection is not empty and leader doesn't belong to intersection, transfer leader to store in intersection
steps = append(steps, TransferLeader{FromStore: source.GetLeader().GetStoreId(), ToStore: targetLeader})
kind |= OpLeader
targetLeader = 0
}

// target leader should be the first added follower if there is no transection stores.
index := 0
// Remove redundant peers.
for _, peer := range sourcePeers {
if _, ok := storeIDs[peer.GetStoreId()]; ok {
if _, found := intersection[peer.GetStoreId()]; found {
continue
}

// the leader should be the last to remove
if targetLeader != 0 && peer.GetStoreId() == leaderID {
continue
}

steps = append(steps, toAdds[index]...)
steps = append(steps, RemovePeer{FromStore: peer.GetStoreId()})
kind |= OpRegion
index++
}

// remove leader
if targetLeader != 0 {
steps = append(steps, toAdds[index]...)
steps = append(steps, TransferLeader{FromStore: leaderID, ToStore: targetLeader})
steps = append(steps, RemovePeer{FromStore: leaderID})
kind |= OpLeader | OpRegion
}
return steps, kind, nil
}

// getIntersectionStores returns the stores included in two region's peers.
func getIntersectionStores(a []*metapb.Peer, b []*metapb.Peer) []uint64 {
set := make([]uint64, 0)
func getIntersectionStores(a []*metapb.Peer, b []*metapb.Peer) map[uint64]struct{} {
set := make(map[uint64]struct{})
hash := make(map[uint64]struct{})

for _, peer := range a {
Expand All @@ -571,7 +597,7 @@ func getIntersectionStores(a []*metapb.Peer, b []*metapb.Peer) []uint64 {

for _, peer := range b {
if _, found := hash[peer.GetStoreId()]; found {
set = append(set, peer.GetStoreId())
set[peer.GetStoreId()] = struct{}{}
}
}

Expand Down
6 changes: 3 additions & 3 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,9 +943,9 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) {
// partial store overlap not including leader
ops := s.mc.Check(s.regions[2])
s.checkSteps(c, ops[0], []schedule.OperatorStep{
schedule.TransferLeader{FromStore: 6, ToStore: 5},
schedule.AddLearner{ToStore: 4, PeerID: 1},
schedule.PromoteLearner{ToStore: 4, PeerID: 1},
schedule.TransferLeader{FromStore: 6, ToStore: 5},
schedule.RemovePeer{FromStore: 6},
schedule.MergeRegion{
FromRegion: s.regions[2].GetMeta(),
Expand Down Expand Up @@ -1018,14 +1018,14 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) {
s.checkSteps(c, ops[0], []schedule.OperatorStep{
schedule.AddLearner{ToStore: 1, PeerID: 3},
schedule.PromoteLearner{ToStore: 1, PeerID: 3},
schedule.RemovePeer{FromStore: 3},
schedule.AddLearner{ToStore: 4, PeerID: 4},
schedule.PromoteLearner{ToStore: 4, PeerID: 4},
schedule.RemovePeer{FromStore: 6},
schedule.AddLearner{ToStore: 5, PeerID: 5},
schedule.PromoteLearner{ToStore: 5, PeerID: 5},
schedule.TransferLeader{FromStore: 2, ToStore: 1},
schedule.RemovePeer{FromStore: 2},
schedule.RemovePeer{FromStore: 3},
schedule.RemovePeer{FromStore: 6},
schedule.MergeRegion{
FromRegion: s.regions[2].GetMeta(),
ToRegion: s.regions[1].GetMeta(),
Expand Down

0 comments on commit 61ed900

Please sign in to comment.