Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: fix rule sync when meet "no rule left" and concurrency #7481

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1330,5 +1330,5 @@ func checkRegionsReplicated(c *gin.Context) {
c.String(http.StatusBadRequest, err.Error())
return
}
c.String(http.StatusOK, state)
c.IndentedJSON(http.StatusOK, state)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for this modification? Do we need to modify the corresponding API interface for the non-API mode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid failure in JSON unmarshalling and maintain consistency with PD mode.

}
14 changes: 8 additions & 6 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@

func (cw *Watcher) initializeTTLConfigWatcher() error {
putFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)[len(sc.TTLConfigPrefix)+1:]
key := strings.TrimPrefix(string(kv.Key), sc.TTLConfigPrefix+"/")
value := string(kv.Value)
leaseID := kv.Lease
resp, err := cw.etcdClient.TimeToLive(cw.ctx, clientv3.LeaseID(leaseID))
Expand All @@ -166,7 +166,7 @@
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)[len(sc.TTLConfigPrefix)+1:]
key := strings.TrimPrefix(string(kv.Key), sc.TTLConfigPrefix+"/")
cw.ttl.PutWithTTL(key, nil, 0)
return nil
}
Expand All @@ -186,13 +186,14 @@
func (cw *Watcher) initializeSchedulerConfigWatcher() error {
prefixToTrim := cw.schedulerConfigPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
name := strings.TrimPrefix(string(kv.Key), prefixToTrim)
key := string(kv.Key)
name := strings.TrimPrefix(key, prefixToTrim)
log.Info("update scheduler config", zap.String("name", name),
zap.String("value", string(kv.Value)))
err := cw.storage.SaveSchedulerConfig(name, kv.Value)
if err != nil {
log.Warn("failed to save scheduler config",
zap.String("event-kv-key", string(kv.Key)),
zap.String("event-kv-key", key),

Check warning on line 196 in pkg/mcs/scheduling/server/config/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/config/watcher.go#L196

Added line #L196 was not covered by tests
zap.String("trimmed-key", name),
zap.Error(err))
return err
Expand All @@ -204,9 +205,10 @@
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
log.Info("remove scheduler config", zap.String("key", string(kv.Key)))
key := string(kv.Key)
log.Info("remove scheduler config", zap.String("key", key))
return cw.storage.RemoveSchedulerConfig(
strings.TrimPrefix(string(kv.Key), prefixToTrim),
strings.TrimPrefix(key, prefixToTrim),
)
}
postEventFn := func() error {
Expand Down
3 changes: 2 additions & 1 deletion pkg/mcs/scheduling/server/meta/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@
func (w *Watcher) initializeStoreWatcher() error {
putFn := func(kv *mvccpb.KeyValue) error {
store := &metapb.Store{}
key := string(kv.Key)
if err := proto.Unmarshal(kv.Value, store); err != nil {
log.Warn("failed to unmarshal store entry",
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
zap.String("event-kv-key", key), zap.Error(err))

Check warning on line 79 in pkg/mcs/scheduling/server/meta/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/meta/watcher.go#L79

Added line #L79 was not covered by tests
return err
}
origin := w.basicCluster.GetStore(store.GetId())
Expand Down
186 changes: 121 additions & 65 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
Expand All @@ -36,6 +37,10 @@
cancel context.CancelFunc
wg sync.WaitGroup

// ruleCommonPathPrefix:
// - Key: /pd/{cluster_id}/rule
// - Value: placement.Rule or placement.RuleGroup
ruleCommonPathPrefix string
// rulesPathPrefix:
// - Key: /pd/{cluster_id}/rules/{group_id}-{rule_id}
// - Value: placement.Rule
Expand All @@ -60,8 +65,18 @@
regionLabeler *labeler.RegionLabeler

ruleWatcher *etcdutil.LoopWatcher
rleungx marked this conversation as resolved.
Show resolved Hide resolved
groupWatcher *etcdutil.LoopWatcher
labelWatcher *etcdutil.LoopWatcher

// pendingDeletion is a structure used to track the rules or rule groups that are marked for deletion.
// If a rule or rule group cannot be deleted immediately due to the absence of rules,
// it will be held here and removed later when a new rule or rule group put event allows for its deletion.
pendingDeletion struct {
syncutil.RWMutex
// key: path, value: [groupID, ruleID]
// The map 'kvs' holds the rules or rule groups that are pending deletion.
// If a rule group needs to be deleted, the ruleID will be an empty string.
kvs map[string][2]string
}
}

// NewWatcher creates a new watcher to watch the Placement Rule change from PD API server.
Expand All @@ -79,22 +94,25 @@
ctx: ctx,
cancel: cancel,
rulesPathPrefix: endpoint.RulesPathPrefix(clusterID),
ruleCommonPathPrefix: endpoint.RuleCommonPathPrefix(clusterID),
ruleGroupPathPrefix: endpoint.RuleGroupPathPrefix(clusterID),
regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID),
etcdClient: etcdClient,
ruleStorage: ruleStorage,
checkerController: checkerController,
ruleManager: ruleManager,
regionLabeler: regionLabeler,
pendingDeletion: struct {
syncutil.RWMutex
kvs map[string][2]string
}{
kvs: make(map[string][2]string),
},
}
err := rw.initializeRuleWatcher()
if err != nil {
return nil, err
}
err = rw.initializeGroupWatcher()
if err != nil {
return nil, err
}
err = rw.initializeRegionLabelWatcher()
if err != nil {
return nil, err
Expand All @@ -103,89 +121,93 @@
}

func (rw *Watcher) initializeRuleWatcher() error {
prefixToTrim := rw.rulesPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
rule, err := placement.NewRuleFromJSON(kv.Value)
if err != nil {
return err
}
// Update the suspect key ranges in the checker.
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
if oldRule := rw.ruleManager.GetRule(rule.GroupID, rule.ID); oldRule != nil {
rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey)
err := func() error {
key := string(kv.Key)
if strings.HasPrefix(key, rw.rulesPathPrefix) {
log.Info("update placement rule", zap.String("key", key), zap.String("value", string(kv.Value)))
rule, err := placement.NewRuleFromJSON(kv.Value)
if err != nil {
return err

Check warning on line 131 in pkg/mcs/scheduling/server/rule/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/rule/watcher.go#L131

Added line #L131 was not covered by tests
}
// Update the suspect key ranges in the checker.
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
if oldRule := rw.ruleManager.GetRule(rule.GroupID, rule.ID); oldRule != nil {
rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey)
}
return rw.ruleManager.SetRule(rule)
} else if strings.HasPrefix(key, rw.ruleGroupPathPrefix) {
log.Info("update placement rule group", zap.String("key", key), zap.String("value", string(kv.Value)))
ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value)
if err != nil {
return err

Check warning on line 143 in pkg/mcs/scheduling/server/rule/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/rule/watcher.go#L143

Added line #L143 was not covered by tests
}
// Add all rule key ranges within the group to the suspect key ranges.
for _, rule := range rw.ruleManager.GetRulesByGroup(ruleGroup.ID) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return rw.ruleManager.SetRuleGroup(ruleGroup)
} else {
log.Warn("unknown key when update placement rule", zap.String("key", key))
return nil

Check warning on line 152 in pkg/mcs/scheduling/server/rule/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/rule/watcher.go#L150-L152

Added lines #L150 - L152 were not covered by tests
}
}()
if err == nil && rw.hasPendingDeletion() {
rw.tryFinishPendingDeletion()
}
return rw.ruleManager.SetRule(rule)
return err
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
log.Info("delete placement rule", zap.String("key", key))
ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, prefixToTrim))
if err != nil {
return err
}
rule, err := placement.NewRuleFromJSON([]byte(ruleJSON))
if err != nil {
return err
groupID, ruleID, err := func() (string, string, error) {
if strings.HasPrefix(key, rw.rulesPathPrefix) {
log.Info("delete placement rule", zap.String("key", key))
ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, rw.rulesPathPrefix+"/"))
if err != nil {
return "", "", err

Check warning on line 167 in pkg/mcs/scheduling/server/rule/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/rule/watcher.go#L167

Added line #L167 was not covered by tests
}
rule, err := placement.NewRuleFromJSON([]byte(ruleJSON))
if err != nil {
return "", "", err

Check warning on line 171 in pkg/mcs/scheduling/server/rule/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/rule/watcher.go#L171

Added line #L171 was not covered by tests
}
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
return rule.GroupID, rule.ID, rw.ruleManager.DeleteRule(rule.GroupID, rule.ID)
} else if strings.HasPrefix(key, rw.ruleGroupPathPrefix) {
log.Info("delete placement rule group", zap.String("key", key))
trimmedKey := strings.TrimPrefix(key, rw.ruleGroupPathPrefix+"/")
for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return trimmedKey, "", rw.ruleManager.DeleteRuleGroup(trimmedKey)
} else {
log.Warn("unknown key when delete placement rule", zap.String("key", key))
return "", "", nil

Check warning on line 184 in pkg/mcs/scheduling/server/rule/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/rule/watcher.go#L182-L184

Added lines #L182 - L184 were not covered by tests
}
}()
if err != nil && strings.Contains(err.Error(), "no rule left") && groupID != "" {
rw.addPendingDeletion(key, groupID, ruleID)
}
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
return rw.ruleManager.DeleteRule(rule.GroupID, rule.ID)
return err
}
postEventFn := func() error {
return nil
}
rw.ruleWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-rule-watcher", rw.rulesPathPrefix,
"scheduling-rule-watcher", rw.ruleCommonPathPrefix,
putFn, deleteFn, postEventFn,
clientv3.WithPrefix(),
)
rw.ruleWatcher.StartWatchLoop()
return rw.ruleWatcher.WaitLoad()
}

func (rw *Watcher) initializeGroupWatcher() error {
prefixToTrim := rw.ruleGroupPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value)
if err != nil {
return err
}
// Add all rule key ranges within the group to the suspect key ranges.
for _, rule := range rw.ruleManager.GetRulesByGroup(ruleGroup.ID) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return rw.ruleManager.SetRuleGroup(ruleGroup)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
log.Info("delete placement rule group", zap.String("key", key))
trimmedKey := strings.TrimPrefix(key, prefixToTrim)
for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return rw.ruleManager.DeleteRuleGroup(trimmedKey)
}
postEventFn := func() error {
return nil
}
rw.groupWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-rule-group-watcher", rw.ruleGroupPathPrefix,
putFn, deleteFn, postEventFn,
clientv3.WithPrefix(),
)
rw.groupWatcher.StartWatchLoop()
return rw.groupWatcher.WaitLoad()
}

func (rw *Watcher) initializeRegionLabelWatcher() error {
prefixToTrim := rw.regionLabelPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
key := string(kv.Key)
log.Info("update region label rule", zap.String("key", key), zap.String("value", string(kv.Value)))
rule, err := labeler.NewLabelRuleFromJSON(kv.Value)
if err != nil {
return err
Expand Down Expand Up @@ -216,3 +238,37 @@
rw.cancel()
rw.wg.Wait()
}

func (rw *Watcher) hasPendingDeletion() bool {
rw.pendingDeletion.RLock()
defer rw.pendingDeletion.RUnlock()
return len(rw.pendingDeletion.kvs) > 0
}

func (rw *Watcher) addPendingDeletion(path, groupID, ruleID string) {
rw.pendingDeletion.Lock()
defer rw.pendingDeletion.Unlock()
rw.pendingDeletion.kvs[path] = [2]string{groupID, ruleID}
}

func (rw *Watcher) tryFinishPendingDeletion() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am worried about if the put and delete can be disordered. If so, the newly added rule might be deleted unexpectedly.

rw.pendingDeletion.Lock()
defer rw.pendingDeletion.Unlock()
previousLen := len(rw.pendingDeletion.kvs)
for k, v := range rw.pendingDeletion.kvs {
groupID, ruleID := v[0], v[1]
var err error
if ruleID == "" {
err = rw.ruleManager.DeleteRuleGroup(groupID)

Check warning on line 262 in pkg/mcs/scheduling/server/rule/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/rule/watcher.go#L262

Added line #L262 was not covered by tests
} else {
err = rw.ruleManager.DeleteRule(groupID, ruleID)
}
if err == nil {
delete(rw.pendingDeletion.kvs, k)
}
}
// TODO: If the length of the map is changed, it means that some rules or rule groups have been deleted.
// We need to compare the rules and rule groups to make sure sync with etcd,
// rather than just force load all the rules and rule groups.
log.Info("clean pending deletion", zap.Int("current", len(rw.pendingDeletion.kvs)), zap.Int("previous", previousLen))
}
8 changes: 4 additions & 4 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.Region
if c.isDownPeer(region, peer) {
if c.isStoreDownTimeHitMaxDownTime(peer.GetStoreId()) {
ruleCheckerReplaceDownCounter.Inc()
return c.replaceUnexpectRulePeer(region, rf, fit, peer, downStatus)
return c.replaceUnexpectedRulePeer(region, rf, fit, peer, downStatus)
}
// When witness placement rule is enabled, promotes the witness to voter when region has down voter.
if c.isWitnessEnabled() && core.IsVoter(peer) {
Expand All @@ -211,7 +211,7 @@ func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.Region
}
if c.isOfflinePeer(peer) {
ruleCheckerReplaceOfflineCounter.Inc()
return c.replaceUnexpectRulePeer(region, rf, fit, peer, offlineStatus)
return c.replaceUnexpectedRulePeer(region, rf, fit, peer, offlineStatus)
}
}
// fix loose matched peers.
Expand Down Expand Up @@ -246,7 +246,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region
continue
}
ruleCheckerNoStoreThenTryReplace.Inc()
op, err := c.replaceUnexpectRulePeer(region, oldPeerRuleFit, fit, p, "swap-fit")
op, err := c.replaceUnexpectedRulePeer(region, oldPeerRuleFit, fit, p, "swap-fit")
if err != nil {
return nil, err
}
Expand All @@ -267,7 +267,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region
}

// The peer's store may in Offline or Down, need to be replace.
func (c *RuleChecker) replaceUnexpectRulePeer(region *core.RegionInfo, rf *placement.RuleFit, fit *placement.RegionFit, peer *metapb.Peer, status string) (*operator.Operator, error) {
func (c *RuleChecker) replaceUnexpectedRulePeer(region *core.RegionInfo, rf *placement.RuleFit, fit *placement.RegionFit, peer *metapb.Peer, status string) (*operator.Operator, error) {
var fastFailover bool
// If the store to which the original peer belongs is TiFlash, the new peer cannot be set to witness, nor can it perform fast failover
if c.isWitnessEnabled() && !c.cluster.GetStore(peer.StoreId).IsTiFlash() {
Expand Down
7 changes: 4 additions & 3 deletions pkg/schedule/schedulers/shuffle_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,18 +139,19 @@ func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster sche.SchedulerCluste
pendingFilter := filter.NewRegionPendingFilter()
downFilter := filter.NewRegionDownFilter()
replicaFilter := filter.NewRegionReplicatedFilter(cluster)
ranges := s.conf.GetRanges()
for _, source := range candidates.Stores {
var region *core.RegionInfo
if s.conf.IsRoleAllow(roleFollower) {
region = filter.SelectOneRegion(cluster.RandFollowerRegions(source.GetID(), s.conf.Ranges), nil,
region = filter.SelectOneRegion(cluster.RandFollowerRegions(source.GetID(), ranges), nil,
pendingFilter, downFilter, replicaFilter)
}
if region == nil && s.conf.IsRoleAllow(roleLeader) {
region = filter.SelectOneRegion(cluster.RandLeaderRegions(source.GetID(), s.conf.Ranges), nil,
region = filter.SelectOneRegion(cluster.RandLeaderRegions(source.GetID(), ranges), nil,
pendingFilter, downFilter, replicaFilter)
}
if region == nil && s.conf.IsRoleAllow(roleLearner) {
region = filter.SelectOneRegion(cluster.RandLearnerRegions(source.GetID(), s.conf.Ranges), nil,
region = filter.SelectOneRegion(cluster.RandLearnerRegions(source.GetID(), ranges), nil,
pendingFilter, downFilter, replicaFilter)
}
if region != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/schedule/schedulers/shuffle_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ func (conf *shuffleRegionSchedulerConfig) GetRoles() []string {
func (conf *shuffleRegionSchedulerConfig) GetRanges() []core.KeyRange {
conf.RLock()
defer conf.RUnlock()
return conf.Ranges
ranges := make([]core.KeyRange, len(conf.Ranges))
copy(ranges, conf.Ranges)
return ranges
}

func (conf *shuffleRegionSchedulerConfig) IsRoleAllow(role string) bool {
Expand Down
Loading
Loading