Skip to content

Commit

Permalink
resolve conflict
Browse files Browse the repository at this point in the history
Signed-off-by: bufferflies <1045931706@qq.com>
  • Loading branch information
bufferflies committed Sep 4, 2023
1 parent 62057c0 commit 86b3c58
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 70 deletions.
24 changes: 1 addition & 23 deletions pkg/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
log.Error("failed to get the store", zap.Uint64("store-id", peer.GetStoreId()), errs.ZapError(errs.ErrGetSourceStore))
continue
}
filters[filterLen-1] = filter.NewPlacementSafeguard(r.name, r.cluster.GetSharedConfig(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, oldFit)
filters[filterLen-1] = filter.NewPlacementSafeguard(r.name, r.cluster.GetOpts(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, oldFit)
for {
newPeer := r.selectNewPeer(context, group, peer, filters)
targetPeers[newPeer.GetStoreId()] = newPeer
Expand Down Expand Up @@ -421,30 +421,13 @@ func isSameDistribution(region *core.RegionInfo, targetPeers map[uint64]*metapb.
return region.GetLeader().GetStoreId() == targetLeader
}

<<<<<<< HEAD:pkg/schedule/region_scatterer.go
func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, oldFit *placement.RegionFit, sourceStoreID uint64, selectedStores map[uint64]struct{}, context engineContext) []uint64 {
sourceStore := r.cluster.GetStore(sourceStoreID)
if sourceStore == nil {
log.Error("failed to get the store", zap.Uint64("store-id", sourceStoreID), errs.ZapError(errs.ErrGetSourceStore))
return nil
}
filters := []filter.Filter{
filter.NewExcludedFilter(r.name, nil, selectedStores),
}
scoreGuard := filter.NewPlacementSafeguard(r.name, r.cluster.GetOpts(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, oldFit)
for _, filterFunc := range context.filterFuncs {
filters = append(filters, filterFunc())
}
filters = append(filters, scoreGuard)
=======
// selectNewPeer return the new peer which pick the fewest picked count.
// it keeps the origin peer if the origin store's pick count is equal the fewest pick.
// it can be diveded into three steps:
// 1. found the max pick count and the min pick count.
// 2. if max pick count equals min pick count, it means all store picked count are some, return the origin peer.
// 3. otherwise, select the store which pick count is the min pick count and pass all filter.
func (r *RegionScatterer) selectNewPeer(context engineContext, group string, peer *metapb.Peer, filters []filter.Filter) *metapb.Peer {
>>>>>>> 72a13c023 (Scatter: make peer scatter logic same with the leader (#6965)):pkg/schedule/scatter/region_scatterer.go
stores := r.cluster.GetStores()
maxStoreTotalCount := uint64(0)
minStoreTotalCount := uint64(math.MaxUint64)
Expand All @@ -470,19 +453,14 @@ func (r *RegionScatterer) selectNewPeer(context engineContext, group string, pee
// If the storeCount are all the same for the whole cluster(maxStoreTotalCount == minStoreTotalCount), any store
// could be selected as candidate.
if storeCount < maxStoreTotalCount || maxStoreTotalCount == minStoreTotalCount {
<<<<<<< HEAD:pkg/schedule/region_scatterer.go
if filter.Target(r.cluster.GetOpts(), store, filters) {
candidates = append(candidates, store.GetID())
=======
if filter.Target(r.cluster.GetSharedConfig(), store, filters) {
if storeCount < minCount {
minCount = storeCount
newPeer = &metapb.Peer{
StoreId: store.GetID(),
Role: peer.GetRole(),
}
}
>>>>>>> 72a13c023 (Scatter: make peer scatter logic same with the leader (#6965)):pkg/schedule/scatter/region_scatterer.go
}
}
}
Expand Down
76 changes: 29 additions & 47 deletions pkg/schedule/region_scatterer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,47 +531,6 @@ func TestSelectedStoreGC(t *testing.T) {
re.False(ok)
}

<<<<<<< HEAD:pkg/schedule/region_scatterer_test.go
// TestRegionFromDifferentGroups test the multi regions. each region have its own group.
// After scatter, the distribution for the whole cluster should be well.
func TestRegionFromDifferentGroups(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opt := mockconfig.NewTestOptions()
tc := mockcluster.NewCluster(ctx, opt)
stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false)
oc := NewOperatorController(ctx, tc, stream)
// Add 6 stores.
storeCount := 6
for i := uint64(1); i <= uint64(storeCount); i++ {
tc.AddRegionStore(i, 0)
}
scatterer := NewRegionScatterer(ctx, tc, oc)
regionCount := 50
for i := 1; i <= regionCount; i++ {
p := rand.Perm(storeCount)
scatterer.scatterRegion(tc.AddLeaderRegion(uint64(i), uint64(p[0])+1, uint64(p[1])+1, uint64(p[2])+1), fmt.Sprintf("t%d", i))
}
check := func(ss *selectedStores) {
max := uint64(0)
min := uint64(math.MaxUint64)
for i := uint64(1); i <= uint64(storeCount); i++ {
count := ss.TotalCountByStore(i)
if count > max {
max = count
}
if count < min {
min = count
}
}
re.LessOrEqual(max-min, uint64(2))
}
check(scatterer.ordinaryEngine.selectedPeer)
}

=======
>>>>>>> 72a13c023 (Scatter: make peer scatter logic same with the leader (#6965)):pkg/schedule/scatter/region_scatterer_test.go
func TestRegionHasLearner(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -619,11 +578,7 @@ func TestRegionHasLearner(t *testing.T) {
scatterer := NewRegionScatterer(ctx, tc, oc)
regionCount := 50
for i := 1; i <= regionCount; i++ {
<<<<<<< HEAD:pkg/schedule/region_scatterer_test.go
_, err := scatterer.Scatter(tc.AddRegionWithLearner(uint64(i), uint64(1), []uint64{uint64(2), uint64(3)}, []uint64{7}), "group")
=======
_, err := scatterer.Scatter(tc.AddRegionWithLearner(uint64(i), uint64(1), []uint64{uint64(2), uint64(3)}, []uint64{7}), group, false)
>>>>>>> 72a13c023 (Scatter: make peer scatter logic same with the leader (#6965)):pkg/schedule/scatter/region_scatterer_test.go
_, err := scatterer.Scatter(tc.AddRegionWithLearner(uint64(i), uint64(1), []uint64{uint64(2), uint64(3)}, []uint64{7}), group)
re.NoError(err)
}
check := func(ss *selectedStores) {
Expand Down Expand Up @@ -740,6 +695,34 @@ func TestSelectedStoresTooManyPeers(t *testing.T) {
}
}

// TestBalanceLeader only tests whether region leaders are balanced after scatter.
func TestBalanceLeader(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
opt := mockconfig.NewTestOptions()
tc := mockcluster.NewCluster(ctx, opt)
stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false)
oc := NewOperatorController(ctx, tc, stream)
// Add 3 stores
for i := uint64(2); i <= 4; i++ {
tc.AddLabelsStore(i, 0, nil)
// prevent store from being disconnected
tc.SetStoreLastHeartbeatInterval(i, -10*time.Minute)
}
group := "group"
scatterer := NewRegionScatterer(ctx, tc, oc)
for i := uint64(1001); i <= 1300; i++ {
region := tc.AddLeaderRegion(i, 2, 3, 4)
op := scatterer.scatterRegion(region, group)
re.False(isPeerCountChanged(op))
}
// all leader will be balanced in three stores.
for i := uint64(2); i <= 4; i++ {
re.Equal(uint64(100), scatterer.ordinaryEngine.selectedLeader.Get(i, group))
}
}

// TestBalanceRegion tests whether region peers and leaders are balanced after scatter.
// ref https://github.com/tikv/pd/issues/6017
func TestBalanceRegion(t *testing.T) {
Expand All @@ -766,7 +749,6 @@ func TestBalanceRegion(t *testing.T) {
}
for i := uint64(2); i <= 7; i++ {
re.Equal(uint64(150), scatterer.ordinaryEngine.selectedPeer.Get(i, group))
re.Equal(uint64(50), scatterer.ordinaryEngine.selectedLeader.Get(i, group))
}
// Test for unhealthy region
// ref https://github.com/tikv/pd/issues/6099
Expand Down

0 comments on commit 86b3c58

Please sign in to comment.