Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduling/watcher, storage: integrate rule watcher with the managers #7213

Merged
merged 6 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 62 additions & 21 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"sync"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/schedule/checker"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
Expand Down Expand Up @@ -49,18 +52,27 @@ type Watcher struct {
etcdClient *clientv3.Client
ruleStorage endpoint.RuleStorage

// checkerController is used to add the suspect key ranges to the checker when the rule changed.
checkerController *checker.Controller
// ruleManager is used to manage the placement rules.
ruleManager *placement.RuleManager
// regionLabeler is used to manage the region label rules.
regionLabeler *labeler.RegionLabeler

ruleWatcher *etcdutil.LoopWatcher
groupWatcher *etcdutil.LoopWatcher
labelWatcher *etcdutil.LoopWatcher
}

// NewWatcher creates a new watcher to watch the Placement Rule change from PD API server.
// Please use `GetRuleStorage` to get the underlying storage to access the Placement Rules.
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
clusterID uint64,
ruleStorage endpoint.RuleStorage,
checkerController *checker.Controller,
ruleManager *placement.RuleManager,
regionLabeler *labeler.RegionLabeler,
) (*Watcher, error) {
ctx, cancel := context.WithCancel(ctx)
rw := &Watcher{
Expand All @@ -71,6 +83,9 @@ func NewWatcher(
regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID),
etcdClient: etcdClient,
ruleStorage: ruleStorage,
checkerController: checkerController,
ruleManager: ruleManager,
regionLabeler: regionLabeler,
}
err := rw.initializeRuleWatcher()
if err != nil {
Expand All @@ -90,17 +105,31 @@ func NewWatcher(
func (rw *Watcher) initializeRuleWatcher() error {
prefixToTrim := rw.rulesPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
// Since the PD API server will validate the rule before saving it to etcd,
// so we could directly save the string rule in JSON to the storage here.
log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
return rw.ruleStorage.SaveRuleJSON(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
rule, err := placement.NewRuleFromJSON(kv.Value)
if err != nil {
return err
}
// Update the suspect key ranges in the checker.
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
if oldRule := rw.ruleManager.GetRule(rule.GroupID, rule.ID); oldRule != nil {
rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey)
}
return rw.ruleManager.SetRule(rule)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("delete placement rule", zap.String("key", string(kv.Key)))
return rw.ruleStorage.DeleteRule(strings.TrimPrefix(string(kv.Key), prefixToTrim))
key := string(kv.Key)
log.Info("delete placement rule", zap.String("key", key))
ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, prefixToTrim))
if err != nil {
return err
}
rule, err := placement.NewRuleFromJSON([]byte(ruleJSON))
if err != nil {
return err
}
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
return rw.ruleManager.DeleteRule(rule.GroupID, rule.ID)
}
postEventFn := func() error {
return nil
Expand All @@ -120,14 +149,24 @@ func (rw *Watcher) initializeGroupWatcher() error {
prefixToTrim := rw.ruleGroupPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
return rw.ruleStorage.SaveRuleGroupJSON(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value)
if err != nil {
return err
}
// Add all rule key ranges within the group to the suspect key ranges.
for _, rule := range rw.ruleManager.GetRulesByGroup(ruleGroup.ID) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return rw.ruleManager.SetRuleGroup(ruleGroup)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("delete placement rule group", zap.String("key", string(kv.Key)))
return rw.ruleStorage.DeleteRuleGroup(strings.TrimPrefix(string(kv.Key), prefixToTrim))
key := string(kv.Key)
log.Info("delete placement rule group", zap.String("key", key))
trimmedKey := strings.TrimPrefix(key, prefixToTrim)
for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return rw.ruleManager.DeleteRuleGroup(trimmedKey)
}
postEventFn := func() error {
return nil
Expand All @@ -147,14 +186,16 @@ func (rw *Watcher) initializeRegionLabelWatcher() error {
prefixToTrim := rw.regionLabelPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
return rw.ruleStorage.SaveRegionRuleJSON(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
string(kv.Value),
)
rule, err := labeler.NewLabelRuleFromJSON(kv.Value)
if err != nil {
return err
}
return rw.regionLabeler.SetLabelRule(rule)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("delete region label rule", zap.String("key", string(kv.Key)))
return rw.ruleStorage.DeleteRegionRule(strings.TrimPrefix(string(kv.Key), prefixToTrim))
key := string(kv.Key)
log.Info("delete region label rule", zap.String("key", key))
return rw.regionLabeler.DeleteLabelRule(strings.TrimPrefix(key, prefixToTrim))
}
postEventFn := func() error {
return nil
Expand Down
17 changes: 14 additions & 3 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func (s *Server) startServer() (err error) {
func (s *Server) startCluster(context.Context) error {
s.basicCluster = core.NewBasicCluster()
s.storage = endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
err := s.startWatcher()
err := s.startMetaConfWatcher()
if err != nil {
return err
}
Expand All @@ -464,7 +464,13 @@ func (s *Server) startCluster(context.Context) error {
if err != nil {
return err
}
// Inject the cluster components into the config watcher after the scheduler controller is created.
s.configWatcher.SetSchedulersController(s.cluster.GetCoordinator().GetSchedulersController())
// Start the rule watcher after the cluster is created.
err = s.startRuleWatcher()
if err != nil {
return err
}
s.cluster.StartBackgroundJobs()
return nil
}
Expand All @@ -474,7 +480,7 @@ func (s *Server) stopCluster() {
s.stopWatcher()
}

func (s *Server) startWatcher() (err error) {
func (s *Server) startMetaConfWatcher() (err error) {
s.metaWatcher, err = meta.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.basicCluster)
if err != nil {
return err
Expand All @@ -483,7 +489,12 @@ func (s *Server) startWatcher() (err error) {
if err != nil {
return err
}
s.ruleWatcher, err = rule.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.storage)
return err
}

func (s *Server) startRuleWatcher() (err error) {
s.ruleWatcher, err = rule.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.storage,
s.cluster.GetCoordinator().GetCheckerController(), s.cluster.GetRuleManager(), s.cluster.GetRegionLabeler())
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/labeler/labeler.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (l *RegionLabeler) Patch(patch LabelRulePatch) error {
}
}

// update inmemory states.
// update in-memory states.
l.Lock()
defer l.Unlock()

Expand Down
6 changes: 4 additions & 2 deletions pkg/schedule/placement/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ func (m *RuleManager) Initialize(maxReplica int, locationLabels []string, isolat
}

func (m *RuleManager) loadRules() error {
var toSave []*Rule
var toDelete []string
var (
toSave []*Rule
toDelete []string
)
err := m.storage.LoadRules(func(k, v string) {
r, err := NewRuleFromJSON([]byte(v))
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/endpoint/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

// RuleStorage defines the storage operations on the rule.
type RuleStorage interface {
LoadRule(ruleKey string) (string, error)
LoadRules(f func(k, v string)) error
SaveRule(ruleKey string, rule interface{}) error
SaveRuleJSON(ruleKey, rule string) error
Expand Down Expand Up @@ -93,6 +94,11 @@ func (se *StorageEndpoint) DeleteRegionRule(ruleKey string) error {
return se.Remove(regionLabelKeyPath(ruleKey))
}

// LoadRule load a placement rule from storage.
func (se *StorageEndpoint) LoadRule(ruleKey string) (string, error) {
return se.Load(ruleKeyPath(ruleKey))
}

// LoadRules loads placement rules from storage.
func (se *StorageEndpoint) LoadRules(f func(k, v string)) error {
return se.loadRangeByPrefix(rulesPath+"/", f)
Expand Down
Loading
Loading