Skip to content

Commit

Permalink
Refine the code and comments
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Aug 14, 2023
1 parent 42fb553 commit 2f49527
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 42 deletions.
23 changes: 12 additions & 11 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ type Watcher struct {
// - Key: /pd/{cluster_id}/config
// - Value: configuration JSON.
configPath string
// schedulerConfigPath is the path of the scheduler configuration in etcd:
// schedulerConfigPathPrefix is the path prefix of the scheduler configuration in etcd:
// - Key: /pd/{cluster_id}/scheduler_config/{scheduler_name}
// - Value: configuration JSON.
schedulerConfigPath string
schedulerConfigPathPrefix string

etcdClient *clientv3.Client
configWatcher *etcdutil.LoopWatcher
Expand All @@ -68,12 +68,12 @@ func NewWatcher(
) (*Watcher, error) {
ctx, cancel := context.WithCancel(ctx)
cw := &Watcher{
ctx: ctx,
cancel: cancel,
configPath: endpoint.ConfigPath(clusterID),
schedulerConfigPath: endpoint.SchedulerConfigPath(clusterID) + "/",
etcdClient: etcdClient,
PersistConfig: persistConfig,
ctx: ctx,
cancel: cancel,
configPath: endpoint.ConfigPath(clusterID),
schedulerConfigPathPrefix: endpoint.SchedulerConfigPathPrefix(clusterID),
etcdClient: etcdClient,
PersistConfig: persistConfig,
}
err := cw.initializeConfigWatcher()
if err != nil {
Expand Down Expand Up @@ -117,15 +117,16 @@ func (cw *Watcher) initializeConfigWatcher() error {
}

func (cw *Watcher) initializeSchedulerConfigWatcher() error {
prefixToTrim := cw.schedulerConfigPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
cw.SetSchedulerConfig(
strings.TrimPrefix(string(kv.Key), cw.schedulerConfigPath),
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
cw.RemoveSchedulerConfig(strings.TrimPrefix(string(kv.Key), cw.schedulerConfigPath))
cw.RemoveSchedulerConfig(strings.TrimPrefix(string(kv.Key), prefixToTrim))
return nil
}
postEventFn := func() error {
Expand All @@ -134,7 +135,7 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error {
cw.schedulerConfigWatcher = etcdutil.NewLoopWatcher(
cw.ctx, &cw.wg,
cw.etcdClient,
"scheduling-scheduler-config-watcher", cw.schedulerConfigPath,
"scheduling-scheduler-config-watcher", cw.schedulerConfigPathPrefix,
putFn, deleteFn, postEventFn,
clientv3.WithPrefix(),
)
Expand Down
49 changes: 26 additions & 23 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (rs *ruleStorage) LoadRules(f func(k, v string)) error {
return nil
}

// SaveRule stores a rule cfg to the rulesPath.
// SaveRule stores a rule cfg to the rulesPathPrefix.
func (rs *ruleStorage) SaveRule(ruleKey string, rule interface{}) error {
rs.rules.Store(ruleKey, rule)
return nil
Expand Down Expand Up @@ -105,18 +105,18 @@ type Watcher struct {
cancel context.CancelFunc
wg sync.WaitGroup

// rulePath:
// rulesPathPrefix:
// - Key: /pd/{cluster_id}/rules/{group_id}-{rule_id}
// - Value: placement.Rule
rulesPath string
// ruleGroupPath:
rulesPathPrefix string
// ruleGroupPathPrefix:
// - Key: /pd/{cluster_id}/rule_group/{group_id}
// - Value: placement.RuleGroup
ruleGroupPath string
// regionLabelPath:
ruleGroupPathPrefix string
// regionLabelPathPrefix:
// - Key: /pd/{cluster_id}/region_label/{rule_id}
// - Value: labeler.LabelRule
regionLabelPath string
regionLabelPathPrefix string

etcdClient *clientv3.Client
ruleStore *ruleStorage
Expand All @@ -135,13 +135,13 @@ func NewWatcher(
) (*Watcher, error) {
ctx, cancel := context.WithCancel(ctx)
rw := &Watcher{
ctx: ctx,
cancel: cancel,
rulesPath: endpoint.RulesPath(clusterID) + "/",
ruleGroupPath: endpoint.RuleGroupPath(clusterID) + "/",
regionLabelPath: endpoint.RegionLabelPath(clusterID) + "/",
etcdClient: etcdClient,
ruleStore: &ruleStorage{},
ctx: ctx,
cancel: cancel,
rulesPathPrefix: endpoint.RulesPathPrefix(clusterID),
ruleGroupPathPrefix: endpoint.RuleGroupPathPrefix(clusterID),
regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID),
etcdClient: etcdClient,
ruleStore: &ruleStorage{},
}
err := rw.initializeRuleWatcher()
if err != nil {
Expand All @@ -159,24 +159,25 @@ func NewWatcher(
}

func (rw *Watcher) initializeRuleWatcher() error {
prefixToTrim := rw.rulesPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
// Since the PD API server will validate the rule before saving it to etcd,
// so we could directly save the string rule in JSON to the storage here.
return rw.ruleStore.SaveRule(
strings.TrimPrefix(string(kv.Key), rw.rulesPath),
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
return rw.ruleStore.DeleteRule(strings.TrimPrefix(string(kv.Key), rw.rulesPath))
return rw.ruleStore.DeleteRule(strings.TrimPrefix(string(kv.Key), prefixToTrim))
}
postEventFn := func() error {
return nil
}
rw.ruleWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-rule-watcher", rw.rulesPath,
"scheduling-rule-watcher", rw.rulesPathPrefix,
putFn, deleteFn, postEventFn,
clientv3.WithPrefix(),
)
Expand All @@ -185,22 +186,23 @@ func (rw *Watcher) initializeRuleWatcher() error {
}

func (rw *Watcher) initializeGroupWatcher() error {
prefixToTrim := rw.ruleGroupPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
return rw.ruleStore.SaveRuleGroup(
strings.TrimPrefix(string(kv.Key), rw.ruleGroupPath),
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
return rw.ruleStore.DeleteRuleGroup(strings.TrimPrefix(string(kv.Key), rw.ruleGroupPath))
return rw.ruleStore.DeleteRuleGroup(strings.TrimPrefix(string(kv.Key), prefixToTrim))
}
postEventFn := func() error {
return nil
}
rw.groupWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-rule-group-watcher", rw.ruleGroupPath,
"scheduling-rule-group-watcher", rw.ruleGroupPathPrefix,
putFn, deleteFn, postEventFn,
clientv3.WithPrefix(),
)
Expand All @@ -209,22 +211,23 @@ func (rw *Watcher) initializeGroupWatcher() error {
}

func (rw *Watcher) initializeRegionLabelWatcher() error {
prefixToTrim := rw.regionLabelPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
return rw.ruleStore.SaveRegionRule(
strings.TrimPrefix(string(kv.Key), rw.regionLabelPath),
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
return rw.ruleStore.DeleteRegionRule(strings.TrimPrefix(string(kv.Key), rw.regionLabelPath))
return rw.ruleStore.DeleteRegionRule(strings.TrimPrefix(string(kv.Key), prefixToTrim))
}
postEventFn := func() error {
return nil
}
rw.labelWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-region-label-watcher", rw.regionLabelPath,
"scheduling-region-label-watcher", rw.regionLabelPathPrefix,
putFn, deleteFn, postEventFn,
clientv3.WithPrefix(),
)
Expand Down
16 changes: 8 additions & 8 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,23 +92,23 @@ func ConfigPath(clusterID uint64) string {
return path.Join(PDRootPath(clusterID), configPath)
}

// SchedulerConfigPath returns the path to save the scheduler config.
func SchedulerConfigPath(clusterID uint64) string {
// SchedulerConfigPathPrefix returns the path prefix to save the scheduler config.
func SchedulerConfigPathPrefix(clusterID uint64) string {
return path.Join(PDRootPath(clusterID), customScheduleConfigPath)
}

// RulesPath returns the path to save the placement rules.
func RulesPath(clusterID uint64) string {
// RulesPathPrefix returns the path prefix to save the placement rules.
func RulesPathPrefix(clusterID uint64) string {
return path.Join(PDRootPath(clusterID), rulesPath)
}

// RuleGroupPath returns the path to save the placement rule groups.
func RuleGroupPath(clusterID uint64) string {
// RuleGroupPathPrefix returns the path prefix to save the placement rule groups.
func RuleGroupPathPrefix(clusterID uint64) string {
return path.Join(PDRootPath(clusterID), ruleGroupPath)
}

// RegionLabelPath returns the path to save the region label.
func RegionLabelPath(clusterID uint64) string {
// RegionLabelPathPrefix returns the path prefix to save the region label.
func RegionLabelPathPrefix(clusterID uint64) string {
return path.Join(PDRootPath(clusterID), regionLabelPath)
}

Expand Down

0 comments on commit 2f49527

Please sign in to comment.