Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

region_scatter: fix a bug that the leaders may be unbalanced after scatter region #6054

Merged
merged 4 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 38 additions & 18 deletions server/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
engineFilter := filter.NewEngineFilter(r.name, filter.NotSpecialEngines)
ordinaryPeers := make(map[uint64]*metapb.Peer, len(region.GetPeers()))
specialPeers := make(map[string]map[uint64]*metapb.Peer)
oldFit := r.cluster.GetRuleManager().FitRegion(r.cluster, region)
// Group peers by the engine of their stores
for _, peer := range region.GetPeers() {
store := r.cluster.GetStore(peer.GetStoreId())
Expand All @@ -322,15 +323,19 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
}

targetPeers := make(map[uint64]*metapb.Peer, len(region.GetPeers())) // StoreID -> Peer
selectedStores := make(map[uint64]struct{}, len(region.GetPeers())) // StoreID set
selectedStores := make(map[uint64]struct{}, len(region.GetPeers())) // selected StoreID set
leaderCandidateStores := make([]uint64, 0, len(region.GetPeers())) // StoreID allowed to become Leader
scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) { // peers: StoreID -> Peer
for _, peer := range peers {
if _, ok := selectedStores[peer.GetStoreId()]; ok {
if allowLeader(oldFit, peer) {
HunDunDM marked this conversation as resolved.
Show resolved Hide resolved
leaderCandidateStores = append(leaderCandidateStores, peer.GetStoreId())
}
// It is both sourcePeer and targetPeer itself, no need to select.
continue
}
for {
candidates := r.selectCandidates(region, peer.GetStoreId(), selectedStores, context)
candidates := r.selectCandidates(region, oldFit, peer.GetStoreId(), selectedStores, context)
newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context)
targetPeers[newPeer.GetStoreId()] = newPeer
selectedStores[newPeer.GetStoreId()] = struct{}{}
Expand All @@ -339,6 +344,9 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
// This origin peer re-selects.
if _, ok := peers[newPeer.GetStoreId()]; !ok || peer.GetStoreId() == newPeer.GetStoreId() {
selectedStores[peer.GetStoreId()] = struct{}{}
if allowLeader(oldFit, peer) {
leaderCandidateStores = append(leaderCandidateStores, newPeer.GetStoreId())
}
break
}
}
Expand All @@ -349,7 +357,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
// FIXME: target leader only considers the ordinary stores, maybe we need to consider the
// special engine stores if the engine supports to become a leader. But now there is only
// one engine, tiflash, which does not support the leader, so don't consider it for now.
targetLeader := r.selectAvailableLeaderStore(group, region, targetPeers, r.ordinaryEngine)
targetLeader := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine)
if targetLeader == 0 {
scatterSkipNoLeaderCounter.Inc()
return nil
Expand Down Expand Up @@ -389,6 +397,26 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
return op
}

func allowLeader(fit *placement.RegionFit, peer *metapb.Peer) bool {
switch peer.GetRole() {
case metapb.PeerRole_Learner, metapb.PeerRole_DemotingVoter:
return false
}
if peer.IsWitness {
return false
}

rule := fit.GetRuleFit(peer.GetId()).Rule
if rule.IsWitness {
return false
}
switch rule.Role {
case placement.Voter, placement.Leader:
return true
}
return false
}

func isSameDistribution(region *core.RegionInfo, targetPeers map[uint64]*metapb.Peer, targetLeader uint64) bool {
peers := region.GetPeers()
for _, peer := range peers {
Expand All @@ -399,7 +427,7 @@ func isSameDistribution(region *core.RegionInfo, targetPeers map[uint64]*metapb.
return region.GetLeader().GetStoreId() == targetLeader
}

func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, sourceStoreID uint64, selectedStores map[uint64]struct{}, context engineContext) []uint64 {
func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, oldFit *placement.RegionFit, sourceStoreID uint64, selectedStores map[uint64]struct{}, context engineContext) []uint64 {
sourceStore := r.cluster.GetStore(sourceStoreID)
if sourceStore == nil {
log.Error("failed to get the store", zap.Uint64("store-id", sourceStoreID), errs.ZapError(errs.ErrGetSourceStore))
Expand All @@ -408,7 +436,7 @@ func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, sourceStoreI
filters := []filter.Filter{
filter.NewExcludedFilter(r.name, nil, selectedStores),
}
scoreGuard := filter.NewPlacementSafeguard(r.name, r.cluster.GetOpts(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, nil)
scoreGuard := filter.NewPlacementSafeguard(r.name, r.cluster.GetOpts(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, oldFit)
for _, filterFunc := range context.filterFuncs {
filters = append(filters, filterFunc())
}
Expand Down Expand Up @@ -470,27 +498,19 @@ func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceSto

// selectAvailableLeaderStore select the target leader store from the candidates. The candidates would be collected by
// the existed peers store depended on the leader counts in the group level. Please use this func before scatter spacial engines.
func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo, peers map[uint64]*metapb.Peer, context engineContext) uint64 {
func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo, leaderCandidateStores []uint64, context engineContext) uint64 {
sourceStore := r.cluster.GetStore(region.GetLeader().GetStoreId())
if sourceStore == nil {
log.Error("failed to get the store", zap.Uint64("store-id", region.GetLeader().GetStoreId()), errs.ZapError(errs.ErrGetSourceStore))
return 0
}
leaderCandidateStores := make([]uint64, 0)
// use PlacementLeaderSafeguard for filtering follower and learner in rule
filter := filter.NewPlacementLeaderSafeguard(r.name, r.cluster.GetOpts(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, true /*allowMoveLeader*/)
for storeID := range peers {
store := r.cluster.GetStore(storeID)
if store == nil {
return 0
}
if filter == nil || filter.Target(r.cluster.GetOpts(), store).IsOK() {
leaderCandidateStores = append(leaderCandidateStores, storeID)
}
}
minStoreGroupLeader := uint64(math.MaxUint64)
id := uint64(0)
for _, storeID := range leaderCandidateStores {
store := r.cluster.GetStore(storeID)
if store == nil {
continue
}
storeGroupLeaderCount := context.selectedLeader.Get(storeID, group)
if minStoreGroupLeader > storeGroupLeaderCount {
minStoreGroupLeader = storeGroupLeaderCount
Expand Down
31 changes: 31 additions & 0 deletions server/schedule/region_scatterer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"math"
"math/rand"
"strconv"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -729,6 +730,36 @@ func TestSelectedStoresTooManyPeers(t *testing.T) {
}
}

// TestBalanceRegion tests whether region peers and leaders are balanced after scatter.
// ref https://github.com/tikv/pd/issues/6017
func TestBalanceRegion(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opt := config.NewTestOptions()
opt.SetLocationLabels([]string{"host"})
tc := mockcluster.NewCluster(ctx, opt)
stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false)
oc := NewOperatorController(ctx, tc, stream)
// Add 6 stores in 3 hosts.
for i := uint64(2); i <= 7; i++ {
tc.AddLabelsStore(i, 0, map[string]string{"host": strconv.FormatUint(i/2, 10)})
// prevent store from being disconnected
tc.SetStoreLastHeartbeatInterval(i, -10*time.Minute)
}
group := "group"
scatterer := NewRegionScatterer(ctx, tc, oc)
for i := uint64(1001); i <= 1300; i++ {
region := tc.AddLeaderRegion(i, 2, 4, 6)
op := scatterer.scatterRegion(region, group)
re.False(isPeerCountChanged(op))
}
for i := uint64(2); i <= 7; i++ {
re.Equal(uint64(150), scatterer.ordinaryEngine.selectedPeer.Get(i, group))
re.Equal(uint64(50), scatterer.ordinaryEngine.selectedLeader.Get(i, group))
}
}

func isPeerCountChanged(op *operator.Operator) bool {
if op == nil {
return false
Expand Down