Skip to content

Commit

Permalink
support ttl config
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Nov 22, 2023
1 parent f9f9be6 commit 709b68c
Show file tree
Hide file tree
Showing 5 changed files with 316 additions and 145 deletions.
267 changes: 191 additions & 76 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"os"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync/atomic"
"time"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/spf13/pflag"
"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/mcs/utils"
Expand Down Expand Up @@ -220,6 +222,7 @@ func (c *Config) Clone() *Config {
// PersistConfig wraps all configurations that need to persist to storage and
// allows to access them safely.
type PersistConfig struct {
ttl *cache.TTLString
// Store the global configuration that is related to the scheduling.
clusterVersion unsafe.Pointer
schedule atomic.Value
Expand All @@ -239,6 +242,7 @@ func NewPersistConfig(cfg *Config) *PersistConfig {
// storeConfig will be fetched from TiKV by PD API server,
// so we just set an empty value here first.
o.storeConfig.Store(&sc.StoreConfig{})
o.ttl = nil
return o
}

Expand Down Expand Up @@ -329,16 +333,6 @@ func (o *PersistConfig) GetMaxReplicas() int {
return int(o.GetReplicationConfig().MaxReplicas)
}

// GetMaxSnapshotCount returns the max snapshot count.
func (o *PersistConfig) GetMaxSnapshotCount() uint64 {
return o.GetScheduleConfig().MaxSnapshotCount
}

// GetMaxPendingPeerCount returns the max pending peer count.
func (o *PersistConfig) GetMaxPendingPeerCount() uint64 {
return o.GetScheduleConfig().MaxPendingPeerCount
}

// IsPlacementRulesEnabled returns if the placement rules is enabled.
func (o *PersistConfig) IsPlacementRulesEnabled() bool {
return o.GetReplicationConfig().EnablePlacementRules
Expand All @@ -354,31 +348,6 @@ func (o *PersistConfig) GetHighSpaceRatio() float64 {
return o.GetScheduleConfig().HighSpaceRatio
}

// GetHotRegionScheduleLimit returns the limit for hot region schedule.
func (o *PersistConfig) GetHotRegionScheduleLimit() uint64 {
return o.GetScheduleConfig().HotRegionScheduleLimit
}

// GetRegionScheduleLimit returns the limit for region schedule.
func (o *PersistConfig) GetRegionScheduleLimit() uint64 {
return o.GetScheduleConfig().RegionScheduleLimit
}

// GetLeaderScheduleLimit returns the limit for leader schedule.
func (o *PersistConfig) GetLeaderScheduleLimit() uint64 {
return o.GetScheduleConfig().LeaderScheduleLimit
}

// GetReplicaScheduleLimit returns the limit for replica schedule.
func (o *PersistConfig) GetReplicaScheduleLimit() uint64 {
return o.GetScheduleConfig().ReplicaScheduleLimit
}

// GetMergeScheduleLimit returns the limit for merge schedule.
func (o *PersistConfig) GetMergeScheduleLimit() uint64 {
return o.GetScheduleConfig().MergeScheduleLimit
}

// GetLeaderSchedulePolicy is to get leader schedule policy.
func (o *PersistConfig) GetLeaderSchedulePolicy() constant.SchedulePolicy {
return constant.StringToSchedulePolicy(o.GetScheduleConfig().LeaderSchedulePolicy)
Expand Down Expand Up @@ -419,26 +388,11 @@ func (o *PersistConfig) IsOneWayMergeEnabled() bool {
return o.GetScheduleConfig().EnableOneWayMerge
}

// GetMaxMergeRegionSize returns the max region size.
func (o *PersistConfig) GetMaxMergeRegionSize() uint64 {
return o.GetScheduleConfig().MaxMergeRegionSize
}

// GetMaxMergeRegionKeys returns the max region keys.
func (o *PersistConfig) GetMaxMergeRegionKeys() uint64 {
return o.GetScheduleConfig().MaxMergeRegionKeys
}

// GetRegionScoreFormulaVersion returns the region score formula version.
func (o *PersistConfig) GetRegionScoreFormulaVersion() string {
return o.GetScheduleConfig().RegionScoreFormulaVersion
}

// GetSchedulerMaxWaitingOperator returns the scheduler max waiting operator.
func (o *PersistConfig) GetSchedulerMaxWaitingOperator() uint64 {
return o.GetScheduleConfig().SchedulerMaxWaitingOperator
}

// GetHotRegionCacheHitsThreshold returns the hot region cache hits threshold.
func (o *PersistConfig) GetHotRegionCacheHitsThreshold() int {
return int(o.GetScheduleConfig().HotRegionCacheHitsThreshold)
Expand Down Expand Up @@ -474,11 +428,6 @@ func (o *PersistConfig) GetTolerantSizeRatio() float64 {
return o.GetScheduleConfig().TolerantSizeRatio
}

// GetWitnessScheduleLimit returns the limit for region schedule.
func (o *PersistConfig) GetWitnessScheduleLimit() uint64 {
return o.GetScheduleConfig().WitnessScheduleLimit
}

// IsDebugMetricsEnabled returns if debug metrics is enabled.
func (o *PersistConfig) IsDebugMetricsEnabled() bool {
return o.GetScheduleConfig().EnableDebugMetrics
Expand Down Expand Up @@ -509,11 +458,6 @@ func (o *PersistConfig) IsRemoveExtraReplicaEnabled() bool {
return o.GetScheduleConfig().EnableRemoveExtraReplica
}

// IsLocationReplacementEnabled returns if location replace is enabled.
func (o *PersistConfig) IsLocationReplacementEnabled() bool {
return o.GetScheduleConfig().EnableLocationReplacement
}

// IsWitnessAllowed returns if the witness is allowed.
func (o *PersistConfig) IsWitnessAllowed() bool {
return o.GetScheduleConfig().EnableWitness
Expand All @@ -534,8 +478,87 @@ func (o *PersistConfig) GetStoresLimit() map[uint64]sc.StoreLimitConfig {
return o.GetScheduleConfig().StoreLimit
}

// TTL related methods.

// GetLeaderScheduleLimit returns the limit for leader schedule.
func (o *PersistConfig) GetLeaderScheduleLimit() uint64 {
return o.getTTLUintOr(sc.LeaderScheduleLimitKey, o.GetScheduleConfig().LeaderScheduleLimit)
}

// GetRegionScheduleLimit returns the limit for region schedule.
func (o *PersistConfig) GetRegionScheduleLimit() uint64 {
return o.getTTLUintOr(sc.RegionScheduleLimitKey, o.GetScheduleConfig().RegionScheduleLimit)
}

// GetWitnessScheduleLimit returns the limit for region schedule.
func (o *PersistConfig) GetWitnessScheduleLimit() uint64 {
return o.getTTLUintOr(sc.WitnessScheduleLimitKey, o.GetScheduleConfig().WitnessScheduleLimit)
}

// GetReplicaScheduleLimit returns the limit for replica schedule.
func (o *PersistConfig) GetReplicaScheduleLimit() uint64 {
return o.getTTLUintOr(sc.ReplicaRescheduleLimitKey, o.GetScheduleConfig().ReplicaScheduleLimit)
}

// GetMergeScheduleLimit returns the limit for merge schedule.
func (o *PersistConfig) GetMergeScheduleLimit() uint64 {
return o.getTTLUintOr(sc.MergeScheduleLimitKey, o.GetScheduleConfig().MergeScheduleLimit)
}

// GetHotRegionScheduleLimit returns the limit for hot region schedule.
func (o *PersistConfig) GetHotRegionScheduleLimit() uint64 {
return o.getTTLUintOr(sc.HotRegionScheduleLimitKey, o.GetScheduleConfig().HotRegionScheduleLimit)
}

// GetStoreLimit returns the limit of a store.
func (o *PersistConfig) GetStoreLimit(storeID uint64) (returnSC sc.StoreLimitConfig) {
defer func() {
returnSC.RemovePeer = o.getTTLFloatOr(fmt.Sprintf("remove-peer-%v", storeID), returnSC.RemovePeer)
returnSC.AddPeer = o.getTTLFloatOr(fmt.Sprintf("add-peer-%v", storeID), returnSC.AddPeer)
}()
if limit, ok := o.GetScheduleConfig().StoreLimit[storeID]; ok {
return limit
}
cfg := o.GetScheduleConfig().Clone()
sc := sc.StoreLimitConfig{
AddPeer: sc.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer),
RemovePeer: sc.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer),
}
v, ok1, err := o.getTTLFloat("default-add-peer")
if err != nil {
log.Warn("failed to parse default-add-peer from PersistOptions's ttl storage", zap.Error(err))
}
canSetAddPeer := ok1 && err == nil
if canSetAddPeer {
returnSC.AddPeer = v
}

v, ok2, err := o.getTTLFloat("default-remove-peer")
if err != nil {
log.Warn("failed to parse default-remove-peer from PersistOptions's ttl storage", zap.Error(err))
}
canSetRemovePeer := ok2 && err == nil
if canSetRemovePeer {
returnSC.RemovePeer = v
}

if canSetAddPeer || canSetRemovePeer {
return returnSC
}
cfg.StoreLimit[storeID] = sc
o.SetScheduleConfig(cfg)
return o.GetScheduleConfig().StoreLimit[storeID]
}

// GetStoreLimitByType returns the limit of a store with a given type.
func (o *PersistConfig) GetStoreLimitByType(storeID uint64, typ storelimit.Type) (returned float64) {
defer func() {
if typ == storelimit.RemovePeer {
returned = o.getTTLFloatOr(fmt.Sprintf("remove-peer-%v", storeID), returned)
} else if typ == storelimit.AddPeer {
returned = o.getTTLFloatOr(fmt.Sprintf("add-peer-%v", storeID), returned)
}
}()
limit := o.GetStoreLimit(storeID)
switch typ {
case storelimit.AddPeer:
Expand All @@ -550,20 +573,48 @@ func (o *PersistConfig) GetStoreLimitByType(storeID uint64, typ storelimit.Type)
}
}

// GetStoreLimit returns the limit of a store.
func (o *PersistConfig) GetStoreLimit(storeID uint64) (returnSC sc.StoreLimitConfig) {
if limit, ok := o.GetScheduleConfig().StoreLimit[storeID]; ok {
return limit
// GetMaxSnapshotCount returns the number of the max snapshot which is allowed to send.
func (o *PersistConfig) GetMaxSnapshotCount() uint64 {
return o.getTTLUintOr(sc.MaxSnapshotCountKey, o.GetScheduleConfig().MaxSnapshotCount)
}

// GetMaxPendingPeerCount returns the number of the max pending peers.
func (o *PersistConfig) GetMaxPendingPeerCount() uint64 {
return o.getTTLUintOr(sc.MaxPendingPeerCountKey, o.GetScheduleConfig().MaxPendingPeerCount)
}

// GetMaxMergeRegionSize returns the max region size.
func (o *PersistConfig) GetMaxMergeRegionSize() uint64 {
return o.getTTLUintOr(sc.MaxMergeRegionSizeKey, o.GetScheduleConfig().MaxMergeRegionSize)
}

// GetMaxMergeRegionKeys returns the max number of keys.
// It returns size * 10000 if the key of max-merge-region-Keys doesn't exist.
func (o *PersistConfig) GetMaxMergeRegionKeys() uint64 {
keys, exist, err := o.getTTLUint(sc.MaxMergeRegionKeysKey)
if exist && err == nil {
return keys
}
cfg := o.GetScheduleConfig().Clone()
sc := sc.StoreLimitConfig{
AddPeer: sc.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer),
RemovePeer: sc.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer),
size, exist, err := o.getTTLUint(sc.MaxMergeRegionSizeKey)
if exist && err == nil {
return size * 10000
}
return o.GetScheduleConfig().GetMaxMergeRegionKeys()
}

cfg.StoreLimit[storeID] = sc
o.SetScheduleConfig(cfg)
return o.GetScheduleConfig().StoreLimit[storeID]
// GetSchedulerMaxWaitingOperator returns the number of the max waiting operators.
func (o *PersistConfig) GetSchedulerMaxWaitingOperator() uint64 {
return o.getTTLUintOr(sc.SchedulerMaxWaitingOperatorKey, o.GetScheduleConfig().SchedulerMaxWaitingOperator)
}

// IsLocationReplacementEnabled returns if location replace is enabled.
func (o *PersistConfig) IsLocationReplacementEnabled() bool {
return o.getTTLBoolOr(sc.EnableLocationReplacement, o.GetScheduleConfig().EnableLocationReplacement)
}

// IsTikvRegionSplitEnabled returns whether tikv split region is disabled.
func (o *PersistConfig) IsTikvRegionSplitEnabled() bool {
return o.getTTLBoolOr(sc.EnableTiKVSplitRegion, o.GetScheduleConfig().EnableTiKVSplitRegion)
}

// SetAllStoresLimit sets all store limit for a given type and rate.
Expand Down Expand Up @@ -680,11 +731,6 @@ func (o *PersistConfig) IsRaftKV2() bool {
return o.GetStoreConfig().IsRaftKV2()
}

// IsTikvRegionSplitEnabled returns whether tikv split region is disabled.
func (o *PersistConfig) IsTikvRegionSplitEnabled() bool {
return o.GetScheduleConfig().EnableTiKVSplitRegion
}

// TODO: implement the following methods

// AddSchedulerCfg adds the scheduler configurations.
Expand All @@ -710,3 +756,72 @@ func (o *PersistConfig) IsTraceRegionFlow() bool {
func (o *PersistConfig) Persist(storage endpoint.ConfigStorage) error {
return nil
}

func (o *PersistConfig) getTTLUint(key string) (uint64, bool, error) {
stringForm, ok := o.GetTTLData(key)
if !ok {
return 0, false, nil
}
r, err := strconv.ParseUint(stringForm, 10, 64)
return r, true, err
}

func (o *PersistConfig) getTTLUintOr(key string, defaultValue uint64) uint64 {
if v, ok, err := o.getTTLUint(key); ok {
if err == nil {
return v
}
log.Warn("failed to parse "+key+" from PersistOptions's ttl storage", zap.Error(err))
}
return defaultValue
}

func (o *PersistConfig) getTTLBool(key string) (result bool, contains bool, err error) {
stringForm, ok := o.GetTTLData(key)
if !ok {
return
}
result, err = strconv.ParseBool(stringForm)
contains = true
return
}

func (o *PersistConfig) getTTLBoolOr(key string, defaultValue bool) bool {
if v, ok, err := o.getTTLBool(key); ok {
if err == nil {
return v
}
log.Warn("failed to parse "+key+" from PersistOptions's ttl storage", zap.Error(err))
}
return defaultValue
}

func (o *PersistConfig) getTTLFloat(key string) (float64, bool, error) {
stringForm, ok := o.GetTTLData(key)
if !ok {
return 0, false, nil
}
r, err := strconv.ParseFloat(stringForm, 64)
return r, true, err
}

func (o *PersistConfig) getTTLFloatOr(key string, defaultValue float64) float64 {
if v, ok, err := o.getTTLFloat(key); ok {
if err == nil {
return v
}
log.Warn("failed to parse "+key+" from PersistOptions's ttl storage", zap.Error(err))
}
return defaultValue
}

// GetTTLData returns if there is a TTL data for a given key.
func (o *PersistConfig) GetTTLData(key string) (string, bool) {
if o.ttl == nil {
return "", false
}
if result, ok := o.ttl.Get(key); ok {
return result.(string), ok
}
return "", false
}
Loading

0 comments on commit 709b68c

Please sign in to comment.