Skip to content

Commit

Permalink
schedule: set influence according to region size (#1613)
Browse files Browse the repository at this point in the history
* not consider store limit when executing merge

Signed-off-by: Ryan Leung <rleungx@gmail.com>

* set influence according to region size

Signed-off-by: Ryan Leung <rleungx@gmail.com>

* fix race problems

Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx authored and disksing committed Jul 5, 2019
1 parent bfcee70 commit c7052f6
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 22 deletions.
45 changes: 45 additions & 0 deletions server/checker/merge_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,23 @@
package checker

import (
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/pkg/mock/mockcluster"
"github.com/pingcap/pd/pkg/mock/mockhbstream"
"github.com/pingcap/pd/pkg/mock/mockoption"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
"github.com/pingcap/pd/server/schedule"
)

func TestChecker(t *testing.T) {
TestingT(t)
}

var _ = Suite(&testMergeCheckerSuite{})

type testMergeCheckerSuite struct {
Expand Down Expand Up @@ -258,3 +264,42 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) {
},
})
}

func (s *testMergeCheckerSuite) TestStorelimit(c *C) {
oc := schedule.NewOperatorController(s.cluster, mockhbstream.NewHeartbeatStream())
s.cluster.ScheduleOptions.SplitMergeInterval = time.Hour
s.cluster.ScheduleOptions.StoreBalanceRate = 60
s.regions[2] = s.regions[2].Clone(
core.SetPeers([]*metapb.Peer{
{Id: 109, StoreId: 2},
{Id: 110, StoreId: 3},
{Id: 111, StoreId: 6},
}),
core.WithLeader(&metapb.Peer{Id: 109, StoreId: 2}),
)
s.cluster.PutRegion(s.regions[2])
ops := s.mc.Check(s.regions[2])
c.Assert(ops, NotNil)
// The size of Region is less or equal than 1MB.
for i := 0; i < 50; i++ {
c.Assert(oc.AddOperator(ops...), IsTrue)
for _, op := range ops {
oc.RemoveOperator(op)
}
}
s.regions[2] = s.regions[2].Clone(
core.SetApproximateSize(2),
core.SetApproximateKeys(2),
)
s.cluster.PutRegion(s.regions[2])
ops = s.mc.Check(s.regions[2])
c.Assert(ops, NotNil)
// The size of Region is more than 1MB but no more than 20MB.
for i := 0; i < 5; i++ {
c.Assert(oc.AddOperator(ops...), IsTrue)
for _, op := range ops {
oc.RemoveOperator(op)
}
}
c.Assert(oc.AddOperator(ops...), IsFalse)
}
2 changes: 1 addition & 1 deletion server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,8 @@ func (c *RaftCluster) GetAdjacentRegions(region *core.RegionInfo) (*core.RegionI
// UpdateStoreLabels updates a store's location labels.
func (c *RaftCluster) UpdateStoreLabels(storeID uint64, labels []*metapb.StoreLabel) error {
c.RLock()
defer c.RUnlock()
store := c.cachedCluster.GetStore(storeID)
c.RUnlock()
if store == nil {
return errors.Errorf("invalid store ID %d, not found", storeID)
}
Expand Down
23 changes: 19 additions & 4 deletions server/schedule/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ const (
RegionOperatorWaitTime = 10 * time.Minute
// RegionInfluence represents the influence of a operator step, which is used by ratelimit.
RegionInfluence int64 = 1000
// smallRegionInfluence represents the influence of a operator step
// when the region size is smaller than smallRegionThreshold, which is used by ratelimit.
smallRegionInfluence int64 = 200
// smallRegionThreshold is used to represent a region which can be regarded as a small region once the size is small than it.
smallRegionThreshold int64 = 20
)

// OperatorStep describes the basic scheduling steps that can not be subdivided.
Expand Down Expand Up @@ -98,9 +103,14 @@ func (ap AddPeer) IsFinish(region *core.RegionInfo) bool {
func (ap AddPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
to := opInfluence.GetStoreInfluence(ap.ToStore)

to.RegionSize += region.GetApproximateSize()
regionSize := region.GetApproximateSize()
to.RegionSize += regionSize
to.RegionCount++
to.StepCost += RegionInfluence
if regionSize > smallRegionThreshold {
to.StepCost += RegionInfluence
} else if regionSize <= smallRegionThreshold && regionSize > core.EmptyRegionApproximateSize {
to.StepCost += smallRegionInfluence
}
}

// AddLearner is an OperatorStep that adds a region learner peer.
Expand Down Expand Up @@ -128,9 +138,14 @@ func (al AddLearner) IsFinish(region *core.RegionInfo) bool {
func (al AddLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
to := opInfluence.GetStoreInfluence(al.ToStore)

to.RegionSize += region.GetApproximateSize()
regionSize := region.GetApproximateSize()
to.RegionSize += regionSize
to.RegionCount++
to.StepCost += RegionInfluence
if regionSize > smallRegionThreshold {
to.StepCost += RegionInfluence
} else if regionSize <= smallRegionThreshold && regionSize > core.EmptyRegionApproximateSize {
to.StepCost += smallRegionInfluence
}
}

// PromoteLearner is an OperatorStep that promotes a region learner peer to normal voter.
Expand Down
34 changes: 17 additions & 17 deletions server/schedule/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *testOperatorSuite) newTestRegion(regionID uint64, leaderPeer uint64, pe
leader = peer
}
}
regionInfo := core.NewRegionInfo(&region, leader, core.SetApproximateSize(10), core.SetApproximateKeys(10))
regionInfo := core.NewRegionInfo(&region, leader, core.SetApproximateSize(50), core.SetApproximateKeys(50))
return regionInfo
}

Expand Down Expand Up @@ -124,71 +124,71 @@ func (s *testOperatorSuite) TestInfluence(c *C) {
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 0,
LeaderCount: 0,
RegionSize: 10,
RegionSize: 50,
RegionCount: 1,
StepCost: 1000,
})

TransferLeader{FromStore: 1, ToStore: 2}.Influence(opInfluence, region)
c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{
LeaderSize: -10,
LeaderSize: -50,
LeaderCount: -1,
RegionSize: 0,
RegionCount: 0,
StepCost: 0,
})
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 10,
LeaderSize: 50,
LeaderCount: 1,
RegionSize: 10,
RegionSize: 50,
RegionCount: 1,
StepCost: 1000,
})

RemovePeer{FromStore: 1}.Influence(opInfluence, region)
c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{
LeaderSize: -10,
LeaderSize: -50,
LeaderCount: -1,
RegionSize: -10,
RegionSize: -50,
RegionCount: -1,
StepCost: 0,
})
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 10,
LeaderSize: 50,
LeaderCount: 1,
RegionSize: 10,
RegionSize: 50,
RegionCount: 1,
StepCost: 1000,
})

MergeRegion{IsPassive: false}.Influence(opInfluence, region)
c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{
LeaderSize: -10,
LeaderSize: -50,
LeaderCount: -1,
RegionSize: -10,
RegionSize: -50,
RegionCount: -1,
StepCost: 0,
})
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 10,
LeaderSize: 50,
LeaderCount: 1,
RegionSize: 10,
RegionSize: 50,
RegionCount: 1,
StepCost: 1000,
})

MergeRegion{IsPassive: true}.Influence(opInfluence, region)
c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{
LeaderSize: -10,
LeaderSize: -50,
LeaderCount: -2,
RegionSize: -10,
RegionSize: -50,
RegionCount: -2,
StepCost: 0,
})
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 10,
LeaderSize: 50,
LeaderCount: 1,
RegionSize: 10,
RegionSize: 50,
RegionCount: 0,
StepCost: 1000,
})
Expand Down
19 changes: 19 additions & 0 deletions server/statistics/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

// StoresStats is a cache hold hot regions.
type StoresStats struct {
sync.RWMutex
rollingStoresStats map[uint64]*RollingStoreStats
bytesReadRate float64
bytesWriteRate float64
Expand All @@ -36,26 +37,36 @@ func NewStoresStats() *StoresStats {

// CreateRollingStoreStats creates RollingStoreStats with a given store ID.
func (s *StoresStats) CreateRollingStoreStats(storeID uint64) {
s.Lock()
defer s.Unlock()
s.rollingStoresStats[storeID] = newRollingStoreStats()
}

// RemoveRollingStoreStats removes RollingStoreStats with a given store ID.
func (s *StoresStats) RemoveRollingStoreStats(storeID uint64) {
s.Lock()
defer s.Unlock()
delete(s.rollingStoresStats, storeID)
}

// GetRollingStoreStats gets RollingStoreStats with a given store ID.
func (s *StoresStats) GetRollingStoreStats(storeID uint64) *RollingStoreStats {
s.RLock()
defer s.RUnlock()
return s.rollingStoresStats[storeID]
}

// Observe records the current store status with a given store.
func (s *StoresStats) Observe(storeID uint64, stats *pdpb.StoreStats) {
s.RLock()
defer s.RUnlock()
s.rollingStoresStats[storeID].Observe(stats)
}

// UpdateTotalBytesRate updates the total bytes write rate and read rate.
func (s *StoresStats) UpdateTotalBytesRate(stores *core.StoresInfo) {
s.RLock()
defer s.RUnlock()
var totalBytesWriteRate float64
var totalBytesReadRate float64
var writeRate, readRate float64
Expand Down Expand Up @@ -83,6 +94,8 @@ func (s *StoresStats) TotalBytesReadRate() float64 {

// GetStoresBytesWriteStat returns the bytes write stat of all StoreInfo.
func (s *StoresStats) GetStoresBytesWriteStat() map[uint64]uint64 {
s.RLock()
defer s.RUnlock()
res := make(map[uint64]uint64, len(s.rollingStoresStats))
for storeID, stats := range s.rollingStoresStats {
writeRate, _ := stats.GetBytesRate()
Expand All @@ -93,6 +106,8 @@ func (s *StoresStats) GetStoresBytesWriteStat() map[uint64]uint64 {

// GetStoresBytesReadStat returns the bytes read stat of all StoreInfo.
func (s *StoresStats) GetStoresBytesReadStat() map[uint64]uint64 {
s.RLock()
defer s.RUnlock()
res := make(map[uint64]uint64, len(s.rollingStoresStats))
for storeID, stats := range s.rollingStoresStats {
_, readRate := stats.GetBytesRate()
Expand All @@ -103,6 +118,8 @@ func (s *StoresStats) GetStoresBytesReadStat() map[uint64]uint64 {

// GetStoresKeysWriteStat returns the keys write stat of all StoreInfo.
func (s *StoresStats) GetStoresKeysWriteStat() map[uint64]uint64 {
s.RLock()
defer s.RUnlock()
res := make(map[uint64]uint64, len(s.rollingStoresStats))
for storeID, stats := range s.rollingStoresStats {
res[storeID] = uint64(stats.GetKeysWriteRate())
Expand All @@ -112,6 +129,8 @@ func (s *StoresStats) GetStoresKeysWriteStat() map[uint64]uint64 {

// GetStoresKeysReadStat returns the bytes read stat of all StoreInfo.
func (s *StoresStats) GetStoresKeysReadStat() map[uint64]uint64 {
s.RLock()
defer s.RUnlock()
res := make(map[uint64]uint64, len(s.rollingStoresStats))
for storeID, stats := range s.rollingStoresStats {
res[storeID] = uint64(stats.GetKeysReadRate())
Expand Down

0 comments on commit c7052f6

Please sign in to comment.