From 1e1817d0bcec05fe3349dbeb7a964663cb7fd41a Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 20 Nov 2023 15:50:41 +0800 Subject: [PATCH 1/3] mcs: support region label http interface in scheduling server (#7283) ref tikv/pd#5839 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/mcs/scheduling/server/apis/v1/api.go | 167 +++++++++++++++++- pkg/schedule/handler/handler.go | 19 ++ pkg/utils/apiutil/serverapi/middleware.go | 30 +++- pkg/utils/testutil/api_check.go | 2 +- server/api/region_label.go | 2 +- server/api/server.go | 21 +++ tests/integrations/mcs/scheduling/api_test.go | 23 +++ tests/server/api/rule_test.go | 73 ++++++-- 8 files changed, 310 insertions(+), 27 deletions(-) diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index 172515d86208..822b4164da1f 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -17,6 +17,7 @@ package apis import ( "encoding/hex" "net/http" + "net/url" "strconv" "sync" @@ -191,12 +192,22 @@ func (s *Service) RegisterConfigRouter() { placementRule := router.Group("placement-rule") placementRule.GET("", getPlacementRules) placementRule.GET("/:group", getPlacementRuleByGroup) + + regionLabel := router.Group("region-label") + regionLabel.GET("/rules", getAllRegionLabelRules) + regionLabel.GET("/rules/ids", getRegionLabelRulesByIDs) + regionLabel.GET("/rules/:id", getRegionLabelRuleByID) + + regions := router.Group("regions") + regions.GET("/:id/label/:key", getRegionLabelByKey) + regions.GET("/:id/labels", getRegionLabels) } // @Tags admin // @Summary Change the log level. // @Produce json // @Success 200 {string} string "The log level is updated." +// @Failure 400 {string} string "The input is invalid." // @Router /admin/log [put] func changeLogLevel(c *gin.Context) { svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) @@ -230,6 +241,7 @@ func getConfig(c *gin.Context) { // @Summary Drop all regions from cache. // @Produce json // @Success 200 {string} string "All regions are removed from server cache." +// @Failure 500 {string} string "PD server failed to proceed the request." // @Router /admin/cache/regions [delete] func deleteAllRegionCache(c *gin.Context) { svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) @@ -248,6 +260,7 @@ func deleteAllRegionCache(c *gin.Context) { // @Produce json // @Success 200 {string} string "The region is removed from server cache." // @Failure 400 {string} string "The input is invalid." +// @Failure 500 {string} string "PD server failed to proceed the request." // @Router /admin/cache/regions/{id} [delete] func deleteRegionCacheByID(c *gin.Context) { svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) @@ -683,8 +696,6 @@ func getHotBuckets(c *gin.Context) { // @Accept json // @Produce json // @Success 200 {object} storage.HistoryHotRegions -// @Failure 400 {string} string "The input is invalid." -// @Failure 500 {string} string "PD server failed to proceed the request." // @Router /hotspot/regions/history [get] func getHistoryHotRegions(c *gin.Context) { // TODO: support history hotspot in scheduling server with stateless in the future. @@ -955,3 +966,155 @@ func getPlacementRuleByGroup(c *gin.Context) { group := manager.GetGroupBundle(g) c.IndentedJSON(http.StatusOK, group) } + +// @Tags region_label +// @Summary Get label of a region. +// @Param id path integer true "Region Id" +// @Param key path string true "Label key" +// @Produce json +// @Success 200 {string} string +// @Failure 400 {string} string "The input is invalid." +// @Failure 404 {string} string "The region does not exist." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /config/regions/{id}/label/{key} [get] +func getRegionLabelByKey(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + + idStr := c.Param("id") + labelKey := c.Param("key") // TODO: test https://github.com/tikv/pd/pull/4004 + + id, err := strconv.ParseUint(idStr, 10, 64) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + region, err := handler.GetRegion(id) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + if region == nil { + c.String(http.StatusNotFound, errs.ErrRegionNotFound.FastGenByArgs().Error()) + return + } + + l, err := handler.GetRegionLabeler() + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + labelValue := l.GetRegionLabel(region, labelKey) + c.IndentedJSON(http.StatusOK, labelValue) +} + +// @Tags region_label +// @Summary Get labels of a region. +// @Param id path integer true "Region Id" +// @Produce json +// @Success 200 {string} string +// @Failure 400 {string} string "The input is invalid." +// @Failure 404 {string} string "The region does not exist." +// @Router /config/regions/{id}/labels [get] +func getRegionLabels(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + + idStr := c.Param("id") + id, err := strconv.ParseUint(idStr, 10, 64) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + region, err := handler.GetRegion(id) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + if region == nil { + c.String(http.StatusNotFound, errs.ErrRegionNotFound.FastGenByArgs().Error()) + return + } + l, err := handler.GetRegionLabeler() + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + labels := l.GetRegionLabels(region) + c.IndentedJSON(http.StatusOK, labels) +} + +// @Tags region_label +// @Summary List all label rules of cluster. +// @Produce json +// @Success 200 {array} labeler.LabelRule +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /config/region-label/rules [get] +func getAllRegionLabelRules(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + l, err := handler.GetRegionLabeler() + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + rules := l.GetAllLabelRules() + c.IndentedJSON(http.StatusOK, rules) +} + +// @Tags region_label +// @Summary Get label rules of cluster by ids. +// @Param body body []string true "IDs of query rules" +// @Produce json +// @Success 200 {array} labeler.LabelRule +// @Failure 400 {string} string "The input is invalid." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /config/region-label/rules/ids [get] +func getRegionLabelRulesByIDs(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + l, err := handler.GetRegionLabeler() + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + var ids []string + if err := c.BindJSON(&ids); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + rules, err := l.GetLabelRules(ids) + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + c.IndentedJSON(http.StatusOK, rules) +} + +// @Tags region_label +// @Summary Get label rule of cluster by id. +// @Param id path string true "Rule Id" +// @Produce json +// @Success 200 {object} labeler.LabelRule +// @Failure 404 {string} string "The rule does not exist." +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /config/region-label/rules/{id} [get] +func getRegionLabelRuleByID(c *gin.Context) { + handler := c.MustGet(handlerKey).(*handler.Handler) + + id, err := url.PathUnescape(c.Param("id")) + if err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + l, err := handler.GetRegionLabeler() + if err != nil { + c.String(http.StatusInternalServerError, err.Error()) + return + } + rule := l.GetLabelRule(id) + if rule == nil { + c.String(http.StatusNotFound, errs.ErrRegionRuleNotFound.FastGenByArgs().Error()) + return + } + c.IndentedJSON(http.StatusOK, rule) +} diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go index 3f9f4f966223..8da84647f770 100644 --- a/pkg/schedule/handler/handler.go +++ b/pkg/schedule/handler/handler.go @@ -31,6 +31,7 @@ import ( "github.com/tikv/pd/pkg/schedule" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/filter" + "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/schedule/scatter" @@ -1063,6 +1064,24 @@ func (h *Handler) GetHotBuckets(regionIDs ...uint64) (HotBucketsResponse, error) return ret, nil } +// GetRegion returns the region labeler. +func (h *Handler) GetRegion(id uint64) (*core.RegionInfo, error) { + c := h.GetCluster() + if c == nil { + return nil, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + return c.GetRegion(id), nil +} + +// GetRegionLabeler returns the region labeler. +func (h *Handler) GetRegionLabeler() (*labeler.RegionLabeler, error) { + c := h.GetCluster() + if c == nil || c.GetRegionLabeler() == nil { + return nil, errs.ErrNotBootstrapped + } + return c.GetRegionLabeler(), nil +} + // GetRuleManager returns the rule manager. func (h *Handler) GetRuleManager() (*placement.RuleManager, error) { c := h.GetCluster() diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index 2bb742ccbba6..e26327cb3ffb 100644 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -79,6 +79,7 @@ type microserviceRedirectRule struct { targetPath string targetServiceName string matchMethods []string + filter func(*http.Request) bool } // NewRedirector redirects request to the leader if needs to be handled in the leader. @@ -94,14 +95,19 @@ func NewRedirector(s *server.Server, opts ...RedirectorOption) negroni.Handler { type RedirectorOption func(*redirector) // MicroserviceRedirectRule new a microservice redirect rule option -func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string, methods []string) RedirectorOption { +func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string, + methods []string, filters ...func(*http.Request) bool) RedirectorOption { return func(s *redirector) { - s.microserviceRedirectRules = append(s.microserviceRedirectRules, µserviceRedirectRule{ - matchPath, - targetPath, - targetServiceName, - methods, - }) + rule := µserviceRedirectRule{ + matchPath: matchPath, + targetPath: targetPath, + targetServiceName: targetServiceName, + matchMethods: methods, + } + if len(filters) > 0 { + rule.filter = filters[0] + } + s.microserviceRedirectRules = append(s.microserviceRedirectRules, rule) } } @@ -117,18 +123,26 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri r.URL.Path = strings.TrimRight(r.URL.Path, "/") for _, rule := range h.microserviceRedirectRules { if strings.HasPrefix(r.URL.Path, rule.matchPath) && slice.Contains(rule.matchMethods, r.Method) { + if rule.filter != nil && !rule.filter(r) { + continue + } origin := r.URL.Path addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName) if !ok || addr == "" { log.Warn("failed to get the service primary addr when trying to match redirect rules", zap.String("path", r.URL.Path)) } + // If the URL contains escaped characters, use RawPath instead of Path + path := r.URL.Path + if r.URL.RawPath != "" { + path = r.URL.RawPath + } // Extract parameters from the URL path // e.g. r.URL.Path = /pd/api/v1/operators/1 (before redirect) // matchPath = /pd/api/v1/operators // targetPath = /scheduling/api/v1/operators // r.URL.Path = /scheduling/api/v1/operator/1 (after redirect) - pathParams := strings.TrimPrefix(r.URL.Path, rule.matchPath) + pathParams := strings.TrimPrefix(path, rule.matchPath) pathParams = strings.Trim(pathParams, "/") // Remove leading and trailing '/' if len(pathParams) > 0 { r.URL.Path = rule.targetPath + "/" + pathParams diff --git a/pkg/utils/testutil/api_check.go b/pkg/utils/testutil/api_check.go index ea91654b1496..58934bf08f6a 100644 --- a/pkg/utils/testutil/api_check.go +++ b/pkg/utils/testutil/api_check.go @@ -88,7 +88,7 @@ func ReadGetJSON(re *require.Assertions, client *http.Client, url string, data i } // ReadGetJSONWithBody is used to do get request with input and check whether given data can be extracted successfully. -func ReadGetJSONWithBody(re *require.Assertions, client *http.Client, url string, input []byte, data interface{}) error { +func ReadGetJSONWithBody(re *require.Assertions, client *http.Client, url string, input []byte, data interface{}, checkOpts ...func([]byte, int, http.Header)) error { resp, err := apiutil.GetJSON(client, url, input) if err != nil { return err diff --git a/server/api/region_label.go b/server/api/region_label.go index 003dfb1132f4..7958bacd3711 100644 --- a/server/api/region_label.go +++ b/server/api/region_label.go @@ -83,7 +83,7 @@ func (h *regionLabelHandler) PatchRegionLabelRules(w http.ResponseWriter, r *htt // @Success 200 {array} labeler.LabelRule // @Failure 400 {string} string "The input is invalid." // @Failure 500 {string} string "PD server failed to proceed the request." -// @Router /config/region-label/rule/ids [get] +// @Router /config/region-label/rules/ids [get] func (h *regionLabelHandler) GetRegionLabelRulesByIDs(w http.ResponseWriter, r *http.Request) { cluster := getCluster(r) var ids []string diff --git a/server/api/server.go b/server/api/server.go index 77a51eb04e54..2c015bec7ac2 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -17,6 +17,7 @@ package api import ( "context" "net/http" + "strings" "github.com/gorilla/mux" scheapi "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1" @@ -79,6 +80,26 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP scheapi.APIPathPrefix+"/checkers", mcs.SchedulingServiceName, []string{http.MethodPost, http.MethodGet}), + serverapi.MicroserviceRedirectRule( + prefix+"/region/id", + scheapi.APIPathPrefix+"/config/regions", + mcs.SchedulingServiceName, + []string{http.MethodGet}, + func(r *http.Request) bool { + // The original code uses the path "/region/id" to get the region id. + // However, the path "/region/id" is used to get the region by id, which is not what we want. + return strings.Contains(r.URL.Path, "label") + }), + serverapi.MicroserviceRedirectRule( + prefix+"/config/region-label/rules", + scheapi.APIPathPrefix+"/config/region-label/rules", + mcs.SchedulingServiceName, + []string{http.MethodGet}), + serverapi.MicroserviceRedirectRule( + prefix+"/config/region-label/rule/", // Note: this is a typo in the original code + scheapi.APIPathPrefix+"/config/region-label/rules", + mcs.SchedulingServiceName, + []string{http.MethodGet}), serverapi.MicroserviceRedirectRule( prefix+"/hotspot", scheapi.APIPathPrefix+"/hotspot", diff --git a/tests/integrations/mcs/scheduling/api_test.go b/tests/integrations/mcs/scheduling/api_test.go index f6a7f66a66fa..b07568580d7b 100644 --- a/tests/integrations/mcs/scheduling/api_test.go +++ b/tests/integrations/mcs/scheduling/api_test.go @@ -15,6 +15,7 @@ import ( _ "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1" "github.com/tikv/pd/pkg/mcs/scheduling/server/config" "github.com/tikv/pd/pkg/schedule/handler" + "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/storage" @@ -236,6 +237,28 @@ func (suite *apiTestSuite) TestAPIForward() { testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) re.NoError(err) + // Test region label + var labelRules []*labeler.LabelRule + err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/region-label/rules"), &labelRules, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + err = testutil.ReadGetJSONWithBody(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/region-label/rules/ids"), []byte(`["rule1", "rule3"]`), + &labelRules, testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/region-label/rule/rule1"), nil, + testutil.StatusNotOK(re), testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + + err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "region/id/1"), nil, + testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader)) + re.NoError(err) + err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "region/id/1/label/key"), nil, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "region/id/1/labels"), nil, + testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true")) + re.NoError(err) + // Test rules: only forward `GET` request var rules []*placement.Rule tests.MustPutRegion(re, suite.cluster, 2, 1, []byte("a"), []byte("b"), core.SetApproximateSize(60)) diff --git a/tests/server/api/rule_test.go b/tests/server/api/rule_test.go index 861fbe5cf321..9176a00e66d3 100644 --- a/tests/server/api/rule_test.go +++ b/tests/server/api/rule_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/placement" tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/config" @@ -1008,8 +1009,7 @@ func (suite *regionRuleTestSuite) TestRegionPlacementRule() { }, } env := tests.NewSchedulingTestEnvironment(suite.T(), opts...) - // FIXME: enable this test in two modes after we support region label forward. - env.RunTestInPDMode(suite.checkRegionPlacementRule) + env.RunTestInTwoModes(suite.checkRegionPlacementRule) } func (suite *regionRuleTestSuite) checkRegionPlacementRule(cluster *tests.TestCluster) { @@ -1076,37 +1076,80 @@ func (suite *regionRuleTestSuite) checkRegionPlacementRule(cluster *tests.TestCl }) fit := &placement.RegionFit{} - url := fmt.Sprintf("%s/config/rules/region/%d/detail", urlPrefix, 1) - err := tu.ReadGetJSON(re, testDialClient, url, fit) + u := fmt.Sprintf("%s/config/rules/region/%d/detail", urlPrefix, 1) + err := tu.ReadGetJSON(re, testDialClient, u, fit) suite.NoError(err) suite.Equal(len(fit.RuleFits), 1) suite.Equal(len(fit.OrphanPeers), 1) - url = fmt.Sprintf("%s/config/rules/region/%d/detail", urlPrefix, 2) + u = fmt.Sprintf("%s/config/rules/region/%d/detail", urlPrefix, 2) fit = &placement.RegionFit{} - err = tu.ReadGetJSON(re, testDialClient, url, fit) + err = tu.ReadGetJSON(re, testDialClient, u, fit) suite.NoError(err) suite.Equal(len(fit.RuleFits), 2) suite.Equal(len(fit.OrphanPeers), 0) - url = fmt.Sprintf("%s/config/rules/region/%d/detail", urlPrefix, 3) + u = fmt.Sprintf("%s/config/rules/region/%d/detail", urlPrefix, 3) fit = &placement.RegionFit{} - err = tu.ReadGetJSON(re, testDialClient, url, fit) + err = tu.ReadGetJSON(re, testDialClient, u, fit) suite.NoError(err) suite.Equal(len(fit.RuleFits), 0) suite.Equal(len(fit.OrphanPeers), 2) - url = fmt.Sprintf("%s/config/rules/region/%d/detail", urlPrefix, 4) - err = tu.CheckGetJSON(testDialClient, url, nil, tu.Status(re, http.StatusNotFound), tu.StringContain( + var label labeler.LabelRule + escapedID := url.PathEscape("keyspaces/0") + u = fmt.Sprintf("%s/config/region-label/rule/%s", urlPrefix, escapedID) + fmt.Println("u====", u) + err = tu.ReadGetJSON(re, testDialClient, u, &label) + suite.NoError(err) + suite.Equal(label.ID, "keyspaces/0") + + var labels []labeler.LabelRule + u = fmt.Sprintf("%s/config/region-label/rules", urlPrefix) + err = tu.ReadGetJSON(re, testDialClient, u, &labels) + suite.NoError(err) + suite.Len(labels, 1) + suite.Equal(labels[0].ID, "keyspaces/0") + + u = fmt.Sprintf("%s/config/region-label/rules/ids", urlPrefix) + err = tu.CheckGetJSON(testDialClient, u, []byte(`["rule1", "rule3"]`), func(resp []byte, statusCode int, _ http.Header) { + err := json.Unmarshal(resp, &labels) + suite.NoError(err) + suite.Len(labels, 0) + }) + suite.NoError(err) + + err = tu.CheckGetJSON(testDialClient, u, []byte(`["keyspaces/0"]`), func(resp []byte, statusCode int, _ http.Header) { + err := json.Unmarshal(resp, &labels) + suite.NoError(err) + suite.Len(labels, 1) + suite.Equal(labels[0].ID, "keyspaces/0") + }) + suite.NoError(err) + + u = fmt.Sprintf("%s/config/rules/region/%d/detail", urlPrefix, 4) + err = tu.CheckGetJSON(testDialClient, u, nil, tu.Status(re, http.StatusNotFound), tu.StringContain( re, "region 4 not found")) suite.NoError(err) - url = fmt.Sprintf("%s/config/rules/region/%s/detail", urlPrefix, "id") - err = tu.CheckGetJSON(testDialClient, url, nil, tu.Status(re, http.StatusBadRequest), tu.StringContain( + u = fmt.Sprintf("%s/config/rules/region/%s/detail", urlPrefix, "id") + err = tu.CheckGetJSON(testDialClient, u, nil, tu.Status(re, http.StatusBadRequest), tu.StringContain( re, errs.ErrRegionInvalidID.Error())) suite.NoError(err) - leaderServer.GetRaftCluster().GetReplicationConfig().EnablePlacementRules = false - url = fmt.Sprintf("%s/config/rules/region/%d/detail", urlPrefix, 1) - err = tu.CheckGetJSON(testDialClient, url, nil, tu.Status(re, http.StatusPreconditionFailed), tu.StringContain( + data := make(map[string]interface{}) + data["enable-placement-rules"] = "false" + reqData, e := json.Marshal(data) + re.NoError(e) + u = fmt.Sprintf("%s/config", urlPrefix) + err = tu.CheckPostJSON(testDialClient, u, reqData, tu.StatusOK(re)) + re.NoError(err) + if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { + // wait for the scheduler server to update the config + tu.Eventually(re, func() bool { + return !sche.GetCluster().GetCheckerConfig().IsPlacementRulesEnabled() + }) + } + u = fmt.Sprintf("%s/config/rules/region/%d/detail", urlPrefix, 1) + err = tu.CheckGetJSON(testDialClient, u, nil, tu.Status(re, http.StatusPreconditionFailed), tu.StringContain( re, "placement rules feature is disabled")) suite.NoError(err) } From 5a4e9efd846e0ee0d15ae4f9f91aab81f4382da3 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 20 Nov 2023 17:46:10 +0800 Subject: [PATCH 2/3] mcs: fix scheduler memory sync in api server (#7389) close tikv/pd#7388 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/mcs/scheduling/server/config/watcher.go | 3 +- pkg/schedule/schedulers/base_scheduler.go | 8 +- pkg/schedule/schedulers/evict_leader.go | 4 +- pkg/schedule/schedulers/evict_slow_store.go | 4 +- .../schedulers/evict_slow_store_test.go | 4 +- pkg/schedule/schedulers/evict_slow_trend.go | 4 +- .../schedulers/evict_slow_trend_test.go | 4 +- pkg/schedule/schedulers/grant_leader.go | 4 +- pkg/schedule/schedulers/scheduler.go | 4 +- .../schedulers/scheduler_controller.go | 8 +- plugin/scheduler_example/evict_leader.go | 4 +- tests/pdctl/scheduler/scheduler_test.go | 126 +++++++++++++----- 12 files changed, 121 insertions(+), 56 deletions(-) diff --git a/pkg/mcs/scheduling/server/config/watcher.go b/pkg/mcs/scheduling/server/config/watcher.go index 6ad370450007..433933674ea4 100644 --- a/pkg/mcs/scheduling/server/config/watcher.go +++ b/pkg/mcs/scheduling/server/config/watcher.go @@ -147,7 +147,8 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error { prefixToTrim := cw.schedulerConfigPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { name := strings.TrimPrefix(string(kv.Key), prefixToTrim) - log.Info("update scheduler config", zap.String("name", string(kv.Value))) + log.Info("update scheduler config", zap.String("name", name), + zap.String("value", string(kv.Value))) err := cw.storage.SaveSchedulerConfig(name, kv.Value) if err != nil { log.Warn("failed to save scheduler config", diff --git a/pkg/schedule/schedulers/base_scheduler.go b/pkg/schedule/schedulers/base_scheduler.go index 6e712c18fe35..f4c8c5777670 100644 --- a/pkg/schedule/schedulers/base_scheduler.go +++ b/pkg/schedule/schedulers/base_scheduler.go @@ -92,8 +92,8 @@ func (s *BaseScheduler) GetNextInterval(interval time.Duration) time.Duration { return intervalGrow(interval, MaxScheduleInterval, exponentialGrowth) } -// Prepare does some prepare work -func (s *BaseScheduler) Prepare(cluster sche.SchedulerCluster) error { return nil } +// PrepareConfig does some prepare work about config. +func (s *BaseScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { return nil } -// Cleanup does some cleanup work -func (s *BaseScheduler) Cleanup(cluster sche.SchedulerCluster) {} +// CleanConfig does some cleanup work about config. +func (s *BaseScheduler) CleanConfig(cluster sche.SchedulerCluster) {} diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index a5c67856df8f..332002043a32 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -239,7 +239,7 @@ func pauseAndResumeLeaderTransfer(cluster *core.BasicCluster, old, new map[uint6 } } -func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error { +func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { s.conf.mu.RLock() defer s.conf.mu.RUnlock() var res error @@ -251,7 +251,7 @@ func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error { return res } -func (s *evictLeaderScheduler) Cleanup(cluster sche.SchedulerCluster) { +func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { s.conf.mu.RLock() defer s.conf.mu.RUnlock() for id := range s.conf.StoreIDWithRanges { diff --git a/pkg/schedule/schedulers/evict_slow_store.go b/pkg/schedule/schedulers/evict_slow_store.go index cc1b16300c5d..563f9f68c459 100644 --- a/pkg/schedule/schedulers/evict_slow_store.go +++ b/pkg/schedule/schedulers/evict_slow_store.go @@ -189,7 +189,7 @@ func (s *evictSlowStoreScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } -func (s *evictSlowStoreScheduler) Prepare(cluster sche.SchedulerCluster) error { +func (s *evictSlowStoreScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { evictStore := s.conf.evictStore() if evictStore != 0 { return cluster.SlowStoreEvicted(evictStore) @@ -197,7 +197,7 @@ func (s *evictSlowStoreScheduler) Prepare(cluster sche.SchedulerCluster) error { return nil } -func (s *evictSlowStoreScheduler) Cleanup(cluster sche.SchedulerCluster) { +func (s *evictSlowStoreScheduler) CleanConfig(cluster sche.SchedulerCluster) { s.cleanupEvictLeader(cluster) } diff --git a/pkg/schedule/schedulers/evict_slow_store_test.go b/pkg/schedule/schedulers/evict_slow_store_test.go index 813d17ae5414..11cd69e60f75 100644 --- a/pkg/schedule/schedulers/evict_slow_store_test.go +++ b/pkg/schedule/schedulers/evict_slow_store_test.go @@ -123,13 +123,13 @@ func (suite *evictSlowStoreTestSuite) TestEvictSlowStorePrepare() { suite.True(ok) suite.Zero(es2.conf.evictStore()) // prepare with no evict store. - suite.es.Prepare(suite.tc) + suite.es.PrepareConfig(suite.tc) es2.conf.setStoreAndPersist(1) suite.Equal(uint64(1), es2.conf.evictStore()) suite.False(es2.conf.readyForRecovery()) // prepare with evict store. - suite.es.Prepare(suite.tc) + suite.es.PrepareConfig(suite.tc) } func (suite *evictSlowStoreTestSuite) TestEvictSlowStorePersistFail() { diff --git a/pkg/schedule/schedulers/evict_slow_trend.go b/pkg/schedule/schedulers/evict_slow_trend.go index f31ba420c97f..0d2c10e2bfe3 100644 --- a/pkg/schedule/schedulers/evict_slow_trend.go +++ b/pkg/schedule/schedulers/evict_slow_trend.go @@ -270,7 +270,7 @@ func (s *evictSlowTrendScheduler) EncodeConfig() ([]byte, error) { return EncodeConfig(s.conf) } -func (s *evictSlowTrendScheduler) Prepare(cluster sche.SchedulerCluster) error { +func (s *evictSlowTrendScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { evictedStoreID := s.conf.evictedStore() if evictedStoreID == 0 { return nil @@ -278,7 +278,7 @@ func (s *evictSlowTrendScheduler) Prepare(cluster sche.SchedulerCluster) error { return cluster.SlowTrendEvicted(evictedStoreID) } -func (s *evictSlowTrendScheduler) Cleanup(cluster sche.SchedulerCluster) { +func (s *evictSlowTrendScheduler) CleanConfig(cluster sche.SchedulerCluster) { s.cleanupEvictLeader(cluster) } diff --git a/pkg/schedule/schedulers/evict_slow_trend_test.go b/pkg/schedule/schedulers/evict_slow_trend_test.go index c6ad058455fe..75ea50d73b45 100644 --- a/pkg/schedule/schedulers/evict_slow_trend_test.go +++ b/pkg/schedule/schedulers/evict_slow_trend_test.go @@ -255,10 +255,10 @@ func (suite *evictSlowTrendTestSuite) TestEvictSlowTrendPrepare() { suite.True(ok) suite.Zero(es2.conf.evictedStore()) // prepare with no evict store. - suite.es.Prepare(suite.tc) + suite.es.PrepareConfig(suite.tc) es2.conf.setStoreAndPersist(1) suite.Equal(uint64(1), es2.conf.evictedStore()) // prepare with evict store. - suite.es.Prepare(suite.tc) + suite.es.PrepareConfig(suite.tc) } diff --git a/pkg/schedule/schedulers/grant_leader.go b/pkg/schedule/schedulers/grant_leader.go index f244228a10f0..84f830f368b8 100644 --- a/pkg/schedule/schedulers/grant_leader.go +++ b/pkg/schedule/schedulers/grant_leader.go @@ -197,7 +197,7 @@ func (s *grantLeaderScheduler) ReloadConfig() error { return nil } -func (s *grantLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error { +func (s *grantLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { s.conf.mu.RLock() defer s.conf.mu.RUnlock() var res error @@ -209,7 +209,7 @@ func (s *grantLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error { return res } -func (s *grantLeaderScheduler) Cleanup(cluster sche.SchedulerCluster) { +func (s *grantLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { s.conf.mu.RLock() defer s.conf.mu.RUnlock() for id := range s.conf.StoreIDWithRanges { diff --git a/pkg/schedule/schedulers/scheduler.go b/pkg/schedule/schedulers/scheduler.go index 9262f7d0a65c..1c7889894541 100644 --- a/pkg/schedule/schedulers/scheduler.go +++ b/pkg/schedule/schedulers/scheduler.go @@ -42,8 +42,8 @@ type Scheduler interface { ReloadConfig() error GetMinInterval() time.Duration GetNextInterval(interval time.Duration) time.Duration - Prepare(cluster sche.SchedulerCluster) error - Cleanup(cluster sche.SchedulerCluster) + PrepareConfig(cluster sche.SchedulerCluster) error + CleanConfig(cluster sche.SchedulerCluster) Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan) IsScheduleAllowed(cluster sche.SchedulerCluster) bool } diff --git a/pkg/schedule/schedulers/scheduler_controller.go b/pkg/schedule/schedulers/scheduler_controller.go index 5097a5f3f1c6..b65173c1f5b2 100644 --- a/pkg/schedule/schedulers/scheduler_controller.go +++ b/pkg/schedule/schedulers/scheduler_controller.go @@ -156,7 +156,8 @@ func (c *Controller) AddSchedulerHandler(scheduler Scheduler, args ...string) er return err } c.cluster.GetSchedulerConfig().AddSchedulerCfg(scheduler.GetType(), args) - return nil + err := scheduler.PrepareConfig(c.cluster) + return err } // RemoveSchedulerHandler removes the HTTP handler for a scheduler. @@ -183,6 +184,7 @@ func (c *Controller) RemoveSchedulerHandler(name string) error { return err } + s.(Scheduler).CleanConfig(c.cluster) delete(c.schedulerHandlers, name) return nil @@ -198,7 +200,7 @@ func (c *Controller) AddScheduler(scheduler Scheduler, args ...string) error { } s := NewScheduleController(c.ctx, c.cluster, c.opController, scheduler) - if err := s.Scheduler.Prepare(c.cluster); err != nil { + if err := s.Scheduler.PrepareConfig(c.cluster); err != nil { return err } @@ -343,7 +345,7 @@ func (c *Controller) IsSchedulerExisted(name string) (bool, error) { func (c *Controller) runScheduler(s *ScheduleController) { defer logutil.LogPanic() defer c.wg.Done() - defer s.Scheduler.Cleanup(c.cluster) + defer s.Scheduler.CleanConfig(c.cluster) ticker := time.NewTicker(s.GetInterval()) defer ticker.Stop() diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index 8919d1bdb4b1..063ae9eb150a 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -186,7 +186,7 @@ func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) { return schedulers.EncodeConfig(s.conf) } -func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error { +func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { s.conf.mu.RLock() defer s.conf.mu.RUnlock() var res error @@ -198,7 +198,7 @@ func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error { return res } -func (s *evictLeaderScheduler) Cleanup(cluster sche.SchedulerCluster) { +func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) { s.conf.mu.RLock() defer s.conf.mu.RUnlock() for id := range s.conf.StoreIDWitRanges { diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index d0fac2c11372..7098637c84a8 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -17,6 +17,7 @@ package scheduler_test import ( "context" "encoding/json" + "fmt" "reflect" "strings" "testing" @@ -28,6 +29,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/versioninfo" "github.com/tikv/pd/tests" @@ -84,7 +86,8 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { checkSchedulerCommand := func(args []string, expected map[string]bool) { if args != nil { - mustExec(re, cmd, args, nil) + echo := mustExec(re, cmd, args, nil) + re.Contains(echo, "Success!") } testutil.Eventually(re, func() bool { var schedulers []string @@ -137,9 +140,40 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { } checkSchedulerCommand(args, expected) - schedulers := []string{"evict-leader-scheduler", "grant-leader-scheduler"} + // avoid the influence of the scheduler order + schedulers := []string{"evict-leader-scheduler", "grant-leader-scheduler", "evict-leader-scheduler", "grant-leader-scheduler"} + + checkStorePause := func(changedStores []uint64, schedulerName string) { + status := func() string { + switch schedulerName { + case "evict-leader-scheduler": + return "paused" + case "grant-leader-scheduler": + return "resumed" + default: + re.Fail(fmt.Sprintf("unknown scheduler %s", schedulerName)) + return "" + } + }() + for _, store := range stores { + isStorePaused := !cluster.GetLeaderServer().GetRaftCluster().GetStore(store.GetId()).AllowLeaderTransfer() + if slice.AnyOf(changedStores, func(i int) bool { + return store.GetId() == changedStores[i] + }) { + re.True(isStorePaused, + fmt.Sprintf("store %d should be %s with %s", store.GetId(), status, schedulerName)) + } else { + re.False(isStorePaused, + fmt.Sprintf("store %d should not be %s with %s", store.GetId(), status, schedulerName)) + } + if sche := cluster.GetSchedulingPrimaryServer(); sche != nil { + re.Equal(isStorePaused, !sche.GetCluster().GetStore(store.GetId()).AllowLeaderTransfer()) + } + } + } for idx := range schedulers { + checkStorePause([]uint64{}, schedulers[idx]) // scheduler add command args = []string{"-u", pdAddr, "scheduler", "add", schedulers[idx], "2"} expected = map[string]bool{ @@ -155,6 +189,7 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { expectedConfig := make(map[string]interface{}) expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}} checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) + checkStorePause([]uint64{2}, schedulers[idx]) // scheduler config update command args = []string{"-u", pdAddr, "scheduler", "config", schedulers[idx], "add-store", "3"} @@ -165,14 +200,12 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { "transfer-witness-leader-scheduler": true, "balance-witness-scheduler": true, } - checkSchedulerCommand(args, expected) // check update success - // FIXME: remove this check after scheduler config is updated - if cluster.GetSchedulingPrimaryServer() == nil && schedulers[idx] == "grant-leader-scheduler" { - expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}, "3": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}} - checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) - } + checkSchedulerCommand(args, expected) + expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}, "3": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}} + checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) + checkStorePause([]uint64{2, 3}, schedulers[idx]) // scheduler delete command args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx]} @@ -183,6 +216,7 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { "balance-witness-scheduler": true, } checkSchedulerCommand(args, expected) + checkStorePause([]uint64{}, schedulers[idx]) // scheduler add command args = []string{"-u", pdAddr, "scheduler", "add", schedulers[idx], "2"} @@ -194,6 +228,7 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { "balance-witness-scheduler": true, } checkSchedulerCommand(args, expected) + checkStorePause([]uint64{2}, schedulers[idx]) // scheduler add command twice args = []string{"-u", pdAddr, "scheduler", "add", schedulers[idx], "4"} @@ -209,6 +244,7 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { // check add success expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}, "4": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}} checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) + checkStorePause([]uint64{2, 4}, schedulers[idx]) // scheduler remove command [old] args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx] + "-4"} @@ -224,6 +260,7 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { // check remove success expectedConfig["store-id-ranges"] = map[string]interface{}{"2": []interface{}{map[string]interface{}{"end-key": "", "start-key": ""}}} checkSchedulerConfigCommand(expectedConfig, schedulers[idx]) + checkStorePause([]uint64{2}, schedulers[idx]) // scheduler remove command, when remove the last store, it should remove whole scheduler args = []string{"-u", pdAddr, "scheduler", "remove", schedulers[idx] + "-2"} @@ -234,6 +271,7 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { "balance-witness-scheduler": true, } checkSchedulerCommand(args, expected) + checkStorePause([]uint64{}, schedulers[idx]) } // test shuffle region config @@ -247,7 +285,8 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { var roles []string mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "show-roles"}, &roles) re.Equal([]string{"leader", "follower", "learner"}, roles) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "set-roles", "learner"}, nil) + echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "set-roles", "learner"}, nil) // todo:add check output + re.Contains(echo, "Success!") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler", "show-roles"}, &roles) re.Equal([]string{"learner"}, roles) mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler"}, &roles) @@ -270,7 +309,8 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler"}, &conf3) re.Equal(expected3, conf3) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler", "set", "2", "1,2,3"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "grant-hot-region-scheduler", "set", "2", "1,2,3"}, nil) + re.Contains(echo, "Success!") expected3["store-leader-id"] = float64(2) // FIXME: remove this check after scheduler config is updated if cluster.GetSchedulingPrimaryServer() == nil { // "grant-hot-region-scheduler" @@ -279,7 +319,7 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { } // test remove and add scheduler - echo := mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-region-scheduler"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-region-scheduler"}, nil) re.Contains(echo, "Success!") echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) re.Contains(echo, "Success!") @@ -326,7 +366,8 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { re.Equal(expected1, conf) mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "show"}, &conf) re.Equal(expected1, conf) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "src-tolerance-ratio", "1.02"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "src-tolerance-ratio", "1.02"}, nil) + re.Contains(echo, "Success!") expected1["src-tolerance-ratio"] = 1.02 var conf1 map[string]interface{} // FIXME: remove this check after scheduler config is updated @@ -334,52 +375,66 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,key"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,key"}, nil) + re.Contains(echo, "Success!") expected1["read-priorities"] = []interface{}{"byte", "key"} mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key"}, nil) + re.Contains(echo, "Failed!") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,byte"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,byte"}, nil) + re.Contains(echo, "Success!") expected1["read-priorities"] = []interface{}{"key", "byte"} mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "foo,bar"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "foo,bar"}, nil) + re.Contains(echo, "Failed!") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", ""}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", ""}, nil) + re.Contains(echo, "Failed!") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key"}, nil) + re.Contains(echo, "Failed!") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,byte"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "byte,byte"}, nil) + re.Contains(echo, "Failed!") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key,byte"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "read-priorities", "key,key,byte"}, nil) + re.Contains(echo, "Failed!") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) // write-priorities is divided into write-leader-priorities and write-peer-priorities - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-priorities", "key,byte"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-priorities", "key,byte"}, nil) + re.Contains(echo, "Failed!") + re.Contains(echo, "Config item is not found.") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v0"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v0"}, nil) + re.Contains(echo, "Failed!") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) expected1["rank-formula-version"] = "v2" - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v2"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v2"}, nil) + re.Contains(echo, "Success!") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) expected1["rank-formula-version"] = "v1" - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v1"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "rank-formula-version", "v1"}, nil) + re.Contains(echo, "Success!") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) expected1["forbid-rw-type"] = "read" - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "forbid-rw-type", "read"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "forbid-rw-type", "read"}, nil) + re.Contains(echo, "Success!") mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1) re.Equal(expected1, conf1) @@ -412,7 +467,8 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { conf1 = make(map[string]interface{}) mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler", "show"}, &conf) re.Equal(4., conf["batch"]) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler", "set", "batch", "3"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler", "set", "batch", "3"}, nil) + re.Contains(echo, "Success!") testutil.Eventually(re, func() bool { mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "balance-leader-scheduler"}, &conf1) return conf1["batch"] == 3. @@ -465,7 +521,8 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { } mustUsage([]string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler"}) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) + re.Contains(echo, "Success!") checkSchedulerWithStatusCommand("paused", []string{ "balance-leader-scheduler", }) @@ -476,7 +533,8 @@ func (suite *schedulerTestSuite) checkScheduler(cluster *tests.TestCluster) { }, testutil.WithWaitFor(30*time.Second)) mustUsage([]string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler", "60"}) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) + re.Contains(echo, "Success!") checkSchedulerWithStatusCommand("paused", nil) // set label scheduler to disabled manually. @@ -547,11 +605,14 @@ func (suite *schedulerTestSuite) checkSchedulerDiagnostic(cluster *tests.TestClu checkSchedulerDescribeCommand("balance-region-scheduler", "pending", "1 store(s) RegionNotMatchRule; ") // scheduler delete command - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) + re.Contains(echo, "Success!") checkSchedulerDescribeCommand("balance-region-scheduler", "disabled", "") - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) - mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) + re.Contains(echo, "Success!") + echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil) + re.Contains(echo, "Success!") checkSchedulerDescribeCommand("balance-leader-scheduler", "normal", "") } @@ -604,7 +665,8 @@ func TestForwardSchedulerRequest(t *testing.T) { re.Contains(string(output), "Usage") } mustUsage([]string{"-u", backendEndpoints, "scheduler", "pause", "balance-leader-scheduler"}) - mustExec(re, cmd, []string{"-u", backendEndpoints, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) + echo := mustExec(re, cmd, []string{"-u", backendEndpoints, "scheduler", "pause", "balance-leader-scheduler", "60"}, nil) + re.Contains(echo, "Success!") checkSchedulerWithStatusCommand := func(status string, expected []string) { var schedulers []string mustExec(re, cmd, []string{"-u", backendEndpoints, "scheduler", "show", "--status", status}, &schedulers) From 89c83748a253df8effd6d8b77c5a8a5bec4e0f74 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 20 Nov 2023 18:01:12 +0800 Subject: [PATCH 3/3] mcs: fix sync store label (#7396) close tikv/pd#7391, close tikv/pd#7393, close tikv/pd#7394 Signed-off-by: Ryan Leung --- pkg/core/store_option.go | 20 +++++++++++++++++++ pkg/mcs/scheduling/server/cluster.go | 3 +-- pkg/mcs/scheduling/server/meta/watcher.go | 2 +- .../integrations/mcs/scheduling/meta_test.go | 11 ++++++++++ .../mcs/scheduling/server_test.go | 3 ++- tests/server/cluster/cluster_test.go | 1 + 6 files changed, 36 insertions(+), 4 deletions(-) diff --git a/pkg/core/store_option.go b/pkg/core/store_option.go index 8a2aa1ef089f..0bdfaffa44fa 100644 --- a/pkg/core/store_option.go +++ b/pkg/core/store_option.go @@ -274,3 +274,23 @@ func SetLastAwakenTime(lastAwaken time.Time) StoreCreateOption { store.lastAwakenTime = lastAwaken } } + +// SetStoreMeta sets the meta for the store. +func SetStoreMeta(newMeta *metapb.Store) StoreCreateOption { + return func(store *StoreInfo) { + meta := typeutil.DeepClone(store.meta, StoreFactory) + meta.Version = newMeta.GetVersion() + meta.GitHash = newMeta.GetGitHash() + meta.Address = newMeta.GetAddress() + meta.StatusAddress = newMeta.GetStatusAddress() + meta.PeerAddress = newMeta.GetPeerAddress() + meta.StartTimestamp = newMeta.GetStartTimestamp() + meta.DeployPath = newMeta.GetDeployPath() + meta.LastHeartbeat = newMeta.GetLastHeartbeat() + meta.State = newMeta.GetState() + meta.Labels = newMeta.GetLabels() + meta.NodeState = newMeta.GetNodeState() + meta.PhysicallyDestroyed = newMeta.GetPhysicallyDestroyed() + store.meta = meta + } +} diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index ac15212553bd..96ee88259da1 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -381,8 +381,7 @@ func (c *Cluster) HandleStoreHeartbeat(heartbeat *schedulingpb.StoreHeartbeatReq return errors.Errorf("store %v not found", storeID) } - nowTime := time.Now() - newStore := store.Clone(core.SetStoreStats(stats), core.SetLastHeartbeatTS(nowTime)) + newStore := store.Clone(core.SetStoreStats(stats)) if store := c.GetStore(storeID); store != nil { statistics.UpdateStoreHeartbeatMetrics(store) diff --git a/pkg/mcs/scheduling/server/meta/watcher.go b/pkg/mcs/scheduling/server/meta/watcher.go index 3dbd0fc8c92f..3a04c2611637 100644 --- a/pkg/mcs/scheduling/server/meta/watcher.go +++ b/pkg/mcs/scheduling/server/meta/watcher.go @@ -81,7 +81,7 @@ func (w *Watcher) initializeStoreWatcher() error { w.basicCluster.PutStore(core.NewStoreInfo(store)) return nil } - w.basicCluster.PutStore(origin.Clone(core.SetStoreState(store.GetState(), store.GetPhysicallyDestroyed()))) + w.basicCluster.PutStore(origin.Clone(core.SetStoreMeta(store))) return nil } deleteFn := func(kv *mvccpb.KeyValue) error { diff --git a/tests/integrations/mcs/scheduling/meta_test.go b/tests/integrations/mcs/scheduling/meta_test.go index 74497e0b5523..ce0dc620aeff 100644 --- a/tests/integrations/mcs/scheduling/meta_test.go +++ b/tests/integrations/mcs/scheduling/meta_test.go @@ -99,4 +99,15 @@ func (suite *metaTestSuite) TestStoreWatch() { testutil.Eventually(re, func() bool { return cluster.GetStore(2) == nil }) + + // test synchronized store labels + suite.pdLeaderServer.GetServer().GetRaftCluster().PutStore( + &metapb.Store{Id: 5, Address: "mock-5", State: metapb.StoreState_Up, NodeState: metapb.NodeState_Serving, LastHeartbeat: time.Now().UnixNano(), Labels: []*metapb.StoreLabel{{Key: "zone", Value: "z1"}}}, + ) + testutil.Eventually(re, func() bool { + if len(cluster.GetStore(5).GetLabels()) == 0 { + return false + } + return cluster.GetStore(5).GetLabels()[0].GetValue() == "z1" + }) } diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index 41c00b8e9b47..eb99411d27e5 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -59,7 +59,7 @@ func TestServerTestSuite(t *testing.T) { func (suite *serverTestSuite) SetupSuite() { var err error re := suite.Require() - + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) suite.ctx, suite.cancel = context.WithCancel(context.Background()) suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 3) re.NoError(err) @@ -76,6 +76,7 @@ func (suite *serverTestSuite) SetupSuite() { func (suite *serverTestSuite) TearDownSuite() { suite.cluster.Destroy() suite.cancel() + suite.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs")) } func (suite *serverTestSuite) TestAllocID() { diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index ccb469c04cb4..6d233a8c8ab7 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -510,6 +510,7 @@ func TestRaftClusterMultipleRestart(t *testing.T) { err = rc.PutStore(store) re.NoError(err) re.NotNil(tc) + rc.Stop() // let the job run at small interval re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`))