diff --git a/conf/simconfig.toml b/conf/simconfig.toml index 68ab11e6321..428ee61e508 100644 --- a/conf/simconfig.toml +++ b/conf/simconfig.toml @@ -29,4 +29,3 @@ leader-schedule-limit = 32 region-schedule-limit = 128 replica-schedule-limit = 32 merge-schedule-limit = 32 -store-balance-rate = 512.0 diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index aa58cba8c6e..b26f49df4fd 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/pd/v4/server/core" "github.com/pingcap/pd/v4/server/kv" "github.com/pingcap/pd/v4/server/schedule/placement" + "github.com/pingcap/pd/v4/server/schedule/storelimit" "github.com/pingcap/pd/v4/server/statistics" "go.uber.org/zap" ) @@ -203,6 +204,8 @@ func (mc *Cluster) AddLeaderStore(storeID uint64, leaderCount int, leaderSizes . core.SetLeaderSize(leaderSize), core.SetLastHeartbeatTS(time.Now()), ) + mc.SetStoreLimit(storeID, storelimit.AddPeer, 60) + mc.SetStoreLimit(storeID, storelimit.RemovePeer, 60) mc.PutStore(store) } @@ -218,6 +221,8 @@ func (mc *Cluster) AddRegionStore(storeID uint64, regionCount int) { core.SetRegionSize(int64(regionCount)*10), core.SetLastHeartbeatTS(time.Now()), ) + mc.SetStoreLimit(storeID, storelimit.AddPeer, 60) + mc.SetStoreLimit(storeID, storelimit.RemovePeer, 60) mc.PutStore(store) } @@ -253,6 +258,8 @@ func (mc *Cluster) AddLabelsStore(storeID uint64, regionCount int, labels map[st core.SetRegionSize(int64(regionCount)*10), core.SetLastHeartbeatTS(time.Now()), ) + mc.SetStoreLimit(storeID, storelimit.AddPeer, 60) + mc.SetStoreLimit(storeID, storelimit.RemovePeer, 60) mc.PutStore(store) } @@ -539,6 +546,11 @@ func (mc *Cluster) GetMaxReplicas() int { return mc.ScheduleOptions.GetMaxReplicas() } +// GetStoreLimitByType mocks method. +func (mc *Cluster) GetStoreLimitByType(storeID uint64, typ storelimit.Type) float64 { + return mc.ScheduleOptions.GetStoreLimitByType(storeID, typ) +} + // CheckLabelProperty checks label property. func (mc *Cluster) CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool { for _, cfg := range mc.LabelProperties[typ] { diff --git a/pkg/mock/mockoption/mockoption.go b/pkg/mock/mockoption/mockoption.go index feaff33215b..28d33655495 100644 --- a/pkg/mock/mockoption/mockoption.go +++ b/pkg/mock/mockoption/mockoption.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/v4/server/core" + "github.com/pingcap/pd/v4/server/schedule/storelimit" ) const ( @@ -33,7 +34,6 @@ const ( defaultReplicaScheduleLimit = 64 defaultMergeScheduleLimit = 8 defaultHotRegionScheduleLimit = 4 - defaultStoreBalanceRate = 60 defaultTolerantSizeRatio = 2.5 defaultLowSpaceRatio = 0.8 defaultHighSpaceRatio = 0.6 @@ -43,9 +43,15 @@ const ( defaultLeaderSchedulePolicy = "count" defaultEnablePlacementRules = false defaultKeyType = "table" - defaultStoreLimitMode = "manual" + defaultStoreLimit = 60 ) +// StoreLimitConfig is a mock of StoreLimitConfig. +type StoreLimitConfig struct { + AddPeer float64 `toml:"add-peer" json:"add-peer"` + RemovePeer float64 `toml:"remove-peer" json:"remove-peer"` +} + // ScheduleOptions is a mock of ScheduleOptions // which implements Options interface type ScheduleOptions struct { @@ -54,7 +60,7 @@ type ScheduleOptions struct { ReplicaScheduleLimit uint64 MergeScheduleLimit uint64 HotRegionScheduleLimit uint64 - StoreBalanceRate float64 + StoreLimit map[uint64]StoreLimitConfig MaxSnapshotCount uint64 MaxPendingPeerCount uint64 MaxMergeRegionSize uint64 @@ -97,7 +103,6 @@ func NewScheduleOptions() *ScheduleOptions { mso.ReplicaScheduleLimit = defaultReplicaScheduleLimit mso.MergeScheduleLimit = defaultMergeScheduleLimit mso.HotRegionScheduleLimit = defaultHotRegionScheduleLimit - mso.StoreBalanceRate = defaultStoreBalanceRate mso.MaxSnapshotCount = defaultMaxSnapshotCount mso.MaxMergeRegionSize = defaultMaxMergeRegionSize mso.MaxMergeRegionKeys = defaultMaxMergeRegionKeys @@ -112,7 +117,6 @@ func NewScheduleOptions() *ScheduleOptions { mso.TolerantSizeRatio = defaultTolerantSizeRatio mso.LowSpaceRatio = defaultLowSpaceRatio mso.HighSpaceRatio = defaultHighSpaceRatio - mso.StoreLimitMode = defaultStoreLimitMode mso.EnableRemoveDownReplica = true mso.EnableReplaceOfflineReplica = true mso.EnableMakeUpReplica = true @@ -120,9 +124,48 @@ func NewScheduleOptions() *ScheduleOptions { mso.EnableLocationReplacement = true mso.LeaderSchedulePolicy = defaultLeaderSchedulePolicy mso.KeyType = defaultKeyType + mso.StoreLimit = make(map[uint64]StoreLimitConfig) return mso } +// SetStoreLimit mocks method +func (mso *ScheduleOptions) SetStoreLimit(storeID uint64, typ storelimit.Type, ratePerMin float64) { + var sc StoreLimitConfig + if _, ok := mso.StoreLimit[storeID]; ok { + switch typ { + case storelimit.AddPeer: + sc = StoreLimitConfig{AddPeer: ratePerMin, RemovePeer: mso.StoreLimit[storeID].RemovePeer} + case storelimit.RemovePeer: + sc = StoreLimitConfig{AddPeer: mso.StoreLimit[storeID].AddPeer, RemovePeer: ratePerMin} + } + } else { + switch typ { + case storelimit.AddPeer: + sc = StoreLimitConfig{AddPeer: ratePerMin, RemovePeer: defaultStoreLimit} + case storelimit.RemovePeer: + sc = StoreLimitConfig{AddPeer: defaultStoreLimit, RemovePeer: ratePerMin} + } + } + + mso.StoreLimit[storeID] = sc +} + +// SetAllStoresLimit mocks method +func (mso *ScheduleOptions) SetAllStoresLimit(typ storelimit.Type, ratePerMin float64) { + switch typ { + case storelimit.AddPeer: + for storeID := range mso.StoreLimit { + sc := StoreLimitConfig{AddPeer: ratePerMin, RemovePeer: mso.StoreLimit[storeID].RemovePeer} + mso.StoreLimit[storeID] = sc + } + case storelimit.RemovePeer: + for storeID := range mso.StoreLimit { + sc := StoreLimitConfig{AddPeer: mso.StoreLimit[storeID].AddPeer, RemovePeer: ratePerMin} + mso.StoreLimit[storeID] = sc + } + } +} + // GetLeaderScheduleLimit mocks method func (mso *ScheduleOptions) GetLeaderScheduleLimit() uint64 { return mso.LeaderScheduleLimit @@ -148,9 +191,20 @@ func (mso *ScheduleOptions) GetHotRegionScheduleLimit() uint64 { return mso.HotRegionScheduleLimit } -// GetStoreBalanceRate mocks method -func (mso *ScheduleOptions) GetStoreBalanceRate() float64 { - return mso.StoreBalanceRate +// GetStoreLimitByType mocks method +func (mso *ScheduleOptions) GetStoreLimitByType(storeID uint64, typ storelimit.Type) float64 { + limit, ok := mso.StoreLimit[storeID] + if !ok { + return 0 + } + switch typ { + case storelimit.AddPeer: + return limit.AddPeer + case storelimit.RemovePeer: + return limit.RemovePeer + default: + panic("no such limit type") + } } // GetMaxSnapshotCount mocks method @@ -283,7 +337,7 @@ func (mso *ScheduleOptions) GetKeyType() core.KeyType { return core.StringToKeyType(mso.KeyType) } -// GetStoreLimitMode returns the limit mode of store. -func (mso *ScheduleOptions) GetStoreLimitMode() string { - return mso.StoreLimitMode +// CheckLabelProperty mocks method +func (mso *ScheduleOptions) CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool { + return true } diff --git a/server/api/store.go b/server/api/store.go index 3c9f198190b..65e996abb3e 100644 --- a/server/api/store.go +++ b/server/api/store.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/pd/v4/server" "github.com/pingcap/pd/v4/server/config" "github.com/pingcap/pd/v4/server/core" - "github.com/pingcap/pd/v4/server/schedule" "github.com/pingcap/pd/v4/server/schedule/storelimit" "github.com/pkg/errors" "github.com/unrolled/render" @@ -361,21 +360,23 @@ func (h *storeHandler) SetLimit(w http.ResponseWriter, r *http.Request) { h.rd.JSON(w, http.StatusBadRequest, "rate unset") return } - rate, ok := rateVal.(float64) - if !ok || rate < 0 { + ratePerMin, ok := rateVal.(float64) + if !ok || ratePerMin < 0 { h.rd.JSON(w, http.StatusBadRequest, "badformat rate") return } - typeValue, err := getStoreLimitType(input) + typeValues, err := getStoreLimitType(input) if err != nil { h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } - if err := h.SetStoreLimit(storeID, rate/schedule.StoreBalanceBaseTime, typeValue); err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return + for _, typ := range typeValues { + if err := h.SetStoreLimit(storeID, ratePerMin, typ); err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } } h.rd.JSON(w, http.StatusOK, nil) @@ -431,21 +432,23 @@ func (h *storesHandler) SetAllLimit(w http.ResponseWriter, r *http.Request) { h.rd.JSON(w, http.StatusBadRequest, "rate unset") return } - rate, ok := rateVal.(float64) - if !ok || rate < 0 { + ratePerMin, ok := rateVal.(float64) + if !ok || ratePerMin < 0 { h.rd.JSON(w, http.StatusBadRequest, "badformat rate") return } - typeValue, err := getStoreLimitType(input) + typeValues, err := getStoreLimitType(input) if err != nil { h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } - if err := h.SetAllStoresLimit(rate/schedule.StoreBalanceBaseTime, typeValue); err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return + for _, typ := range typeValues { + if err := h.SetAllStoresLimit(ratePerMin, typ); err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } } h.rd.JSON(w, http.StatusOK, nil) @@ -459,30 +462,8 @@ func (h *storesHandler) SetAllLimit(w http.ResponseWriter, r *http.Request) { // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /stores/limit [get] func (h *storesHandler) GetAllLimit(w http.ResponseWriter, r *http.Request) { - typeName := r.URL.Query().Get("type") - typeValue, err := parseStoreLimitType(typeName) - if err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) - return - } - limits, err := h.GetAllStoresLimit(typeValue) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - type LimitResp struct { - Rate float64 `json:"rate"` - Mode string `json:"mode"` - } - resp := make(map[uint64]*LimitResp) - for s, l := range limits { - resp[s] = &LimitResp{ - Rate: l.Rate() * schedule.StoreBalanceBaseTime, - Mode: l.Mode().String(), - } - } - - h.rd.JSON(w, http.StatusOK, resp) + limits := h.GetScheduleConfig().StoreLimit + h.rd.JSON(w, http.StatusOK, limits) } // @Tags store @@ -607,23 +588,24 @@ func (filter *storeStateFilter) filter(stores []*metapb.Store) []*metapb.Store { return ret } -func getStoreLimitType(input map[string]interface{}) (storelimit.Type, error) { +func getStoreLimitType(input map[string]interface{}) ([]storelimit.Type, error) { typeNameIface, ok := input["type"] - typeValue := storelimit.RegionAdd var err error if ok { typeName, ok := typeNameIface.(string) if !ok { err = errors.New("bad format type") - } else { - return parseStoreLimitType(typeName) + return nil, err } + typ, err := parseStoreLimitType(typeName) + return []storelimit.Type{typ}, err } - return typeValue, err + + return []storelimit.Type{storelimit.AddPeer, storelimit.RemovePeer}, err } func parseStoreLimitType(typeName string) (storelimit.Type, error) { - typeValue := storelimit.RegionAdd + typeValue := storelimit.AddPeer var err error if typeName != "" { if value, ok := storelimit.TypeNameValue[typeName]; ok { diff --git a/server/api/trend_test.go b/server/api/trend_test.go index ace37a08690..3c831577a8f 100644 --- a/server/api/trend_test.go +++ b/server/api/trend_test.go @@ -20,7 +20,6 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/v4/server" - "github.com/pingcap/pd/v4/server/config" "github.com/pingcap/pd/v4/server/core" "github.com/pingcap/pd/v4/server/schedule/operator" ) @@ -30,7 +29,7 @@ var _ = Suite(&testTrendSuite{}) type testTrendSuite struct{} func (s *testTrendSuite) TestTrend(c *C) { - svr, cleanup := mustNewServer(c, func(cfg *config.Config) { cfg.Schedule.StoreBalanceRate = 60 }) + svr, cleanup := mustNewServer(c) defer cleanup() mustWaitLeader(c, []*server.Server{svr}) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index b614087defc..87aa4942cfc 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -235,7 +235,7 @@ func (c *RaftCluster) Start(s Server) error { c.coordinator = newCoordinator(c.ctx, cluster, s.GetHBStreams()) c.regionStats = statistics.NewRegionStatistics(c.opt) - c.limiter = NewStoreLimiter(c.coordinator.opController) + c.limiter = NewStoreLimiter(s.GetPersistOptions()) c.quit = make(chan struct{}) c.wg.Add(4) @@ -935,8 +935,7 @@ func (c *RaftCluster) RemoveStore(storeID uint64) error { zap.String("store-address", newStore.GetAddress())) err := c.putStoreLocked(newStore) if err == nil { - // set the remove peer limit of the store to unlimited - c.coordinator.opController.SetStoreLimit(store.GetID(), storelimit.Unlimited, storelimit.Manual, storelimit.RegionRemove) + c.SetStoreLimit(storeID, storelimit.RemovePeer, storelimit.Unlimited) } return err } @@ -972,7 +971,7 @@ func (c *RaftCluster) BuryStore(storeID uint64, force bool) error { zap.String("store-address", newStore.GetAddress())) err := c.putStoreLocked(newStore) if err == nil { - c.coordinator.opController.RemoveStoreLimit(store.GetID()) + c.RemoveStoreLimit(storeID) } return err } @@ -1100,7 +1099,7 @@ func (c *RaftCluster) RemoveTombStoneRecords() error { zap.Error(err)) return err } - c.coordinator.opController.RemoveStoreLimit(store.GetID()) + c.RemoveStoreLimit(store.GetID()) log.Info("delete store succeeded", zap.Stringer("store", store.GetMeta())) } @@ -1329,11 +1328,6 @@ func (c *RaftCluster) GetHotRegionScheduleLimit() uint64 { return c.opt.GetHotRegionScheduleLimit() } -// GetStoreBalanceRate returns the balance rate of a store. -func (c *RaftCluster) GetStoreBalanceRate() float64 { - return c.opt.GetStoreBalanceRate() -} - // GetTolerantSizeRatio gets the tolerant size ratio. func (c *RaftCluster) GetTolerantSizeRatio() float64 { return c.opt.GetTolerantSizeRatio() @@ -1646,6 +1640,47 @@ func (c *RaftCluster) GetStoreLimiter() *StoreLimiter { return c.limiter } +// GetStoreLimitByType returns the store limit for a given store ID and type. +func (c *RaftCluster) GetStoreLimitByType(storeID uint64, typ storelimit.Type) float64 { + return c.opt.GetStoreLimitByType(storeID, typ) +} + +// GetAllStoresLimit returns all store limit +func (c *RaftCluster) GetAllStoresLimit() map[uint64]config.StoreLimitConfig { + return c.opt.GetAllStoresLimit() +} + +// AddStoreLimit add a store limit for a given store ID. +func (c *RaftCluster) AddStoreLimit(storeID uint64) { + cfg := c.opt.GetScheduleConfig().Clone() + sc := config.StoreLimitConfig{ + AddPeer: config.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer), + RemovePeer: config.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer), + } + cfg.StoreLimit[storeID] = sc + c.opt.SetScheduleConfig(cfg) +} + +// RemoveStoreLimit remove a store limit for a given store ID. +func (c *RaftCluster) RemoveStoreLimit(storeID uint64) { + cfg := c.opt.GetScheduleConfig().Clone() + for _, limitType := range storelimit.TypeNameValue { + c.AttachAvailableFunc(storeID, limitType, nil) + } + delete(cfg.StoreLimit, storeID) + c.opt.SetScheduleConfig(cfg) +} + +// SetStoreLimit sets a store limit for a given type and rate. +func (c *RaftCluster) SetStoreLimit(storeID uint64, typ storelimit.Type, ratePerMin float64) { + c.opt.SetStoreLimit(storeID, typ, ratePerMin) +} + +// SetAllStoresLimit sets all store limit for a given type and rate. +func (c *RaftCluster) SetAllStoresLimit(typ storelimit.Type, ratePerMin float64) { + c.opt.SetAllStoresLimit(typ, ratePerMin) +} + var healthURL = "/pd/api/v1/ping" // CheckHealth checks if members are healthy. diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index fbeb39d4b4e..c4e4e955176 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -655,7 +655,6 @@ type testCluster struct { func newTestScheduleConfig() (*config.ScheduleConfig, *config.PersistOptions, error) { cfg := config.NewConfig() cfg.Schedule.TolerantSizeRatio = 5 - cfg.Schedule.StoreBalanceRate = 60 if err := cfg.Adjust(nil); err != nil { return nil, nil, err } diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index f05aba93615..5afb0f211f2 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -525,7 +525,7 @@ func (c *coordinator) pauseOrResumeScheduler(name string, t int64) error { } var err error for _, sc := range s { - var delayUntil int64 = 0 + var delayUntil int64 if t > 0 { delayUntil = time.Now().Unix() + t } diff --git a/server/cluster/coordinator_test.go b/server/cluster/coordinator_test.go index 28833d8675a..54bb7bd078b 100644 --- a/server/cluster/coordinator_test.go +++ b/server/cluster/coordinator_test.go @@ -67,6 +67,9 @@ func (c *testCluster) addRegionStore(storeID uint64, regionCount int, regionSize core.SetRegionSize(int64(regionSize)), core.SetLastHeartbeatTS(time.Now()), ) + + c.SetStoreLimit(storeID, storelimit.AddPeer, 60) + c.SetStoreLimit(storeID, storelimit.RemovePeer, 60) c.Lock() defer c.Unlock() return c.putStoreLocked(newStore) @@ -103,6 +106,9 @@ func (c *testCluster) addLeaderStore(storeID uint64, leaderCount int) error { core.SetLeaderSize(int64(leaderCount)*10), core.SetLastHeartbeatTS(time.Now()), ) + + c.SetStoreLimit(storeID, storelimit.AddPeer, 60) + c.SetStoreLimit(storeID, storelimit.RemovePeer, 60) c.Lock() defer c.Unlock() return c.putStoreLocked(newStore) @@ -907,15 +913,12 @@ func (s *testOperatorControllerSuite) TestOperatorCount(c *C) { } func (s *testOperatorControllerSuite) TestStoreOverloaded(c *C) { - tc, co, cleanup := prepare(func(cfg *config.ScheduleConfig) { - // scheduling one time needs 60 seconds - // and thus it's large enough to make sure that only schedule one time - cfg.StoreBalanceRate = 1 - }, nil, nil, c) + tc, co, cleanup := prepare(nil, nil, nil, c) defer cleanup() oc := co.opController lb, err := schedule.CreateScheduler(schedulers.BalanceRegionType, oc, tc.storage, schedule.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) c.Assert(err, IsNil) + opt := tc.GetOpt() c.Assert(tc.addRegionStore(4, 100), IsNil) c.Assert(tc.addRegionStore(3, 100), IsNil) c.Assert(tc.addRegionStore(2, 100), IsNil) @@ -935,9 +938,9 @@ func (s *testOperatorControllerSuite) TestStoreOverloaded(c *C) { // reset all stores' limit // scheduling one time needs 1/10 seconds - oc.SetAllStoresLimit(10, storelimit.Manual, storelimit.RegionAdd) - oc.SetAllStoresLimit(10, storelimit.Manual, storelimit.RegionRemove) - + opt.SetAllStoresLimit(storelimit.AddPeer, 600) + opt.SetAllStoresLimit(storelimit.RemovePeer, 600) + time.Sleep(1 * time.Second) for i := 0; i < 10; i++ { op1 := lb.Schedule(tc)[0] c.Assert(op1, NotNil) @@ -952,10 +955,7 @@ func (s *testOperatorControllerSuite) TestStoreOverloaded(c *C) { } func (s *testOperatorControllerSuite) TestStoreOverloadedWithReplace(c *C) { - tc, co, cleanup := prepare(func(cfg *config.ScheduleConfig) { - // scheduling one time needs 2 seconds - cfg.StoreBalanceRate = 30 - }, nil, nil, c) + tc, co, cleanup := prepare(nil, nil, nil, c) defer cleanup() oc := co.opController lb, err := schedule.CreateScheduler(schedulers.BalanceRegionType, oc, tc.storage, schedule.ConfigSliceDecoder(schedulers.BalanceRegionType, []string{"", ""})) diff --git a/server/cluster/store_limiter.go b/server/cluster/store_limiter.go index ac6998b0f5d..2f9c91f6d5d 100644 --- a/server/cluster/store_limiter.go +++ b/server/cluster/store_limiter.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/pingcap/pd/v4/server/schedule" + "github.com/pingcap/pd/v4/server/schedule/opt" "github.com/pingcap/pd/v4/server/schedule/storelimit" "go.uber.org/zap" ) @@ -26,21 +27,21 @@ import ( // StoreLimiter adjust the store limit dynamically type StoreLimiter struct { m sync.RWMutex - oc *schedule.OperatorController + opt opt.Options scene map[storelimit.Type]*storelimit.Scene state *State current LoadState } // NewStoreLimiter builds a store limiter object using the operator controller -func NewStoreLimiter(c *schedule.OperatorController) *StoreLimiter { +func NewStoreLimiter(opt opt.Options) *StoreLimiter { defaultScene := map[storelimit.Type]*storelimit.Scene{ - storelimit.RegionAdd: storelimit.DefaultScene(storelimit.RegionAdd), - storelimit.RegionRemove: storelimit.DefaultScene(storelimit.RegionRemove), + storelimit.AddPeer: storelimit.DefaultScene(storelimit.AddPeer), + storelimit.RemovePeer: storelimit.DefaultScene(storelimit.RemovePeer), } return &StoreLimiter{ - oc: c, + opt: opt, state: NewState(), scene: defaultScene, current: LoadStateNone, @@ -56,17 +57,17 @@ func (s *StoreLimiter) Collect(stats *pdpb.StoreStats) { s.state.Collect((*StatEntry)(stats)) state := s.state.State() - rateRegionAdd := s.calculateRate(storelimit.RegionAdd, state) - rateRegionRemove := s.calculateRate(storelimit.RegionRemove, state) + ratePeerAdd := s.calculateRate(storelimit.AddPeer, state) + ratePeerRemove := s.calculateRate(storelimit.RemovePeer, state) - if rateRegionAdd > 0 || rateRegionRemove > 0 { - if rateRegionAdd > 0 { - s.oc.SetAllStoresLimitAuto(rateRegionAdd, storelimit.RegionAdd) - log.Info("change store region add limit for cluster", zap.Stringer("state", state), zap.Float64("rate", rateRegionAdd)) + if ratePeerAdd > 0 || ratePeerRemove > 0 { + if ratePeerAdd > 0 { + s.opt.SetAllStoresLimit(storelimit.AddPeer, ratePeerAdd) + log.Info("change store region add limit for cluster", zap.Stringer("state", state), zap.Float64("rate", ratePeerAdd)) } - if rateRegionRemove > 0 { - s.oc.SetAllStoresLimitAuto(rateRegionAdd, storelimit.RegionRemove) - log.Info("change store region remove limit for cluster", zap.Stringer("state", state), zap.Float64("rate", rateRegionRemove)) + if ratePeerRemove > 0 { + s.opt.SetAllStoresLimit(storelimit.RemovePeer, ratePeerRemove) + log.Info("change store region remove limit for cluster", zap.Stringer("state", state), zap.Float64("rate", ratePeerRemove)) } s.current = state collectClusterStateCurrent(state) diff --git a/server/cluster/store_limiter_test.go b/server/cluster/store_limiter_test.go index a4c4d6f5cec..35177a97496 100644 --- a/server/cluster/store_limiter_test.go +++ b/server/cluster/store_limiter_test.go @@ -14,57 +14,44 @@ package cluster import ( - "context" - . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/pingcap/pd/v4/pkg/mock/mockcluster" "github.com/pingcap/pd/v4/pkg/mock/mockoption" - "github.com/pingcap/pd/v4/server/schedule" "github.com/pingcap/pd/v4/server/schedule/storelimit" ) var _ = Suite(&testStoreLimiterSuite{}) type testStoreLimiterSuite struct { - oc *schedule.OperatorController - cancel context.CancelFunc + opt *mockoption.ScheduleOptions } func (s *testStoreLimiterSuite) SetUpSuite(c *C) { - ctx, cancel := context.WithCancel(context.Background()) - s.cancel = cancel - // Create a server for testing - opt := mockoption.NewScheduleOptions() - cluster := mockcluster.NewCluster(opt) - s.oc = schedule.NewOperatorController(ctx, cluster, nil) -} -func (s *testStoreLimiterSuite) TearDownSuite(c *C) { - s.cancel() + s.opt = mockoption.NewScheduleOptions() } func (s *testStoreLimiterSuite) TestCollect(c *C) { - limiter := NewStoreLimiter(s.oc) + limiter := NewStoreLimiter(s.opt) limiter.Collect(&pdpb.StoreStats{}) c.Assert(limiter.state.cst.total, Equals, int64(1)) } func (s *testStoreLimiterSuite) TestStoreLimitScene(c *C) { - limiter := NewStoreLimiter(s.oc) - c.Assert(limiter.scene[storelimit.RegionAdd], DeepEquals, storelimit.DefaultScene(storelimit.RegionAdd)) - c.Assert(limiter.scene[storelimit.RegionRemove], DeepEquals, storelimit.DefaultScene(storelimit.RegionRemove)) + limiter := NewStoreLimiter(s.opt) + c.Assert(limiter.scene[storelimit.AddPeer], DeepEquals, storelimit.DefaultScene(storelimit.AddPeer)) + c.Assert(limiter.scene[storelimit.RemovePeer], DeepEquals, storelimit.DefaultScene(storelimit.RemovePeer)) } func (s *testStoreLimiterSuite) TestReplaceStoreLimitScene(c *C) { - limiter := NewStoreLimiter(s.oc) + limiter := NewStoreLimiter(s.opt) - sceneRegionAdd := &storelimit.Scene{Idle: 4, Low: 3, Normal: 2, High: 1} - limiter.ReplaceStoreLimitScene(sceneRegionAdd, storelimit.RegionAdd) + sceneAddPeer := &storelimit.Scene{Idle: 4, Low: 3, Normal: 2, High: 1} + limiter.ReplaceStoreLimitScene(sceneAddPeer, storelimit.AddPeer) - c.Assert(limiter.scene[storelimit.RegionAdd], DeepEquals, sceneRegionAdd) + c.Assert(limiter.scene[storelimit.AddPeer], DeepEquals, sceneAddPeer) - sceneRegionRemove := &storelimit.Scene{Idle: 5, Low: 4, Normal: 3, High: 2} - limiter.ReplaceStoreLimitScene(sceneRegionRemove, storelimit.RegionRemove) + sceneRemovePeer := &storelimit.Scene{Idle: 5, Low: 4, Normal: 3, High: 2} + limiter.ReplaceStoreLimitScene(sceneRemovePeer, storelimit.RemovePeer) } diff --git a/server/config/config.go b/server/config/config.go index 433dcfa25d9..bad9457cfb2 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -23,12 +23,14 @@ import ( "os" "path/filepath" "strings" + "sync" "time" "github.com/BurntSushi/toml" "github.com/coreos/go-semver/semver" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" + "github.com/pingcap/pd/v4/server/schedule/storelimit" "github.com/pkg/errors" "go.etcd.io/etcd/embed" "go.etcd.io/etcd/pkg/transport" @@ -216,8 +218,45 @@ const ( var ( defaultRuntimeServices = []string{} defaultLocationLabels = []string{} + // DefaultStoreLimit is the default limit of add peer and remove peer. + DefaultStoreLimit StoreLimit = StoreLimit{AddPeer: 15, RemovePeer: 15} ) +// StoreLimit is the default limit of adding peer and removing peer when putting stores. +type StoreLimit struct { + mu sync.Mutex + // AddPeer is the default rate of adding peers for store limit (per minute). + AddPeer float64 + // RemovePeer is the default rate of removing peers for store limit (per minute). + RemovePeer float64 +} + +// SetDefaultStoreLimit sets the default store limit for a given type. +func (sl *StoreLimit) SetDefaultStoreLimit(typ storelimit.Type, ratePerMin float64) { + sl.mu.Lock() + defer sl.mu.Unlock() + switch typ { + case storelimit.AddPeer: + sl.AddPeer = ratePerMin + case storelimit.RemovePeer: + sl.RemovePeer = ratePerMin + } +} + +// GetDefaultStoreLimit gets the default store limit for a given type. +func (sl *StoreLimit) GetDefaultStoreLimit(typ storelimit.Type) float64 { + sl.mu.Lock() + defer sl.mu.Unlock() + switch typ { + case storelimit.AddPeer: + return sl.AddPeer + case storelimit.RemovePeer: + return sl.RemovePeer + default: + panic("invalid type") + } +} + func adjustString(v *string, defValue string) { if len(*v) == 0 { *v = defValue @@ -535,7 +574,10 @@ type ScheduleConfig struct { // threshold, it is considered a hot region. HotRegionCacheHitsThreshold uint64 `toml:"hot-region-cache-hits-threshold" json:"hot-region-cache-hits-threshold"` // StoreBalanceRate is the maximum of balance rate for each store. - StoreBalanceRate float64 `toml:"store-balance-rate" json:"store-balance-rate"` + // WARN: StoreBalanceRate is deprecated. + StoreBalanceRate float64 `toml:"store-balance-rate" json:"store-balance-rate,omitempty"` + // StoreLimit is the limit of scheduling for stores. + StoreLimit map[uint64]StoreLimitConfig `toml:"store-limit" json:"store-limit"` // TolerantSizeRatio is the ratio of buffer size for balance scheduler. TolerantSizeRatio float64 `toml:"tolerant-size-ratio" json:"tolerant-size-ratio"` // @@ -608,6 +650,10 @@ type ScheduleConfig struct { func (c *ScheduleConfig) Clone() *ScheduleConfig { schedulers := make(SchedulerConfigs, len(c.Schedulers)) copy(schedulers, c.Schedulers) + storeLimit := make(map[uint64]StoreLimitConfig, len(c.StoreLimit)) + for k, v := range c.StoreLimit { + storeLimit[k] = v + } return &ScheduleConfig{ MaxSnapshotCount: c.MaxSnapshotCount, MaxPendingPeerCount: c.MaxPendingPeerCount, @@ -625,7 +671,7 @@ func (c *ScheduleConfig) Clone() *ScheduleConfig { EnableCrossTableMerge: c.EnableCrossTableMerge, HotRegionScheduleLimit: c.HotRegionScheduleLimit, HotRegionCacheHitsThreshold: c.HotRegionCacheHitsThreshold, - StoreBalanceRate: c.StoreBalanceRate, + StoreLimit: storeLimit, TolerantSizeRatio: c.TolerantSizeRatio, LowSpaceRatio: c.LowSpaceRatio, HighSpaceRatio: c.HighSpaceRatio, @@ -661,7 +707,6 @@ const ( defaultReplicaScheduleLimit = 64 defaultMergeScheduleLimit = 8 defaultHotRegionScheduleLimit = 4 - defaultStoreBalanceRate = 15 defaultTolerantSizeRatio = 0 defaultLowSpaceRatio = 0.8 defaultHighSpaceRatio = 0.7 @@ -719,7 +764,6 @@ func (c *ScheduleConfig) adjust(meta *configMetaData) error { if !meta.IsDefined("store-limit-mode") { adjustString(&c.StoreLimitMode, defaultStoreLimitMode) } - adjustFloat64(&c.StoreBalanceRate, defaultStoreBalanceRate) adjustFloat64(&c.LowSpaceRatio, defaultLowSpaceRatio) adjustFloat64(&c.HighSpaceRatio, defaultHighSpaceRatio) adjustSchedulers(&c.Schedulers, defaultSchedulers) @@ -732,6 +776,11 @@ func (c *ScheduleConfig) adjust(meta *configMetaData) error { *b[0], *b[1] = false, v // reset old flag false to make it ignored when marshal to JSON } + if c.StoreBalanceRate != 0 { + DefaultStoreLimit = StoreLimit{AddPeer: c.StoreBalanceRate, RemovePeer: c.StoreBalanceRate} + c.StoreBalanceRate = 0 + } + return c.Validate() } @@ -767,6 +816,10 @@ func (c *ScheduleConfig) parseDeprecatedFlag(meta *configMetaData, name string, // MigrateDeprecatedFlags updates new flags according to deprecated flags. func (c *ScheduleConfig) MigrateDeprecatedFlags() { c.DisableLearner = false + if c.StoreBalanceRate != 0 { + DefaultStoreLimit = StoreLimit{AddPeer: c.StoreBalanceRate, RemovePeer: c.StoreBalanceRate} + c.StoreBalanceRate = 0 + } for _, b := range c.migrateConfigurationMap() { // If old=false (previously disabled), set both old and new to false. if *b[0] { @@ -817,25 +870,16 @@ func (c *ScheduleConfig) Deprecated() error { if c.DisableLocationReplacement { return errors.New("disable-location-replacement has already been deprecated") } + if c.StoreBalanceRate != 0 { + return errors.New("store-balance-rate has already been deprecated") + } return nil } -var deprecateConfigs = []string{ - "disable-remove-down-replica", - "disable-replace-offline-replica", - "disable-make-up-replica", - "disable-remove-extra-replica", - "disable-location-replacement", -} - -// IsDeprecated returns if a config is deprecated. -func IsDeprecated(config string) bool { - for _, t := range deprecateConfigs { - if t == config { - return true - } - } - return false +// StoreLimitConfig is a config about scheduling rate limit of different types for a store. +type StoreLimitConfig struct { + AddPeer float64 `toml:"add-peer" json:"add-peer"` + RemovePeer float64 `toml:"remove-peer" json:"remove-peer"` } // SchedulerConfigs is a slice of customized scheduler configuration. diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 64d0d154977..19d83294e73 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/pd/v4/server/core" "github.com/pingcap/pd/v4/server/kv" "github.com/pingcap/pd/v4/server/schedule" + "github.com/pingcap/pd/v4/server/schedule/storelimit" ) // PersistOptions wraps all configurations that need to persist to storage and @@ -175,6 +176,50 @@ func (o *PersistOptions) SetSplitMergeInterval(splitMergeInterval time.Duration) o.SetScheduleConfig(v) } +// SetStoreLimit sets a store limit for a given type and rate. +func (o *PersistOptions) SetStoreLimit(storeID uint64, typ storelimit.Type, ratePerMin float64) { + v := o.GetScheduleConfig().Clone() + var sc StoreLimitConfig + var rate float64 + switch typ { + case storelimit.AddPeer: + if _, ok := v.StoreLimit[storeID]; !ok { + rate = DefaultStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer) + } else { + rate = v.StoreLimit[storeID].RemovePeer + } + sc = StoreLimitConfig{AddPeer: ratePerMin, RemovePeer: rate} + case storelimit.RemovePeer: + if _, ok := v.StoreLimit[storeID]; !ok { + rate = DefaultStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer) + } else { + rate = v.StoreLimit[storeID].AddPeer + } + sc = StoreLimitConfig{AddPeer: rate, RemovePeer: ratePerMin} + } + v.StoreLimit[storeID] = sc + o.SetScheduleConfig(v) +} + +// SetAllStoresLimit sets all store limit for a given type and rate. +func (o *PersistOptions) SetAllStoresLimit(typ storelimit.Type, ratePerMin float64) { + v := o.GetScheduleConfig().Clone() + switch typ { + case storelimit.AddPeer: + for storeID := range v.StoreLimit { + sc := StoreLimitConfig{AddPeer: ratePerMin, RemovePeer: v.StoreLimit[storeID].RemovePeer} + v.StoreLimit[storeID] = sc + } + case storelimit.RemovePeer: + for storeID := range v.StoreLimit { + sc := StoreLimitConfig{AddPeer: v.StoreLimit[storeID].AddPeer, RemovePeer: ratePerMin} + v.StoreLimit[storeID] = sc + } + } + + o.SetScheduleConfig(v) +} + // IsOneWayMergeEnabled returns if a region can only be merged into the next region of it. func (o *PersistOptions) IsOneWayMergeEnabled() bool { return o.GetScheduleConfig().EnableOneWayMerge @@ -220,9 +265,30 @@ func (o *PersistOptions) GetHotRegionScheduleLimit() uint64 { return o.GetScheduleConfig().HotRegionScheduleLimit } -// GetStoreBalanceRate returns the balance rate of a store. -func (o *PersistOptions) GetStoreBalanceRate() float64 { - return o.GetScheduleConfig().StoreBalanceRate +// GetStoreLimit returns the limit of a store. +func (o *PersistOptions) GetStoreLimit(storeID uint64) StoreLimitConfig { + if limit, ok := o.GetScheduleConfig().StoreLimit[storeID]; ok { + return limit + } + return StoreLimitConfig{0, 0} +} + +// GetStoreLimitByType returns the limit of a store with a given type. +func (o *PersistOptions) GetStoreLimitByType(storeID uint64, typ storelimit.Type) float64 { + limit := o.GetStoreLimit(storeID) + switch typ { + case storelimit.AddPeer: + return limit.AddPeer + case storelimit.RemovePeer: + return limit.RemovePeer + default: + panic("no such limit type") + } +} + +// GetAllStoresLimit returns the limit of all stores. +func (o *PersistOptions) GetAllStoresLimit() map[uint64]StoreLimitConfig { + return o.GetScheduleConfig().StoreLimit } // GetStoreLimitMode returns the limit mode of store. diff --git a/server/grpc_service.go b/server/grpc_service.go index 0a12b30d131..61c3dd979e0 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -235,6 +235,7 @@ func (s *Server) PutStore(ctx context.Context, request *pdpb.PutStoreRequest) (* log.Info("put store ok", zap.Stringer("store", store)) rc.OnStoreVersionChange() CheckPDVersion(s.persistOptions) + rc.AddStoreLimit(store.GetId()) return &pdpb.PutStoreResponse{ Header: s.header(), diff --git a/server/handler.go b/server/handler.go index 4091807ef0f..9e3a642d1e3 100644 --- a/server/handler.go +++ b/server/handler.go @@ -418,32 +418,58 @@ func (h *Handler) GetHistory(start time.Time) ([]operator.OpHistory, error) { } // SetAllStoresLimit is used to set limit of all stores. -func (h *Handler) SetAllStoresLimit(rate float64, limitType storelimit.Type) error { - c, err := h.GetOperatorController() - if err != nil { - return err +func (h *Handler) SetAllStoresLimit(ratePerMin float64, limitType storelimit.Type) error { + cfg := h.GetScheduleConfig().Clone() + switch limitType { + case storelimit.AddPeer: + config.DefaultStoreLimit.SetDefaultStoreLimit(storelimit.AddPeer, ratePerMin) + for storeID := range cfg.StoreLimit { + sc := config.StoreLimitConfig{ + AddPeer: ratePerMin, + RemovePeer: cfg.StoreLimit[storeID].RemovePeer, + } + cfg.StoreLimit[storeID] = sc + } + case storelimit.RemovePeer: + config.DefaultStoreLimit.SetDefaultStoreLimit(storelimit.RemovePeer, ratePerMin) + for storeID := range cfg.StoreLimit { + sc := config.StoreLimitConfig{ + AddPeer: cfg.StoreLimit[storeID].AddPeer, + RemovePeer: ratePerMin, + } + cfg.StoreLimit[storeID] = sc + } } - c.SetAllStoresLimit(rate, storelimit.Manual, limitType) - return nil + return h.s.SetScheduleConfig(*cfg) } // GetAllStoresLimit is used to get limit of all stores. -func (h *Handler) GetAllStoresLimit(limitType storelimit.Type) (map[uint64]*storelimit.StoreLimit, error) { - c, err := h.GetOperatorController() +func (h *Handler) GetAllStoresLimit(limitType storelimit.Type) (map[uint64]config.StoreLimitConfig, error) { + c, err := h.GetRaftCluster() if err != nil { return nil, err } - return c.GetAllStoresLimit(limitType), nil + return c.GetAllStoresLimit(), nil } // SetStoreLimit is used to set the limit of a store. func (h *Handler) SetStoreLimit(storeID uint64, rate float64, limitType storelimit.Type) error { - c, err := h.GetOperatorController() - if err != nil { - return err + cfg := h.GetScheduleConfig() + switch limitType { + case storelimit.AddPeer: + sc := config.StoreLimitConfig{ + AddPeer: rate, + RemovePeer: cfg.StoreLimit[storeID].RemovePeer, + } + cfg.StoreLimit[storeID] = sc + case storelimit.RemovePeer: + sc := config.StoreLimitConfig{ + AddPeer: cfg.StoreLimit[storeID].AddPeer, + RemovePeer: rate, + } + cfg.StoreLimit[storeID] = sc } - c.SetStoreLimit(storeID, rate, storelimit.Manual, limitType) - return nil + return h.s.SetScheduleConfig(*cfg) } // AddTransferLeaderOperator adds an operator to transfer leader to the store. diff --git a/server/schedule/filter/filters.go b/server/schedule/filter/filters.go index 631b1338ef9..9e97bcf8c1a 100644 --- a/server/schedule/filter/filters.go +++ b/server/schedule/filter/filters.go @@ -134,11 +134,11 @@ func (f *storeLimitFilter) Type() string { } func (f *storeLimitFilter) Source(opt opt.Options, store *core.StoreInfo) bool { - return store.IsAvailable(storelimit.RegionRemove) + return store.IsAvailable(storelimit.RemovePeer) } func (f *storeLimitFilter) Target(opt opt.Options, store *core.StoreInfo) bool { - return store.IsAvailable(storelimit.RegionAdd) + return store.IsAvailable(storelimit.AddPeer) } type stateFilter struct{ scope string } @@ -393,7 +393,7 @@ func (f StoreStateFilter) filterMoveRegion(opt opt.Options, isSource bool, store return false } - if (isSource && !store.IsAvailable(storelimit.RegionRemove)) || (!isSource && !store.IsAvailable(storelimit.RegionAdd)) { + if (isSource && !store.IsAvailable(storelimit.RemovePeer)) || (!isSource && !store.IsAvailable(storelimit.AddPeer)) { return false } diff --git a/server/schedule/metrics.go b/server/schedule/metrics.go index ed40342653b..0821c95a8f5 100644 --- a/server/schedule/metrics.go +++ b/server/schedule/metrics.go @@ -50,19 +50,37 @@ var ( Buckets: prometheus.ExponentialBuckets(0.01, 2, 16), }, []string{"type"}) - storeLimitGauge = prometheus.NewGaugeVec( + storeLimitAvailableGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "pd", Subsystem: "schedule", - Name: "store_limit", - Help: "Limit of store.", - }, []string{"store", "type", "limit_type"}) + Name: "store_limit_available", + Help: "available limit rate of store.", + }, []string{"store", "limit_type"}) + + storeLimitRateGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "schedule", + Name: "store_limit_rate", + Help: "the limit rate of store.", + }, []string{"store", "limit_type"}) + + storeLimitCostCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "schedule", + Name: "store_limit_cost", + Help: "limit rate cost of store.", + }, []string{"store", "limit_type"}) ) func init() { prometheus.MustRegister(operatorCounter) prometheus.MustRegister(operatorDuration) prometheus.MustRegister(operatorWaitDuration) - prometheus.MustRegister(storeLimitGauge) + prometheus.MustRegister(storeLimitAvailableGauge) + prometheus.MustRegister(storeLimitRateGauge) + prometheus.MustRegister(storeLimitCostCounter) prometheus.MustRegister(operatorWaitCounter) } diff --git a/server/schedule/operator/operator_test.go b/server/schedule/operator/operator_test.go index 6b1a7b14ffe..1f7f37d8f1b 100644 --- a/server/schedule/operator/operator_test.go +++ b/server/schedule/operator/operator_test.go @@ -156,7 +156,7 @@ func (s *testOperatorSuite) TestInfluence(c *C) { LeaderCount: 0, RegionSize: 50, RegionCount: 1, - StepCost: map[storelimit.Type]int64{storelimit.RegionAdd: 1000}, + StepCost: map[storelimit.Type]int64{storelimit.AddPeer: 1000}, }) TransferLeader{FromStore: 1, ToStore: 2}.Influence(opInfluence, region) @@ -172,7 +172,7 @@ func (s *testOperatorSuite) TestInfluence(c *C) { LeaderCount: 1, RegionSize: 50, RegionCount: 1, - StepCost: map[storelimit.Type]int64{storelimit.RegionAdd: 1000}, + StepCost: map[storelimit.Type]int64{storelimit.AddPeer: 1000}, }) RemovePeer{FromStore: 1}.Influence(opInfluence, region) @@ -181,14 +181,14 @@ func (s *testOperatorSuite) TestInfluence(c *C) { LeaderCount: -1, RegionSize: -50, RegionCount: -1, - StepCost: map[storelimit.Type]int64{storelimit.RegionRemove: 1000}, + StepCost: map[storelimit.Type]int64{storelimit.RemovePeer: 1000}, }) c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{ LeaderSize: 50, LeaderCount: 1, RegionSize: 50, RegionCount: 1, - StepCost: map[storelimit.Type]int64{storelimit.RegionAdd: 1000}, + StepCost: map[storelimit.Type]int64{storelimit.AddPeer: 1000}, }) MergeRegion{IsPassive: false}.Influence(opInfluence, region) @@ -197,14 +197,14 @@ func (s *testOperatorSuite) TestInfluence(c *C) { LeaderCount: -1, RegionSize: -50, RegionCount: -1, - StepCost: map[storelimit.Type]int64{storelimit.RegionRemove: 1000}, + StepCost: map[storelimit.Type]int64{storelimit.RemovePeer: 1000}, }) c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{ LeaderSize: 50, LeaderCount: 1, RegionSize: 50, RegionCount: 1, - StepCost: map[storelimit.Type]int64{storelimit.RegionAdd: 1000}, + StepCost: map[storelimit.Type]int64{storelimit.AddPeer: 1000}, }) MergeRegion{IsPassive: true}.Influence(opInfluence, region) @@ -213,14 +213,14 @@ func (s *testOperatorSuite) TestInfluence(c *C) { LeaderCount: -2, RegionSize: -50, RegionCount: -2, - StepCost: map[storelimit.Type]int64{storelimit.RegionRemove: 1000}, + StepCost: map[storelimit.Type]int64{storelimit.RemovePeer: 1000}, }) c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{ LeaderSize: 50, LeaderCount: 1, RegionSize: 50, RegionCount: 0, - StepCost: map[storelimit.Type]int64{storelimit.RegionAdd: 1000}, + StepCost: map[storelimit.Type]int64{storelimit.AddPeer: 1000}, }) } diff --git a/server/schedule/operator/step.go b/server/schedule/operator/step.go index 7f2ff956bea..980ae1f00f8 100644 --- a/server/schedule/operator/step.go +++ b/server/schedule/operator/step.go @@ -112,7 +112,7 @@ func (ap AddPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) { regionSize := region.GetApproximateSize() to.RegionSize += regionSize to.RegionCount++ - to.AdjustStepCost(storelimit.RegionAdd, regionSize) + to.AdjustStepCost(storelimit.AddPeer, regionSize) } // CheckSafety checks if the step meets the safety properties. @@ -175,7 +175,7 @@ func (al AddLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) regionSize := region.GetApproximateSize() to.RegionSize += regionSize to.RegionCount++ - to.AdjustStepCost(storelimit.RegionAdd, regionSize) + to.AdjustStepCost(storelimit.AddPeer, regionSize) } // PromoteLearner is an OpStep that promotes a region learner peer to normal voter. @@ -252,7 +252,7 @@ func (rp RemovePeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) regionSize := region.GetApproximateSize() from.RegionSize -= regionSize from.RegionCount-- - from.AdjustStepCost(storelimit.RegionRemove, regionSize) + from.AdjustStepCost(storelimit.RemovePeer, regionSize) } // MergeRegion is an OpStep that merge two regions. diff --git a/server/schedule/operator_controller.go b/server/schedule/operator_controller.go index 1932edec847..b377fb40ac5 100644 --- a/server/schedule/operator_controller.go +++ b/server/schedule/operator_controller.go @@ -452,15 +452,16 @@ func (oc *OperatorController) addOperatorLocked(op *operator.Operator) bool { continue } for n, v := range storelimit.TypeNameValue { - if oc.storesLimit[storeID][v] == nil { + storeLimit := oc.storesLimit[storeID][v] + if storeLimit == nil { continue } stepCost := opInfluence.GetStoreInfluence(storeID).GetStepCost(v) if stepCost == 0 { continue } - storeLimitGauge.WithLabelValues(strconv.FormatUint(storeID, 10), "take", n).Set(float64(stepCost) / float64(storelimit.RegionInfluence[v])) - oc.storesLimit[storeID][v].Take(stepCost) + storeLimit.Take(stepCost) + storeLimitCostCounter.WithLabelValues(strconv.FormatUint(storeID, 10), n).Add(float64(stepCost) / float64(storelimit.RegionInfluence[v])) } } oc.updateCounts(oc.operators) @@ -824,6 +825,7 @@ func (oc *OperatorController) SetOperator(op *operator.Operator) { oc.Lock() defer oc.Unlock() oc.operators[op.RegionID()] = op + oc.updateCounts(oc.operators) } // OperatorWithStatus records the operator and its status. @@ -879,14 +881,12 @@ func (o *OperatorRecords) Put(op *operator.Operator) { func (oc *OperatorController) exceedStoreLimit(ops ...*operator.Operator) bool { opInfluence := NewTotalOpInfluence(ops, oc.cluster) for storeID := range opInfluence.StoresInfluence { - for n, v := range storelimit.TypeNameValue { + for _, v := range storelimit.TypeNameValue { stepCost := opInfluence.GetStoreInfluence(storeID).GetStepCost(v) if stepCost == 0 { continue } - available := oc.getOrCreateStoreLimit(storeID, v).Available() - storeLimitGauge.WithLabelValues(strconv.FormatUint(storeID, 10), "available", n).Set(float64(available) / float64(storelimit.RegionInfluence[v])) - if available < stepCost { + if oc.getOrCreateStoreLimit(storeID, v).Available() < stepCost { return true } } @@ -894,56 +894,20 @@ func (oc *OperatorController) exceedStoreLimit(ops ...*operator.Operator) bool { return false } -// SetAllStoresLimit is used to set limit of all stores. -func (oc *OperatorController) SetAllStoresLimit(rate float64, mode storelimit.Mode, limitType storelimit.Type) { - oc.Lock() - defer oc.Unlock() - stores := oc.cluster.GetStores() - for _, s := range stores { - oc.newStoreLimit(s.GetID(), rate, mode, limitType) - } -} - -// SetAllStoresLimitAuto updates the store limit in Auto mode -func (oc *OperatorController) SetAllStoresLimitAuto(rate float64, limitType storelimit.Type) { - oc.Lock() - defer oc.Unlock() - stores := oc.cluster.GetStores() - for _, s := range stores { - sid := s.GetID() - if old, ok := oc.storesLimit[sid]; ok { - limit, ok1 := old[limitType] - if ok1 && limit.Mode() == storelimit.Manual { - continue - } - } - if oc.storesLimit[sid] == nil { - oc.storesLimit[sid] = make(map[storelimit.Type]*storelimit.StoreLimit) - } - oc.storesLimit[sid][limitType] = storelimit.NewStoreLimit(rate, storelimit.Auto, storelimit.RegionInfluence[limitType]) - } -} - -// SetStoreLimit is used to set the limit of a store. -func (oc *OperatorController) SetStoreLimit(storeID uint64, rate float64, mode storelimit.Mode, limitType storelimit.Type) { - oc.Lock() - defer oc.Unlock() - oc.newStoreLimit(storeID, rate, mode, limitType) -} - // newStoreLimit is used to create the limit of a store. -func (oc *OperatorController) newStoreLimit(storeID uint64, rate float64, mode storelimit.Mode, limitType storelimit.Type) { +func (oc *OperatorController) newStoreLimit(storeID uint64, ratePerSec float64, limitType storelimit.Type) { + log.Info("create or update a store limit", zap.Uint64("store-id", storeID), zap.String("type", limitType.String()), zap.Float64("rate", ratePerSec)) if oc.storesLimit[storeID] == nil { oc.storesLimit[storeID] = make(map[storelimit.Type]*storelimit.StoreLimit) } - oc.storesLimit[storeID][limitType] = storelimit.NewStoreLimit(rate, mode, storelimit.RegionInfluence[limitType]) + oc.storesLimit[storeID][limitType] = storelimit.NewStoreLimit(ratePerSec, storelimit.RegionInfluence[limitType]) } // getOrCreateStoreLimit is used to get or create the limit of a store. func (oc *OperatorController) getOrCreateStoreLimit(storeID uint64, limitType storelimit.Type) *storelimit.StoreLimit { if oc.storesLimit[storeID][limitType] == nil { - rate := oc.cluster.GetStoreBalanceRate() / StoreBalanceBaseTime - oc.newStoreLimit(storeID, rate, storelimit.Auto, limitType) + ratePerSec := oc.cluster.GetStoreLimitByType(storeID, limitType) / StoreBalanceBaseTime + oc.newStoreLimit(storeID, ratePerSec, limitType) oc.cluster.AttachAvailableFunc(storeID, limitType, func() bool { oc.RLock() defer oc.RUnlock() @@ -953,23 +917,11 @@ func (oc *OperatorController) getOrCreateStoreLimit(storeID uint64, limitType st return oc.storesLimit[storeID][limitType].Available() >= storelimit.RegionInfluence[limitType] }) } - return oc.storesLimit[storeID][limitType] -} - -// GetAllStoresLimit is used to get limit of all stores. -func (oc *OperatorController) GetAllStoresLimit(limitType storelimit.Type) map[uint64]*storelimit.StoreLimit { - oc.RLock() - defer oc.RUnlock() - limits := make(map[uint64]*storelimit.StoreLimit) - for storeID, limit := range oc.storesLimit { - store := oc.cluster.GetStore(storeID) - if !store.IsTombstone() { - if limit[limitType] != nil { - limits[storeID] = limit[limitType] - } - } + ratePerSec := oc.cluster.GetStoreLimitByType(storeID, limitType) / StoreBalanceBaseTime + if ratePerSec != oc.storesLimit[storeID][limitType].Rate() { + oc.newStoreLimit(storeID, ratePerSec, limitType) } - return limits + return oc.storesLimit[storeID][limitType] } // GetLeaderSchedulePolicy is to get leader schedule policy. @@ -980,12 +932,29 @@ func (oc *OperatorController) GetLeaderSchedulePolicy() core.SchedulePolicy { return oc.cluster.GetLeaderSchedulePolicy() } -// RemoveStoreLimit removes the store limit for a given store ID. -func (oc *OperatorController) RemoveStoreLimit(storeID uint64) { - oc.Lock() - defer oc.Unlock() - for _, limitType := range storelimit.TypeNameValue { - oc.cluster.AttachAvailableFunc(storeID, limitType, nil) +// CollectStoreLimitMetrics collects the metrics about store limit +func (oc *OperatorController) CollectStoreLimitMetrics() { + oc.RLock() + defer oc.RUnlock() + if oc.storesLimit == nil { + return + } + stores := oc.cluster.GetStores() + for _, store := range stores { + if store != nil { + storeID := store.GetID() + storeIDStr := strconv.FormatUint(storeID, 10) + for n, v := range storelimit.TypeNameValue { + var storeLimit *storelimit.StoreLimit + if oc.storesLimit[storeID] == nil || oc.storesLimit[storeID][v] == nil { + // Set to 0 to represent the store limit of the specific type is not initialized. + storeLimitRateGauge.WithLabelValues(storeIDStr, n).Set(0) + continue + } + storeLimit = oc.storesLimit[storeID][v] + storeLimitAvailableGauge.WithLabelValues(storeIDStr, n).Set(float64(storeLimit.Available()) / float64(storelimit.RegionInfluence[v])) + storeLimitRateGauge.WithLabelValues(storeIDStr, n).Set(storeLimit.Rate() * StoreBalanceBaseTime) + } + } } - delete(oc.storesLimit, storeID) } diff --git a/server/schedule/operator_controller_test.go b/server/schedule/operator_controller_test.go index c8e7649765a..cfbd1a71c89 100644 --- a/server/schedule/operator_controller_test.go +++ b/server/schedule/operator_controller_test.go @@ -347,7 +347,8 @@ func (t *testOperatorControllerSuite) TestStoreLimit(c *C) { for i := uint64(1); i <= 1000; i++ { tc.AddLeaderRegion(i, i) } - oc.SetStoreLimit(2, 1, storelimit.Manual, storelimit.RegionAdd) + + tc.SetStoreLimit(2, storelimit.AddPeer, 60) for i := uint64(1); i <= 5; i++ { op := operator.NewOperator("test", "test", 1, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: i}) c.Assert(oc.AddOperator(op), IsTrue) @@ -357,13 +358,13 @@ func (t *testOperatorControllerSuite) TestStoreLimit(c *C) { c.Assert(oc.AddOperator(op), IsFalse) c.Assert(oc.RemoveOperator(op), IsFalse) - oc.SetStoreLimit(2, 2, storelimit.Manual, storelimit.RegionAdd) + tc.SetStoreLimit(2, storelimit.AddPeer, 120) for i := uint64(1); i <= 10; i++ { op = operator.NewOperator("test", "test", i, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: i}) c.Assert(oc.AddOperator(op), IsTrue) checkRemoveOperatorSuccess(c, oc, op) } - oc.SetAllStoresLimit(1, storelimit.Manual, storelimit.RegionAdd) + tc.SetAllStoresLimit(storelimit.AddPeer, 60) for i := uint64(1); i <= 5; i++ { op = operator.NewOperator("test", "test", i, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: i}) c.Assert(oc.AddOperator(op), IsTrue) @@ -373,7 +374,7 @@ func (t *testOperatorControllerSuite) TestStoreLimit(c *C) { c.Assert(oc.AddOperator(op), IsFalse) c.Assert(oc.RemoveOperator(op), IsFalse) - oc.SetStoreLimit(2, 1, storelimit.Manual, storelimit.RegionRemove) + tc.SetStoreLimit(2, storelimit.RemovePeer, 60) for i := uint64(1); i <= 5; i++ { op := operator.NewOperator("test", "test", 1, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2}) c.Assert(oc.AddOperator(op), IsTrue) @@ -383,13 +384,13 @@ func (t *testOperatorControllerSuite) TestStoreLimit(c *C) { c.Assert(oc.AddOperator(op), IsFalse) c.Assert(oc.RemoveOperator(op), IsFalse) - oc.SetStoreLimit(2, 2, storelimit.Manual, storelimit.RegionRemove) + tc.SetStoreLimit(2, storelimit.RemovePeer, 120) for i := uint64(1); i <= 10; i++ { op = operator.NewOperator("test", "test", i, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2}) c.Assert(oc.AddOperator(op), IsTrue) checkRemoveOperatorSuccess(c, oc, op) } - oc.SetAllStoresLimit(1, storelimit.Manual, storelimit.RegionRemove) + tc.SetAllStoresLimit(storelimit.RemovePeer, 60) for i := uint64(1); i <= 5; i++ { op = operator.NewOperator("test", "test", i, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2}) c.Assert(oc.AddOperator(op), IsTrue) @@ -574,9 +575,10 @@ func (t *testOperatorControllerSuite) TestStoreLimitWithMerge(c *C) { newRegionInfo(4, "x", "", 10, 10, []uint64{109, 4}, []uint64{109, 4}), } - tc.AddLeaderStore(1, 10) - tc.AddLeaderStore(4, 10) - tc.AddLeaderStore(5, 10) + for i := uint64(1); i <= 6; i++ { + tc.AddLeaderStore(i, 10) + } + for _, region := range regions { tc.PutRegion(region) } @@ -584,8 +586,6 @@ func (t *testOperatorControllerSuite) TestStoreLimitWithMerge(c *C) { mc := checker.NewMergeChecker(t.ctx, tc) oc := NewOperatorController(t.ctx, tc, mockhbstream.NewHeartbeatStream()) - cfg.StoreBalanceRate = 60 - regions[2] = regions[2].Clone( core.SetPeers([]*metapb.Peer{ {Id: 109, StoreId: 2}, @@ -625,54 +625,6 @@ func (t *testOperatorControllerSuite) TestStoreLimitWithMerge(c *C) { } } -func (t *testOperatorControllerSuite) TestRemoveTombstone(c *C) { - var mu sync.Mutex - cfg := mockoption.NewScheduleOptions() - cfg.StoreBalanceRate = 1000 - cfg.LocationLabels = []string{"zone", "rack"} - tc := mockcluster.NewCluster(cfg) - rc := checker.NewReplicaChecker(tc) - oc := NewOperatorController(t.ctx, tc, mockhbstream.NewHeartbeatStream()) - - tc.AddLabelsStore(1, 100, map[string]string{"zone": "zone1", "rack": "rack1"}) - tc.AddLabelsStore(2, 100, map[string]string{"zone": "zone1", "rack": "rack1"}) - tc.AddLabelsStore(3, 100, map[string]string{"zone": "zone2", "rack": "rack1"}) - tc.AddLabelsStore(4, 10, map[string]string{"zone": "zone3", "rack": "rack1"}) - peers := []*metapb.Peer{ - {Id: 4, StoreId: 1}, - {Id: 5, StoreId: 2}, - {Id: 6, StoreId: 3}, - } - regions := make([]*core.RegionInfo, 100) - for i := 2; i < 20; i++ { - r := core.NewRegionInfo(&metapb.Region{ - Id: uint64(i), - StartKey: []byte(fmt.Sprintf("%20d", i)), - EndKey: []byte(fmt.Sprintf("%20d", i+1)), - Peers: peers}, peers[0], core.SetApproximateSize(50*(1<<20))) - regions[i] = r - tc.PutRegion(r) - } - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - time.Sleep(100 * time.Millisecond) - mu.Lock() - defer mu.Unlock() - oc.RemoveStoreLimit(4) - }() - for i := 2; i < 20; i++ { - time.Sleep(10 * time.Millisecond) - mu.Lock() - op := rc.Check(regions[i]) - mu.Unlock() - oc.AddOperator(op) - oc.RemoveOperator(op) - } - wg.Wait() -} - func newRegionInfo(id uint64, startKey, endKey string, size, keys int64, leader []uint64, peers ...[]uint64) *core.RegionInfo { prs := make([]*metapb.Peer, 0, len(peers)) for _, peer := range peers { @@ -736,15 +688,3 @@ func (t *testOperatorControllerSuite) TestAddWaitingOperator(c *C) { // no space left, new operator can not be added. c.Assert(controller.AddWaitingOperator(addPeerOp(0)), Equals, 0) } - -func (t *testOperatorControllerSuite) TestAutoStoreLimitMode(c *C) { - opt := mockoption.NewScheduleOptions() - opt.StoreLimitMode = "auto" - tc := mockcluster.NewCluster(opt) - stream := mockhbstream.NewHeartbeatStreams(tc.ID, true /* no need to run */) - oc := NewOperatorController(t.ctx, tc, stream) - - tc.AddLeaderStore(1, 10) - oc.SetStoreLimit(1, 10, storelimit.Auto, storelimit.RegionAdd) - oc.SetAllStoresLimitAuto(10, storelimit.RegionRemove) -} diff --git a/server/schedule/opt/opts.go b/server/schedule/opt/opts.go index 5215f3feb0d..575b2bea5ad 100644 --- a/server/schedule/opt/opts.go +++ b/server/schedule/opt/opts.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/v4/server/core" "github.com/pingcap/pd/v4/server/schedule/placement" + "github.com/pingcap/pd/v4/server/schedule/storelimit" "github.com/pingcap/pd/v4/server/statistics" ) @@ -32,7 +33,8 @@ type Options interface { GetHotRegionScheduleLimit() uint64 // store limit - GetStoreBalanceRate() float64 + GetStoreLimitByType(storeID uint64, typ storelimit.Type) float64 + SetAllStoresLimit(typ storelimit.Type, ratePerMin float64) GetMaxSnapshotCount() uint64 GetMaxPendingPeerCount() uint64 @@ -63,8 +65,6 @@ type Options interface { GetLeaderSchedulePolicy() core.SchedulePolicy GetKeyType() core.KeyType - RemoveScheduler(name string) error - CheckLabelProperty(typ string, labels []*metapb.StoreLabel) bool } @@ -87,6 +87,7 @@ type Cluster interface { AllocID() (uint64, error) FitRegion(*core.RegionInfo) *placement.RegionFit + RemoveScheduler(name string) error } // HeartbeatStream is an interface. diff --git a/server/schedule/storelimit/store_limit.go b/server/schedule/storelimit/store_limit.go index 1e0cf789b0e..fcc3b9bb723 100644 --- a/server/schedule/storelimit/store_limit.go +++ b/server/schedule/storelimit/store_limit.go @@ -28,54 +28,31 @@ const ( // RegionInfluence represents the influence of a operator step, which is used by store limit. var RegionInfluence = map[Type]int64{ - RegionAdd: 1000, - RegionRemove: 1000, + AddPeer: 1000, + RemovePeer: 1000, } // SmallRegionInfluence represents the influence of a operator step // when the region size is smaller than smallRegionThreshold, which is used by store limit. var SmallRegionInfluence = map[Type]int64{ - RegionAdd: 200, - RegionRemove: 200, + AddPeer: 200, + RemovePeer: 200, } -// Mode indicates the strategy to set store limit -type Mode int - -// There are two modes supported now, "auto" indicates the value -// is set by PD itself. "manual" means it is set by the user. -// An auto set value can be overwrite by a manual set value. -const ( - Auto Mode = iota - Manual -) - // Type indicates the type of store limit type Type int const ( - // RegionAdd indicates the type of store limit that limits the adding region rate - RegionAdd Type = iota - // RegionRemove indicates the type of store limit that limits the removing region rate - RegionRemove + // AddPeer indicates the type of store limit that limits the adding peer rate + AddPeer Type = iota + // RemovePeer indicates the type of store limit that limits the removing peer rate + RemovePeer ) // TypeNameValue indicates the name of store limit type and the enum value var TypeNameValue = map[string]Type{ - "region-add": RegionAdd, - "region-remove": RegionRemove, -} - -// String returns the representation of the store limit mode -func (m Mode) String() string { - switch m { - case Auto: - return "auto" - case Manual: - return "manual" - } - // default to be auto - return "auto" + "add-peer": AddPeer, + "remove-peer": RemovePeer, } // String returns the representation of the Type @@ -91,26 +68,27 @@ func (t Type) String() string { // StoreLimit limits the operators of a store type StoreLimit struct { bucket *ratelimit.Bucket - mode Mode regionInfluence int64 + ratePerSec float64 } // NewStoreLimit returns a StoreLimit object -func NewStoreLimit(rate float64, mode Mode, regionInfluence int64) *StoreLimit { +func NewStoreLimit(ratePerSec float64, regionInfluence int64) *StoreLimit { capacity := regionInfluence + rate := ratePerSec // unlimited if rate >= Unlimited { capacity = int64(Unlimited) - } else if rate > 1 { - capacity = int64(rate * float64(regionInfluence)) - rate *= float64(regionInfluence) + } else if ratePerSec > 1 { + capacity = int64(ratePerSec * float64(regionInfluence)) + ratePerSec *= float64(regionInfluence) } else { - rate *= float64(regionInfluence) + ratePerSec *= float64(regionInfluence) } return &StoreLimit{ - bucket: ratelimit.NewBucketWithRate(rate, capacity), - mode: mode, + bucket: ratelimit.NewBucketWithRate(ratePerSec, capacity), regionInfluence: regionInfluence, + ratePerSec: rate, } } @@ -121,15 +99,10 @@ func (l *StoreLimit) Available() int64 { // Rate returns the fill rate of the bucket, in tokens per second. func (l *StoreLimit) Rate() float64 { - return l.bucket.Rate() / float64(l.regionInfluence) + return l.ratePerSec } // Take takes count tokens from the bucket without blocking. func (l *StoreLimit) Take(count int64) time.Duration { return l.bucket.Take(count) } - -// Mode returns the store limit mode -func (l *StoreLimit) Mode() Mode { - return l.mode -} diff --git a/server/schedule/storelimit/store_limit_scenes.go b/server/schedule/storelimit/store_limit_scenes.go index b0581f54ac5..c855353cd8b 100644 --- a/server/schedule/storelimit/store_limit_scenes.go +++ b/server/schedule/storelimit/store_limit_scenes.go @@ -39,9 +39,9 @@ func DefaultScene(limitType Type) *Scene { // change this if different type rate limit has different default scene switch limitType { - case RegionAdd: + case AddPeer: return defaultScene - case RegionRemove: + case RemovePeer: return defaultScene default: return nil diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index 6a8f38b8a7d..e7b8165a814 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -343,10 +343,9 @@ func (s *testBalanceLeaderSchedulerSuite) TestLeaderWeight(c *C) { // Weight: 0.5 0.9 1 2 // Region1: L F F F - s.tc.AddLeaderStore(1, 10) - s.tc.AddLeaderStore(2, 10) - s.tc.AddLeaderStore(3, 10) - s.tc.AddLeaderStore(4, 10) + for i := uint64(1); i <= 4; i++ { + s.tc.AddLeaderStore(i, 10) + } s.tc.UpdateStoreLeaderWeight(1, 0.5) s.tc.UpdateStoreLeaderWeight(2, 0.9) s.tc.UpdateStoreLeaderWeight(3, 1) @@ -395,10 +394,9 @@ func (s *testBalanceLeaderSchedulerSuite) TestBalanceSelector(c *C) { // Leaders: 9 10 10 11 // Region1: - F F L // Region2: L F F - - s.tc.AddLeaderStore(1, 10) - s.tc.AddLeaderStore(2, 10) - s.tc.AddLeaderStore(3, 10) - s.tc.AddLeaderStore(4, 10) + for i := uint64(1); i <= 4; i++ { + s.tc.AddLeaderStore(i, 10) + } s.tc.AddLeaderRegion(1, 4, 2, 3) s.tc.AddLeaderRegion(2, 1, 2, 3) // The cluster is balanced. @@ -443,10 +441,9 @@ func (s *testBalanceLeaderRangeSchedulerSuite) TestSingleRangeBalance(c *C) { // Weight: 0.5 0.9 1 2 // Region1: L F F F - s.tc.AddLeaderStore(1, 10) - s.tc.AddLeaderStore(2, 10) - s.tc.AddLeaderStore(3, 10) - s.tc.AddLeaderStore(4, 10) + for i := uint64(1); i <= 4; i++ { + s.tc.AddLeaderStore(i, 10) + } s.tc.UpdateStoreLeaderWeight(1, 0.5) s.tc.UpdateStoreLeaderWeight(2, 0.9) s.tc.UpdateStoreLeaderWeight(3, 1) @@ -484,10 +481,9 @@ func (s *testBalanceLeaderRangeSchedulerSuite) TestMultiRangeBalance(c *C) { // Weight: 0.5 0.9 1 2 // Region1: L F F F - s.tc.AddLeaderStore(1, 10) - s.tc.AddLeaderStore(2, 10) - s.tc.AddLeaderStore(3, 10) - s.tc.AddLeaderStore(4, 10) + for i := uint64(1); i <= 4; i++ { + s.tc.AddLeaderStore(i, 10) + } s.tc.UpdateStoreLeaderWeight(1, 0.5) s.tc.UpdateStoreLeaderWeight(2, 0.9) s.tc.UpdateStoreLeaderWeight(3, 1) @@ -759,7 +755,6 @@ func (s *testBalanceRegionSchedulerSuite) TestReplacePendingRegion(c *C) { func (s *testBalanceRegionSchedulerSuite) TestOpInfluence(c *C) { opt := mockoption.NewScheduleOptions() - opt.StoreBalanceRate = 65536 tc := mockcluster.NewCluster(opt) oc := schedule.NewOperatorController(s.ctx, tc, mockhbstream.NewHeartbeatStream()) sb, err := schedule.CreateScheduler(BalanceRegionType, oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder(BalanceRegionType, []string{"", ""})) @@ -770,6 +765,7 @@ func (s *testBalanceRegionSchedulerSuite) TestOpInfluence(c *C) { tc.AddRegionStoreWithLeader(2, 8) tc.AddRegionStoreWithLeader(3, 8) tc.AddRegionStoreWithLeader(4, 16, 8) + // add 8 leader regions to store 4 and move them to store 3 // ensure store score without operator influence : store 4 > store 3 // and store score with operator influence : store 3 > store 4 diff --git a/server/statistics/schedule_options.go b/server/statistics/schedule_options.go index c14c28b6aea..240670449a1 100644 --- a/server/statistics/schedule_options.go +++ b/server/statistics/schedule_options.go @@ -17,6 +17,7 @@ import ( "time" "github.com/pingcap/pd/v4/server/core" + "github.com/pingcap/pd/v4/server/schedule/storelimit" ) // ScheduleOptions is an interface to access configurations. @@ -27,7 +28,7 @@ type ScheduleOptions interface { GetLowSpaceRatio() float64 GetHighSpaceRatio() float64 GetTolerantSizeRatio() float64 - GetStoreBalanceRate() float64 + GetStoreLimitByType(storeID uint64, typ storelimit.Type) float64 GetSchedulerMaxWaitingOperator() uint64 GetLeaderScheduleLimit() uint64 diff --git a/server/statistics/store_collection.go b/server/statistics/store_collection.go index deaeab8f9f7..7018a459130 100644 --- a/server/statistics/store_collection.go +++ b/server/statistics/store_collection.go @@ -148,7 +148,6 @@ func (s *storeStatistics) Collect() { configs["high-space-ratio"] = s.opt.GetHighSpaceRatio() configs["low-space-ratio"] = s.opt.GetLowSpaceRatio() configs["tolerant-size-ratio"] = s.opt.GetTolerantSizeRatio() - configs["store-balance-rate"] = s.opt.GetStoreBalanceRate() configs["hot-region-schedule-limit"] = float64(s.opt.GetHotRegionScheduleLimit()) configs["hot-region-cache-hits-threshold"] = float64(s.opt.GetHotRegionCacheHitsThreshold()) configs["max-pending-peer-count"] = float64(s.opt.GetMaxPendingPeerCount()) diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index 3f5229f5c08..ed541e66cf8 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -89,6 +89,7 @@ func (s *configTestSuite) TestConfig(c *C) { scheduleConfig := svr.GetScheduleConfig() scheduleConfig.Schedulers = nil scheduleConfig.SchedulersPayload = nil + scheduleConfig.StoreLimit = nil c.Assert(&cfg.Schedule, DeepEquals, scheduleConfig) c.Assert(&cfg.Replication, DeepEquals, svr.GetReplicationConfig()) diff --git a/tests/pdctl/operator/operator_test.go b/tests/pdctl/operator/operator_test.go index 6560c0053ed..5c303c71471 100644 --- a/tests/pdctl/operator/operator_test.go +++ b/tests/pdctl/operator/operator_test.go @@ -49,7 +49,6 @@ func (s *operatorTestSuite) TestOperator(c *C) { cluster, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config) { conf.Replication.MaxReplicas = 2 }, func(conf *config.Config) { conf.Schedule.MaxStoreDownTime.Duration = time.Since(t) }, - func(conf *config.Config) { conf.Schedule.StoreBalanceRate = 240 }, ) c.Assert(err, IsNil) err = cluster.RunInitialServers() diff --git a/tests/pdctl/store/store_test.go b/tests/pdctl/store/store_test.go index fc3948e8b57..19d79ef9c4a 100644 --- a/tests/pdctl/store/store_test.go +++ b/tests/pdctl/store/store_test.go @@ -154,56 +154,62 @@ func (s *storeTestSuite) TestStore(c *C) { args = []string{"-u", pdAddr, "store", "limit", "1", "10"} _, _, err = pdctl.ExecuteCommandC(cmd, args...) c.Assert(err, IsNil) - limits := leaderServer.GetRaftCluster().GetOperatorController().GetAllStoresLimit(storelimit.RegionAdd) - c.Assert(limits[1].Rate()*60, Equals, float64(10)) + limit := leaderServer.GetRaftCluster().GetStoreLimitByType(1, storelimit.AddPeer) + c.Assert(limit, Equals, float64(10)) + limit = leaderServer.GetRaftCluster().GetStoreLimitByType(1, storelimit.RemovePeer) + c.Assert(limit, Equals, float64(10)) // store limit - args = []string{"-u", pdAddr, "store", "limit", "1", "5", "region-remove"} + args = []string{"-u", pdAddr, "store", "limit", "1", "5", "remove-peer"} _, _, err = pdctl.ExecuteCommandC(cmd, args...) c.Assert(err, IsNil) - limits = leaderServer.GetRaftCluster().GetOperatorController().GetAllStoresLimit(storelimit.RegionRemove) - c.Assert(limits[1].Rate()*60, Equals, float64(5)) - limits = leaderServer.GetRaftCluster().GetOperatorController().GetAllStoresLimit(storelimit.RegionAdd) - c.Assert(limits[1].Rate()*60, Equals, float64(10)) + limit = leaderServer.GetRaftCluster().GetStoreLimitByType(1, storelimit.RemovePeer) + c.Assert(limit, Equals, float64(5)) + limit = leaderServer.GetRaftCluster().GetStoreLimitByType(1, storelimit.AddPeer) + c.Assert(limit, Equals, float64(10)) // store limit all args = []string{"-u", pdAddr, "store", "limit", "all", "20"} _, _, err = pdctl.ExecuteCommandC(cmd, args...) c.Assert(err, IsNil) - limits = leaderServer.GetRaftCluster().GetOperatorController().GetAllStoresLimit(storelimit.RegionAdd) - c.Assert(limits[3].Rate()*60, Equals, float64(20)) - c.Assert(limits[1].Rate()*60, Equals, float64(20)) - _, ok := limits[2] - c.Assert(ok, IsFalse) + limit1 := leaderServer.GetRaftCluster().GetStoreLimitByType(1, storelimit.AddPeer) + limit2 := leaderServer.GetRaftCluster().GetStoreLimitByType(2, storelimit.AddPeer) + limit3 := leaderServer.GetRaftCluster().GetStoreLimitByType(3, storelimit.AddPeer) + c.Assert(limit1, Equals, float64(20)) + c.Assert(limit2, Equals, float64(20)) + c.Assert(limit3, Equals, float64(20)) + limit1 = leaderServer.GetRaftCluster().GetStoreLimitByType(1, storelimit.RemovePeer) + limit2 = leaderServer.GetRaftCluster().GetStoreLimitByType(2, storelimit.RemovePeer) + limit3 = leaderServer.GetRaftCluster().GetStoreLimitByType(3, storelimit.RemovePeer) + c.Assert(limit1, Equals, float64(20)) + c.Assert(limit2, Equals, float64(20)) + c.Assert(limit3, Equals, float64(20)) // store limit all - args = []string{"-u", pdAddr, "store", "limit", "all", "25", "region-remove"} + args = []string{"-u", pdAddr, "store", "limit", "all", "25", "remove-peer"} _, _, err = pdctl.ExecuteCommandC(cmd, args...) c.Assert(err, IsNil) - limits = leaderServer.GetRaftCluster().GetOperatorController().GetAllStoresLimit(storelimit.RegionRemove) - c.Assert(limits[3].Rate()*60, Equals, float64(25)) - c.Assert(limits[1].Rate()*60, Equals, float64(25)) - _, ok = limits[2] - c.Assert(ok, IsFalse) + limit1 = leaderServer.GetRaftCluster().GetStoreLimitByType(1, storelimit.RemovePeer) + limit3 = leaderServer.GetRaftCluster().GetStoreLimitByType(3, storelimit.RemovePeer) + c.Assert(limit1, Equals, float64(25)) + c.Assert(limit3, Equals, float64(25)) + limit2 = leaderServer.GetRaftCluster().GetStoreLimitByType(2, storelimit.RemovePeer) + c.Assert(limit2, Equals, float64(25)) + // store limit - echo := pdctl.GetEcho([]string{"-u", pdAddr, "store", "limit", "region-remove"}) - allRegionAddLimit := make(map[string]map[string]interface{}) - json.Unmarshal([]byte(echo), &allRegionAddLimit) - c.Assert(allRegionAddLimit["1"]["rate"].(float64), Equals, float64(25)) - c.Assert(allRegionAddLimit["1"]["mode"].(string), Equals, "manual") - c.Assert(allRegionAddLimit["3"]["rate"].(float64), Equals, float64(25)) - c.Assert(allRegionAddLimit["3"]["mode"].(string), Equals, "manual") - _, ok = allRegionAddLimit["2"] - c.Assert(ok, IsFalse) - // store limit - args = []string{"-u", pdAddr, "store", "limit"} - _, _, err = pdctl.ExecuteCommandC(cmd, args...) - c.Assert(err, IsNil) - limits = leaderServer.GetRaftCluster().GetOperatorController().GetAllStoresLimit(storelimit.RegionAdd) - c.Assert(limits[1].Rate()*60, Equals, float64(20)) - c.Assert(limits[3].Rate()*60, Equals, float64(20)) - _, ok = limits[2] - c.Assert(ok, IsFalse) + echo := pdctl.GetEcho([]string{"-u", pdAddr, "store", "limit"}) + allAddPeerLimit := make(map[string]map[string]interface{}) + json.Unmarshal([]byte(echo), &allAddPeerLimit) + c.Assert(allAddPeerLimit["1"]["add-peer"].(float64), Equals, float64(20)) + c.Assert(allAddPeerLimit["3"]["add-peer"].(float64), Equals, float64(20)) + c.Assert(allAddPeerLimit["2"]["add-peer"].(float64), Equals, float64(20)) + + echo = pdctl.GetEcho([]string{"-u", pdAddr, "store", "limit", "remove-peer"}) + allRemovePeerLimit := make(map[string]map[string]interface{}) + json.Unmarshal([]byte(echo), &allRemovePeerLimit) + c.Assert(allRemovePeerLimit["1"]["remove-peer"].(float64), Equals, float64(25)) + c.Assert(allRemovePeerLimit["3"]["remove-peer"].(float64), Equals, float64(25)) + c.Assert(allRemovePeerLimit["2"]["remove-peer"].(float64), Equals, float64(25)) // store delete command c.Assert(storeInfo.Store.State, Equals, metapb.StoreState_Up) @@ -241,9 +247,9 @@ func (s *storeTestSuite) TestStore(c *C) { // It should be called after stores remove-tombstone. echo = pdctl.GetEcho([]string{"-u", pdAddr, "stores", "show", "limit"}) c.Assert(strings.Contains(echo, "PANIC"), IsFalse) - echo = pdctl.GetEcho([]string{"-u", pdAddr, "stores", "show", "limit", "region-remove"}) + echo = pdctl.GetEcho([]string{"-u", pdAddr, "stores", "show", "limit", "remove-peer"}) c.Assert(strings.Contains(echo, "PANIC"), IsFalse) - echo = pdctl.GetEcho([]string{"-u", pdAddr, "stores", "show", "limit", "region-add"}) + echo = pdctl.GetEcho([]string{"-u", pdAddr, "stores", "show", "limit", "add-peer"}) c.Assert(strings.Contains(echo, "PANIC"), IsFalse) // store limit-scene args = []string{"-u", pdAddr, "store", "limit-scene"} @@ -252,7 +258,7 @@ func (s *storeTestSuite) TestStore(c *C) { scene := &storelimit.Scene{} err = json.Unmarshal(output, scene) c.Assert(err, IsNil) - c.Assert(scene, DeepEquals, storelimit.DefaultScene(storelimit.RegionAdd)) + c.Assert(scene, DeepEquals, storelimit.DefaultScene(storelimit.AddPeer)) // store limit-scene args = []string{"-u", pdAddr, "store", "limit-scene", "idle", "200"} @@ -267,10 +273,10 @@ func (s *storeTestSuite) TestStore(c *C) { c.Assert(scene.Idle, Equals, 200) // store limit-scene - args = []string{"-u", pdAddr, "store", "limit-scene", "idle", "100", "region-remove"} + args = []string{"-u", pdAddr, "store", "limit-scene", "idle", "100", "remove-peer"} _, _, err = pdctl.ExecuteCommandC(cmd, args...) c.Assert(err, IsNil) - args = []string{"-u", pdAddr, "store", "limit-scene", "region-remove"} + args = []string{"-u", pdAddr, "store", "limit-scene", "remove-peer"} scene = &storelimit.Scene{} _, output, err = pdctl.ExecuteCommandC(cmd, args...) c.Assert(err, IsNil) diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 7eddbe85783..3299cf65966 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -138,6 +138,10 @@ func (s *clusterTestSuite) TestGetPutConfig(c *C) { regionByID := getRegionByID(c, clusterID, grpcPDClient, region.GetId()) c.Assert(region, DeepEquals, regionByID) + r := core.NewRegionInfo(region, region.Peers[0], core.SetApproximateSize(30)) + err = tc.HandleRegionHeartbeat(r) + c.Assert(err, IsNil) + // Get store. storeID := peer.GetStoreId() store := getStore(c, clusterID, grpcPDClient, storeID) @@ -214,35 +218,42 @@ func resetStoreState(c *C, rc *cluster.RaftCluster, storeID uint64, state metapb c.Assert(store, NotNil) newStore := store.Clone(core.SetStoreState(state)) rc.GetCacheCluster().PutStore(newStore) + if state == metapb.StoreState_Offline { + rc.SetStoreLimit(storeID, storelimit.RemovePeer, storelimit.Unlimited) + } else if state == metapb.StoreState_Tombstone { + rc.RemoveStoreLimit(storeID) + } } func testStateAndLimit(c *C, clusterID uint64, rc *cluster.RaftCluster, grpcPDClient pdpb.PDClient, store *metapb.Store, beforeState metapb.StoreState, run func(*cluster.RaftCluster) error, expectStates ...metapb.StoreState) { // prepare storeID := store.GetId() oc := rc.GetOperatorController() - oc.SetAllStoresLimit(1.0, storelimit.Manual, storelimit.RegionAdd) - oc.SetAllStoresLimit(1.0, storelimit.Manual, storelimit.RegionRemove) + rc.SetStoreLimit(storeID, storelimit.AddPeer, 60) + rc.SetStoreLimit(storeID, storelimit.RemovePeer, 60) + op := operator.NewOperator("test", "test", 2, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: storeID, PeerID: 3}) + oc.AddOperator(op) + op = operator.NewOperator("test", "test", 2, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: storeID}) + oc.AddOperator(op) + resetStoreState(c, rc, store.GetId(), beforeState) - _, isRegionAddLimitOKBefore := oc.GetAllStoresLimit(storelimit.RegionAdd)[storeID] - _, isRegionRemoveLimitOKBefore := oc.GetAllStoresLimit(storelimit.RegionRemove)[storeID] + _, isOKBefore := rc.GetAllStoresLimit()[storeID] // run err := run(rc) // judge - _, isRegionAddLimitOKAfter := oc.GetAllStoresLimit(storelimit.RegionAdd)[storeID] - _, isRegionRemoveLimitOKAfter := oc.GetAllStoresLimit(storelimit.RegionRemove)[storeID] + _, isOKAfter := rc.GetAllStoresLimit()[storeID] if len(expectStates) != 0 { c.Assert(err, IsNil) expectState := expectStates[0] c.Assert(getStore(c, clusterID, grpcPDClient, storeID).GetState(), Equals, expectState) if expectState == metapb.StoreState_Offline { - c.Assert(isRegionAddLimitOKAfter && isRegionRemoveLimitOKAfter, IsTrue) + c.Assert(isOKAfter, IsTrue) } else if expectState == metapb.StoreState_Tombstone { - c.Assert(isRegionAddLimitOKAfter || isRegionRemoveLimitOKAfter, IsFalse) + c.Assert(isOKAfter, IsFalse) } } else { c.Assert(err, NotNil) - c.Assert(isRegionAddLimitOKAfter, Equals, isRegionAddLimitOKBefore) - c.Assert(isRegionRemoveLimitOKAfter, Equals, isRegionRemoveLimitOKBefore) + c.Assert(isOKBefore, Equals, isOKAfter) } } @@ -555,7 +566,6 @@ func (s *clusterTestSuite) TestSetScheduleOpt(c *C) { cfg := config.NewConfig() cfg.Schedule.TolerantSizeRatio = 5 - cfg.Schedule.StoreBalanceRate = 60 err = cfg.Adjust(nil) c.Assert(err, IsNil) opt := config.NewPersistOptions(cfg) @@ -899,7 +909,8 @@ func (s *clusterTestSuite) TestOfflineStoreLimit(c *C) { } oc := rc.GetOperatorController() - oc.SetAllStoresLimit(1, storelimit.Manual, storelimit.RegionRemove) + opt := rc.GetOpt() + opt.SetAllStoresLimit(storelimit.RemovePeer, 1) // only can add 5 remove peer operators on store 1 for i := uint64(1); i <= 5; i++ { op := operator.NewOperator("test", "test", 1, &metapb.RegionEpoch{ConfVer: 1, Version: 1}, operator.OpRegion, operator.RemovePeer{FromStore: 1}) @@ -921,7 +932,7 @@ func (s *clusterTestSuite) TestOfflineStoreLimit(c *C) { c.Assert(oc.RemoveOperator(op), IsFalse) // reset all store limit - oc.SetAllStoresLimit(1, storelimit.Manual, storelimit.RegionRemove) + opt.SetAllStoresLimit(storelimit.RemovePeer, 2) // only can add 5 remove peer operators on store 2 for i := uint64(1); i <= 5; i++ { @@ -934,7 +945,9 @@ func (s *clusterTestSuite) TestOfflineStoreLimit(c *C) { c.Assert(oc.RemoveOperator(op), IsFalse) // offline store 1 + rc.SetStoreLimit(1, storelimit.RemovePeer, storelimit.Unlimited) rc.RemoveStore(1) + // can add unlimited remove peer operators on store 1 for i := uint64(1); i <= 30; i++ { op := operator.NewOperator("test", "test", 1, &metapb.RegionEpoch{ConfVer: 1, Version: 1}, operator.OpRegion, operator.RemovePeer{FromStore: 1}) diff --git a/tools/pd-ctl/README.md b/tools/pd-ctl/README.md index c6950021c49..2aecb8d62de 100644 --- a/tools/pd-ctl/README.md +++ b/tools/pd-ctl/README.md @@ -127,7 +127,6 @@ Usage: "replica-schedule-limit": 64, "scheduler-max-waiting-operator": 5, "split-merge-interval": "1h0m0s", - "store-balance-rate": 15, "store-limit-mode": "manual", "tolerant-size-ratio": 0 } @@ -638,16 +637,16 @@ Usage: >> store label 1 zone cn // Set the value of the label with the "zone" key to "cn" for the store with the store id of 1 >> store weight 1 5 10 // Set the leader weight to 5 and region weight to 10 for the store with the store id of 1 >> store remove-tombstone // Remove stores that are in tombstone state ->> store limit // Show limits of adding region operation for all stores ->> store limit region-add // Show limits of adding region operation for all stores ->> store limit region-remove // Show limits of removing region operation for all stores ->> store limit all 5 // Limit 5 adding region operations per minute for all stores ->> store limit 1 5 // Limit 5 adding region operations per minute for store 1 ->> store limit all 5 region-add // Limit 5 adding region operations per minute for all stores ->> store limit 1 5 region-add // Limit 5 adding region operations per minute for store 1 ->> store limit 1 5 region-remove // Limit 5 removing region operations per minute for store 1 ->> store limit all 5 region-remove // Limit 5 removing region operations per minute for all stores ->> store limit-scene // Show all limit scene +>> store limit // Show limits of adding peer and removing peer operation for all stores +>> store limit add-peer // Show limits of adding peer operation for all stores +>> store limit remove-peer // Show limits of removing peer operation for all stores +>> store limit all 5 // Limit 5 adding peer operations and 5 remove peer operations per minute for all stores +>> store limit 1 5 // Limit 5 adding peer operations and 5 remove peer operations per minute for store 1 +>> store limit all 5 add-peer // Limit 5 adding peer operations per minute for all stores +>> store limit 1 5 add-peer // Limit 5 adding peer operations per minute for store 1 +>> store limit 1 5 remove-peer // Limit 5 removing peer operations per minute for store 1 +>> store limit all 5 remove-peer // Limit 5 removing peer operations per minute for all stores +>> store limit-scene // Show all limit scene { "Idle": 100, "Low": 50, @@ -657,6 +656,10 @@ Usage: >> store limit-scene idle 100 // set rate to 100 in the idle scene ``` +> **Notice** +> +> When using `store limit` command, the original `region-add` and `region-remove` are deprecated, please use `add-peer` and `remove-peer`. + ### `tso` Use this command to parse the physical and logical time of TSO. @@ -669,7 +672,6 @@ system: 2017-10-09 05:50:59 +0800 CST logic: 120102 ``` - ## Jq formatted JSON output usage ### Simplify the output of `store` diff --git a/tools/pd-ctl/pdctl/command/config_command.go b/tools/pd-ctl/pdctl/command/config_command.go index 6514d344284..d4d77e5a6b4 100644 --- a/tools/pd-ctl/pdctl/command/config_command.go +++ b/tools/pd-ctl/pdctl/command/config_command.go @@ -217,6 +217,7 @@ func showConfigCommandFunc(cmd *cobra.Command, args []string) { delete(scheduleConfig, "schedulers-v2") delete(scheduleConfig, "schedulers-payload") + delete(scheduleConfig, "store-limit") data["schedule"] = scheduleConfig r, err := json.MarshalIndent(data, "", " ") if err != nil { diff --git a/tools/pd-ctl/pdctl/command/store_command.go b/tools/pd-ctl/pdctl/command/store_command.go index 8cf991d6e94..148287cc7f7 100644 --- a/tools/pd-ctl/pdctl/command/store_command.go +++ b/tools/pd-ctl/pdctl/command/store_command.go @@ -91,7 +91,7 @@ func NewStoreLimitCommand() *cobra.Command { c := &cobra.Command{ Use: "limit []|[| ]", Short: "show or set a store's rate limit", - Long: "show or set a store's rate limit, can be 'region-add'(default) or 'region-remove'", + Long: "show or set a store's rate limit, can be 'add-peer'(default) or 'remove-peer'", Run: storeLimitCommandFunc, } return c @@ -146,7 +146,7 @@ func NewShowStoresCommand() *cobra.Command { func NewShowAllStoresLimitCommand() *cobra.Command { sc := &cobra.Command{ Use: "limit ", - Short: "show all stores' limit, can be 'region-add'(default) or 'region-remove'", + Short: "show all stores' limit, can be 'add-peer'(default) or 'remove-peer'", Deprecated: "use store limit instead", Run: showAllStoresLimitCommandFunc, } @@ -169,7 +169,7 @@ func NewSetAllLimitCommand() *cobra.Command { return &cobra.Command{ Use: "limit ", Short: "set all store's rate limit", - Long: "set all store's rate limit, can be 'region-add'(default) or 'region-remove'", + Long: "set all store's rate limit, can be 'add-peer'(default) or 'remove-peer'", Deprecated: "use store limit all instead", Run: setAllLimitCommandFunc, } @@ -180,7 +180,7 @@ func NewStoreLimitSceneCommand() *cobra.Command { return &cobra.Command{ Use: "limit-scene []|[ ]", Short: "show or set the limit value for a scene", - Long: "show or set the limit value for a scene, can be 'region-add'(default) or 'region-remove'", + Long: "show or set the limit value for a scene, can be 'add-peer'(default) or 'remove-peer'", Run: storeLimitSceneCommandFunc, } }