From f5322c4ef5cc40fbb56a09980dfde167fa659191 Mon Sep 17 00:00:00 2001 From: Connor Date: Thu, 6 Dec 2018 17:37:37 +0800 Subject: [PATCH] schedule: change merge match peers strategy (#1339) * change merge leader strategy --- server/schedule/operator.go | 111 +++++++++++++++++++++--------- server/schedulers/balance_test.go | 67 ++++++++++++++---- 2 files changed, 132 insertions(+), 46 deletions(-) diff --git a/server/schedule/operator.go b/server/schedule/operator.go index d7128cac6539..14cdf55a9064 100644 --- a/server/schedule/operator.go +++ b/server/schedule/operator.go @@ -15,6 +15,7 @@ package schedule import ( "bytes" + "errors" "fmt" "reflect" "sync/atomic" @@ -485,68 +486,110 @@ func CreateMergeRegionOperator(desc string, cluster Cluster, source *core.Region // matchPeerSteps returns the steps to match the location of peer stores of source region with target's. func matchPeerSteps(cluster Cluster, source *core.RegionInfo, target *core.RegionInfo) ([]OperatorStep, OperatorKind, error) { - storeIDs := make(map[uint64]struct{}) var steps []OperatorStep var kind OperatorKind sourcePeers := source.GetPeers() targetPeers := target.GetPeers() - for _, peer := range targetPeers { - storeIDs[peer.GetStoreId()] = struct{}{} + // make sure the peer count is same + if len(sourcePeers) != len(targetPeers) { + return nil, kind, errors.New("mismatch count of peer") } - // Add missing peers. - for id := range storeIDs { - if source.GetStorePeer(id) != nil { - continue - } - peer, err := cluster.AllocPeer(id) - if err != nil { - log.Debugf("peer alloc failed: %v", err) - return nil, kind, err - } - if cluster.IsRaftLearnerEnabled() { - steps = append(steps, - AddLearner{ToStore: id, PeerID: peer.Id}, - PromoteLearner{ToStore: id, PeerID: peer.Id}, - ) - } else { - steps = append(steps, AddPeer{ToStore: id, PeerID: peer.Id}) + // There is a case that a follower is added and transfer leader to it, + // and the apply process of it is slow so leader regards it as voter + // but actually it is still learner. Once that, the follower can't be leader, + // but old leader can't know that so there is no leader to serve for a while. + // So target leader should be the first added follower if there is no transection stores. + var targetLeader uint64 + var toAdds [][]OperatorStep + + // get overlapped part of the peers of two regions + intersection := getIntersectionStores(sourcePeers, targetPeers) + for _, peer := range targetPeers { + storeID := peer.GetStoreId() + // find missing peers. + if _, found := intersection[storeID]; !found { + var addSteps []OperatorStep + + peer, err := cluster.AllocPeer(storeID) + if err != nil { + log.Debugf("peer alloc failed: %v", err) + return nil, kind, err + } + if cluster.IsRaftLearnerEnabled() { + addSteps = append(addSteps, + AddLearner{ToStore: storeID, PeerID: peer.Id}, + PromoteLearner{ToStore: storeID, PeerID: peer.Id}, + ) + } else { + addSteps = append(addSteps, AddPeer{ToStore: storeID, PeerID: peer.Id}) + } + toAdds = append(toAdds, addSteps) + + // record the first added peer + if targetLeader == 0 { + targetLeader = storeID + } + kind |= OpRegion } - kind |= OpRegion } - // Check whether to transfer leader or not - intersection := getIntersectionStores(sourcePeers, targetPeers) leaderID := source.GetLeader().GetStoreId() - isFound := false - for _, storeID := range intersection { + for storeID := range intersection { + // if leader belongs to overlapped part, no need to transfer if storeID == leaderID { - isFound = true + targetLeader = 0 break } + targetLeader = storeID } - if !isFound { - steps = append(steps, TransferLeader{FromStore: source.GetLeader().GetStoreId(), ToStore: target.GetLeader().GetStoreId()}) + + // if intersection is not empty and leader doesn't belong to intersection, transfer leader to store in overlapped part + if len(intersection) != 0 && targetLeader != 0 { + steps = append(steps, TransferLeader{FromStore: source.GetLeader().GetStoreId(), ToStore: targetLeader}) kind |= OpLeader + targetLeader = 0 } - // Remove redundant peers. + index := 0 + // remove redundant peers. for _, peer := range sourcePeers { - if _, ok := storeIDs[peer.GetStoreId()]; ok { + if _, found := intersection[peer.GetStoreId()]; found { + continue + } + + // the leader should be the last to remove + if targetLeader != 0 && peer.GetStoreId() == leaderID { continue } + + steps = append(steps, toAdds[index]...) steps = append(steps, RemovePeer{FromStore: peer.GetStoreId()}) kind |= OpRegion + index++ + } + + // transfer leader before remove leader + if targetLeader != 0 { + steps = append(steps, toAdds[index]...) + steps = append(steps, TransferLeader{FromStore: leaderID, ToStore: targetLeader}) + steps = append(steps, RemovePeer{FromStore: leaderID}) + kind |= OpLeader | OpRegion + index++ + } + + if index != len(toAdds) { + return nil, kind, errors.New("wrong count of add steps") } return steps, kind, nil } // getIntersectionStores returns the stores included in two region's peers. -func getIntersectionStores(a []*metapb.Peer, b []*metapb.Peer) []uint64 { - set := make([]uint64, 0) +func getIntersectionStores(a []*metapb.Peer, b []*metapb.Peer) map[uint64]struct{} { + intersection := make(map[uint64]struct{}) hash := make(map[uint64]struct{}) for _, peer := range a { @@ -555,9 +598,9 @@ func getIntersectionStores(a []*metapb.Peer, b []*metapb.Peer) []uint64 { for _, peer := range b { if _, found := hash[peer.GetStoreId()]; found { - set = append(set, peer.GetStoreId()) + intersection[peer.GetStoreId()] = struct{}{} } } - return set + return intersection } diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index eb94f7db6097..97c65620c000 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -879,7 +879,7 @@ func (s *testMergeCheckerSuite) SetUpTest(c *C) { StartKey: []byte("t"), EndKey: []byte("x"), Peers: []*metapb.Peer{ - {Id: 106, StoreId: 1}, + {Id: 106, StoreId: 2}, {Id: 107, StoreId: 5}, {Id: 108, StoreId: 6}, }, @@ -945,11 +945,14 @@ func (s *testMergeCheckerSuite) checkSteps(c *C, op *schedule.Operator, steps [] func (s *testMergeCheckerSuite) TestMatchPeers(c *C) { // partial store overlap not including leader - op1, op2 := s.mc.Check(s.regions[2]) - s.checkSteps(c, op1, []schedule.OperatorStep{ - schedule.AddLearner{ToStore: 4, PeerID: 1}, - schedule.PromoteLearner{ToStore: 4, PeerID: 1}, - schedule.TransferLeader{FromStore: 6, ToStore: 4}, + ops := s.mc.Check(s.regions[2]) + s.checkSteps(c, ops[0], []schedule.OperatorStep{ + schedule.TransferLeader{FromStore: 6, ToStore: 5}, + schedule.AddLearner{ToStore: 1, PeerID: 1}, + schedule.PromoteLearner{ToStore: 1, PeerID: 1}, + schedule.RemovePeer{FromStore: 2}, + schedule.AddLearner{ToStore: 4, PeerID: 2}, + schedule.PromoteLearner{ToStore: 4, PeerID: 2}, schedule.RemovePeer{FromStore: 6}, schedule.MergeRegion{ FromRegion: s.regions[2].GetMeta(), @@ -966,13 +969,20 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) { }) // partial store overlap including leader - newRegion := s.regions[2].Clone(core.WithLeader(&metapb.Peer{Id: 106, StoreId: 1})) + newRegion := s.regions[2].Clone( + core.SetPeers([]*metapb.Peer{ + {Id: 106, StoreId: 1}, + {Id: 107, StoreId: 5}, + {Id: 108, StoreId: 6}, + }), + core.WithLeader(&metapb.Peer{Id: 106, StoreId: 1}), + ) s.regions[2] = newRegion s.cluster.PutRegion(s.regions[2]) - op1, op2 = s.mc.Check(s.regions[2]) - s.checkSteps(c, op1, []schedule.OperatorStep{ - schedule.AddLearner{ToStore: 4, PeerID: 2}, - schedule.PromoteLearner{ToStore: 4, PeerID: 2}, + ops = s.mc.Check(s.regions[2]) + s.checkSteps(c, ops[0], []schedule.OperatorStep{ + schedule.AddLearner{ToStore: 4, PeerID: 3}, + schedule.PromoteLearner{ToStore: 4, PeerID: 3}, schedule.RemovePeer{FromStore: 6}, schedule.MergeRegion{ FromRegion: s.regions[2].GetMeta(), @@ -988,7 +998,7 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) { }, }) - // all store overlap + // all stores overlap s.regions[2] = s.regions[2].Clone(core.SetPeers([]*metapb.Peer{ {Id: 106, StoreId: 1}, {Id: 107, StoreId: 5}, @@ -1010,6 +1020,39 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) { IsPassive: true, }, }) + + // all stores not overlap + s.regions[2] = s.regions[2].Clone(core.SetPeers([]*metapb.Peer{ + {Id: 109, StoreId: 2}, + {Id: 110, StoreId: 3}, + {Id: 111, StoreId: 6}, + }), core.WithLeader(&metapb.Peer{Id: 109, StoreId: 2})) + s.cluster.PutRegion(s.regions[2]) + ops = s.mc.Check(s.regions[2]) + s.checkSteps(c, ops[0], []schedule.OperatorStep{ + schedule.AddLearner{ToStore: 1, PeerID: 4}, + schedule.PromoteLearner{ToStore: 1, PeerID: 4}, + schedule.RemovePeer{FromStore: 3}, + schedule.AddLearner{ToStore: 4, PeerID: 5}, + schedule.PromoteLearner{ToStore: 4, PeerID: 5}, + schedule.RemovePeer{FromStore: 6}, + schedule.AddLearner{ToStore: 5, PeerID: 6}, + schedule.PromoteLearner{ToStore: 5, PeerID: 6}, + schedule.TransferLeader{FromStore: 2, ToStore: 1}, + schedule.RemovePeer{FromStore: 2}, + schedule.MergeRegion{ + FromRegion: s.regions[2].GetMeta(), + ToRegion: s.regions[1].GetMeta(), + IsPassive: false, + }, + }) + s.checkSteps(c, ops[1], []schedule.OperatorStep{ + schedule.MergeRegion{ + FromRegion: s.regions[2].GetMeta(), + ToRegion: s.regions[1].GetMeta(), + IsPassive: true, + }, + }) } var _ = Suite(&testBalanceHotWriteRegionSchedulerSuite{})