Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedule: change merge match peers strategy #1339

Merged
merged 7 commits into from
Dec 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 77 additions & 34 deletions server/schedule/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package schedule

import (
"bytes"
"errors"
"fmt"
"reflect"
"sync/atomic"
Expand Down Expand Up @@ -492,68 +493,110 @@ func CreateMergeRegionOperator(desc string, cluster Cluster, source *core.Region

// matchPeerSteps returns the steps to match the location of peer stores of source region with target's.
func matchPeerSteps(cluster Cluster, source *core.RegionInfo, target *core.RegionInfo) ([]OperatorStep, OperatorKind, error) {
storeIDs := make(map[uint64]struct{})
var steps []OperatorStep
var kind OperatorKind

sourcePeers := source.GetPeers()
targetPeers := target.GetPeers()

for _, peer := range targetPeers {
storeIDs[peer.GetStoreId()] = struct{}{}
// make sure the peer count is same
if len(sourcePeers) != len(targetPeers) {
return nil, kind, errors.New("mismatch count of peer")
}

// Add missing peers.
for id := range storeIDs {
if source.GetStorePeer(id) != nil {
continue
}
peer, err := cluster.AllocPeer(id)
if err != nil {
log.Debugf("peer alloc failed: %v", err)
return nil, kind, err
}
if cluster.IsRaftLearnerEnabled() {
steps = append(steps,
AddLearner{ToStore: id, PeerID: peer.Id},
PromoteLearner{ToStore: id, PeerID: peer.Id},
)
} else {
steps = append(steps, AddPeer{ToStore: id, PeerID: peer.Id})
// There is a case that a follower is added and transfer leader to it,
// and the apply process of it is slow so leader regards it as voter
// but actually it is still learner. Once that, the follower can't be leader,
// but old leader can't know that so there is no leader to serve for a while.
// So target leader should be the first added follower if there is no transection stores.
var targetLeader uint64
var toAdds [][]OperatorStep

// get overlapped part of the peers of two regions
intersection := getIntersectionStores(sourcePeers, targetPeers)
for _, peer := range targetPeers {
storeID := peer.GetStoreId()
// find missing peers.
if _, found := intersection[storeID]; !found {
var addSteps []OperatorStep

peer, err := cluster.AllocPeer(storeID)
if err != nil {
log.Debugf("peer alloc failed: %v", err)
return nil, kind, err
}
if cluster.IsRaftLearnerEnabled() {
addSteps = append(addSteps,
AddLearner{ToStore: storeID, PeerID: peer.Id},
PromoteLearner{ToStore: storeID, PeerID: peer.Id},
)
} else {
addSteps = append(addSteps, AddPeer{ToStore: storeID, PeerID: peer.Id})
}
toAdds = append(toAdds, addSteps)

// record the first added peer
if targetLeader == 0 {
targetLeader = storeID
}
kind |= OpRegion
}
kind |= OpRegion
}

// Check whether to transfer leader or not
intersection := getIntersectionStores(sourcePeers, targetPeers)
leaderID := source.GetLeader().GetStoreId()
isFound := false
for _, storeID := range intersection {
for storeID := range intersection {
// if leader belongs to overlapped part, no need to transfer
if storeID == leaderID {
isFound = true
targetLeader = 0
break
}
targetLeader = storeID
disksing marked this conversation as resolved.
Show resolved Hide resolved
}
if !isFound {
steps = append(steps, TransferLeader{FromStore: source.GetLeader().GetStoreId(), ToStore: target.GetLeader().GetStoreId()})

// if intersection is not empty and leader doesn't belong to intersection, transfer leader to store in overlapped part
if len(intersection) != 0 && targetLeader != 0 {
steps = append(steps, TransferLeader{FromStore: source.GetLeader().GetStoreId(), ToStore: targetLeader})
kind |= OpLeader
targetLeader = 0
}

// Remove redundant peers.
index := 0
// remove redundant peers.
for _, peer := range sourcePeers {
if _, ok := storeIDs[peer.GetStoreId()]; ok {
if _, found := intersection[peer.GetStoreId()]; found {
continue
}

// the leader should be the last to remove
if targetLeader != 0 && peer.GetStoreId() == leaderID {
continue
}

steps = append(steps, toAdds[index]...)
steps = append(steps, RemovePeer{FromStore: peer.GetStoreId()})
kind |= OpRegion
index++
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check if index == len(toAdds) after this loop.

}

// transfer leader before remove leader
if targetLeader != 0 {
steps = append(steps, toAdds[index]...)
steps = append(steps, TransferLeader{FromStore: leaderID, ToStore: targetLeader})
steps = append(steps, RemovePeer{FromStore: leaderID})
kind |= OpLeader | OpRegion
index++
}

if index != len(toAdds) {
return nil, kind, errors.New("wrong count of add steps")
}

return steps, kind, nil
}

// getIntersectionStores returns the stores included in two region's peers.
func getIntersectionStores(a []*metapb.Peer, b []*metapb.Peer) []uint64 {
set := make([]uint64, 0)
func getIntersectionStores(a []*metapb.Peer, b []*metapb.Peer) map[uint64]struct{} {
intersection := make(map[uint64]struct{})
hash := make(map[uint64]struct{})

for _, peer := range a {
Expand All @@ -562,9 +605,9 @@ func getIntersectionStores(a []*metapb.Peer, b []*metapb.Peer) []uint64 {

for _, peer := range b {
if _, found := hash[peer.GetStoreId()]; found {
set = append(set, peer.GetStoreId())
intersection[peer.GetStoreId()] = struct{}{}
}
}

return set
return intersection
}
59 changes: 51 additions & 8 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@ func (s *testMergeCheckerSuite) SetUpTest(c *C) {
StartKey: []byte("t"),
EndKey: []byte("x"),
Peers: []*metapb.Peer{
{Id: 106, StoreId: 1},
{Id: 106, StoreId: 2},
{Id: 107, StoreId: 5},
{Id: 108, StoreId: 6},
},
Expand Down Expand Up @@ -951,9 +951,12 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) {
// partial store overlap not including leader
ops := s.mc.Check(s.regions[2])
s.checkSteps(c, ops[0], []schedule.OperatorStep{
schedule.AddLearner{ToStore: 4, PeerID: 1},
schedule.PromoteLearner{ToStore: 4, PeerID: 1},
schedule.TransferLeader{FromStore: 6, ToStore: 4},
schedule.TransferLeader{FromStore: 6, ToStore: 5},
schedule.AddLearner{ToStore: 1, PeerID: 1},
schedule.PromoteLearner{ToStore: 1, PeerID: 1},
schedule.RemovePeer{FromStore: 2},
schedule.AddLearner{ToStore: 4, PeerID: 2},
schedule.PromoteLearner{ToStore: 4, PeerID: 2},
schedule.RemovePeer{FromStore: 6},
schedule.MergeRegion{
FromRegion: s.regions[2].GetMeta(),
Expand All @@ -970,13 +973,20 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) {
})

// partial store overlap including leader
newRegion := s.regions[2].Clone(core.WithLeader(&metapb.Peer{Id: 106, StoreId: 1}))
newRegion := s.regions[2].Clone(
core.SetPeers([]*metapb.Peer{
{Id: 106, StoreId: 1},
{Id: 107, StoreId: 5},
{Id: 108, StoreId: 6},
}),
core.WithLeader(&metapb.Peer{Id: 106, StoreId: 1}),
)
s.regions[2] = newRegion
s.cluster.PutRegion(s.regions[2])
ops = s.mc.Check(s.regions[2])
s.checkSteps(c, ops[0], []schedule.OperatorStep{
schedule.AddLearner{ToStore: 4, PeerID: 2},
schedule.PromoteLearner{ToStore: 4, PeerID: 2},
schedule.AddLearner{ToStore: 4, PeerID: 3},
schedule.PromoteLearner{ToStore: 4, PeerID: 3},
schedule.RemovePeer{FromStore: 6},
schedule.MergeRegion{
FromRegion: s.regions[2].GetMeta(),
Expand All @@ -992,7 +1002,7 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) {
},
})

// all store overlap
// all stores overlap
s.regions[2] = s.regions[2].Clone(core.SetPeers([]*metapb.Peer{
{Id: 106, StoreId: 1},
{Id: 107, StoreId: 5},
Expand All @@ -1014,6 +1024,39 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) {
IsPassive: true,
},
})

// all stores not overlap
s.regions[2] = s.regions[2].Clone(core.SetPeers([]*metapb.Peer{
{Id: 109, StoreId: 2},
{Id: 110, StoreId: 3},
{Id: 111, StoreId: 6},
}), core.WithLeader(&metapb.Peer{Id: 109, StoreId: 2}))
s.cluster.PutRegion(s.regions[2])
ops = s.mc.Check(s.regions[2])
s.checkSteps(c, ops[0], []schedule.OperatorStep{
schedule.AddLearner{ToStore: 1, PeerID: 4},
schedule.PromoteLearner{ToStore: 1, PeerID: 4},
schedule.RemovePeer{FromStore: 3},
schedule.AddLearner{ToStore: 4, PeerID: 5},
schedule.PromoteLearner{ToStore: 4, PeerID: 5},
schedule.RemovePeer{FromStore: 6},
schedule.AddLearner{ToStore: 5, PeerID: 6},
schedule.PromoteLearner{ToStore: 5, PeerID: 6},
schedule.TransferLeader{FromStore: 2, ToStore: 1},
schedule.RemovePeer{FromStore: 2},
schedule.MergeRegion{
FromRegion: s.regions[2].GetMeta(),
ToRegion: s.regions[1].GetMeta(),
IsPassive: false,
},
})
s.checkSteps(c, ops[1], []schedule.OperatorStep{
schedule.MergeRegion{
FromRegion: s.regions[2].GetMeta(),
ToRegion: s.regions[1].GetMeta(),
IsPassive: true,
},
})
}

var _ = Suite(&testBalanceHotWriteRegionSchedulerSuite{})
Expand Down