Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: watch rule change with txn #7550

Merged
merged 11 commits into from
Dec 20, 2023
21 changes: 21 additions & 0 deletions pkg/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,3 +309,24 @@
EndKey: []byte(endKey),
}
}

// KeyRanges is a slice of KeyRange.
type KeyRanges struct {
krs []*KeyRange
}

// Append appends a KeyRange.
func (rs *KeyRanges) Append(startKey, endKey []byte) {
rs.krs = append(rs.krs, &KeyRange{
StartKey: startKey,
EndKey: endKey,
})
}

// Ranges returns the slice of KeyRange.
func (rs *KeyRanges) Ranges() []*KeyRange {
if rs == nil {
return nil

Check warning on line 329 in pkg/core/basic_cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/basic_cluster.go#L329

Added line #L329 was not covered by tests
}
return rs.krs
}
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1330,5 +1330,5 @@ func checkRegionsReplicated(c *gin.Context) {
c.String(http.StatusBadRequest, err.Error())
return
}
c.String(http.StatusOK, state)
c.IndentedJSON(http.StatusOK, state)
}
154 changes: 92 additions & 62 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"sync"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/schedule/checker"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/placement"
Expand All @@ -36,6 +37,10 @@
cancel context.CancelFunc
wg sync.WaitGroup

// ruleCommonPathPrefix:
// - Key: /pd/{cluster_id}/rule
// - Value: placement.Rule or placement.RuleGroup
ruleCommonPathPrefix string
// rulesPathPrefix:
// - Key: /pd/{cluster_id}/rules/{group_id}-{rule_id}
// - Value: placement.Rule
Expand All @@ -60,8 +65,10 @@
regionLabeler *labeler.RegionLabeler

ruleWatcher *etcdutil.LoopWatcher
groupWatcher *etcdutil.LoopWatcher
labelWatcher *etcdutil.LoopWatcher

// patch is used to cache the placement rule changes.
patch *placement.RuleConfigPatch
}

// NewWatcher creates a new watcher to watch the Placement Rule change from PD API server.
Expand All @@ -79,6 +86,7 @@
ctx: ctx,
cancel: cancel,
rulesPathPrefix: endpoint.RulesPathPrefix(clusterID),
ruleCommonPathPrefix: endpoint.RuleCommonPathPrefix(clusterID),
ruleGroupPathPrefix: endpoint.RuleGroupPathPrefix(clusterID),
regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID),
etcdClient: etcdClient,
Expand All @@ -91,10 +99,6 @@
if err != nil {
return nil, err
}
err = rw.initializeGroupWatcher()
if err != nil {
return nil, err
}
err = rw.initializeRegionLabelWatcher()
if err != nil {
return nil, err
Expand All @@ -103,83 +107,109 @@
}

func (rw *Watcher) initializeRuleWatcher() error {
prefixToTrim := rw.rulesPathPrefix + "/"
var suspectKeyRanges *core.KeyRanges

preEventsFn := func(events []*clientv3.Event) error {
// It will be locked until the postFn is finished.
rw.ruleManager.Lock()
rw.patch = rw.ruleManager.BeginPatch()
suspectKeyRanges = &core.KeyRanges{}
return nil
}

putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
rule, err := placement.NewRuleFromJSON(kv.Value)
if err != nil {
return err
}
// Update the suspect key ranges in the checker.
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
if oldRule := rw.ruleManager.GetRule(rule.GroupID, rule.ID); oldRule != nil {
rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey)
key := string(kv.Key)
if strings.HasPrefix(key, rw.rulesPathPrefix) {
log.Info("update placement rule", zap.String("key", key), zap.String("value", string(kv.Value)))
rule, err := placement.NewRuleFromJSON(kv.Value)
if err != nil {
return err

Check warning on line 126 in pkg/mcs/scheduling/server/rule/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/rule/watcher.go#L126

Added line #L126 was not covered by tests
}
// Try to add the rule change to the patch.
if err := rw.ruleManager.AdjustRule(rule, ""); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to parser the group ID and use it for this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pr has used rule.GroupID in value

return err

Check warning on line 130 in pkg/mcs/scheduling/server/rule/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/rule/watcher.go#L130

Added line #L130 was not covered by tests
}
rw.patch.SetRule(rule)
// Update the suspect key ranges in lock.
suspectKeyRanges.Append(rule.StartKey, rule.EndKey)
if oldRule := rw.ruleManager.GetRuleLocked(rule.GroupID, rule.ID); oldRule != nil {
suspectKeyRanges.Append(oldRule.StartKey, oldRule.EndKey)
}
return nil
} else if strings.HasPrefix(key, rw.ruleGroupPathPrefix) {
log.Info("update placement rule group", zap.String("key", key), zap.String("value", string(kv.Value)))
ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value)
if err != nil {
return err

Check warning on line 143 in pkg/mcs/scheduling/server/rule/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/rule/watcher.go#L143

Added line #L143 was not covered by tests
}
// Try to add the rule group change to the patch.
rw.patch.SetGroup(ruleGroup)
// Update the suspect key ranges
for _, rule := range rw.ruleManager.GetRulesByGroupLocked(ruleGroup.ID) {
suspectKeyRanges.Append(rule.StartKey, rule.EndKey)
}
return nil
} else {
log.Warn("unknown key when updating placement rule", zap.String("key", key))
return nil

Check warning on line 154 in pkg/mcs/scheduling/server/rule/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/rule/watcher.go#L152-L154

Added lines #L152 - L154 were not covered by tests
}
return rw.ruleManager.SetRule(rule)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
log.Info("delete placement rule", zap.String("key", key))
ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, prefixToTrim))
if err != nil {
if strings.HasPrefix(key, rw.rulesPathPrefix) {
log.Info("delete placement rule", zap.String("key", key))
ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, rw.rulesPathPrefix+"/"))
if err != nil {
return err

Check warning on line 163 in pkg/mcs/scheduling/server/rule/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/rule/watcher.go#L163

Added line #L163 was not covered by tests
}
rule, err := placement.NewRuleFromJSON([]byte(ruleJSON))
if err != nil {
return err

Check warning on line 167 in pkg/mcs/scheduling/server/rule/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/rule/watcher.go#L167

Added line #L167 was not covered by tests
}
// Try to add the rule change to the patch.
rw.patch.DeleteRule(rule.GroupID, rule.ID)
// Update the suspect key ranges
suspectKeyRanges.Append(rule.StartKey, rule.EndKey)
return err
} else if strings.HasPrefix(key, rw.ruleGroupPathPrefix) {
log.Info("delete placement rule group", zap.String("key", key))
trimmedKey := strings.TrimPrefix(key, rw.ruleGroupPathPrefix+"/")
// Try to add the rule group change to the patch.
rw.patch.DeleteGroup(trimmedKey)
// Update the suspect key ranges
for _, rule := range rw.ruleManager.GetRulesByGroupLocked(trimmedKey) {
suspectKeyRanges.Append(rule.StartKey, rule.EndKey)
}
return nil
} else {
log.Warn("unknown key when deleting placement rule", zap.String("key", key))
return nil

Check warning on line 186 in pkg/mcs/scheduling/server/rule/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/rule/watcher.go#L184-L186

Added lines #L184 - L186 were not covered by tests
}
rule, err := placement.NewRuleFromJSON([]byte(ruleJSON))
if err != nil {
}
postEventsFn := func(events []*clientv3.Event) error {
defer rw.ruleManager.Unlock()
if err := rw.ruleManager.TryCommitPatch(rw.patch); err != nil {
log.Error("failed to commit patch", zap.Error(err))
return err
}
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
return rw.ruleManager.DeleteRule(rule.GroupID, rule.ID)
for _, kr := range suspectKeyRanges.Ranges() {
rw.checkerController.AddSuspectKeyRange(kr.StartKey, kr.EndKey)
}
return nil
}
rw.ruleWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-rule-watcher", rw.rulesPathPrefix,
func([]*clientv3.Event) error { return nil },
"scheduling-rule-watcher", rw.ruleCommonPathPrefix,
preEventsFn,
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
postEventsFn,
clientv3.WithPrefix(),
)
rw.ruleWatcher.StartWatchLoop()
return rw.ruleWatcher.WaitLoad()
}

func (rw *Watcher) initializeGroupWatcher() error {
prefixToTrim := rw.ruleGroupPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value)
if err != nil {
return err
}
// Add all rule key ranges within the group to the suspect key ranges.
for _, rule := range rw.ruleManager.GetRulesByGroup(ruleGroup.ID) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return rw.ruleManager.SetRuleGroup(ruleGroup)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
log.Info("delete placement rule group", zap.String("key", key))
trimmedKey := strings.TrimPrefix(key, prefixToTrim)
for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return rw.ruleManager.DeleteRuleGroup(trimmedKey)
}
rw.groupWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-rule-group-watcher", rw.ruleGroupPathPrefix,
func([]*clientv3.Event) error { return nil },
putFn, deleteFn,
func([]*clientv3.Event) error { return nil },
clientv3.WithPrefix(),
)
rw.groupWatcher.StartWatchLoop()
return rw.groupWatcher.WaitLoad()
}

func (rw *Watcher) initializeRegionLabelWatcher() error {
prefixToTrim := rw.regionLabelPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
Expand Down
32 changes: 18 additions & 14 deletions pkg/schedule/placement/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,28 +79,30 @@ func (c *ruleConfig) getGroup(id string) *RuleGroup {
return &RuleGroup{ID: id}
}

func (c *ruleConfig) beginPatch() *ruleConfigPatch {
return &ruleConfigPatch{
func (c *ruleConfig) beginPatch() *RuleConfigPatch {
return &RuleConfigPatch{
c: c,
mut: newRuleConfig(),
}
}

// A helper data structure to update ruleConfig.
type ruleConfigPatch struct {
// RuleConfigPatch is a helper data structure to update ruleConfig.
type RuleConfigPatch struct {
c *ruleConfig // original configuration to be updated
mut *ruleConfig // record all to-commit rules and groups
}

func (p *ruleConfigPatch) setRule(r *Rule) {
// SetRule sets a rule to the patch.
func (p *RuleConfigPatch) SetRule(r *Rule) {
p.mut.rules[r.Key()] = r
}

func (p *ruleConfigPatch) deleteRule(group, id string) {
// DeleteRule deletes a rule from the patch.
func (p *RuleConfigPatch) DeleteRule(group, id string) {
p.mut.rules[[2]string{group, id}] = nil
}

func (p *ruleConfigPatch) getGroup(id string) *RuleGroup {
func (p *RuleConfigPatch) getGroup(id string) *RuleGroup {
if g, ok := p.mut.groups[id]; ok {
return g
}
Expand All @@ -110,15 +112,17 @@ func (p *ruleConfigPatch) getGroup(id string) *RuleGroup {
return &RuleGroup{ID: id}
}

func (p *ruleConfigPatch) setGroup(g *RuleGroup) {
// SetGroup sets a group to the patch.
func (p *RuleConfigPatch) SetGroup(g *RuleGroup) {
p.mut.groups[g.ID] = g
}

func (p *ruleConfigPatch) deleteGroup(id string) {
p.setGroup(&RuleGroup{ID: id})
// DeleteGroup deletes a group from the patch.
func (p *RuleConfigPatch) DeleteGroup(id string) {
p.SetGroup(&RuleGroup{ID: id})
}

func (p *ruleConfigPatch) iterateRules(f func(*Rule)) {
func (p *RuleConfigPatch) iterateRules(f func(*Rule)) {
for _, r := range p.mut.rules {
if r != nil { // nil means delete.
f(r)
Expand All @@ -131,13 +135,13 @@ func (p *ruleConfigPatch) iterateRules(f func(*Rule)) {
}
}

func (p *ruleConfigPatch) adjust() {
func (p *RuleConfigPatch) adjust() {
// setup rule.group for `buildRuleList` use.
p.iterateRules(func(r *Rule) { r.group = p.getGroup(r.GroupID) })
}

// trim unnecessary updates. For example, remove a rule then insert the same rule.
func (p *ruleConfigPatch) trim() {
func (p *RuleConfigPatch) trim() {
for key, rule := range p.mut.rules {
if jsonEquals(rule, p.c.getRule(key)) {
delete(p.mut.rules, key)
Expand All @@ -151,7 +155,7 @@ func (p *ruleConfigPatch) trim() {
}

// merge all mutations to ruleConfig.
func (p *ruleConfigPatch) commit() {
func (p *RuleConfigPatch) commit() {
for key, rule := range p.mut.rules {
if rule == nil {
delete(p.c.rules, key)
Expand Down
40 changes: 20 additions & 20 deletions pkg/schedule/placement/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,40 +30,40 @@ func TestTrim(t *testing.T) {
rc.setGroup(&RuleGroup{ID: "g2", Index: 2})

testCases := []struct {
ops func(p *ruleConfigPatch)
ops func(p *RuleConfigPatch)
mutRules map[[2]string]*Rule
mutGroups map[string]*RuleGroup
}{
{
func(p *ruleConfigPatch) {
p.setRule(&Rule{GroupID: "g1", ID: "id1", Index: 100})
p.setRule(&Rule{GroupID: "g1", ID: "id2"})
p.setGroup(&RuleGroup{ID: "g1", Index: 100})
p.setGroup(&RuleGroup{ID: "g2", Index: 2})
func(p *RuleConfigPatch) {
p.SetRule(&Rule{GroupID: "g1", ID: "id1", Index: 100})
p.SetRule(&Rule{GroupID: "g1", ID: "id2"})
p.SetGroup(&RuleGroup{ID: "g1", Index: 100})
p.SetGroup(&RuleGroup{ID: "g2", Index: 2})
},
map[[2]string]*Rule{{"g1", "id1"}: {GroupID: "g1", ID: "id1", Index: 100}},
map[string]*RuleGroup{"g1": {ID: "g1", Index: 100}},
},
{
func(p *ruleConfigPatch) {
p.deleteRule("g1", "id1")
p.deleteGroup("g2")
p.deleteRule("g3", "id3")
p.deleteGroup("g3")
func(p *RuleConfigPatch) {
p.DeleteRule("g1", "id1")
p.DeleteGroup("g2")
p.DeleteRule("g3", "id3")
p.DeleteGroup("g3")
},
map[[2]string]*Rule{{"g1", "id1"}: nil},
map[string]*RuleGroup{"g2": {ID: "g2"}},
},
{
func(p *ruleConfigPatch) {
p.setRule(&Rule{GroupID: "g1", ID: "id2", Index: 200})
p.setRule(&Rule{GroupID: "g1", ID: "id2"})
p.setRule(&Rule{GroupID: "g3", ID: "id3"})
p.deleteRule("g3", "id3")
p.setGroup(&RuleGroup{ID: "g1", Index: 100})
p.setGroup(&RuleGroup{ID: "g1", Index: 1})
p.setGroup(&RuleGroup{ID: "g3", Index: 3})
p.deleteGroup("g3")
func(p *RuleConfigPatch) {
p.SetRule(&Rule{GroupID: "g1", ID: "id2", Index: 200})
p.SetRule(&Rule{GroupID: "g1", ID: "id2"})
p.SetRule(&Rule{GroupID: "g3", ID: "id3"})
p.DeleteRule("g3", "id3")
p.SetGroup(&RuleGroup{ID: "g1", Index: 100})
p.SetGroup(&RuleGroup{ID: "g1", Index: 1})
p.SetGroup(&RuleGroup{ID: "g3", Index: 3})
p.DeleteGroup("g3")
},
map[[2]string]*Rule{},
map[string]*RuleGroup{},
Expand Down
Loading
Loading