Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: fix rule sync when meet "no rule left" and concurrency #7481

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1330,5 +1330,5 @@ func checkRegionsReplicated(c *gin.Context) {
c.String(http.StatusBadRequest, err.Error())
return
}
c.String(http.StatusOK, state)
c.IndentedJSON(http.StatusOK, state)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for this modification? Do we need to modify the corresponding API interface for the non-API mode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid failure in JSON unmarshalling and maintain consistency with PD mode.

}
79 changes: 75 additions & 4 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
Expand Down Expand Up @@ -62,6 +63,17 @@
ruleWatcher *etcdutil.LoopWatcher
rleungx marked this conversation as resolved.
Show resolved Hide resolved
groupWatcher *etcdutil.LoopWatcher
Copy link
Member

@rleungx rleungx Nov 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still worry about whether two watchers will get the event disordered and overwrite the same rule.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add a test about it. If necessary, I will merge these watchers.

labelWatcher *etcdutil.LoopWatcher

// pendingDeletion is a structure used to track the rules or rule groups that are marked for deletion.
// If a rule or rule group cannot be deleted immediately due to the absence of rules,
// it will be held here and removed later when a new rule or rule group put event allows for its deletion.
pendingDeletion struct {
syncutil.RWMutex
// key: path, value: [groupID, ruleID]
// The map 'kvs' holds the rules or rule groups that are pending deletion.
// If a rule group needs to be deleted, the ruleID will be an empty string.
kvs map[string][2]string
}
}

// NewWatcher creates a new watcher to watch the Placement Rule change from PD API server.
Expand All @@ -86,6 +98,12 @@
checkerController: checkerController,
ruleManager: ruleManager,
regionLabeler: regionLabeler,
pendingDeletion: struct {
syncutil.RWMutex
kvs map[string][2]string
}{
kvs: make(map[string][2]string),
},
}
err := rw.initializeRuleWatcher()
if err != nil {
Expand Down Expand Up @@ -115,7 +133,11 @@
if oldRule := rw.ruleManager.GetRule(rule.GroupID, rule.ID); oldRule != nil {
rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey)
}
return rw.ruleManager.SetRule(rule)
err = rw.ruleManager.SetRule(rule)
if err == nil && rw.hasPendingDeletion() {
rw.tryFinishPendingDeletion()
}
return err
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
Expand All @@ -129,7 +151,11 @@
return err
}
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
return rw.ruleManager.DeleteRule(rule.GroupID, rule.ID)
err = rw.ruleManager.DeleteRule(rule.GroupID, rule.ID)
if err != nil && strings.Contains(err.Error(), "no rule left") {
rw.addPendingDeletion(key, rule.GroupID, rule.ID)
}
return err
}
postEventFn := func() error {
return nil
Expand Down Expand Up @@ -157,7 +183,11 @@
for _, rule := range rw.ruleManager.GetRulesByGroup(ruleGroup.ID) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return rw.ruleManager.SetRuleGroup(ruleGroup)
err = rw.ruleManager.SetRuleGroup(ruleGroup)
if err == nil && rw.hasPendingDeletion() {
rw.tryFinishPendingDeletion()

Check warning on line 188 in pkg/mcs/scheduling/server/rule/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/rule/watcher.go#L188

Added line #L188 was not covered by tests
}
return err
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
Expand All @@ -166,7 +196,11 @@
for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return rw.ruleManager.DeleteRuleGroup(trimmedKey)
err := rw.ruleManager.DeleteRuleGroup(trimmedKey)
if err != nil && strings.Contains(err.Error(), "no rule left") {
rw.addPendingDeletion(key, trimmedKey, "")

Check warning on line 201 in pkg/mcs/scheduling/server/rule/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/rule/watcher.go#L201

Added line #L201 was not covered by tests
}
return err
}
postEventFn := func() error {
return nil
Expand Down Expand Up @@ -216,3 +250,40 @@
rw.cancel()
rw.wg.Wait()
}

func (rw *Watcher) hasPendingDeletion() bool {
rw.pendingDeletion.RLock()
defer rw.pendingDeletion.RUnlock()
return len(rw.pendingDeletion.kvs) > 0
}

func (rw *Watcher) addPendingDeletion(path, groupID, ruleID string) {
rw.pendingDeletion.Lock()
defer rw.pendingDeletion.Unlock()
rw.pendingDeletion.kvs[path] = [2]string{groupID, ruleID}
}

func (rw *Watcher) tryFinishPendingDeletion() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am worried about if the put and delete can be disordered. If so, the newly added rule might be deleted unexpectedly.

rw.pendingDeletion.Lock()
defer rw.pendingDeletion.Unlock()
originLen := len(rw.pendingDeletion.kvs)
for k, v := range rw.pendingDeletion.kvs {
groupID, ruleID := v[0], v[1]
var err error
if ruleID == "" {
err = rw.ruleManager.DeleteRuleGroup(groupID)

Check warning on line 274 in pkg/mcs/scheduling/server/rule/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/rule/watcher.go#L274

Added line #L274 was not covered by tests
} else {
err = rw.ruleManager.DeleteRule(groupID, ruleID)
}
if err == nil {
delete(rw.pendingDeletion.kvs, k)
}
}
// If the length of the map is changed, it means that some rules or rule groups have been deleted.
// We need to force load the rules and rule groups to make sure sync with etcd.
if len(rw.pendingDeletion.kvs) != originLen {
rw.ruleWatcher.ForceLoad()
rw.groupWatcher.ForceLoad()
log.Info("force load rules", zap.Int("pending deletion", len(rw.pendingDeletion.kvs)), zap.Int("origin", originLen))
}
}
8 changes: 4 additions & 4 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.Region
if c.isDownPeer(region, peer) {
if c.isStoreDownTimeHitMaxDownTime(peer.GetStoreId()) {
ruleCheckerReplaceDownCounter.Inc()
return c.replaceUnexpectRulePeer(region, rf, fit, peer, downStatus)
return c.replaceUnexpectedRulePeer(region, rf, fit, peer, downStatus)
}
// When witness placement rule is enabled, promotes the witness to voter when region has down voter.
if c.isWitnessEnabled() && core.IsVoter(peer) {
Expand All @@ -211,7 +211,7 @@ func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.Region
}
if c.isOfflinePeer(peer) {
ruleCheckerReplaceOfflineCounter.Inc()
return c.replaceUnexpectRulePeer(region, rf, fit, peer, offlineStatus)
return c.replaceUnexpectedRulePeer(region, rf, fit, peer, offlineStatus)
}
}
// fix loose matched peers.
Expand Down Expand Up @@ -246,7 +246,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region
continue
}
ruleCheckerNoStoreThenTryReplace.Inc()
op, err := c.replaceUnexpectRulePeer(region, oldPeerRuleFit, fit, p, "swap-fit")
op, err := c.replaceUnexpectedRulePeer(region, oldPeerRuleFit, fit, p, "swap-fit")
if err != nil {
return nil, err
}
Expand All @@ -267,7 +267,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region
}

// The peer's store may in Offline or Down, need to be replace.
func (c *RuleChecker) replaceUnexpectRulePeer(region *core.RegionInfo, rf *placement.RuleFit, fit *placement.RegionFit, peer *metapb.Peer, status string) (*operator.Operator, error) {
func (c *RuleChecker) replaceUnexpectedRulePeer(region *core.RegionInfo, rf *placement.RuleFit, fit *placement.RegionFit, peer *metapb.Peer, status string) (*operator.Operator, error) {
var fastFailover bool
// If the store to which the original peer belongs is TiFlash, the new peer cannot be set to witness, nor can it perform fast failover
if c.isWitnessEnabled() && !c.cluster.GetStore(peer.StoreId).IsTiFlash() {
Expand Down
46 changes: 42 additions & 4 deletions tests/server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,11 @@ func (suite *regionTestSuite) TestCheckRegionsReplicated() {
func(conf *config.Config, serverName string) {
conf.Replication.EnablePlacementRules = true
})
env.RunTestInPDMode(suite.checkRegionsReplicated)
env.RunTestInTwoModes(suite.checkRegionsReplicated)
}

func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) {
suite.pauseRuleChecker(cluster)
leader := cluster.GetLeaderServer()
urlPrefix := leader.GetAddr() + "/pd/api/v1"
re := suite.Require()
Expand Down Expand Up @@ -271,6 +272,14 @@ func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster)
err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re))
suite.NoError(err)

tu.Eventually(re, func() bool {
respBundle := make([]placement.GroupBundle, 0)
err = tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil,
tu.StatusOK(re), tu.ExtractJSON(re, &respBundle))
suite.NoError(err)
return len(respBundle) == 1 && respBundle[0].ID == "5"
})

tu.Eventually(re, func() bool {
err = tu.ReadGetJSON(re, testDialClient, url, &status)
suite.NoError(err)
Expand Down Expand Up @@ -314,9 +323,24 @@ func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster)
err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re))
suite.NoError(err)

err = tu.ReadGetJSON(re, testDialClient, url, &status)
suite.NoError(err)
suite.Equal("INPROGRESS", status)
tu.Eventually(re, func() bool {
respBundle := make([]placement.GroupBundle, 0)
err = tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil,
tu.StatusOK(re), tu.ExtractJSON(re, &respBundle))
suite.NoError(err)
if len(respBundle) != 2 {
return false
}
s1 := respBundle[0].ID == "5" && respBundle[1].ID == "6"
s2 := respBundle[0].ID == "6" && respBundle[1].ID == "5"
return s1 || s2
})

tu.Eventually(re, func() bool {
err = tu.ReadGetJSON(re, testDialClient, url, &status)
suite.NoError(err)
return status == "INPROGRESS"
})

r1 = core.NewTestRegionInfo(2, 1, []byte("a"), []byte("b"))
r1.GetMeta().Peers = append(r1.GetMeta().Peers, &metapb.Peer{Id: 5, StoreId: 1}, &metapb.Peer{Id: 6, StoreId: 1}, &metapb.Peer{Id: 7, StoreId: 1})
Expand All @@ -338,3 +362,17 @@ func (suite *regionTestSuite) checkRegionCount(cluster *tests.TestCluster, count
})
}
}

// pauseRuleChecker will pause rule checker to avoid unexpected operator.
func (suite *regionTestSuite) pauseRuleChecker(cluster *tests.TestCluster) {
re := suite.Require()
checkerName := "rule"
addr := cluster.GetLeaderServer().GetAddr()
resp := make(map[string]interface{})
url := fmt.Sprintf("%s/pd/api/v1/checker/%s", addr, checkerName)
err := tu.CheckPostJSON(testDialClient, url, []byte(`{"delay":1000}`), tu.StatusOK(re))
re.NoError(err)
err = tu.ReadGetJSON(re, testDialClient, url, &resp)
re.NoError(err)
re.True(resp["paused"].(bool))
}
Loading
Loading