Skip to content

Commit

Permalink
cherry pick #3422 to release-5.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
Yisaer authored and ti-srebot committed Apr 21, 2021
1 parent ea1449c commit 1ff15bc
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 32 deletions.
13 changes: 13 additions & 0 deletions pkg/cache/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,16 @@ func (c *TTLString) Pop() (string, interface{}, bool) {
func (c *TTLString) Get(id string) (interface{}, bool) {
return c.ttlCache.get(id)
}

// GetAllID returns all key ids
func (c *TTLString) GetAllID() []string {
keys := c.ttlCache.getKeys()
var ids []string
for _, key := range keys {
id, ok := key.(string)
if ok {
ids = append(ids, id)
}
}
return ids
}
5 changes: 3 additions & 2 deletions server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,10 @@ func (s *testRegionSuite) TestScatterRegions(c *C) {
err := postJSON(testDialClient, fmt.Sprintf("%s/regions/scatter", s.urlPrefix), []byte(body))
c.Assert(err, IsNil)
op1 := s.svr.GetRaftCluster().GetOperatorController().GetOperator(601)
c.Assert(op1 != nil, Equals, true)
op2 := s.svr.GetRaftCluster().GetOperatorController().GetOperator(602)
c.Assert(op2 != nil, Equals, true)
op3 := s.svr.GetRaftCluster().GetOperatorController().GetOperator(603)
// At least one operator used to scatter region
c.Assert(op1 != nil || op2 != nil || op3 != nil, IsTrue)
}

func (s *testRegionSuite) TestSplitRegions(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/operator/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (b *Builder) EnableLightWeight() *Builder {
return b
}

// EnableForceTargetLeader marks the step of transferring leader to target is forcible. It is used for grant leader.
// EnableForceTargetLeader marks the step of transferring leader to target is forcible.
func (b *Builder) EnableForceTargetLeader() *Builder {
b.forceTargetLeader = true
return b
Expand Down
2 changes: 2 additions & 0 deletions server/schedule/operator/create_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ func CreateScatterRegionOperator(desc string, cluster opt.Cluster, origin *core.
SetPeers(targetPeers).
SetLeader(leader).
EnableLightWeight().
// EnableForceTargetLeader in order to ignore the leader schedule limit
EnableForceTargetLeader().
Build(0)
}

Expand Down
51 changes: 45 additions & 6 deletions server/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,23 @@ func (s *selectedStores) getDistributionByGroupLocked(group string) (map[uint64]
return nil, false
}

func (s *selectedStores) totalCountByStore(storeID uint64) uint64 {
groups := s.groupDistribution.GetAllID()
totalCount := uint64(0)
for _, group := range groups {
storeDistribution, ok := s.getDistributionByGroupLocked(group)
if !ok {
continue
}
count, ok := storeDistribution[storeID]
if !ok {
continue
}
totalCount += count
}
return totalCount
}

// RegionScatterer scatters regions.
type RegionScatterer struct {
ctx context.Context
Expand Down Expand Up @@ -328,9 +345,26 @@ func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, sourceStoreI
filters = append(filters, scoreGuard)
stores := r.cluster.GetStores()
candidates := make([]uint64, 0)
maxStoreTotalCount := uint64(0)
minStoreTotalCount := uint64(math.MaxUint64)
for _, store := range r.cluster.GetStores() {
count := context.selectedPeer.totalCountByStore(store.GetID())
if count > maxStoreTotalCount {
maxStoreTotalCount = count
}
if count < minStoreTotalCount {
minStoreTotalCount = count
}
}
for _, store := range stores {
if filter.Target(r.cluster.GetOpts(), store, filters) {
candidates = append(candidates, store.GetID())
storeCount := context.selectedPeer.totalCountByStore(store.GetID())
// If storeCount is equal to the maxStoreTotalCount, we should skip this store as candidate.
// If the storeCount are all the same for the whole cluster(maxStoreTotalCount == minStoreTotalCount), any store
// could be selected as candidate.
if storeCount < maxStoreTotalCount || maxStoreTotalCount == minStoreTotalCount {
if filter.Target(r.cluster.GetOpts(), store, filters) {
candidates = append(candidates, store.GetID())
}
}
}
return candidates
Expand Down Expand Up @@ -367,12 +401,17 @@ func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceSto
// selectAvailableLeaderStores select the target leader store from the candidates. The candidates would be collected by
// the existed peers store depended on the leader counts in the group level.
func (r *RegionScatterer) selectAvailableLeaderStores(group string, peers map[uint64]*metapb.Peer, context engineContext) uint64 {
minStoreGroupLeader := uint64(math.MaxUint64)
id := uint64(0)
leaderCandidateStores := make([]uint64, 0)
for storeID := range peers {
if id == 0 {
id = storeID
store := r.cluster.GetStore(storeID)
engine := store.GetLabelValue(filter.EngineKey)
if len(engine) < 1 {
leaderCandidateStores = append(leaderCandidateStores, storeID)
}
}
minStoreGroupLeader := uint64(math.MaxUint64)
id := uint64(0)
for _, storeID := range leaderCandidateStores {
storeGroupLeaderCount := context.selectedLeader.Get(storeID, group)
if minStoreGroupLeader > storeGroupLeaderCount {
minStoreGroupLeader = storeGroupLeaderCount
Expand Down
82 changes: 59 additions & 23 deletions server/schedule/region_scatterer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math"
"math/rand"
"time"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -292,7 +293,7 @@ func (s *testScatterRegionSuite) TestScatterCheck(c *C) {
}
}

func (s *testScatterRegionSuite) TestScatterGroup(c *C) {
func (s *testScatterRegionSuite) TestScatterGroupInConcurrency(c *C) {
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(opt)
// Add 5 stores.
Expand All @@ -318,43 +319,43 @@ func (s *testScatterRegionSuite) TestScatterGroup(c *C) {
},
}

// We send scatter interweave request for each group to simulate scattering multiple region groups in concurrency.
for _, testcase := range testcases {
c.Logf(testcase.name)
ctx, cancel := context.WithCancel(context.Background())
scatterer := NewRegionScatterer(ctx, tc)
regionID := 1
for i := 0; i < 100; i++ {
for j := 0; j < testcase.groupCount; j++ {
_, err := scatterer.Scatter(tc.AddLeaderRegion(uint64(regionID), 1, 2, 3),
scatterer.scatterRegion(tc.AddLeaderRegion(uint64(regionID), 1, 2, 3),
fmt.Sprintf("group-%v", j))
c.Assert(err, IsNil)
regionID++
}
// insert region with no group
_, err := scatterer.Scatter(tc.AddLeaderRegion(uint64(regionID), 1, 2, 3), "")
c.Assert(err, IsNil)
regionID++
}

for i := 0; i < testcase.groupCount; i++ {
// comparing the leader distribution
group := fmt.Sprintf("group-%v", i)
max := uint64(0)
min := uint64(math.MaxUint64)
groupDistribution, _ := scatterer.ordinaryEngine.selectedLeader.groupDistribution.Get(group)
for _, count := range groupDistribution.(map[uint64]uint64) {
if count > max {
max = count
}
if count < min {
min = count
checker := func(ss *selectedStores, expected uint64, delta float64) {
for i := 0; i < testcase.groupCount; i++ {
// comparing the leader distribution
group := fmt.Sprintf("group-%v", i)
max := uint64(0)
min := uint64(math.MaxUint64)
groupDistribution, _ := ss.groupDistribution.Get(group)
for _, count := range groupDistribution.(map[uint64]uint64) {
if count > max {
max = count
}
if count < min {
min = count
}
}
c.Assert(math.Abs(float64(max)-float64(expected)), LessEqual, delta)
c.Assert(math.Abs(float64(min)-float64(expected)), LessEqual, delta)
}
// 100 regions divided 5 stores, each store expected to have about 20 regions.
c.Assert(min, LessEqual, uint64(20))
c.Assert(max, GreaterEqual, uint64(20))
c.Assert(max-min, LessEqual, uint64(5))
}
// For leader, we expect each store have about 20 leader for each group
checker(scatterer.ordinaryEngine.selectedLeader, 20, 5)
// For peer, we expect each store have about 50 peers for each group
checker(scatterer.ordinaryEngine.selectedPeer, 50, 15)
cancel()
}
}
Expand Down Expand Up @@ -440,3 +441,38 @@ func (s *testScatterRegionSuite) TestSelectedStoreGC(c *C) {
_, ok = stores.GetGroupDistribution("testgroup")
c.Assert(ok, Equals, false)
}

// TestRegionFromDifferentGroups test the multi regions. each region have its own group.
// After scatter, the distribution for the whole cluster should be well.
func (s *testScatterRegionSuite) TestRegionFromDifferentGroups(c *C) {
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(opt)
// Add 6 stores.
storeCount := 6
for i := uint64(1); i <= uint64(storeCount); i++ {
tc.AddRegionStore(i, 0)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
scatterer := NewRegionScatterer(ctx, tc)
regionCount := 50
for i := 1; i <= regionCount; i++ {
p := rand.Perm(storeCount)
scatterer.scatterRegion(tc.AddLeaderRegion(uint64(i), uint64(p[0])+1, uint64(p[1])+1, uint64(p[2])+1), fmt.Sprintf("t%d", i))
}
check := func(ss *selectedStores) {
max := uint64(0)
min := uint64(math.MaxUint64)
for i := uint64(1); i <= uint64(storeCount); i++ {
count := ss.totalCountByStore(i)
if count > max {
max = count
}
if count < min {
min = count
}
}
c.Assert(max-min, LessEqual, uint64(2))
}
check(scatterer.ordinaryEngine.selectedPeer)
}

0 comments on commit 1ff15bc

Please sign in to comment.