From 7866780647a64e4756c048acf5c57e56d1482eef Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 29 Nov 2023 18:27:50 +0800 Subject: [PATCH 1/9] mcs: fix rule sync when meet "no rule left" Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/apis/v1/api.go | 2 +- pkg/mcs/scheduling/server/rule/watcher.go | 79 +++++++++++++++++++++-- pkg/schedule/checker/rule_checker.go | 8 +-- tests/server/api/region_test.go | 46 +++++++++++-- 4 files changed, 122 insertions(+), 13 deletions(-) diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index b59780b7a61..e6881f2f85c 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -1330,5 +1330,5 @@ func checkRegionsReplicated(c *gin.Context) { c.String(http.StatusBadRequest, err.Error()) return } - c.String(http.StatusOK, state) + c.IndentedJSON(http.StatusOK, state) } diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index 912fb9c01e5..bf6e55f0983 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -25,6 +25,7 @@ import ( "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/syncutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" @@ -62,6 +63,17 @@ type Watcher struct { ruleWatcher *etcdutil.LoopWatcher groupWatcher *etcdutil.LoopWatcher labelWatcher *etcdutil.LoopWatcher + + // pendingDeletion is a structure used to track the rules or rule groups that are marked for deletion. + // If a rule or rule group cannot be deleted immediately due to the absence of rules, + // it will be held here and removed later when a new rule or rule group put event allows for its deletion. + pendingDeletion struct { + syncutil.RWMutex + // key: path, value: [groupID, ruleID] + // The map 'kvs' holds the rules or rule groups that are pending deletion. + // If a rule group needs to be deleted, the ruleID will be an empty string. + kvs map[string][2]string + } } // NewWatcher creates a new watcher to watch the Placement Rule change from PD API server. @@ -86,6 +98,12 @@ func NewWatcher( checkerController: checkerController, ruleManager: ruleManager, regionLabeler: regionLabeler, + pendingDeletion: struct { + syncutil.RWMutex + kvs map[string][2]string + }{ + kvs: make(map[string][2]string), + }, } err := rw.initializeRuleWatcher() if err != nil { @@ -115,7 +133,11 @@ func (rw *Watcher) initializeRuleWatcher() error { if oldRule := rw.ruleManager.GetRule(rule.GroupID, rule.ID); oldRule != nil { rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey) } - return rw.ruleManager.SetRule(rule) + err = rw.ruleManager.SetRule(rule) + if err == nil && rw.hasPendingDeletion() { + rw.tryFinishPendingDeletion() + } + return err } deleteFn := func(kv *mvccpb.KeyValue) error { key := string(kv.Key) @@ -129,7 +151,11 @@ func (rw *Watcher) initializeRuleWatcher() error { return err } rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) - return rw.ruleManager.DeleteRule(rule.GroupID, rule.ID) + err = rw.ruleManager.DeleteRule(rule.GroupID, rule.ID) + if err != nil && strings.Contains(err.Error(), "no rule left") { + rw.addPendingDeletion(key, rule.GroupID, rule.ID) + } + return err } postEventFn := func() error { return nil @@ -157,7 +183,11 @@ func (rw *Watcher) initializeGroupWatcher() error { for _, rule := range rw.ruleManager.GetRulesByGroup(ruleGroup.ID) { rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) } - return rw.ruleManager.SetRuleGroup(ruleGroup) + err = rw.ruleManager.SetRuleGroup(ruleGroup) + if err == nil && rw.hasPendingDeletion() { + rw.tryFinishPendingDeletion() + } + return err } deleteFn := func(kv *mvccpb.KeyValue) error { key := string(kv.Key) @@ -166,7 +196,11 @@ func (rw *Watcher) initializeGroupWatcher() error { for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) { rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) } - return rw.ruleManager.DeleteRuleGroup(trimmedKey) + err := rw.ruleManager.DeleteRuleGroup(trimmedKey) + if err != nil && strings.Contains(err.Error(), "no rule left") { + rw.addPendingDeletion(key, trimmedKey, "") + } + return err } postEventFn := func() error { return nil @@ -216,3 +250,40 @@ func (rw *Watcher) Close() { rw.cancel() rw.wg.Wait() } + +func (rw *Watcher) hasPendingDeletion() bool { + rw.pendingDeletion.RLock() + defer rw.pendingDeletion.RUnlock() + return len(rw.pendingDeletion.kvs) > 0 +} + +func (rw *Watcher) addPendingDeletion(path, groupID, ruleID string) { + rw.pendingDeletion.Lock() + defer rw.pendingDeletion.Unlock() + rw.pendingDeletion.kvs[path] = [2]string{groupID, ruleID} +} + +func (rw *Watcher) tryFinishPendingDeletion() { + rw.pendingDeletion.Lock() + defer rw.pendingDeletion.Unlock() + originLen := len(rw.pendingDeletion.kvs) + for k, v := range rw.pendingDeletion.kvs { + groupID, ruleID := v[0], v[1] + var err error + if ruleID == "" { + err = rw.ruleManager.DeleteRuleGroup(groupID) + } else { + err = rw.ruleManager.DeleteRule(groupID, ruleID) + } + if err == nil { + delete(rw.pendingDeletion.kvs, k) + } + } + // If the length of the map is changed, it means that some rules or rule groups have been deleted. + // We need to force load the rules and rule groups to make sure sync with etcd. + if len(rw.pendingDeletion.kvs) != originLen { + rw.ruleWatcher.ForceLoad() + rw.groupWatcher.ForceLoad() + log.Info("force load rules", zap.Int("pending deletion", len(rw.pendingDeletion.kvs)), zap.Int("origin", originLen)) + } +} diff --git a/pkg/schedule/checker/rule_checker.go b/pkg/schedule/checker/rule_checker.go index 553ece09e65..95cc77ade5d 100644 --- a/pkg/schedule/checker/rule_checker.go +++ b/pkg/schedule/checker/rule_checker.go @@ -199,7 +199,7 @@ func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.Region if c.isDownPeer(region, peer) { if c.isStoreDownTimeHitMaxDownTime(peer.GetStoreId()) { ruleCheckerReplaceDownCounter.Inc() - return c.replaceUnexpectRulePeer(region, rf, fit, peer, downStatus) + return c.replaceUnexpectedRulePeer(region, rf, fit, peer, downStatus) } // When witness placement rule is enabled, promotes the witness to voter when region has down voter. if c.isWitnessEnabled() && core.IsVoter(peer) { @@ -211,7 +211,7 @@ func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.Region } if c.isOfflinePeer(peer) { ruleCheckerReplaceOfflineCounter.Inc() - return c.replaceUnexpectRulePeer(region, rf, fit, peer, offlineStatus) + return c.replaceUnexpectedRulePeer(region, rf, fit, peer, offlineStatus) } } // fix loose matched peers. @@ -246,7 +246,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region continue } ruleCheckerNoStoreThenTryReplace.Inc() - op, err := c.replaceUnexpectRulePeer(region, oldPeerRuleFit, fit, p, "swap-fit") + op, err := c.replaceUnexpectedRulePeer(region, oldPeerRuleFit, fit, p, "swap-fit") if err != nil { return nil, err } @@ -267,7 +267,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region } // The peer's store may in Offline or Down, need to be replace. -func (c *RuleChecker) replaceUnexpectRulePeer(region *core.RegionInfo, rf *placement.RuleFit, fit *placement.RegionFit, peer *metapb.Peer, status string) (*operator.Operator, error) { +func (c *RuleChecker) replaceUnexpectedRulePeer(region *core.RegionInfo, rf *placement.RuleFit, fit *placement.RegionFit, peer *metapb.Peer, status string) (*operator.Operator, error) { var fastFailover bool // If the store to which the original peer belongs is TiFlash, the new peer cannot be set to witness, nor can it perform fast failover if c.isWitnessEnabled() && !c.cluster.GetStore(peer.StoreId).IsTiFlash() { diff --git a/tests/server/api/region_test.go b/tests/server/api/region_test.go index dcd31d6462d..8d4406c8d14 100644 --- a/tests/server/api/region_test.go +++ b/tests/server/api/region_test.go @@ -217,10 +217,11 @@ func (suite *regionTestSuite) TestCheckRegionsReplicated() { func(conf *config.Config, serverName string) { conf.Replication.EnablePlacementRules = true }) - env.RunTestInPDMode(suite.checkRegionsReplicated) + env.RunTestInTwoModes(suite.checkRegionsReplicated) } func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) { + suite.pauseRuleChecker(cluster) leader := cluster.GetLeaderServer() urlPrefix := leader.GetAddr() + "/pd/api/v1" re := suite.Require() @@ -271,6 +272,14 @@ func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) suite.NoError(err) + tu.Eventually(re, func() bool { + respBundle := make([]placement.GroupBundle, 0) + err = tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, + tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) + suite.NoError(err) + return len(respBundle) == 1 && respBundle[0].ID == "5" + }) + tu.Eventually(re, func() bool { err = tu.ReadGetJSON(re, testDialClient, url, &status) suite.NoError(err) @@ -314,9 +323,24 @@ func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) suite.NoError(err) - err = tu.ReadGetJSON(re, testDialClient, url, &status) - suite.NoError(err) - suite.Equal("INPROGRESS", status) + tu.Eventually(re, func() bool { + respBundle := make([]placement.GroupBundle, 0) + err = tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, + tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) + suite.NoError(err) + if len(respBundle) != 2 { + return false + } + s1 := respBundle[0].ID == "5" && respBundle[1].ID == "6" + s2 := respBundle[0].ID == "6" && respBundle[1].ID == "5" + return s1 || s2 + }) + + tu.Eventually(re, func() bool { + err = tu.ReadGetJSON(re, testDialClient, url, &status) + suite.NoError(err) + return status == "INPROGRESS" + }) r1 = core.NewTestRegionInfo(2, 1, []byte("a"), []byte("b")) r1.GetMeta().Peers = append(r1.GetMeta().Peers, &metapb.Peer{Id: 5, StoreId: 1}, &metapb.Peer{Id: 6, StoreId: 1}, &metapb.Peer{Id: 7, StoreId: 1}) @@ -338,3 +362,17 @@ func (suite *regionTestSuite) checkRegionCount(cluster *tests.TestCluster, count }) } } + +// pauseRuleChecker will pause rule checker to avoid unexpected operator. +func (suite *regionTestSuite) pauseRuleChecker(cluster *tests.TestCluster) { + re := suite.Require() + checkerName := "rule" + addr := cluster.GetLeaderServer().GetAddr() + resp := make(map[string]interface{}) + url := fmt.Sprintf("%s/pd/api/v1/checker/%s", addr, checkerName) + err := tu.CheckPostJSON(testDialClient, url, []byte(`{"delay":1000}`), tu.StatusOK(re)) + re.NoError(err) + err = tu.ReadGetJSON(re, testDialClient, url, &resp) + re.NoError(err) + re.True(resp["paused"].(bool)) +} From 10af0333132d174873cb5941c07ebecb519ca676 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 29 Nov 2023 19:46:19 +0800 Subject: [PATCH 2/9] add some test Signed-off-by: lhy1024 --- tests/server/api/rule_test.go | 168 +++++++++++++++++++++++++++++----- 1 file changed, 144 insertions(+), 24 deletions(-) diff --git a/tests/server/api/rule_test.go b/tests/server/api/rule_test.go index ac52362df4e..49418d9705f 100644 --- a/tests/server/api/rule_test.go +++ b/tests/server/api/rule_test.go @@ -20,9 +20,11 @@ import ( "fmt" "net/http" "net/url" + "sort" "testing" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" @@ -31,6 +33,7 @@ import ( tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" + "go.uber.org/zap" ) type ruleTestSuite struct { @@ -817,7 +820,7 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { err := tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 1) - suite.compareBundle(bundles[0], b1) + suite.assertBundleEqual(bundles[0], b1) // Set b2 := placement.GroupBundle{ @@ -837,14 +840,14 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { var bundle placement.GroupBundle err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule/foo", &bundle) suite.NoError(err) - suite.compareBundle(bundle, b2) + suite.assertBundleEqual(bundle, b2) // GetAll again err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 2) - suite.compareBundle(bundles[0], b1) - suite.compareBundle(bundles[1], b2) + suite.assertBundleEqual(bundles[0], b1) + suite.assertBundleEqual(bundles[1], b2) // Delete err = tu.CheckDelete(testDialClient, urlPrefix+"/placement-rule/pd", tu.StatusOK(suite.Require())) @@ -854,7 +857,7 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 1) - suite.compareBundle(bundles[0], b2) + suite.assertBundleEqual(bundles[0], b2) // SetAll b2.Rules = append(b2.Rules, &placement.Rule{GroupID: "foo", ID: "baz", Index: 2, Role: placement.Follower, Count: 1}) @@ -869,9 +872,9 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 3) - suite.compareBundle(bundles[0], b2) - suite.compareBundle(bundles[1], b1) - suite.compareBundle(bundles[2], b3) + suite.assertBundleEqual(bundles[0], b2) + suite.assertBundleEqual(bundles[1], b1) + suite.assertBundleEqual(bundles[2], b3) // Delete using regexp err = tu.CheckDelete(testDialClient, urlPrefix+"/placement-rule/"+url.PathEscape("foo.*")+"?regexp", tu.StatusOK(suite.Require())) @@ -881,7 +884,7 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 1) - suite.compareBundle(bundles[0], b1) + suite.assertBundleEqual(bundles[0], b1) // Set id := "rule-without-group-id" @@ -902,14 +905,14 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { // Get err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule/"+id, &bundle) suite.NoError(err) - suite.compareBundle(bundle, b4) + suite.assertBundleEqual(bundle, b4) // GetAll again err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 2) - suite.compareBundle(bundles[0], b1) - suite.compareBundle(bundles[1], b4) + suite.assertBundleEqual(bundles[0], b1) + suite.assertBundleEqual(bundles[1], b4) // SetAll b5 := placement.GroupBundle{ @@ -930,9 +933,9 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 3) - suite.compareBundle(bundles[0], b1) - suite.compareBundle(bundles[1], b4) - suite.compareBundle(bundles[2], b5) + suite.assertBundleEqual(bundles[0], b1) + suite.assertBundleEqual(bundles[1], b4) + suite.assertBundleEqual(bundles[2], b5) } func (suite *ruleTestSuite) TestBundleBadRequest() { @@ -972,20 +975,137 @@ func (suite *ruleTestSuite) checkBundleBadRequest(cluster *tests.TestCluster) { } } -func (suite *ruleTestSuite) compareBundle(b1, b2 placement.GroupBundle) { - tu.Eventually(suite.Require(), func() bool { - if b2.ID != b1.ID || b2.Index != b1.Index || b2.Override != b1.Override || len(b2.Rules) != len(b1.Rules) { - return false - } - for i := range b1.Rules { - if !suite.compareRule(b1.Rules[i], b2.Rules[i]) { +func (suite *ruleTestSuite) TestDeleteAndUpdate() { + opts := []tests.ConfigOption{ + func(conf *config.Config, serverName string) { + conf.PDServerCfg.KeyType = "raw" + conf.Replication.EnablePlacementRules = true + }, + } + env := tests.NewSchedulingTestEnvironment(suite.T(), opts...) + env.RunTestInTwoModes(suite.checkDeleteAndUpdate) +} + +func (suite *ruleTestSuite) checkDeleteAndUpdate(cluster *tests.TestCluster) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + pdAddr := leaderServer.GetAddr() + urlPrefix := fmt.Sprintf("%s%s/api/v1", pdAddr, apiPrefix) + + bundles := [][]placement.GroupBundle{ + // 1 rule group with 1 rule + {{ + ID: "1", + Index: 1, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 1, Role: placement.Voter, Count: 1, GroupID: "1", + }, + }, + }}, + // 2 rule groups with different range rules + {{ + ID: "1", + Index: 1, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 1, Role: placement.Voter, Count: 1, GroupID: "1", + StartKey: []byte("a"), EndKey: []byte("b"), + }, + }, + }, { + ID: "2", + Index: 2, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 2, Role: placement.Voter, Count: 1, GroupID: "2", + StartKey: []byte("b"), EndKey: []byte("c"), + }, + }, + }}, + // 2 rule groups with 1 rule and 2 rules + {{ + ID: "3", + Index: 3, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 3, Role: placement.Voter, Count: 1, GroupID: "3", + }, + }, + }, { + ID: "4", + Index: 4, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 4, Role: placement.Voter, Count: 1, GroupID: "4", + }, + { + ID: "bar", Index: 6, Role: placement.Voter, Count: 1, GroupID: "4", + }, + }, + }}, + // 1 rule group with 2 rules + {{ + ID: "5", + Index: 5, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 5, Role: placement.Voter, Count: 1, GroupID: "5", + }, + { + ID: "bar", Index: 6, Role: placement.Voter, Count: 1, GroupID: "5", + }, + }, + }}, + } + + for _, bundle := range bundles { + data, err := json.Marshal(bundle) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) + suite.NoError(err) + + tu.Eventually(re, func() bool { + respBundle := make([]placement.GroupBundle, 0) + err = tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, + tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) + suite.NoError(err) + if len(respBundle) != len(bundle) { return false } - } - return true + log.Info("respBundle", zap.Any("respBundle", respBundle), zap.Any("bundle", bundle)) + sort.Slice(respBundle, func(i, j int) bool { return respBundle[i].ID < respBundle[j].ID }) + sort.Slice(bundle, func(i, j int) bool { return bundle[i].ID < bundle[j].ID }) + for i := range respBundle { + if !suite.compareBundle(respBundle[i], bundle[i]) { + return false + } + } + return true + }) + } +} + +func (suite *ruleTestSuite) assertBundleEqual(b1, b2 placement.GroupBundle) { + tu.Eventually(suite.Require(), func() bool { + return suite.compareBundle(b1, b2) }) } +func (suite *ruleTestSuite) compareBundle(b1, b2 placement.GroupBundle) bool { + if b2.ID != b1.ID || b2.Index != b1.Index || b2.Override != b1.Override || len(b2.Rules) != len(b1.Rules) { + return false + } + sort.Slice(b1.Rules, func(i, j int) bool { return b1.Rules[i].ID < b1.Rules[j].ID }) + sort.Slice(b2.Rules, func(i, j int) bool { return b2.Rules[i].ID < b2.Rules[j].ID }) + for i := range b1.Rules { + if !suite.compareRule(b1.Rules[i], b2.Rules[i]) { + return false + } + } + return true +} + func (suite *ruleTestSuite) compareRule(r1 *placement.Rule, r2 *placement.Rule) bool { return r2.GroupID == r1.GroupID && r2.ID == r1.ID && From 9e006d31450f178f3b764b7e64308bd87ca79ffe Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 30 Nov 2023 18:44:10 +0800 Subject: [PATCH 3/9] add TestConcurrency Signed-off-by: lhy1024 --- tests/server/api/rule_test.go | 101 ++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/tests/server/api/rule_test.go b/tests/server/api/rule_test.go index 49418d9705f..70f2d632528 100644 --- a/tests/server/api/rule_test.go +++ b/tests/server/api/rule_test.go @@ -21,6 +21,8 @@ import ( "net/http" "net/url" "sort" + "strconv" + "sync" "testing" "github.com/pingcap/kvproto/pkg/metapb" @@ -30,6 +32,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/utils/syncutil" tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" @@ -1086,6 +1089,104 @@ func (suite *ruleTestSuite) checkDeleteAndUpdate(cluster *tests.TestCluster) { } } +func (suite *ruleTestSuite) TestConcurrency() { + opts := []tests.ConfigOption{ + func(conf *config.Config, serverName string) { + conf.PDServerCfg.KeyType = "raw" + conf.Replication.EnablePlacementRules = true + }, + } + env := tests.NewSchedulingTestEnvironment(suite.T(), opts...) + // FIXME: enable this test in api mode + env.RunTestInPDMode(suite.checkConcurrency) +} + +func (suite *ruleTestSuite) checkConcurrency(cluster *tests.TestCluster) { + // test concurrency of set rule group with different group id + suite.checkConcurrencyWith(cluster, + func(i int) []placement.GroupBundle { + return []placement.GroupBundle{ + { + ID: strconv.Itoa(i), + Index: i, + Rules: []*placement.Rule{ + { + ID: "foo", Index: i, Role: placement.Voter, Count: 1, GroupID: strconv.Itoa(i), + }, + }, + }, + } + }, + func(resp []placement.GroupBundle, i int) bool { + return len(resp) == 1 && resp[0].ID == strconv.Itoa(i) + }, + ) + // test concurrency of set rule with different id + // TODO: this part cannot run in api mode + suite.checkConcurrencyWith(cluster, + func(i int) []placement.GroupBundle { + return []placement.GroupBundle{ + { + ID: "pd", + Index: 1, + Rules: []*placement.Rule{ + { + ID: strconv.Itoa(i), Index: i, Role: placement.Voter, Count: 1, GroupID: "pd", + }, + }, + }, + } + }, + func(resp []placement.GroupBundle, i int) bool { + return len(resp) == 1 && resp[0].ID == "pd" && resp[0].Rules[0].ID == strconv.Itoa(i) + }, + ) +} + +func (suite *ruleTestSuite) checkConcurrencyWith(cluster *tests.TestCluster, + genBundle func(int) []placement.GroupBundle, + checkBundle func([]placement.GroupBundle, int) bool) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + pdAddr := leaderServer.GetAddr() + urlPrefix := fmt.Sprintf("%s%s/api/v1", pdAddr, apiPrefix) + expectResult := struct { + syncutil.RWMutex + val int + }{} + wg := sync.WaitGroup{} + + for i := 1; i <= 10; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + bundle := genBundle(i) + data, err := json.Marshal(bundle) + suite.NoError(err) + for j := 0; j < 10; j++ { + expectResult.Lock() + err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) + suite.NoError(err) + expectResult.val = i + expectResult.Unlock() + } + }(i) + } + + wg.Wait() + expectResult.RLock() + defer expectResult.RUnlock() + suite.NotZero(expectResult.val) + tu.Eventually(re, func() bool { + respBundle := make([]placement.GroupBundle, 0) + err := tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, + tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) + suite.NoError(err) + suite.Len(respBundle, 1) + return checkBundle(respBundle, expectResult.val) + }) +} + func (suite *ruleTestSuite) assertBundleEqual(b1, b2 placement.GroupBundle) { tu.Eventually(suite.Require(), func() bool { return suite.compareBundle(b1, b2) From d785df3cfea1cf5782cbd916b2a3fb6870b52c0a Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 5 Dec 2023 02:26:32 +0800 Subject: [PATCH 4/9] merge two watcher Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/rule/watcher.go | 140 ++++++++++------------ pkg/storage/endpoint/key_path.go | 6 + tests/server/api/rule_test.go | 4 +- 3 files changed, 69 insertions(+), 81 deletions(-) diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index bf6e55f0983..51735df39c9 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -37,6 +37,10 @@ type Watcher struct { cancel context.CancelFunc wg sync.WaitGroup + // ruleCommonPathPrefix: + // - Key: /pd/{cluster_id}/rule + // - Value: placement.Rule or placement.RuleGroup + ruleCommonPathPrefix string // rulesPathPrefix: // - Key: /pd/{cluster_id}/rules/{group_id}-{rule_id} // - Value: placement.Rule @@ -61,7 +65,6 @@ type Watcher struct { regionLabeler *labeler.RegionLabeler ruleWatcher *etcdutil.LoopWatcher - groupWatcher *etcdutil.LoopWatcher labelWatcher *etcdutil.LoopWatcher // pendingDeletion is a structure used to track the rules or rule groups that are marked for deletion. @@ -109,10 +112,6 @@ func NewWatcher( if err != nil { return nil, err } - err = rw.initializeGroupWatcher() - if err != nil { - return nil, err - } err = rw.initializeRegionLabelWatcher() if err != nil { return nil, err @@ -121,19 +120,36 @@ func NewWatcher( } func (rw *Watcher) initializeRuleWatcher() error { - prefixToTrim := rw.rulesPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { - log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) - rule, err := placement.NewRuleFromJSON(kv.Value) - if err != nil { - return err - } - // Update the suspect key ranges in the checker. - rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) - if oldRule := rw.ruleManager.GetRule(rule.GroupID, rule.ID); oldRule != nil { - rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey) - } - err = rw.ruleManager.SetRule(rule) + err := func() error { + if strings.HasPrefix(string(kv.Key), rw.rulesPathPrefix) { + log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) + rule, err := placement.NewRuleFromJSON(kv.Value) + if err != nil { + return err + } + // Update the suspect key ranges in the checker. + rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) + if oldRule := rw.ruleManager.GetRule(rule.GroupID, rule.ID); oldRule != nil { + rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey) + } + return rw.ruleManager.SetRule(rule) + } else if strings.HasPrefix(string(kv.Key), rw.ruleGroupPathPrefix) { + log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) + ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value) + if err != nil { + return err + } + // Add all rule key ranges within the group to the suspect key ranges. + for _, rule := range rw.ruleManager.GetRulesByGroup(ruleGroup.ID) { + rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) + } + return rw.ruleManager.SetRuleGroup(ruleGroup) + } else { + log.Warn("unknown key when update placement rule", zap.String("key", string(kv.Key))) + return nil + } + }() if err == nil && rw.hasPendingDeletion() { rw.tryFinishPendingDeletion() } @@ -141,21 +157,35 @@ func (rw *Watcher) initializeRuleWatcher() error { } deleteFn := func(kv *mvccpb.KeyValue) error { key := string(kv.Key) - log.Info("delete placement rule", zap.String("key", key)) - ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, prefixToTrim)) - if err != nil { - return err + groupID, ruleID, err := func() (string, string, error) { + if strings.HasPrefix(string(kv.Key), rw.rulesPathPrefix) { + log.Info("delete placement rule", zap.String("key", key)) + ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, rw.rulesPathPrefix+"/")) + if err != nil { + return "", "", err + } + rule, err := placement.NewRuleFromJSON([]byte(ruleJSON)) + if err != nil { + return "", "", err + } + rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) + return rule.GroupID, rule.ID, rw.ruleManager.DeleteRule(rule.GroupID, rule.ID) + } else if strings.HasPrefix(string(kv.Key), rw.ruleGroupPathPrefix) { + log.Info("delete placement rule group", zap.String("key", key)) + trimmedKey := strings.TrimPrefix(key, rw.ruleGroupPathPrefix+"/") + for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) { + rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) + } + return trimmedKey, "", rw.ruleManager.DeleteRuleGroup(trimmedKey) + } else { + log.Warn("unknown key when delete placement rule", zap.String("key", string(kv.Key))) + return "", "", nil + } + }() + if err != nil && strings.Contains(err.Error(), "no rule left") && groupID != "" { + rw.addPendingDeletion(key, groupID, ruleID) } - rule, err := placement.NewRuleFromJSON([]byte(ruleJSON)) - if err != nil { - return err - } - rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) - err = rw.ruleManager.DeleteRule(rule.GroupID, rule.ID) - if err != nil && strings.Contains(err.Error(), "no rule left") { - rw.addPendingDeletion(key, rule.GroupID, rule.ID) - } - return err + return nil } postEventFn := func() error { return nil @@ -163,7 +193,7 @@ func (rw *Watcher) initializeRuleWatcher() error { rw.ruleWatcher = etcdutil.NewLoopWatcher( rw.ctx, &rw.wg, rw.etcdClient, - "scheduling-rule-watcher", rw.rulesPathPrefix, + "scheduling-rule-watcher", rw.ruleCommonPathPrefix, putFn, deleteFn, postEventFn, clientv3.WithPrefix(), ) @@ -171,51 +201,6 @@ func (rw *Watcher) initializeRuleWatcher() error { return rw.ruleWatcher.WaitLoad() } -func (rw *Watcher) initializeGroupWatcher() error { - prefixToTrim := rw.ruleGroupPathPrefix + "/" - putFn := func(kv *mvccpb.KeyValue) error { - log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) - ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value) - if err != nil { - return err - } - // Add all rule key ranges within the group to the suspect key ranges. - for _, rule := range rw.ruleManager.GetRulesByGroup(ruleGroup.ID) { - rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) - } - err = rw.ruleManager.SetRuleGroup(ruleGroup) - if err == nil && rw.hasPendingDeletion() { - rw.tryFinishPendingDeletion() - } - return err - } - deleteFn := func(kv *mvccpb.KeyValue) error { - key := string(kv.Key) - log.Info("delete placement rule group", zap.String("key", key)) - trimmedKey := strings.TrimPrefix(key, prefixToTrim) - for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) { - rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) - } - err := rw.ruleManager.DeleteRuleGroup(trimmedKey) - if err != nil && strings.Contains(err.Error(), "no rule left") { - rw.addPendingDeletion(key, trimmedKey, "") - } - return err - } - postEventFn := func() error { - return nil - } - rw.groupWatcher = etcdutil.NewLoopWatcher( - rw.ctx, &rw.wg, - rw.etcdClient, - "scheduling-rule-group-watcher", rw.ruleGroupPathPrefix, - putFn, deleteFn, postEventFn, - clientv3.WithPrefix(), - ) - rw.groupWatcher.StartWatchLoop() - return rw.groupWatcher.WaitLoad() -} - func (rw *Watcher) initializeRegionLabelWatcher() error { prefixToTrim := rw.regionLabelPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { @@ -283,7 +268,6 @@ func (rw *Watcher) tryFinishPendingDeletion() { // We need to force load the rules and rule groups to make sure sync with etcd. if len(rw.pendingDeletion.kvs) != originLen { rw.ruleWatcher.ForceLoad() - rw.groupWatcher.ForceLoad() log.Info("force load rules", zap.Int("pending deletion", len(rw.pendingDeletion.kvs)), zap.Int("origin", originLen)) } } diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index cac40db29c5..69b8d0f2f8e 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -31,6 +31,7 @@ const ( serviceMiddlewarePath = "service_middleware" schedulePath = "schedule" gcPath = "gc" + ruleCommonPath = "rule" rulesPath = "rules" ruleGroupPath = "rule_group" regionLabelPath = "region_label" @@ -102,6 +103,11 @@ func RulesPathPrefix(clusterID uint64) string { return path.Join(PDRootPath(clusterID), rulesPath) } +// RuleCommonPathPrefix returns the path prefix to save the placement rule common config. +func RuleCommonPathPrefix(clusterID uint64) string { + return path.Join(PDRootPath(clusterID), ruleCommonPath) +} + // RuleGroupPathPrefix returns the path prefix to save the placement rule groups. func RuleGroupPathPrefix(clusterID uint64) string { return path.Join(PDRootPath(clusterID), ruleGroupPath) diff --git a/tests/server/api/rule_test.go b/tests/server/api/rule_test.go index 70f2d632528..5e71c158028 100644 --- a/tests/server/api/rule_test.go +++ b/tests/server/api/rule_test.go @@ -1097,8 +1097,7 @@ func (suite *ruleTestSuite) TestConcurrency() { }, } env := tests.NewSchedulingTestEnvironment(suite.T(), opts...) - // FIXME: enable this test in api mode - env.RunTestInPDMode(suite.checkConcurrency) + env.RunTestInTwoModes(suite.checkConcurrency) } func (suite *ruleTestSuite) checkConcurrency(cluster *tests.TestCluster) { @@ -1122,7 +1121,6 @@ func (suite *ruleTestSuite) checkConcurrency(cluster *tests.TestCluster) { }, ) // test concurrency of set rule with different id - // TODO: this part cannot run in api mode suite.checkConcurrencyWith(cluster, func(i int) []placement.GroupBundle { return []placement.GroupBundle{ From bd5acaacb2209f2f9aaab7788e56debf60f459ca Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 5 Dec 2023 17:19:13 +0800 Subject: [PATCH 5/9] fix init Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/rule/watcher.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index 51735df39c9..433807ed14b 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -94,6 +94,7 @@ func NewWatcher( ctx: ctx, cancel: cancel, rulesPathPrefix: endpoint.RulesPathPrefix(clusterID), + ruleCommonPathPrefix: endpoint.RuleCommonPathPrefix(clusterID), ruleGroupPathPrefix: endpoint.RuleGroupPathPrefix(clusterID), regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID), etcdClient: etcdClient, From 48afd515bc65360f3a78ad0d26369204643b2408 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 6 Dec 2023 22:57:00 +0800 Subject: [PATCH 6/9] address comments Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/config/watcher.go | 14 ++++++++------ pkg/mcs/scheduling/server/meta/watcher.go | 3 ++- pkg/mcs/scheduling/server/rule/watcher.go | 20 +++++++++++--------- 3 files changed, 21 insertions(+), 16 deletions(-) diff --git a/pkg/mcs/scheduling/server/config/watcher.go b/pkg/mcs/scheduling/server/config/watcher.go index 4ded93ceb1b..c97306d50ba 100644 --- a/pkg/mcs/scheduling/server/config/watcher.go +++ b/pkg/mcs/scheduling/server/config/watcher.go @@ -154,7 +154,7 @@ func (cw *Watcher) initializeConfigWatcher() error { func (cw *Watcher) initializeTTLConfigWatcher() error { putFn := func(kv *mvccpb.KeyValue) error { - key := string(kv.Key)[len(sc.TTLConfigPrefix)+1:] + key := strings.TrimPrefix(string(kv.Key), sc.TTLConfigPrefix+"/") value := string(kv.Value) leaseID := kv.Lease resp, err := cw.etcdClient.TimeToLive(cw.ctx, clientv3.LeaseID(leaseID)) @@ -166,7 +166,7 @@ func (cw *Watcher) initializeTTLConfigWatcher() error { return nil } deleteFn := func(kv *mvccpb.KeyValue) error { - key := string(kv.Key)[len(sc.TTLConfigPrefix)+1:] + key := strings.TrimPrefix(string(kv.Key), sc.TTLConfigPrefix+"/") cw.ttl.PutWithTTL(key, nil, 0) return nil } @@ -186,13 +186,14 @@ func (cw *Watcher) initializeTTLConfigWatcher() error { func (cw *Watcher) initializeSchedulerConfigWatcher() error { prefixToTrim := cw.schedulerConfigPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { - name := strings.TrimPrefix(string(kv.Key), prefixToTrim) + key := string(kv.Key) + name := strings.TrimPrefix(key, prefixToTrim) log.Info("update scheduler config", zap.String("name", name), zap.String("value", string(kv.Value))) err := cw.storage.SaveSchedulerConfig(name, kv.Value) if err != nil { log.Warn("failed to save scheduler config", - zap.String("event-kv-key", string(kv.Key)), + zap.String("event-kv-key", key), zap.String("trimmed-key", name), zap.Error(err)) return err @@ -204,9 +205,10 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error { return nil } deleteFn := func(kv *mvccpb.KeyValue) error { - log.Info("remove scheduler config", zap.String("key", string(kv.Key))) + key := string(kv.Key) + log.Info("remove scheduler config", zap.String("key", key)) return cw.storage.RemoveSchedulerConfig( - strings.TrimPrefix(string(kv.Key), prefixToTrim), + strings.TrimPrefix(key, prefixToTrim), ) } postEventFn := func() error { diff --git a/pkg/mcs/scheduling/server/meta/watcher.go b/pkg/mcs/scheduling/server/meta/watcher.go index 3a04c261163..ce0abfad950 100644 --- a/pkg/mcs/scheduling/server/meta/watcher.go +++ b/pkg/mcs/scheduling/server/meta/watcher.go @@ -71,9 +71,10 @@ func NewWatcher( func (w *Watcher) initializeStoreWatcher() error { putFn := func(kv *mvccpb.KeyValue) error { store := &metapb.Store{} + key := string(kv.Key) if err := proto.Unmarshal(kv.Value, store); err != nil { log.Warn("failed to unmarshal store entry", - zap.String("event-kv-key", string(kv.Key)), zap.Error(err)) + zap.String("event-kv-key", key), zap.Error(err)) return err } origin := w.basicCluster.GetStore(store.GetId()) diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index 433807ed14b..5b8e0d9260a 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -123,8 +123,9 @@ func NewWatcher( func (rw *Watcher) initializeRuleWatcher() error { putFn := func(kv *mvccpb.KeyValue) error { err := func() error { - if strings.HasPrefix(string(kv.Key), rw.rulesPathPrefix) { - log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) + key := string(kv.Key) + if strings.HasPrefix(key, rw.rulesPathPrefix) { + log.Info("update placement rule", zap.String("key", key), zap.String("value", string(kv.Value))) rule, err := placement.NewRuleFromJSON(kv.Value) if err != nil { return err @@ -135,8 +136,8 @@ func (rw *Watcher) initializeRuleWatcher() error { rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey) } return rw.ruleManager.SetRule(rule) - } else if strings.HasPrefix(string(kv.Key), rw.ruleGroupPathPrefix) { - log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) + } else if strings.HasPrefix(key, rw.ruleGroupPathPrefix) { + log.Info("update placement rule group", zap.String("key", key), zap.String("value", string(kv.Value))) ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value) if err != nil { return err @@ -147,7 +148,7 @@ func (rw *Watcher) initializeRuleWatcher() error { } return rw.ruleManager.SetRuleGroup(ruleGroup) } else { - log.Warn("unknown key when update placement rule", zap.String("key", string(kv.Key))) + log.Warn("unknown key when update placement rule", zap.String("key", key)) return nil } }() @@ -159,7 +160,7 @@ func (rw *Watcher) initializeRuleWatcher() error { deleteFn := func(kv *mvccpb.KeyValue) error { key := string(kv.Key) groupID, ruleID, err := func() (string, string, error) { - if strings.HasPrefix(string(kv.Key), rw.rulesPathPrefix) { + if strings.HasPrefix(key, rw.rulesPathPrefix) { log.Info("delete placement rule", zap.String("key", key)) ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, rw.rulesPathPrefix+"/")) if err != nil { @@ -171,7 +172,7 @@ func (rw *Watcher) initializeRuleWatcher() error { } rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) return rule.GroupID, rule.ID, rw.ruleManager.DeleteRule(rule.GroupID, rule.ID) - } else if strings.HasPrefix(string(kv.Key), rw.ruleGroupPathPrefix) { + } else if strings.HasPrefix(key, rw.ruleGroupPathPrefix) { log.Info("delete placement rule group", zap.String("key", key)) trimmedKey := strings.TrimPrefix(key, rw.ruleGroupPathPrefix+"/") for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) { @@ -179,7 +180,7 @@ func (rw *Watcher) initializeRuleWatcher() error { } return trimmedKey, "", rw.ruleManager.DeleteRuleGroup(trimmedKey) } else { - log.Warn("unknown key when delete placement rule", zap.String("key", string(kv.Key))) + log.Warn("unknown key when delete placement rule", zap.String("key", key)) return "", "", nil } }() @@ -205,7 +206,8 @@ func (rw *Watcher) initializeRuleWatcher() error { func (rw *Watcher) initializeRegionLabelWatcher() error { prefixToTrim := rw.regionLabelPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { - log.Info("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) + key := string(kv.Key) + log.Info("update region label rule", zap.String("key", key), zap.String("value", string(kv.Value))) rule, err := labeler.NewLabelRuleFromJSON(kv.Value) if err != nil { return err From ac72124fbbf72c61410304c83193416ae4d43918 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 7 Dec 2023 12:18:39 +0800 Subject: [PATCH 7/9] fix test Signed-off-by: lhy1024 --- pkg/mcs/scheduling/server/rule/watcher.go | 14 ++++++-------- pkg/utils/etcdutil/etcdutil.go | 10 ++++++---- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index 5b8e0d9260a..8ccfd02e07c 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -187,7 +187,7 @@ func (rw *Watcher) initializeRuleWatcher() error { if err != nil && strings.Contains(err.Error(), "no rule left") && groupID != "" { rw.addPendingDeletion(key, groupID, ruleID) } - return nil + return err } postEventFn := func() error { return nil @@ -254,7 +254,7 @@ func (rw *Watcher) addPendingDeletion(path, groupID, ruleID string) { func (rw *Watcher) tryFinishPendingDeletion() { rw.pendingDeletion.Lock() defer rw.pendingDeletion.Unlock() - originLen := len(rw.pendingDeletion.kvs) + previousLen := len(rw.pendingDeletion.kvs) for k, v := range rw.pendingDeletion.kvs { groupID, ruleID := v[0], v[1] var err error @@ -267,10 +267,8 @@ func (rw *Watcher) tryFinishPendingDeletion() { delete(rw.pendingDeletion.kvs, k) } } - // If the length of the map is changed, it means that some rules or rule groups have been deleted. - // We need to force load the rules and rule groups to make sure sync with etcd. - if len(rw.pendingDeletion.kvs) != originLen { - rw.ruleWatcher.ForceLoad() - log.Info("force load rules", zap.Int("pending deletion", len(rw.pendingDeletion.kvs)), zap.Int("origin", originLen)) - } + // TODO: If the length of the map is changed, it means that some rules or rule groups have been deleted. + // We need to compare the rules and rule groups to make sure sync with etcd, + // rather than just force load all the rules and rule groups. + log.Info("clean pending deletion", zap.Int("current", len(rw.pendingDeletion.kvs)), zap.Int("previous", previousLen)) } diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 03c2374efc6..d24c5ded98a 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -818,18 +818,20 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision case clientv3.EventTypePut: if err := lw.putFn(event.Kv); err != nil { log.Error("put failed in watch loop", zap.Error(err), - zap.Int64("revision", revision), zap.String("name", lw.name), zap.String("key", lw.key)) + zap.Int64("revision", revision), zap.String("name", lw.name), + zap.String("watch-key", lw.key), zap.ByteString("event-kv-key", event.Kv.Key)) } else { - log.Debug("put in watch loop", zap.String("name", lw.name), + log.Debug("put successfully in watch loop", zap.String("name", lw.name), zap.ByteString("key", event.Kv.Key), zap.ByteString("value", event.Kv.Value)) } case clientv3.EventTypeDelete: if err := lw.deleteFn(event.Kv); err != nil { log.Error("delete failed in watch loop", zap.Error(err), - zap.Int64("revision", revision), zap.String("name", lw.name), zap.String("key", lw.key)) + zap.Int64("revision", revision), zap.String("name", lw.name), + zap.String("watch-key", lw.key), zap.ByteString("event-kv-key", event.Kv.Key)) } else { - log.Debug("delete in watch loop", zap.String("name", lw.name), + log.Debug("delete successfully in watch loop", zap.String("name", lw.name), zap.ByteString("key", event.Kv.Key)) } } From 0c70123349d4655a79e3e2025494d29a947960e2 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 8 Dec 2023 18:39:40 +0800 Subject: [PATCH 8/9] make test stable Signed-off-by: lhy1024 --- tests/server/api/region_test.go | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/tests/server/api/region_test.go b/tests/server/api/region_test.go index 1dd0066cd7b..452cdb63b39 100644 --- a/tests/server/api/region_test.go +++ b/tests/server/api/region_test.go @@ -24,11 +24,13 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/schedule/placement" tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/tests" + "go.uber.org/zap" ) type regionTestSuite struct { @@ -335,9 +337,20 @@ func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) suite.NoError(err) - err = tu.ReadGetJSON(re, testDialClient, url, &status) - suite.NoError(err) - suite.Equal("REPLICATED", status) + tu.Eventually(re, func() bool { + respBundle := make([]placement.GroupBundle, 0) + err = tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, + tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) + suite.NoError(err) + log.Info("respBundle", zap.Any("respBundle", respBundle)) + return len(respBundle) == 1 && len(respBundle[0].Rules) == 2 + }) + + tu.Eventually(re, func() bool { + err = tu.ReadGetJSON(re, testDialClient, url, &status) + suite.NoError(err) + return status == "REPLICATED" + }) // test multiple bundles bundle = append(bundle, placement.GroupBundle{ @@ -377,9 +390,11 @@ func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) r1.GetMeta().Peers = append(r1.GetMeta().Peers, &metapb.Peer{Id: 5, StoreId: 1}, &metapb.Peer{Id: 6, StoreId: 1}, &metapb.Peer{Id: 7, StoreId: 1}) tests.MustPutRegionInfo(re, cluster, r1) - err = tu.ReadGetJSON(re, testDialClient, url, &status) - suite.NoError(err) - suite.Equal("REPLICATED", status) + tu.Eventually(re, func() bool { + err = tu.ReadGetJSON(re, testDialClient, url, &status) + suite.NoError(err) + return status == "REPLICATED" + }) } func (suite *regionTestSuite) checkRegionCount(cluster *tests.TestCluster, count uint64) { From ff234905ed9c9f5e00f2b030042d670ff913a7ef Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Sat, 9 Dec 2023 20:12:26 +0800 Subject: [PATCH 9/9] fix data race in test Signed-off-by: lhy1024 --- pkg/schedule/schedulers/shuffle_region.go | 7 ++++--- pkg/schedule/schedulers/shuffle_region_config.go | 4 +++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/schedule/schedulers/shuffle_region.go b/pkg/schedule/schedulers/shuffle_region.go index f1d35e80925..f9bed18d3fa 100644 --- a/pkg/schedule/schedulers/shuffle_region.go +++ b/pkg/schedule/schedulers/shuffle_region.go @@ -139,18 +139,19 @@ func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster sche.SchedulerCluste pendingFilter := filter.NewRegionPendingFilter() downFilter := filter.NewRegionDownFilter() replicaFilter := filter.NewRegionReplicatedFilter(cluster) + ranges := s.conf.GetRanges() for _, source := range candidates.Stores { var region *core.RegionInfo if s.conf.IsRoleAllow(roleFollower) { - region = filter.SelectOneRegion(cluster.RandFollowerRegions(source.GetID(), s.conf.Ranges), nil, + region = filter.SelectOneRegion(cluster.RandFollowerRegions(source.GetID(), ranges), nil, pendingFilter, downFilter, replicaFilter) } if region == nil && s.conf.IsRoleAllow(roleLeader) { - region = filter.SelectOneRegion(cluster.RandLeaderRegions(source.GetID(), s.conf.Ranges), nil, + region = filter.SelectOneRegion(cluster.RandLeaderRegions(source.GetID(), ranges), nil, pendingFilter, downFilter, replicaFilter) } if region == nil && s.conf.IsRoleAllow(roleLearner) { - region = filter.SelectOneRegion(cluster.RandLearnerRegions(source.GetID(), s.conf.Ranges), nil, + region = filter.SelectOneRegion(cluster.RandLearnerRegions(source.GetID(), ranges), nil, pendingFilter, downFilter, replicaFilter) } if region != nil { diff --git a/pkg/schedule/schedulers/shuffle_region_config.go b/pkg/schedule/schedulers/shuffle_region_config.go index 7d04879c992..552d7ea8bce 100644 --- a/pkg/schedule/schedulers/shuffle_region_config.go +++ b/pkg/schedule/schedulers/shuffle_region_config.go @@ -58,7 +58,9 @@ func (conf *shuffleRegionSchedulerConfig) GetRoles() []string { func (conf *shuffleRegionSchedulerConfig) GetRanges() []core.KeyRange { conf.RLock() defer conf.RUnlock() - return conf.Ranges + ranges := make([]core.KeyRange, len(conf.Ranges)) + copy(ranges, conf.Ranges) + return ranges } func (conf *shuffleRegionSchedulerConfig) IsRoleAllow(role string) bool {