Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedule: combine store limit and store balance rate (#2437) #2557

Merged
merged 1 commit into from
Jun 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion conf/simconfig.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,3 @@ leader-schedule-limit = 32
region-schedule-limit = 128
replica-schedule-limit = 32
merge-schedule-limit = 32
store-balance-rate = 512.0
12 changes: 12 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/pd/v4/server/core"
"github.com/pingcap/pd/v4/server/kv"
"github.com/pingcap/pd/v4/server/schedule/placement"
"github.com/pingcap/pd/v4/server/schedule/storelimit"
"github.com/pingcap/pd/v4/server/statistics"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -203,6 +204,8 @@ func (mc *Cluster) AddLeaderStore(storeID uint64, leaderCount int, leaderSizes .
core.SetLeaderSize(leaderSize),
core.SetLastHeartbeatTS(time.Now()),
)
mc.SetStoreLimit(storeID, storelimit.AddPeer, 60)
mc.SetStoreLimit(storeID, storelimit.RemovePeer, 60)
mc.PutStore(store)
}

Expand All @@ -218,6 +221,8 @@ func (mc *Cluster) AddRegionStore(storeID uint64, regionCount int) {
core.SetRegionSize(int64(regionCount)*10),
core.SetLastHeartbeatTS(time.Now()),
)
mc.SetStoreLimit(storeID, storelimit.AddPeer, 60)
mc.SetStoreLimit(storeID, storelimit.RemovePeer, 60)
mc.PutStore(store)
}

Expand Down Expand Up @@ -253,6 +258,8 @@ func (mc *Cluster) AddLabelsStore(storeID uint64, regionCount int, labels map[st
core.SetRegionSize(int64(regionCount)*10),
core.SetLastHeartbeatTS(time.Now()),
)
mc.SetStoreLimit(storeID, storelimit.AddPeer, 60)
mc.SetStoreLimit(storeID, storelimit.RemovePeer, 60)
mc.PutStore(store)
}

Expand Down Expand Up @@ -539,6 +546,11 @@ func (mc *Cluster) GetMaxReplicas() int {
return mc.ScheduleOptions.GetMaxReplicas()
}

// GetStoreLimitByType mocks method.
func (mc *Cluster) GetStoreLimitByType(storeID uint64, typ storelimit.Type) float64 {
return mc.ScheduleOptions.GetStoreLimitByType(storeID, typ)
}

// CheckLabelProperty checks label property.
func (mc *Cluster) CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool {
for _, cfg := range mc.LabelProperties[typ] {
Expand Down
76 changes: 65 additions & 11 deletions pkg/mock/mockoption/mockoption.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/v4/server/core"
"github.com/pingcap/pd/v4/server/schedule/storelimit"
)

const (
Expand All @@ -33,7 +34,6 @@ const (
defaultReplicaScheduleLimit = 64
defaultMergeScheduleLimit = 8
defaultHotRegionScheduleLimit = 4
defaultStoreBalanceRate = 60
defaultTolerantSizeRatio = 2.5
defaultLowSpaceRatio = 0.8
defaultHighSpaceRatio = 0.6
Expand All @@ -43,9 +43,15 @@ const (
defaultLeaderSchedulePolicy = "count"
defaultEnablePlacementRules = false
defaultKeyType = "table"
defaultStoreLimitMode = "manual"
defaultStoreLimit = 60
)

// StoreLimitConfig is a mock of StoreLimitConfig.
type StoreLimitConfig struct {
AddPeer float64 `toml:"add-peer" json:"add-peer"`
RemovePeer float64 `toml:"remove-peer" json:"remove-peer"`
}

// ScheduleOptions is a mock of ScheduleOptions
// which implements Options interface
type ScheduleOptions struct {
Expand All @@ -54,7 +60,7 @@ type ScheduleOptions struct {
ReplicaScheduleLimit uint64
MergeScheduleLimit uint64
HotRegionScheduleLimit uint64
StoreBalanceRate float64
StoreLimit map[uint64]StoreLimitConfig
MaxSnapshotCount uint64
MaxPendingPeerCount uint64
MaxMergeRegionSize uint64
Expand Down Expand Up @@ -97,7 +103,6 @@ func NewScheduleOptions() *ScheduleOptions {
mso.ReplicaScheduleLimit = defaultReplicaScheduleLimit
mso.MergeScheduleLimit = defaultMergeScheduleLimit
mso.HotRegionScheduleLimit = defaultHotRegionScheduleLimit
mso.StoreBalanceRate = defaultStoreBalanceRate
mso.MaxSnapshotCount = defaultMaxSnapshotCount
mso.MaxMergeRegionSize = defaultMaxMergeRegionSize
mso.MaxMergeRegionKeys = defaultMaxMergeRegionKeys
Expand All @@ -112,17 +117,55 @@ func NewScheduleOptions() *ScheduleOptions {
mso.TolerantSizeRatio = defaultTolerantSizeRatio
mso.LowSpaceRatio = defaultLowSpaceRatio
mso.HighSpaceRatio = defaultHighSpaceRatio
mso.StoreLimitMode = defaultStoreLimitMode
mso.EnableRemoveDownReplica = true
mso.EnableReplaceOfflineReplica = true
mso.EnableMakeUpReplica = true
mso.EnableRemoveExtraReplica = true
mso.EnableLocationReplacement = true
mso.LeaderSchedulePolicy = defaultLeaderSchedulePolicy
mso.KeyType = defaultKeyType
mso.StoreLimit = make(map[uint64]StoreLimitConfig)
return mso
}

// SetStoreLimit mocks method
func (mso *ScheduleOptions) SetStoreLimit(storeID uint64, typ storelimit.Type, ratePerMin float64) {
var sc StoreLimitConfig
if _, ok := mso.StoreLimit[storeID]; ok {
switch typ {
case storelimit.AddPeer:
sc = StoreLimitConfig{AddPeer: ratePerMin, RemovePeer: mso.StoreLimit[storeID].RemovePeer}
case storelimit.RemovePeer:
sc = StoreLimitConfig{AddPeer: mso.StoreLimit[storeID].AddPeer, RemovePeer: ratePerMin}
}
} else {
switch typ {
case storelimit.AddPeer:
sc = StoreLimitConfig{AddPeer: ratePerMin, RemovePeer: defaultStoreLimit}
case storelimit.RemovePeer:
sc = StoreLimitConfig{AddPeer: defaultStoreLimit, RemovePeer: ratePerMin}
}
}

mso.StoreLimit[storeID] = sc
}

// SetAllStoresLimit mocks method
func (mso *ScheduleOptions) SetAllStoresLimit(typ storelimit.Type, ratePerMin float64) {
switch typ {
case storelimit.AddPeer:
for storeID := range mso.StoreLimit {
sc := StoreLimitConfig{AddPeer: ratePerMin, RemovePeer: mso.StoreLimit[storeID].RemovePeer}
mso.StoreLimit[storeID] = sc
}
case storelimit.RemovePeer:
for storeID := range mso.StoreLimit {
sc := StoreLimitConfig{AddPeer: mso.StoreLimit[storeID].AddPeer, RemovePeer: ratePerMin}
mso.StoreLimit[storeID] = sc
}
}
}

// GetLeaderScheduleLimit mocks method
func (mso *ScheduleOptions) GetLeaderScheduleLimit() uint64 {
return mso.LeaderScheduleLimit
Expand All @@ -148,9 +191,20 @@ func (mso *ScheduleOptions) GetHotRegionScheduleLimit() uint64 {
return mso.HotRegionScheduleLimit
}

// GetStoreBalanceRate mocks method
func (mso *ScheduleOptions) GetStoreBalanceRate() float64 {
return mso.StoreBalanceRate
// GetStoreLimitByType mocks method
func (mso *ScheduleOptions) GetStoreLimitByType(storeID uint64, typ storelimit.Type) float64 {
limit, ok := mso.StoreLimit[storeID]
if !ok {
return 0
}
switch typ {
case storelimit.AddPeer:
return limit.AddPeer
case storelimit.RemovePeer:
return limit.RemovePeer
default:
panic("no such limit type")
}
}

// GetMaxSnapshotCount mocks method
Expand Down Expand Up @@ -283,7 +337,7 @@ func (mso *ScheduleOptions) GetKeyType() core.KeyType {
return core.StringToKeyType(mso.KeyType)
}

// GetStoreLimitMode returns the limit mode of store.
func (mso *ScheduleOptions) GetStoreLimitMode() string {
return mso.StoreLimitMode
// CheckLabelProperty mocks method
func (mso *ScheduleOptions) CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool {
return true
}
68 changes: 25 additions & 43 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/pd/v4/server"
"github.com/pingcap/pd/v4/server/config"
"github.com/pingcap/pd/v4/server/core"
"github.com/pingcap/pd/v4/server/schedule"
"github.com/pingcap/pd/v4/server/schedule/storelimit"
"github.com/pkg/errors"
"github.com/unrolled/render"
Expand Down Expand Up @@ -361,21 +360,23 @@ func (h *storeHandler) SetLimit(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusBadRequest, "rate unset")
return
}
rate, ok := rateVal.(float64)
if !ok || rate < 0 {
ratePerMin, ok := rateVal.(float64)
if !ok || ratePerMin < 0 {
h.rd.JSON(w, http.StatusBadRequest, "badformat rate")
return
}

typeValue, err := getStoreLimitType(input)
typeValues, err := getStoreLimitType(input)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}

if err := h.SetStoreLimit(storeID, rate/schedule.StoreBalanceBaseTime, typeValue); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
for _, typ := range typeValues {
if err := h.SetStoreLimit(storeID, ratePerMin, typ); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
}

h.rd.JSON(w, http.StatusOK, nil)
Expand Down Expand Up @@ -431,21 +432,23 @@ func (h *storesHandler) SetAllLimit(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusBadRequest, "rate unset")
return
}
rate, ok := rateVal.(float64)
if !ok || rate < 0 {
ratePerMin, ok := rateVal.(float64)
if !ok || ratePerMin < 0 {
h.rd.JSON(w, http.StatusBadRequest, "badformat rate")
return
}

typeValue, err := getStoreLimitType(input)
typeValues, err := getStoreLimitType(input)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}

if err := h.SetAllStoresLimit(rate/schedule.StoreBalanceBaseTime, typeValue); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
for _, typ := range typeValues {
if err := h.SetAllStoresLimit(ratePerMin, typ); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
}

h.rd.JSON(w, http.StatusOK, nil)
Expand All @@ -459,30 +462,8 @@ func (h *storesHandler) SetAllLimit(w http.ResponseWriter, r *http.Request) {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /stores/limit [get]
func (h *storesHandler) GetAllLimit(w http.ResponseWriter, r *http.Request) {
typeName := r.URL.Query().Get("type")
typeValue, err := parseStoreLimitType(typeName)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
limits, err := h.GetAllStoresLimit(typeValue)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
type LimitResp struct {
Rate float64 `json:"rate"`
Mode string `json:"mode"`
}
resp := make(map[uint64]*LimitResp)
for s, l := range limits {
resp[s] = &LimitResp{
Rate: l.Rate() * schedule.StoreBalanceBaseTime,
Mode: l.Mode().String(),
}
}

h.rd.JSON(w, http.StatusOK, resp)
limits := h.GetScheduleConfig().StoreLimit
h.rd.JSON(w, http.StatusOK, limits)
}

// @Tags store
Expand Down Expand Up @@ -607,23 +588,24 @@ func (filter *storeStateFilter) filter(stores []*metapb.Store) []*metapb.Store {
return ret
}

func getStoreLimitType(input map[string]interface{}) (storelimit.Type, error) {
func getStoreLimitType(input map[string]interface{}) ([]storelimit.Type, error) {
typeNameIface, ok := input["type"]
typeValue := storelimit.RegionAdd
var err error
if ok {
typeName, ok := typeNameIface.(string)
if !ok {
err = errors.New("bad format type")
} else {
return parseStoreLimitType(typeName)
return nil, err
}
typ, err := parseStoreLimitType(typeName)
return []storelimit.Type{typ}, err
}
return typeValue, err

return []storelimit.Type{storelimit.AddPeer, storelimit.RemovePeer}, err
}

func parseStoreLimitType(typeName string) (storelimit.Type, error) {
typeValue := storelimit.RegionAdd
typeValue := storelimit.AddPeer
var err error
if typeName != "" {
if value, ok := storelimit.TypeNameValue[typeName]; ok {
Expand Down
3 changes: 1 addition & 2 deletions server/api/trend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/v4/server"
"github.com/pingcap/pd/v4/server/config"
"github.com/pingcap/pd/v4/server/core"
"github.com/pingcap/pd/v4/server/schedule/operator"
)
Expand All @@ -30,7 +29,7 @@ var _ = Suite(&testTrendSuite{})
type testTrendSuite struct{}

func (s *testTrendSuite) TestTrend(c *C) {
svr, cleanup := mustNewServer(c, func(cfg *config.Config) { cfg.Schedule.StoreBalanceRate = 60 })
svr, cleanup := mustNewServer(c)
defer cleanup()
mustWaitLeader(c, []*server.Server{svr})

Expand Down
Loading