Skip to content

Commit

Permalink
schedule: change merge match peers strategy (tikv#1339)
Browse files Browse the repository at this point in the history
* change merge leader strategy
  • Loading branch information
Connor1996 committed Dec 11, 2018
1 parent 757e91d commit f5322c4
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 46 deletions.
111 changes: 77 additions & 34 deletions server/schedule/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package schedule

import (
"bytes"
"errors"
"fmt"
"reflect"
"sync/atomic"
Expand Down Expand Up @@ -485,68 +486,110 @@ func CreateMergeRegionOperator(desc string, cluster Cluster, source *core.Region

// matchPeerSteps returns the steps to match the location of peer stores of source region with target's.
func matchPeerSteps(cluster Cluster, source *core.RegionInfo, target *core.RegionInfo) ([]OperatorStep, OperatorKind, error) {
storeIDs := make(map[uint64]struct{})
var steps []OperatorStep
var kind OperatorKind

sourcePeers := source.GetPeers()
targetPeers := target.GetPeers()

for _, peer := range targetPeers {
storeIDs[peer.GetStoreId()] = struct{}{}
// make sure the peer count is same
if len(sourcePeers) != len(targetPeers) {
return nil, kind, errors.New("mismatch count of peer")
}

// Add missing peers.
for id := range storeIDs {
if source.GetStorePeer(id) != nil {
continue
}
peer, err := cluster.AllocPeer(id)
if err != nil {
log.Debugf("peer alloc failed: %v", err)
return nil, kind, err
}
if cluster.IsRaftLearnerEnabled() {
steps = append(steps,
AddLearner{ToStore: id, PeerID: peer.Id},
PromoteLearner{ToStore: id, PeerID: peer.Id},
)
} else {
steps = append(steps, AddPeer{ToStore: id, PeerID: peer.Id})
// There is a case that a follower is added and transfer leader to it,
// and the apply process of it is slow so leader regards it as voter
// but actually it is still learner. Once that, the follower can't be leader,
// but old leader can't know that so there is no leader to serve for a while.
// So target leader should be the first added follower if there is no transection stores.
var targetLeader uint64
var toAdds [][]OperatorStep

// get overlapped part of the peers of two regions
intersection := getIntersectionStores(sourcePeers, targetPeers)
for _, peer := range targetPeers {
storeID := peer.GetStoreId()
// find missing peers.
if _, found := intersection[storeID]; !found {
var addSteps []OperatorStep

peer, err := cluster.AllocPeer(storeID)
if err != nil {
log.Debugf("peer alloc failed: %v", err)
return nil, kind, err
}
if cluster.IsRaftLearnerEnabled() {
addSteps = append(addSteps,
AddLearner{ToStore: storeID, PeerID: peer.Id},
PromoteLearner{ToStore: storeID, PeerID: peer.Id},
)
} else {
addSteps = append(addSteps, AddPeer{ToStore: storeID, PeerID: peer.Id})
}
toAdds = append(toAdds, addSteps)

// record the first added peer
if targetLeader == 0 {
targetLeader = storeID
}
kind |= OpRegion
}
kind |= OpRegion
}

// Check whether to transfer leader or not
intersection := getIntersectionStores(sourcePeers, targetPeers)
leaderID := source.GetLeader().GetStoreId()
isFound := false
for _, storeID := range intersection {
for storeID := range intersection {
// if leader belongs to overlapped part, no need to transfer
if storeID == leaderID {
isFound = true
targetLeader = 0
break
}
targetLeader = storeID
}
if !isFound {
steps = append(steps, TransferLeader{FromStore: source.GetLeader().GetStoreId(), ToStore: target.GetLeader().GetStoreId()})

// if intersection is not empty and leader doesn't belong to intersection, transfer leader to store in overlapped part
if len(intersection) != 0 && targetLeader != 0 {
steps = append(steps, TransferLeader{FromStore: source.GetLeader().GetStoreId(), ToStore: targetLeader})
kind |= OpLeader
targetLeader = 0
}

// Remove redundant peers.
index := 0
// remove redundant peers.
for _, peer := range sourcePeers {
if _, ok := storeIDs[peer.GetStoreId()]; ok {
if _, found := intersection[peer.GetStoreId()]; found {
continue
}

// the leader should be the last to remove
if targetLeader != 0 && peer.GetStoreId() == leaderID {
continue
}

steps = append(steps, toAdds[index]...)
steps = append(steps, RemovePeer{FromStore: peer.GetStoreId()})
kind |= OpRegion
index++
}

// transfer leader before remove leader
if targetLeader != 0 {
steps = append(steps, toAdds[index]...)
steps = append(steps, TransferLeader{FromStore: leaderID, ToStore: targetLeader})
steps = append(steps, RemovePeer{FromStore: leaderID})
kind |= OpLeader | OpRegion
index++
}

if index != len(toAdds) {
return nil, kind, errors.New("wrong count of add steps")
}

return steps, kind, nil
}

// getIntersectionStores returns the stores included in two region's peers.
func getIntersectionStores(a []*metapb.Peer, b []*metapb.Peer) []uint64 {
set := make([]uint64, 0)
func getIntersectionStores(a []*metapb.Peer, b []*metapb.Peer) map[uint64]struct{} {
intersection := make(map[uint64]struct{})
hash := make(map[uint64]struct{})

for _, peer := range a {
Expand All @@ -555,9 +598,9 @@ func getIntersectionStores(a []*metapb.Peer, b []*metapb.Peer) []uint64 {

for _, peer := range b {
if _, found := hash[peer.GetStoreId()]; found {
set = append(set, peer.GetStoreId())
intersection[peer.GetStoreId()] = struct{}{}
}
}

return set
return intersection
}
67 changes: 55 additions & 12 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ func (s *testMergeCheckerSuite) SetUpTest(c *C) {
StartKey: []byte("t"),
EndKey: []byte("x"),
Peers: []*metapb.Peer{
{Id: 106, StoreId: 1},
{Id: 106, StoreId: 2},
{Id: 107, StoreId: 5},
{Id: 108, StoreId: 6},
},
Expand Down Expand Up @@ -945,11 +945,14 @@ func (s *testMergeCheckerSuite) checkSteps(c *C, op *schedule.Operator, steps []

func (s *testMergeCheckerSuite) TestMatchPeers(c *C) {
// partial store overlap not including leader
op1, op2 := s.mc.Check(s.regions[2])
s.checkSteps(c, op1, []schedule.OperatorStep{
schedule.AddLearner{ToStore: 4, PeerID: 1},
schedule.PromoteLearner{ToStore: 4, PeerID: 1},
schedule.TransferLeader{FromStore: 6, ToStore: 4},
ops := s.mc.Check(s.regions[2])
s.checkSteps(c, ops[0], []schedule.OperatorStep{
schedule.TransferLeader{FromStore: 6, ToStore: 5},
schedule.AddLearner{ToStore: 1, PeerID: 1},
schedule.PromoteLearner{ToStore: 1, PeerID: 1},
schedule.RemovePeer{FromStore: 2},
schedule.AddLearner{ToStore: 4, PeerID: 2},
schedule.PromoteLearner{ToStore: 4, PeerID: 2},
schedule.RemovePeer{FromStore: 6},
schedule.MergeRegion{
FromRegion: s.regions[2].GetMeta(),
Expand All @@ -966,13 +969,20 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) {
})

// partial store overlap including leader
newRegion := s.regions[2].Clone(core.WithLeader(&metapb.Peer{Id: 106, StoreId: 1}))
newRegion := s.regions[2].Clone(
core.SetPeers([]*metapb.Peer{
{Id: 106, StoreId: 1},
{Id: 107, StoreId: 5},
{Id: 108, StoreId: 6},
}),
core.WithLeader(&metapb.Peer{Id: 106, StoreId: 1}),
)
s.regions[2] = newRegion
s.cluster.PutRegion(s.regions[2])
op1, op2 = s.mc.Check(s.regions[2])
s.checkSteps(c, op1, []schedule.OperatorStep{
schedule.AddLearner{ToStore: 4, PeerID: 2},
schedule.PromoteLearner{ToStore: 4, PeerID: 2},
ops = s.mc.Check(s.regions[2])
s.checkSteps(c, ops[0], []schedule.OperatorStep{
schedule.AddLearner{ToStore: 4, PeerID: 3},
schedule.PromoteLearner{ToStore: 4, PeerID: 3},
schedule.RemovePeer{FromStore: 6},
schedule.MergeRegion{
FromRegion: s.regions[2].GetMeta(),
Expand All @@ -988,7 +998,7 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) {
},
})

// all store overlap
// all stores overlap
s.regions[2] = s.regions[2].Clone(core.SetPeers([]*metapb.Peer{
{Id: 106, StoreId: 1},
{Id: 107, StoreId: 5},
Expand All @@ -1010,6 +1020,39 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) {
IsPassive: true,
},
})

// all stores not overlap
s.regions[2] = s.regions[2].Clone(core.SetPeers([]*metapb.Peer{
{Id: 109, StoreId: 2},
{Id: 110, StoreId: 3},
{Id: 111, StoreId: 6},
}), core.WithLeader(&metapb.Peer{Id: 109, StoreId: 2}))
s.cluster.PutRegion(s.regions[2])
ops = s.mc.Check(s.regions[2])
s.checkSteps(c, ops[0], []schedule.OperatorStep{
schedule.AddLearner{ToStore: 1, PeerID: 4},
schedule.PromoteLearner{ToStore: 1, PeerID: 4},
schedule.RemovePeer{FromStore: 3},
schedule.AddLearner{ToStore: 4, PeerID: 5},
schedule.PromoteLearner{ToStore: 4, PeerID: 5},
schedule.RemovePeer{FromStore: 6},
schedule.AddLearner{ToStore: 5, PeerID: 6},
schedule.PromoteLearner{ToStore: 5, PeerID: 6},
schedule.TransferLeader{FromStore: 2, ToStore: 1},
schedule.RemovePeer{FromStore: 2},
schedule.MergeRegion{
FromRegion: s.regions[2].GetMeta(),
ToRegion: s.regions[1].GetMeta(),
IsPassive: false,
},
})
s.checkSteps(c, ops[1], []schedule.OperatorStep{
schedule.MergeRegion{
FromRegion: s.regions[2].GetMeta(),
ToRegion: s.regions[1].GetMeta(),
IsPassive: true,
},
})
}

var _ = Suite(&testBalanceHotWriteRegionSchedulerSuite{})
Expand Down

0 comments on commit f5322c4

Please sign in to comment.