Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: fix rule sync when meet "no rule left" and concurrency #7481

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 62 additions & 78 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ type Watcher struct {
cancel context.CancelFunc
wg sync.WaitGroup

// ruleCommonPathPrefix:
// - Key: /pd/{cluster_id}/rule
// - Value: placement.Rule or placement.RuleGroup
ruleCommonPathPrefix string
// rulesPathPrefix:
// - Key: /pd/{cluster_id}/rules/{group_id}-{rule_id}
// - Value: placement.Rule
Expand All @@ -61,7 +65,6 @@ type Watcher struct {
regionLabeler *labeler.RegionLabeler

ruleWatcher *etcdutil.LoopWatcher
rleungx marked this conversation as resolved.
Show resolved Hide resolved
groupWatcher *etcdutil.LoopWatcher
labelWatcher *etcdutil.LoopWatcher

// pendingDeletion is a structure used to track the rules or rule groups that are marked for deletion.
Expand Down Expand Up @@ -109,10 +112,6 @@ func NewWatcher(
if err != nil {
return nil, err
}
err = rw.initializeGroupWatcher()
if err != nil {
return nil, err
}
err = rw.initializeRegionLabelWatcher()
if err != nil {
return nil, err
Expand All @@ -121,101 +120,87 @@ func NewWatcher(
}

func (rw *Watcher) initializeRuleWatcher() error {
prefixToTrim := rw.rulesPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
rule, err := placement.NewRuleFromJSON(kv.Value)
if err != nil {
return err
}
// Update the suspect key ranges in the checker.
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
if oldRule := rw.ruleManager.GetRule(rule.GroupID, rule.ID); oldRule != nil {
rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey)
}
err = rw.ruleManager.SetRule(rule)
err := func() error {
if strings.HasPrefix(string(kv.Key), rw.rulesPathPrefix) {
log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
rule, err := placement.NewRuleFromJSON(kv.Value)
if err != nil {
return err
}
// Update the suspect key ranges in the checker.
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
if oldRule := rw.ruleManager.GetRule(rule.GroupID, rule.ID); oldRule != nil {
rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey)
}
return rw.ruleManager.SetRule(rule)
} else if strings.HasPrefix(string(kv.Key), rw.ruleGroupPathPrefix) {
log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value)
if err != nil {
return err
}
// Add all rule key ranges within the group to the suspect key ranges.
for _, rule := range rw.ruleManager.GetRulesByGroup(ruleGroup.ID) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return rw.ruleManager.SetRuleGroup(ruleGroup)
} else {
log.Warn("unknown key when update placement rule", zap.String("key", string(kv.Key)))
return nil
}
}()
if err == nil && rw.hasPendingDeletion() {
rw.tryFinishPendingDeletion()
}
return err
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
log.Info("delete placement rule", zap.String("key", key))
ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, prefixToTrim))
if err != nil {
return err
groupID, ruleID, err := func() (string, string, error) {
if strings.HasPrefix(string(kv.Key), rw.rulesPathPrefix) {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
log.Info("delete placement rule", zap.String("key", key))
ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, rw.rulesPathPrefix+"/"))
if err != nil {
return "", "", err
}
rule, err := placement.NewRuleFromJSON([]byte(ruleJSON))
if err != nil {
return "", "", err
}
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
return rule.GroupID, rule.ID, rw.ruleManager.DeleteRule(rule.GroupID, rule.ID)
} else if strings.HasPrefix(string(kv.Key), rw.ruleGroupPathPrefix) {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
log.Info("delete placement rule group", zap.String("key", key))
trimmedKey := strings.TrimPrefix(key, rw.ruleGroupPathPrefix+"/")
for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return trimmedKey, "", rw.ruleManager.DeleteRuleGroup(trimmedKey)
} else {
log.Warn("unknown key when delete placement rule", zap.String("key", string(kv.Key)))
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
return "", "", nil
}
}()
if err != nil && strings.Contains(err.Error(), "no rule left") && groupID != "" {
rw.addPendingDeletion(key, groupID, ruleID)
}
rule, err := placement.NewRuleFromJSON([]byte(ruleJSON))
if err != nil {
return err
}
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
err = rw.ruleManager.DeleteRule(rule.GroupID, rule.ID)
if err != nil && strings.Contains(err.Error(), "no rule left") {
rw.addPendingDeletion(key, rule.GroupID, rule.ID)
}
return err
return nil
}
postEventFn := func() error {
return nil
}
rw.ruleWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-rule-watcher", rw.rulesPathPrefix,
"scheduling-rule-watcher", rw.ruleCommonPathPrefix,
putFn, deleteFn, postEventFn,
clientv3.WithPrefix(),
)
rw.ruleWatcher.StartWatchLoop()
return rw.ruleWatcher.WaitLoad()
}

func (rw *Watcher) initializeGroupWatcher() error {
prefixToTrim := rw.ruleGroupPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value)
if err != nil {
return err
}
// Add all rule key ranges within the group to the suspect key ranges.
for _, rule := range rw.ruleManager.GetRulesByGroup(ruleGroup.ID) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
err = rw.ruleManager.SetRuleGroup(ruleGroup)
if err == nil && rw.hasPendingDeletion() {
rw.tryFinishPendingDeletion()
}
return err
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
log.Info("delete placement rule group", zap.String("key", key))
trimmedKey := strings.TrimPrefix(key, prefixToTrim)
for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
err := rw.ruleManager.DeleteRuleGroup(trimmedKey)
if err != nil && strings.Contains(err.Error(), "no rule left") {
rw.addPendingDeletion(key, trimmedKey, "")
}
return err
}
postEventFn := func() error {
return nil
}
rw.groupWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-rule-group-watcher", rw.ruleGroupPathPrefix,
putFn, deleteFn, postEventFn,
clientv3.WithPrefix(),
)
rw.groupWatcher.StartWatchLoop()
return rw.groupWatcher.WaitLoad()
}

func (rw *Watcher) initializeRegionLabelWatcher() error {
prefixToTrim := rw.regionLabelPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
Expand Down Expand Up @@ -283,7 +268,6 @@ func (rw *Watcher) tryFinishPendingDeletion() {
// We need to force load the rules and rule groups to make sure sync with etcd.
if len(rw.pendingDeletion.kvs) != originLen {
rw.ruleWatcher.ForceLoad()
rw.groupWatcher.ForceLoad()
log.Info("force load rules", zap.Int("pending deletion", len(rw.pendingDeletion.kvs)), zap.Int("origin", originLen))
}
}
6 changes: 6 additions & 0 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
serviceMiddlewarePath = "service_middleware"
schedulePath = "schedule"
gcPath = "gc"
ruleCommonPath = "rule"
rulesPath = "rules"
ruleGroupPath = "rule_group"
regionLabelPath = "region_label"
Expand Down Expand Up @@ -102,6 +103,11 @@ func RulesPathPrefix(clusterID uint64) string {
return path.Join(PDRootPath(clusterID), rulesPath)
}

// RuleCommonPathPrefix returns the path prefix to save the placement rule common config.
func RuleCommonPathPrefix(clusterID uint64) string {
return path.Join(PDRootPath(clusterID), ruleCommonPath)
}

// RuleGroupPathPrefix returns the path prefix to save the placement rule groups.
func RuleGroupPathPrefix(clusterID uint64) string {
return path.Join(PDRootPath(clusterID), ruleGroupPath)
Expand Down
4 changes: 1 addition & 3 deletions tests/server/api/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,8 +1097,7 @@ func (suite *ruleTestSuite) TestConcurrency() {
},
}
env := tests.NewSchedulingTestEnvironment(suite.T(), opts...)
// FIXME: enable this test in api mode
env.RunTestInPDMode(suite.checkConcurrency)
env.RunTestInTwoModes(suite.checkConcurrency)
}

func (suite *ruleTestSuite) checkConcurrency(cluster *tests.TestCluster) {
Expand All @@ -1122,7 +1121,6 @@ func (suite *ruleTestSuite) checkConcurrency(cluster *tests.TestCluster) {
},
)
// test concurrency of set rule with different id
// TODO: this part cannot run in api mode
suite.checkConcurrencyWith(cluster,
func(i int) []placement.GroupBundle {
return []placement.GroupBundle{
Expand Down
Loading