Skip to content

Commit

Permalink
combine storelimit and store balance rate
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed May 21, 2020
1 parent 773e262 commit 284e68f
Show file tree
Hide file tree
Showing 31 changed files with 437 additions and 386 deletions.
1 change: 0 additions & 1 deletion conf/simconfig.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,3 @@ leader-schedule-limit = 32
region-schedule-limit = 128
replica-schedule-limit = 32
merge-schedule-limit = 32
store-balance-rate = 512.0
13 changes: 13 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/pd/v4/server/core"
"github.com/pingcap/pd/v4/server/kv"
"github.com/pingcap/pd/v4/server/schedule/placement"
"github.com/pingcap/pd/v4/server/schedule/storelimit"
"github.com/pingcap/pd/v4/server/statistics"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -203,6 +204,8 @@ func (mc *Cluster) AddLeaderStore(storeID uint64, leaderCount int, leaderSizes .
core.SetLeaderSize(leaderSize),
core.SetLastHeartbeatTS(time.Now()),
)
mc.SetStoreLimit(storeID, storelimit.AddPeer, 60)
mc.SetStoreLimit(storeID, storelimit.RemovePeer, 60)
mc.PutStore(store)
}

Expand All @@ -218,6 +221,9 @@ func (mc *Cluster) AddRegionStore(storeID uint64, regionCount int) {
core.SetRegionSize(int64(regionCount)*10),
core.SetLastHeartbeatTS(time.Now()),
)

mc.SetStoreLimit(storeID, storelimit.AddPeer, 60)
mc.SetStoreLimit(storeID, storelimit.RemovePeer, 60)
mc.PutStore(store)
}

Expand Down Expand Up @@ -253,6 +259,8 @@ func (mc *Cluster) AddLabelsStore(storeID uint64, regionCount int, labels map[st
core.SetRegionSize(int64(regionCount)*10),
core.SetLastHeartbeatTS(time.Now()),
)
mc.SetStoreLimit(storeID, storelimit.AddPeer, 60)
mc.SetStoreLimit(storeID, storelimit.RemovePeer, 60)
mc.PutStore(store)
}

Expand Down Expand Up @@ -539,6 +547,11 @@ func (mc *Cluster) GetMaxReplicas() int {
return mc.ScheduleOptions.GetMaxReplicas()
}

// GetStoreLimitByType mocks method.
func (mc *Cluster) GetStoreLimitByType(storeID uint64, typ storelimit.Type) float64 {
return mc.ScheduleOptions.GetStoreLimitByType(storeID, typ)
}

// CheckLabelProperty checks label property.
func (mc *Cluster) CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool {
for _, cfg := range mc.LabelProperties[typ] {
Expand Down
73 changes: 67 additions & 6 deletions pkg/mock/mockoption/mockoption.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/v4/server/core"
"github.com/pingcap/pd/v4/server/schedule/storelimit"
)

const (
Expand All @@ -33,7 +34,6 @@ const (
defaultReplicaScheduleLimit = 64
defaultMergeScheduleLimit = 8
defaultHotRegionScheduleLimit = 4
defaultStoreBalanceRate = 60
defaultTolerantSizeRatio = 2.5
defaultLowSpaceRatio = 0.8
defaultHighSpaceRatio = 0.6
Expand All @@ -43,8 +43,15 @@ const (
defaultLeaderSchedulePolicy = "count"
defaultEnablePlacementRules = false
defaultKeyType = "table"
defaultStoreLimit = 60
)

// StoreLimitConfig is a mock of StoreLimitConfig.
type StoreLimitConfig struct {
AddPeer float64 `toml:"add-peer" json:"add-peer"`
RemovePeer float64 `toml:"remove-peer" json:"remove-peer"`
}

// ScheduleOptions is a mock of ScheduleOptions
// which implements Options interface
type ScheduleOptions struct {
Expand All @@ -53,7 +60,7 @@ type ScheduleOptions struct {
ReplicaScheduleLimit uint64
MergeScheduleLimit uint64
HotRegionScheduleLimit uint64
StoreBalanceRate float64
StoreLimit map[uint64]StoreLimitConfig
MaxSnapshotCount uint64
MaxPendingPeerCount uint64
MaxMergeRegionSize uint64
Expand Down Expand Up @@ -95,7 +102,6 @@ func NewScheduleOptions() *ScheduleOptions {
mso.ReplicaScheduleLimit = defaultReplicaScheduleLimit
mso.MergeScheduleLimit = defaultMergeScheduleLimit
mso.HotRegionScheduleLimit = defaultHotRegionScheduleLimit
mso.StoreBalanceRate = defaultStoreBalanceRate
mso.MaxSnapshotCount = defaultMaxSnapshotCount
mso.MaxMergeRegionSize = defaultMaxMergeRegionSize
mso.MaxMergeRegionKeys = defaultMaxMergeRegionKeys
Expand All @@ -117,9 +123,48 @@ func NewScheduleOptions() *ScheduleOptions {
mso.EnableLocationReplacement = true
mso.LeaderSchedulePolicy = defaultLeaderSchedulePolicy
mso.KeyType = defaultKeyType
mso.StoreLimit = make(map[uint64]StoreLimitConfig)
return mso
}

// SetStoreLimit is's only used to test.
func (mso *ScheduleOptions) SetStoreLimit(storeID uint64, typ storelimit.Type, ratePerMin float64) {
var sc StoreLimitConfig
if _, ok := mso.StoreLimit[storeID]; ok {
switch typ {
case storelimit.AddPeer:
sc = StoreLimitConfig{AddPeer: ratePerMin, RemovePeer: mso.StoreLimit[storeID].RemovePeer}
case storelimit.RemovePeer:
sc = StoreLimitConfig{AddPeer: mso.StoreLimit[storeID].AddPeer, RemovePeer: ratePerMin}
}
} else {
switch typ {
case storelimit.AddPeer:
sc = StoreLimitConfig{AddPeer: ratePerMin, RemovePeer: defaultStoreLimit}
case storelimit.RemovePeer:
sc = StoreLimitConfig{AddPeer: defaultStoreLimit, RemovePeer: ratePerMin}
}
}

mso.StoreLimit[storeID] = sc
}

// SetAllStoresLimit is's only used to test.
func (mso *ScheduleOptions) SetAllStoresLimit(typ storelimit.Type, ratePerMin float64) {
switch typ {
case storelimit.AddPeer:
for storeID := range mso.StoreLimit {
sc := StoreLimitConfig{AddPeer: ratePerMin, RemovePeer: mso.StoreLimit[storeID].RemovePeer}
mso.StoreLimit[storeID] = sc
}
case storelimit.RemovePeer:
for storeID := range mso.StoreLimit {
sc := StoreLimitConfig{AddPeer: mso.StoreLimit[storeID].AddPeer, RemovePeer: ratePerMin}
mso.StoreLimit[storeID] = sc
}
}
}

// GetLeaderScheduleLimit mocks method
func (mso *ScheduleOptions) GetLeaderScheduleLimit() uint64 {
return mso.LeaderScheduleLimit
Expand All @@ -145,9 +190,20 @@ func (mso *ScheduleOptions) GetHotRegionScheduleLimit() uint64 {
return mso.HotRegionScheduleLimit
}

// GetStoreBalanceRate mocks method
func (mso *ScheduleOptions) GetStoreBalanceRate() float64 {
return mso.StoreBalanceRate
// GetStoreLimitByType mocks method
func (mso *ScheduleOptions) GetStoreLimitByType(storeID uint64, typ storelimit.Type) float64 {
limit, ok := mso.StoreLimit[storeID]
if !ok {
return 0
}
switch typ {
case storelimit.AddPeer:
return limit.AddPeer
case storelimit.RemovePeer:
return limit.RemovePeer
default:
panic("no such limit type")
}
}

// GetMaxSnapshotCount mocks method
Expand Down Expand Up @@ -279,3 +335,8 @@ func (mso *ScheduleOptions) GetLeaderSchedulePolicy() core.SchedulePolicy {
func (mso *ScheduleOptions) GetKeyType() core.KeyType {
return core.StringToKeyType(mso.KeyType)
}

// CheckLabelProperty mocks method
func (mso *ScheduleOptions) CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool {
return true
}
43 changes: 10 additions & 33 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/pd/v4/server"
"github.com/pingcap/pd/v4/server/config"
"github.com/pingcap/pd/v4/server/core"
"github.com/pingcap/pd/v4/server/schedule"
"github.com/pingcap/pd/v4/server/schedule/storelimit"
"github.com/pkg/errors"
"github.com/unrolled/render"
Expand Down Expand Up @@ -361,8 +360,8 @@ func (h *storeHandler) SetLimit(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusBadRequest, "rate unset")
return
}
rate, ok := rateVal.(float64)
if !ok || rate < 0 {
ratePerMin, ok := rateVal.(float64)
if !ok || ratePerMin < 0 {
h.rd.JSON(w, http.StatusBadRequest, "badformat rate")
return
}
Expand All @@ -373,7 +372,7 @@ func (h *storeHandler) SetLimit(w http.ResponseWriter, r *http.Request) {
return
}

if err := h.SetStoreLimit(storeID, rate/schedule.StoreBalanceBaseTime, typeValue); err != nil {
if err := h.SetStoreLimit(storeID, ratePerMin, typeValue); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
Expand Down Expand Up @@ -431,8 +430,8 @@ func (h *storesHandler) SetAllLimit(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusBadRequest, "rate unset")
return
}
rate, ok := rateVal.(float64)
if !ok || rate < 0 {
ratePerMin, ok := rateVal.(float64)
if !ok || ratePerMin < 0 {
h.rd.JSON(w, http.StatusBadRequest, "badformat rate")
return
}
Expand All @@ -443,7 +442,7 @@ func (h *storesHandler) SetAllLimit(w http.ResponseWriter, r *http.Request) {
return
}

if err := h.SetAllStoresLimit(rate/schedule.StoreBalanceBaseTime, typeValue); err != nil {
if err := h.SetAllStoresLimit(ratePerMin, typeValue); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
Expand All @@ -459,30 +458,8 @@ func (h *storesHandler) SetAllLimit(w http.ResponseWriter, r *http.Request) {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /stores/limit [get]
func (h *storesHandler) GetAllLimit(w http.ResponseWriter, r *http.Request) {
typeName := r.URL.Query().Get("type")
typeValue, err := parseStoreLimitType(typeName)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
limits, err := h.GetAllStoresLimit(typeValue)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
type LimitResp struct {
Rate float64 `json:"rate"`
Mode string `json:"mode"`
}
resp := make(map[uint64]*LimitResp)
for s, l := range limits {
resp[s] = &LimitResp{
Rate: l.Rate() * schedule.StoreBalanceBaseTime,
Mode: l.Mode().String(),
}
}

h.rd.JSON(w, http.StatusOK, resp)
limits := h.GetScheduleConfig().StoreLimit
h.rd.JSON(w, http.StatusOK, limits)
}

// @Tags store
Expand Down Expand Up @@ -609,7 +586,7 @@ func (filter *storeStateFilter) filter(stores []*metapb.Store) []*metapb.Store {

func getStoreLimitType(input map[string]interface{}) (storelimit.Type, error) {
typeNameIface, ok := input["type"]
typeValue := storelimit.RegionAdd
typeValue := storelimit.AddPeer
var err error
if ok {
typeName, ok := typeNameIface.(string)
Expand All @@ -623,7 +600,7 @@ func getStoreLimitType(input map[string]interface{}) (storelimit.Type, error) {
}

func parseStoreLimitType(typeName string) (storelimit.Type, error) {
typeValue := storelimit.RegionAdd
typeValue := storelimit.AddPeer
var err error
if typeName != "" {
if value, ok := storelimit.TypeNameValue[typeName]; ok {
Expand Down
3 changes: 1 addition & 2 deletions server/api/trend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/v4/server"
"github.com/pingcap/pd/v4/server/config"
"github.com/pingcap/pd/v4/server/core"
"github.com/pingcap/pd/v4/server/schedule/operator"
)
Expand All @@ -30,7 +29,7 @@ var _ = Suite(&testTrendSuite{})
type testTrendSuite struct{}

func (s *testTrendSuite) TestTrend(c *C) {
svr, cleanup := mustNewServer(c, func(cfg *config.Config) { cfg.Schedule.StoreBalanceRate = 60 })
svr, cleanup := mustNewServer(c)
defer cleanup()
mustWaitLeader(c, []*server.Server{svr})

Expand Down
Loading

0 comments on commit 284e68f

Please sign in to comment.