Skip to content

Commit

Permalink
mcs, schedule: use txn in label rule manager (#7738)
Browse files Browse the repository at this point in the history
ref #5839

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: Ryan Leung <rleungx@gmail.com>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Jan 22, 2024
1 parent d9789dc commit 6929c15
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 52 deletions.
59 changes: 39 additions & 20 deletions pkg/schedule/labeler/labeler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule/rangelist"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -88,11 +89,12 @@ func (l *RegionLabeler) checkAndClearExpiredLabels() {
continue
}
if len(rule.Labels) == 0 {
err = l.storage.DeleteRegionRule(key)
delete(l.labelRules, key)
deleted = true
err = l.DeleteLabelRuleLocked(key)
if err == nil {
deleted = true
}
} else {
err = l.storage.SaveRegionRule(key, rule)
err = l.SaveLabelRuleLocked(rule)
}
if err != nil {
log.Error("failed to save rule expired label rule", zap.String("rule-key", key), zap.Error(err))
Expand Down Expand Up @@ -123,8 +125,10 @@ func (l *RegionLabeler) loadRules() error {
if err != nil {
return err
}
for _, d := range toDelete {
if err = l.storage.DeleteRegionRule(d); err != nil {
for _, id := range toDelete {
if err := l.storage.RunInTxn(l.ctx, func(txn kv.Txn) error {
return l.storage.DeleteRegionRule(txn, id)
}); err != nil {
return err
}
}
Expand Down Expand Up @@ -197,11 +201,10 @@ func (l *RegionLabeler) getAndCheckRule(id string, now time.Time) *LabelRule {
return rule
}
if len(rule.Labels) == 0 {
l.storage.DeleteRegionRule(id)
delete(l.labelRules, id)
l.DeleteLabelRuleLocked(id)
return nil
}
l.storage.SaveRegionRule(id, rule)
l.SaveLabelRuleLocked(rule)
return rule
}

Expand All @@ -221,17 +224,28 @@ func (l *RegionLabeler) SetLabelRuleLocked(rule *LabelRule) error {
if err := rule.checkAndAdjust(); err != nil {
return err
}
if err := l.storage.SaveRegionRule(rule.ID, rule); err != nil {
if err := l.SaveLabelRuleLocked(rule); err != nil {
return err
}
l.labelRules[rule.ID] = rule
return nil
}

// SaveLabelRuleLocked inserts or updates a LabelRule but not buildRangeList.
// It only saves the rule to storage, and does not update the in-memory states.
func (l *RegionLabeler) SaveLabelRuleLocked(rule *LabelRule) error {
return l.storage.RunInTxn(l.ctx, func(txn kv.Txn) error {
return l.storage.SaveRegionRule(txn, rule.ID, rule)
})
}

// DeleteLabelRule removes a LabelRule.
func (l *RegionLabeler) DeleteLabelRule(id string) error {
l.Lock()
defer l.Unlock()
if _, ok := l.labelRules[id]; !ok {
return errs.ErrRegionRuleNotFound.FastGenByArgs(id)
}
if err := l.DeleteLabelRuleLocked(id); err != nil {
return err
}
Expand All @@ -241,10 +255,9 @@ func (l *RegionLabeler) DeleteLabelRule(id string) error {

// DeleteLabelRuleLocked removes a LabelRule but not buildRangeList.
func (l *RegionLabeler) DeleteLabelRuleLocked(id string) error {
if _, ok := l.labelRules[id]; !ok {
return errs.ErrRegionRuleNotFound.FastGenByArgs(id)
}
if err := l.storage.DeleteRegionRule(id); err != nil {
if err := l.storage.RunInTxn(l.ctx, func(txn kv.Txn) error {
return l.storage.DeleteRegionRule(txn, id)
}); err != nil {
return err
}
delete(l.labelRules, id)
Expand All @@ -260,15 +273,21 @@ func (l *RegionLabeler) Patch(patch LabelRulePatch) error {
}

// save to storage
var batch []func(kv.Txn) error
for _, key := range patch.DeleteRules {
if err := l.storage.DeleteRegionRule(key); err != nil {
return err
}
localKey := key
batch = append(batch, func(txn kv.Txn) error {
return l.storage.DeleteRegionRule(txn, localKey)
})
}
for _, rule := range patch.SetRules {
if err := l.storage.SaveRegionRule(rule.ID, rule); err != nil {
return err
}
localID, localRule := rule.ID, rule
batch = append(batch, func(txn kv.Txn) error {
return l.storage.SaveRegionRule(txn, localID, localRule)
})
}
if err := endpoint.RunBatchOpInTxn(l.ctx, l.storage, batch); err != nil {
return err
}

// update in-memory states.
Expand Down
44 changes: 44 additions & 0 deletions pkg/schedule/labeler/labeler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"sort"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -132,6 +133,49 @@ func TestGetSetRule(t *testing.T) {
for id, rule := range allRules {
expectSameRules(re, rule, rules[id+1])
}

for _, r := range rules {
labeler.DeleteLabelRule(r.ID)
}
re.Empty(labeler.GetAllLabelRules())

// test patch rules in batch
rulesNum := 200
patch.SetRules = patch.SetRules[:0]
patch.DeleteRules = patch.DeleteRules[:0]
for i := 1; i <= rulesNum; i++ {
patch.SetRules = append(patch.SetRules, &LabelRule{
ID: fmt.Sprintf("rule_%d", i),
Labels: []RegionLabel{
{Key: fmt.Sprintf("k_%d", i), Value: fmt.Sprintf("v_%d", i)},
},
RuleType: "key-range",
Data: MakeKeyRanges("", ""),
})
}
err = labeler.Patch(patch)
re.NoError(err)
allRules = labeler.GetAllLabelRules()
re.Len(allRules, rulesNum)
sort.Slice(allRules, func(i, j int) bool {
i1, err := strconv.Atoi(allRules[i].ID[5:])
re.NoError(err)
j1, err := strconv.Atoi(allRules[j].ID[5:])
re.NoError(err)
return i1 < j1
})
for id, rule := range allRules {
expectSameRules(re, rule, patch.SetRules[id])
}
patch.SetRules = patch.SetRules[:0]
patch.DeleteRules = patch.DeleteRules[:0]
for i := 1; i <= rulesNum; i++ {
patch.DeleteRules = append(patch.DeleteRules, fmt.Sprintf("rule_%d", i))
}
err = labeler.Patch(patch)
re.NoError(err)
allRules = labeler.GetAllLabelRules()
re.Empty(allRules)
}

func TestIndex(t *testing.T) {
Expand Down
27 changes: 2 additions & 25 deletions pkg/schedule/placement/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -202,7 +201,7 @@ func (m *RuleManager) loadRules() error {
return m.storage.DeleteRule(txn, localKey)
})
}
return m.runBatchOpInTxn(batch)
return endpoint.RunBatchOpInTxn(m.ctx, m.storage, batch)
}

func (m *RuleManager) loadGroups() error {
Expand Down Expand Up @@ -521,7 +520,7 @@ func (m *RuleManager) savePatch(p *ruleConfig) error {
})
}
}
return m.runBatchOpInTxn(batch)
return endpoint.RunBatchOpInTxn(m.ctx, m.storage, batch)
}

// SetRules inserts or updates lots of Rules at once.
Expand Down Expand Up @@ -814,28 +813,6 @@ func (m *RuleManager) IsInitialized() bool {
return m.initialized
}

func (m *RuleManager) runBatchOpInTxn(batch []func(kv.Txn) error) error {
// execute batch in transaction with limited operations per transaction
for start := 0; start < len(batch); start += etcdutil.MaxEtcdTxnOps {
end := start + etcdutil.MaxEtcdTxnOps
if end > len(batch) {
end = len(batch)
}
err := m.storage.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
for _, op := range batch[start:end] {
if err = op(txn); err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
}
return nil
}

// checkRule check the rule whether will have RuleFit after FitRegion
// in order to reduce the calculation.
func checkRule(rule *Rule, stores []*core.StoreInfo) bool {
Expand Down
15 changes: 8 additions & 7 deletions pkg/storage/endpoint/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,17 @@ type RuleStorage interface {
LoadRule(ruleKey string) (string, error)
LoadRules(f func(k, v string)) error
LoadRuleGroups(f func(k, v string)) error
LoadRegionRules(f func(k, v string)) error

// We need to use txn to avoid concurrent modification.
// And it is helpful for the scheduling server to watch the rule.
SaveRule(txn kv.Txn, ruleKey string, rule interface{}) error
DeleteRule(txn kv.Txn, ruleKey string) error
SaveRuleGroup(txn kv.Txn, groupID string, group interface{}) error
DeleteRuleGroup(txn kv.Txn, groupID string) error
SaveRegionRule(txn kv.Txn, ruleKey string, rule interface{}) error
DeleteRegionRule(txn kv.Txn, ruleKey string) error

LoadRegionRules(f func(k, v string)) error
SaveRegionRule(ruleKey string, rule interface{}) error
DeleteRegionRule(ruleKey string) error
RunInTxn(ctx context.Context, f func(txn kv.Txn) error) error
}

Expand Down Expand Up @@ -73,13 +74,13 @@ func (se *StorageEndpoint) LoadRegionRules(f func(k, v string)) error {
}

// SaveRegionRule saves a region rule to the storage.
func (se *StorageEndpoint) SaveRegionRule(ruleKey string, rule interface{}) error {
return se.saveJSON(regionLabelKeyPath(ruleKey), rule)
func (se *StorageEndpoint) SaveRegionRule(txn kv.Txn, ruleKey string, rule interface{}) error {
return saveJSONInTxn(txn, regionLabelKeyPath(ruleKey), rule)
}

// DeleteRegionRule removes a region rule from storage.
func (se *StorageEndpoint) DeleteRegionRule(ruleKey string) error {
return se.Remove(regionLabelKeyPath(ruleKey))
func (se *StorageEndpoint) DeleteRegionRule(txn kv.Txn, ruleKey string) error {
return txn.Remove(regionLabelKeyPath(ruleKey))
}

// LoadRule load a placement rule from storage.
Expand Down
30 changes: 30 additions & 0 deletions pkg/storage/endpoint/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
package endpoint

import (
"context"
"encoding/json"
"strings"

"github.com/gogo/protobuf/proto"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
)

Expand Down Expand Up @@ -74,3 +76,31 @@ func (se *StorageEndpoint) loadRangeByPrefix(prefix string, f func(k, v string))
nextKey = keys[len(keys)-1] + "\x00"
}
}

// TxnStorage is the interface with RunInTxn
type TxnStorage interface {
RunInTxn(ctx context.Context, f func(txn kv.Txn) error) error
}

// RunBatchOpInTxn runs a batch of operations in transaction.
// The batch is split into multiple transactions if it exceeds the maximum number of operations per transaction.
func RunBatchOpInTxn(ctx context.Context, storage TxnStorage, batch []func(kv.Txn) error) error {
for start := 0; start < len(batch); start += etcdutil.MaxEtcdTxnOps {
end := start + etcdutil.MaxEtcdTxnOps
if end > len(batch) {
end = len(batch)
}
err := storage.RunInTxn(ctx, func(txn kv.Txn) (err error) {
for _, op := range batch[start:end] {
if err = op(txn); err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
}
return nil
}

0 comments on commit 6929c15

Please sign in to comment.