diff --git a/pkg/cache/ttl.go b/pkg/cache/ttl.go index 1a22f745b6e..c4ece50075b 100644 --- a/pkg/cache/ttl.go +++ b/pkg/cache/ttl.go @@ -256,3 +256,16 @@ func (c *TTLString) Pop() (string, interface{}, bool) { func (c *TTLString) Get(id string) (interface{}, bool) { return c.ttlCache.get(id) } + +// GetAllID returns all key ids +func (c *TTLString) GetAllID() []string { + keys := c.ttlCache.getKeys() + var ids []string + for _, key := range keys { + id, ok := key.(string) + if ok { + ids = append(ids, id) + } + } + return ids +} diff --git a/server/api/region_test.go b/server/api/region_test.go index bf0d073d48e..ecec8dcf489 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -294,9 +294,10 @@ func (s *testRegionSuite) TestScatterRegions(c *C) { err := postJSON(testDialClient, fmt.Sprintf("%s/regions/scatter", s.urlPrefix), []byte(body)) c.Assert(err, IsNil) op1 := s.svr.GetRaftCluster().GetOperatorController().GetOperator(601) - c.Assert(op1 != nil, Equals, true) op2 := s.svr.GetRaftCluster().GetOperatorController().GetOperator(602) - c.Assert(op2 != nil, Equals, true) + op3 := s.svr.GetRaftCluster().GetOperatorController().GetOperator(603) + // At least one operator used to scatter region + c.Assert(op1 != nil || op2 != nil || op3 != nil, IsTrue) } func (s *testRegionSuite) TestSplitRegions(c *C) { diff --git a/server/schedule/operator/builder.go b/server/schedule/operator/builder.go index c36718c6dd5..5af39eb91fd 100644 --- a/server/schedule/operator/builder.go +++ b/server/schedule/operator/builder.go @@ -304,7 +304,7 @@ func (b *Builder) EnableLightWeight() *Builder { return b } -// EnableForceTargetLeader marks the step of transferring leader to target is forcible. It is used for grant leader. +// EnableForceTargetLeader marks the step of transferring leader to target is forcible. func (b *Builder) EnableForceTargetLeader() *Builder { b.forceTargetLeader = true return b diff --git a/server/schedule/operator/create_operator.go b/server/schedule/operator/create_operator.go index 4b9573a20a7..dfc4b6d137f 100644 --- a/server/schedule/operator/create_operator.go +++ b/server/schedule/operator/create_operator.go @@ -195,6 +195,8 @@ func CreateScatterRegionOperator(desc string, cluster opt.Cluster, origin *core. SetPeers(targetPeers). SetLeader(leader). EnableLightWeight(). + // EnableForceTargetLeader in order to ignore the leader schedule limit + EnableForceTargetLeader(). Build(0) } diff --git a/server/schedule/region_scatterer.go b/server/schedule/region_scatterer.go index 03b480877f1..5c423eb6758 100644 --- a/server/schedule/region_scatterer.go +++ b/server/schedule/region_scatterer.go @@ -94,6 +94,23 @@ func (s *selectedStores) getDistributionByGroupLocked(group string) (map[uint64] return nil, false } +func (s *selectedStores) totalCountByStore(storeID uint64) uint64 { + groups := s.groupDistribution.GetAllID() + totalCount := uint64(0) + for _, group := range groups { + storeDistribution, ok := s.getDistributionByGroupLocked(group) + if !ok { + continue + } + count, ok := storeDistribution[storeID] + if !ok { + continue + } + totalCount += count + } + return totalCount +} + // RegionScatterer scatters regions. type RegionScatterer struct { ctx context.Context @@ -328,9 +345,26 @@ func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, sourceStoreI filters = append(filters, scoreGuard) stores := r.cluster.GetStores() candidates := make([]uint64, 0) + maxStoreTotalCount := uint64(0) + minStoreTotalCount := uint64(math.MaxUint64) + for _, store := range r.cluster.GetStores() { + count := context.selectedPeer.totalCountByStore(store.GetID()) + if count > maxStoreTotalCount { + maxStoreTotalCount = count + } + if count < minStoreTotalCount { + minStoreTotalCount = count + } + } for _, store := range stores { - if filter.Target(r.cluster.GetOpts(), store, filters) { - candidates = append(candidates, store.GetID()) + storeCount := context.selectedPeer.totalCountByStore(store.GetID()) + // If storeCount is equal to the maxStoreTotalCount, we should skip this store as candidate. + // If the storeCount are all the same for the whole cluster(maxStoreTotalCount == minStoreTotalCount), any store + // could be selected as candidate. + if storeCount < maxStoreTotalCount || maxStoreTotalCount == minStoreTotalCount { + if filter.Target(r.cluster.GetOpts(), store, filters) { + candidates = append(candidates, store.GetID()) + } } } return candidates @@ -367,12 +401,17 @@ func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceSto // selectAvailableLeaderStores select the target leader store from the candidates. The candidates would be collected by // the existed peers store depended on the leader counts in the group level. func (r *RegionScatterer) selectAvailableLeaderStores(group string, peers map[uint64]*metapb.Peer, context engineContext) uint64 { - minStoreGroupLeader := uint64(math.MaxUint64) - id := uint64(0) + leaderCandidateStores := make([]uint64, 0) for storeID := range peers { - if id == 0 { - id = storeID + store := r.cluster.GetStore(storeID) + engine := store.GetLabelValue(filter.EngineKey) + if len(engine) < 1 { + leaderCandidateStores = append(leaderCandidateStores, storeID) } + } + minStoreGroupLeader := uint64(math.MaxUint64) + id := uint64(0) + for _, storeID := range leaderCandidateStores { storeGroupLeaderCount := context.selectedLeader.Get(storeID, group) if minStoreGroupLeader > storeGroupLeaderCount { minStoreGroupLeader = storeGroupLeaderCount diff --git a/server/schedule/region_scatterer_test.go b/server/schedule/region_scatterer_test.go index 935e6815202..49e1d31a7e7 100644 --- a/server/schedule/region_scatterer_test.go +++ b/server/schedule/region_scatterer_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math" + "math/rand" "time" . "github.com/pingcap/check" @@ -292,7 +293,7 @@ func (s *testScatterRegionSuite) TestScatterCheck(c *C) { } } -func (s *testScatterRegionSuite) TestScatterGroup(c *C) { +func (s *testScatterRegionSuite) TestScatterGroupInConcurrency(c *C) { opt := config.NewTestOptions() tc := mockcluster.NewCluster(opt) // Add 5 stores. @@ -318,6 +319,7 @@ func (s *testScatterRegionSuite) TestScatterGroup(c *C) { }, } + // We send scatter interweave request for each group to simulate scattering multiple region groups in concurrency. for _, testcase := range testcases { c.Logf(testcase.name) ctx, cancel := context.WithCancel(context.Background()) @@ -325,36 +327,35 @@ func (s *testScatterRegionSuite) TestScatterGroup(c *C) { regionID := 1 for i := 0; i < 100; i++ { for j := 0; j < testcase.groupCount; j++ { - _, err := scatterer.Scatter(tc.AddLeaderRegion(uint64(regionID), 1, 2, 3), + scatterer.scatterRegion(tc.AddLeaderRegion(uint64(regionID), 1, 2, 3), fmt.Sprintf("group-%v", j)) - c.Assert(err, IsNil) regionID++ } - // insert region with no group - _, err := scatterer.Scatter(tc.AddLeaderRegion(uint64(regionID), 1, 2, 3), "") - c.Assert(err, IsNil) - regionID++ } - for i := 0; i < testcase.groupCount; i++ { - // comparing the leader distribution - group := fmt.Sprintf("group-%v", i) - max := uint64(0) - min := uint64(math.MaxUint64) - groupDistribution, _ := scatterer.ordinaryEngine.selectedLeader.groupDistribution.Get(group) - for _, count := range groupDistribution.(map[uint64]uint64) { - if count > max { - max = count - } - if count < min { - min = count + checker := func(ss *selectedStores, expected uint64, delta float64) { + for i := 0; i < testcase.groupCount; i++ { + // comparing the leader distribution + group := fmt.Sprintf("group-%v", i) + max := uint64(0) + min := uint64(math.MaxUint64) + groupDistribution, _ := ss.groupDistribution.Get(group) + for _, count := range groupDistribution.(map[uint64]uint64) { + if count > max { + max = count + } + if count < min { + min = count + } } + c.Assert(math.Abs(float64(max)-float64(expected)), LessEqual, delta) + c.Assert(math.Abs(float64(min)-float64(expected)), LessEqual, delta) } - // 100 regions divided 5 stores, each store expected to have about 20 regions. - c.Assert(min, LessEqual, uint64(20)) - c.Assert(max, GreaterEqual, uint64(20)) - c.Assert(max-min, LessEqual, uint64(5)) } + // For leader, we expect each store have about 20 leader for each group + checker(scatterer.ordinaryEngine.selectedLeader, 20, 5) + // For peer, we expect each store have about 50 peers for each group + checker(scatterer.ordinaryEngine.selectedPeer, 50, 15) cancel() } } @@ -440,3 +441,38 @@ func (s *testScatterRegionSuite) TestSelectedStoreGC(c *C) { _, ok = stores.GetGroupDistribution("testgroup") c.Assert(ok, Equals, false) } + +// TestRegionFromDifferentGroups test the multi regions. each region have its own group. +// After scatter, the distribution for the whole cluster should be well. +func (s *testScatterRegionSuite) TestRegionFromDifferentGroups(c *C) { + opt := config.NewTestOptions() + tc := mockcluster.NewCluster(opt) + // Add 6 stores. + storeCount := 6 + for i := uint64(1); i <= uint64(storeCount); i++ { + tc.AddRegionStore(i, 0) + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + scatterer := NewRegionScatterer(ctx, tc) + regionCount := 50 + for i := 1; i <= regionCount; i++ { + p := rand.Perm(storeCount) + scatterer.scatterRegion(tc.AddLeaderRegion(uint64(i), uint64(p[0])+1, uint64(p[1])+1, uint64(p[2])+1), fmt.Sprintf("t%d", i)) + } + check := func(ss *selectedStores) { + max := uint64(0) + min := uint64(math.MaxUint64) + for i := uint64(1); i <= uint64(storeCount); i++ { + count := ss.totalCountByStore(i) + if count > max { + max = count + } + if count < min { + min = count + } + } + c.Assert(max-min, LessEqual, uint64(2)) + } + check(scatterer.ordinaryEngine.selectedPeer) +}