Skip to content

Commit

Permalink
schedule: combine store limit and store balance rate (#2437) (#2557)
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Jun 19, 2020
1 parent c459c25 commit 0534436
Show file tree
Hide file tree
Showing 34 changed files with 584 additions and 460 deletions.
1 change: 0 additions & 1 deletion conf/simconfig.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,3 @@ leader-schedule-limit = 32
region-schedule-limit = 128
replica-schedule-limit = 32
merge-schedule-limit = 32
store-balance-rate = 512.0
12 changes: 12 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/pd/v4/server/core"
"github.com/pingcap/pd/v4/server/kv"
"github.com/pingcap/pd/v4/server/schedule/placement"
"github.com/pingcap/pd/v4/server/schedule/storelimit"
"github.com/pingcap/pd/v4/server/statistics"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -203,6 +204,8 @@ func (mc *Cluster) AddLeaderStore(storeID uint64, leaderCount int, leaderSizes .
core.SetLeaderSize(leaderSize),
core.SetLastHeartbeatTS(time.Now()),
)
mc.SetStoreLimit(storeID, storelimit.AddPeer, 60)
mc.SetStoreLimit(storeID, storelimit.RemovePeer, 60)
mc.PutStore(store)
}

Expand All @@ -218,6 +221,8 @@ func (mc *Cluster) AddRegionStore(storeID uint64, regionCount int) {
core.SetRegionSize(int64(regionCount)*10),
core.SetLastHeartbeatTS(time.Now()),
)
mc.SetStoreLimit(storeID, storelimit.AddPeer, 60)
mc.SetStoreLimit(storeID, storelimit.RemovePeer, 60)
mc.PutStore(store)
}

Expand Down Expand Up @@ -253,6 +258,8 @@ func (mc *Cluster) AddLabelsStore(storeID uint64, regionCount int, labels map[st
core.SetRegionSize(int64(regionCount)*10),
core.SetLastHeartbeatTS(time.Now()),
)
mc.SetStoreLimit(storeID, storelimit.AddPeer, 60)
mc.SetStoreLimit(storeID, storelimit.RemovePeer, 60)
mc.PutStore(store)
}

Expand Down Expand Up @@ -539,6 +546,11 @@ func (mc *Cluster) GetMaxReplicas() int {
return mc.ScheduleOptions.GetMaxReplicas()
}

// GetStoreLimitByType mocks method.
func (mc *Cluster) GetStoreLimitByType(storeID uint64, typ storelimit.Type) float64 {
return mc.ScheduleOptions.GetStoreLimitByType(storeID, typ)
}

// CheckLabelProperty checks label property.
func (mc *Cluster) CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool {
for _, cfg := range mc.LabelProperties[typ] {
Expand Down
76 changes: 65 additions & 11 deletions pkg/mock/mockoption/mockoption.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/v4/server/core"
"github.com/pingcap/pd/v4/server/schedule/storelimit"
)

const (
Expand All @@ -33,7 +34,6 @@ const (
defaultReplicaScheduleLimit = 64
defaultMergeScheduleLimit = 8
defaultHotRegionScheduleLimit = 4
defaultStoreBalanceRate = 60
defaultTolerantSizeRatio = 2.5
defaultLowSpaceRatio = 0.8
defaultHighSpaceRatio = 0.6
Expand All @@ -43,9 +43,15 @@ const (
defaultLeaderSchedulePolicy = "count"
defaultEnablePlacementRules = false
defaultKeyType = "table"
defaultStoreLimitMode = "manual"
defaultStoreLimit = 60
)

// StoreLimitConfig is a mock of StoreLimitConfig.
type StoreLimitConfig struct {
AddPeer float64 `toml:"add-peer" json:"add-peer"`
RemovePeer float64 `toml:"remove-peer" json:"remove-peer"`
}

// ScheduleOptions is a mock of ScheduleOptions
// which implements Options interface
type ScheduleOptions struct {
Expand All @@ -54,7 +60,7 @@ type ScheduleOptions struct {
ReplicaScheduleLimit uint64
MergeScheduleLimit uint64
HotRegionScheduleLimit uint64
StoreBalanceRate float64
StoreLimit map[uint64]StoreLimitConfig
MaxSnapshotCount uint64
MaxPendingPeerCount uint64
MaxMergeRegionSize uint64
Expand Down Expand Up @@ -97,7 +103,6 @@ func NewScheduleOptions() *ScheduleOptions {
mso.ReplicaScheduleLimit = defaultReplicaScheduleLimit
mso.MergeScheduleLimit = defaultMergeScheduleLimit
mso.HotRegionScheduleLimit = defaultHotRegionScheduleLimit
mso.StoreBalanceRate = defaultStoreBalanceRate
mso.MaxSnapshotCount = defaultMaxSnapshotCount
mso.MaxMergeRegionSize = defaultMaxMergeRegionSize
mso.MaxMergeRegionKeys = defaultMaxMergeRegionKeys
Expand All @@ -112,17 +117,55 @@ func NewScheduleOptions() *ScheduleOptions {
mso.TolerantSizeRatio = defaultTolerantSizeRatio
mso.LowSpaceRatio = defaultLowSpaceRatio
mso.HighSpaceRatio = defaultHighSpaceRatio
mso.StoreLimitMode = defaultStoreLimitMode
mso.EnableRemoveDownReplica = true
mso.EnableReplaceOfflineReplica = true
mso.EnableMakeUpReplica = true
mso.EnableRemoveExtraReplica = true
mso.EnableLocationReplacement = true
mso.LeaderSchedulePolicy = defaultLeaderSchedulePolicy
mso.KeyType = defaultKeyType
mso.StoreLimit = make(map[uint64]StoreLimitConfig)
return mso
}

// SetStoreLimit mocks method
func (mso *ScheduleOptions) SetStoreLimit(storeID uint64, typ storelimit.Type, ratePerMin float64) {
var sc StoreLimitConfig
if _, ok := mso.StoreLimit[storeID]; ok {
switch typ {
case storelimit.AddPeer:
sc = StoreLimitConfig{AddPeer: ratePerMin, RemovePeer: mso.StoreLimit[storeID].RemovePeer}
case storelimit.RemovePeer:
sc = StoreLimitConfig{AddPeer: mso.StoreLimit[storeID].AddPeer, RemovePeer: ratePerMin}
}
} else {
switch typ {
case storelimit.AddPeer:
sc = StoreLimitConfig{AddPeer: ratePerMin, RemovePeer: defaultStoreLimit}
case storelimit.RemovePeer:
sc = StoreLimitConfig{AddPeer: defaultStoreLimit, RemovePeer: ratePerMin}
}
}

mso.StoreLimit[storeID] = sc
}

// SetAllStoresLimit mocks method
func (mso *ScheduleOptions) SetAllStoresLimit(typ storelimit.Type, ratePerMin float64) {
switch typ {
case storelimit.AddPeer:
for storeID := range mso.StoreLimit {
sc := StoreLimitConfig{AddPeer: ratePerMin, RemovePeer: mso.StoreLimit[storeID].RemovePeer}
mso.StoreLimit[storeID] = sc
}
case storelimit.RemovePeer:
for storeID := range mso.StoreLimit {
sc := StoreLimitConfig{AddPeer: mso.StoreLimit[storeID].AddPeer, RemovePeer: ratePerMin}
mso.StoreLimit[storeID] = sc
}
}
}

// GetLeaderScheduleLimit mocks method
func (mso *ScheduleOptions) GetLeaderScheduleLimit() uint64 {
return mso.LeaderScheduleLimit
Expand All @@ -148,9 +191,20 @@ func (mso *ScheduleOptions) GetHotRegionScheduleLimit() uint64 {
return mso.HotRegionScheduleLimit
}

// GetStoreBalanceRate mocks method
func (mso *ScheduleOptions) GetStoreBalanceRate() float64 {
return mso.StoreBalanceRate
// GetStoreLimitByType mocks method
func (mso *ScheduleOptions) GetStoreLimitByType(storeID uint64, typ storelimit.Type) float64 {
limit, ok := mso.StoreLimit[storeID]
if !ok {
return 0
}
switch typ {
case storelimit.AddPeer:
return limit.AddPeer
case storelimit.RemovePeer:
return limit.RemovePeer
default:
panic("no such limit type")
}
}

// GetMaxSnapshotCount mocks method
Expand Down Expand Up @@ -283,7 +337,7 @@ func (mso *ScheduleOptions) GetKeyType() core.KeyType {
return core.StringToKeyType(mso.KeyType)
}

// GetStoreLimitMode returns the limit mode of store.
func (mso *ScheduleOptions) GetStoreLimitMode() string {
return mso.StoreLimitMode
// CheckLabelProperty mocks method
func (mso *ScheduleOptions) CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool {
return true
}
68 changes: 25 additions & 43 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/pd/v4/server"
"github.com/pingcap/pd/v4/server/config"
"github.com/pingcap/pd/v4/server/core"
"github.com/pingcap/pd/v4/server/schedule"
"github.com/pingcap/pd/v4/server/schedule/storelimit"
"github.com/pkg/errors"
"github.com/unrolled/render"
Expand Down Expand Up @@ -361,21 +360,23 @@ func (h *storeHandler) SetLimit(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusBadRequest, "rate unset")
return
}
rate, ok := rateVal.(float64)
if !ok || rate < 0 {
ratePerMin, ok := rateVal.(float64)
if !ok || ratePerMin < 0 {
h.rd.JSON(w, http.StatusBadRequest, "badformat rate")
return
}

typeValue, err := getStoreLimitType(input)
typeValues, err := getStoreLimitType(input)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}

if err := h.SetStoreLimit(storeID, rate/schedule.StoreBalanceBaseTime, typeValue); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
for _, typ := range typeValues {
if err := h.SetStoreLimit(storeID, ratePerMin, typ); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
}

h.rd.JSON(w, http.StatusOK, nil)
Expand Down Expand Up @@ -431,21 +432,23 @@ func (h *storesHandler) SetAllLimit(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusBadRequest, "rate unset")
return
}
rate, ok := rateVal.(float64)
if !ok || rate < 0 {
ratePerMin, ok := rateVal.(float64)
if !ok || ratePerMin < 0 {
h.rd.JSON(w, http.StatusBadRequest, "badformat rate")
return
}

typeValue, err := getStoreLimitType(input)
typeValues, err := getStoreLimitType(input)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}

if err := h.SetAllStoresLimit(rate/schedule.StoreBalanceBaseTime, typeValue); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
for _, typ := range typeValues {
if err := h.SetAllStoresLimit(ratePerMin, typ); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
}

h.rd.JSON(w, http.StatusOK, nil)
Expand All @@ -459,30 +462,8 @@ func (h *storesHandler) SetAllLimit(w http.ResponseWriter, r *http.Request) {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /stores/limit [get]
func (h *storesHandler) GetAllLimit(w http.ResponseWriter, r *http.Request) {
typeName := r.URL.Query().Get("type")
typeValue, err := parseStoreLimitType(typeName)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
limits, err := h.GetAllStoresLimit(typeValue)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
type LimitResp struct {
Rate float64 `json:"rate"`
Mode string `json:"mode"`
}
resp := make(map[uint64]*LimitResp)
for s, l := range limits {
resp[s] = &LimitResp{
Rate: l.Rate() * schedule.StoreBalanceBaseTime,
Mode: l.Mode().String(),
}
}

h.rd.JSON(w, http.StatusOK, resp)
limits := h.GetScheduleConfig().StoreLimit
h.rd.JSON(w, http.StatusOK, limits)
}

// @Tags store
Expand Down Expand Up @@ -607,23 +588,24 @@ func (filter *storeStateFilter) filter(stores []*metapb.Store) []*metapb.Store {
return ret
}

func getStoreLimitType(input map[string]interface{}) (storelimit.Type, error) {
func getStoreLimitType(input map[string]interface{}) ([]storelimit.Type, error) {
typeNameIface, ok := input["type"]
typeValue := storelimit.RegionAdd
var err error
if ok {
typeName, ok := typeNameIface.(string)
if !ok {
err = errors.New("bad format type")
} else {
return parseStoreLimitType(typeName)
return nil, err
}
typ, err := parseStoreLimitType(typeName)
return []storelimit.Type{typ}, err
}
return typeValue, err

return []storelimit.Type{storelimit.AddPeer, storelimit.RemovePeer}, err
}

func parseStoreLimitType(typeName string) (storelimit.Type, error) {
typeValue := storelimit.RegionAdd
typeValue := storelimit.AddPeer
var err error
if typeName != "" {
if value, ok := storelimit.TypeNameValue[typeName]; ok {
Expand Down
3 changes: 1 addition & 2 deletions server/api/trend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/v4/server"
"github.com/pingcap/pd/v4/server/config"
"github.com/pingcap/pd/v4/server/core"
"github.com/pingcap/pd/v4/server/schedule/operator"
)
Expand All @@ -30,7 +29,7 @@ var _ = Suite(&testTrendSuite{})
type testTrendSuite struct{}

func (s *testTrendSuite) TestTrend(c *C) {
svr, cleanup := mustNewServer(c, func(cfg *config.Config) { cfg.Schedule.StoreBalanceRate = 60 })
svr, cleanup := mustNewServer(c)
defer cleanup()
mustWaitLeader(c, []*server.Server{svr})

Expand Down
Loading

0 comments on commit 0534436

Please sign in to comment.