Skip to content

Commit

Permalink
mcs: support region label http interface in scheduling server (#7283)
Browse files Browse the repository at this point in the history
ref #5839

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
lhy1024 and ti-chi-bot[bot] authored Nov 20, 2023
1 parent 49b3251 commit 1e1817d
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 27 deletions.
167 changes: 165 additions & 2 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package apis
import (
"encoding/hex"
"net/http"
"net/url"
"strconv"
"sync"

Expand Down Expand Up @@ -191,12 +192,22 @@ func (s *Service) RegisterConfigRouter() {
placementRule := router.Group("placement-rule")
placementRule.GET("", getPlacementRules)
placementRule.GET("/:group", getPlacementRuleByGroup)

regionLabel := router.Group("region-label")
regionLabel.GET("/rules", getAllRegionLabelRules)
regionLabel.GET("/rules/ids", getRegionLabelRulesByIDs)
regionLabel.GET("/rules/:id", getRegionLabelRuleByID)

regions := router.Group("regions")
regions.GET("/:id/label/:key", getRegionLabelByKey)
regions.GET("/:id/labels", getRegionLabels)
}

// @Tags admin
// @Summary Change the log level.
// @Produce json
// @Success 200 {string} string "The log level is updated."
// @Failure 400 {string} string "The input is invalid."
// @Router /admin/log [put]
func changeLogLevel(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
Expand Down Expand Up @@ -230,6 +241,7 @@ func getConfig(c *gin.Context) {
// @Summary Drop all regions from cache.
// @Produce json
// @Success 200 {string} string "All regions are removed from server cache."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /admin/cache/regions [delete]
func deleteAllRegionCache(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
Expand All @@ -248,6 +260,7 @@ func deleteAllRegionCache(c *gin.Context) {
// @Produce json
// @Success 200 {string} string "The region is removed from server cache."
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /admin/cache/regions/{id} [delete]
func deleteRegionCacheByID(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
Expand Down Expand Up @@ -683,8 +696,6 @@ func getHotBuckets(c *gin.Context) {
// @Accept json
// @Produce json
// @Success 200 {object} storage.HistoryHotRegions
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /hotspot/regions/history [get]
func getHistoryHotRegions(c *gin.Context) {
// TODO: support history hotspot in scheduling server with stateless in the future.
Expand Down Expand Up @@ -955,3 +966,155 @@ func getPlacementRuleByGroup(c *gin.Context) {
group := manager.GetGroupBundle(g)
c.IndentedJSON(http.StatusOK, group)
}

// @Tags region_label
// @Summary Get label of a region.
// @Param id path integer true "Region Id"
// @Param key path string true "Label key"
// @Produce json
// @Success 200 {string} string
// @Failure 400 {string} string "The input is invalid."
// @Failure 404 {string} string "The region does not exist."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /config/regions/{id}/label/{key} [get]
func getRegionLabelByKey(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)

idStr := c.Param("id")
labelKey := c.Param("key") // TODO: test https://github.com/tikv/pd/pull/4004

id, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}

region, err := handler.GetRegion(id)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
if region == nil {
c.String(http.StatusNotFound, errs.ErrRegionNotFound.FastGenByArgs().Error())
return
}

l, err := handler.GetRegionLabeler()
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
labelValue := l.GetRegionLabel(region, labelKey)
c.IndentedJSON(http.StatusOK, labelValue)
}

// @Tags region_label
// @Summary Get labels of a region.
// @Param id path integer true "Region Id"
// @Produce json
// @Success 200 {string} string
// @Failure 400 {string} string "The input is invalid."
// @Failure 404 {string} string "The region does not exist."
// @Router /config/regions/{id}/labels [get]
func getRegionLabels(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)

idStr := c.Param("id")
id, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}

region, err := handler.GetRegion(id)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
if region == nil {
c.String(http.StatusNotFound, errs.ErrRegionNotFound.FastGenByArgs().Error())
return
}
l, err := handler.GetRegionLabeler()
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
labels := l.GetRegionLabels(region)
c.IndentedJSON(http.StatusOK, labels)
}

// @Tags region_label
// @Summary List all label rules of cluster.
// @Produce json
// @Success 200 {array} labeler.LabelRule
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /config/region-label/rules [get]
func getAllRegionLabelRules(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
l, err := handler.GetRegionLabeler()
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
rules := l.GetAllLabelRules()
c.IndentedJSON(http.StatusOK, rules)
}

// @Tags region_label
// @Summary Get label rules of cluster by ids.
// @Param body body []string true "IDs of query rules"
// @Produce json
// @Success 200 {array} labeler.LabelRule
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /config/region-label/rules/ids [get]
func getRegionLabelRulesByIDs(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
l, err := handler.GetRegionLabeler()
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
var ids []string
if err := c.BindJSON(&ids); err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
rules, err := l.GetLabelRules(ids)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, rules)
}

// @Tags region_label
// @Summary Get label rule of cluster by id.
// @Param id path string true "Rule Id"
// @Produce json
// @Success 200 {object} labeler.LabelRule
// @Failure 404 {string} string "The rule does not exist."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /config/region-label/rules/{id} [get]
func getRegionLabelRuleByID(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)

id, err := url.PathUnescape(c.Param("id"))
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}

l, err := handler.GetRegionLabeler()
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
rule := l.GetLabelRule(id)
if rule == nil {
c.String(http.StatusNotFound, errs.ErrRegionRuleNotFound.FastGenByArgs().Error())
return
}
c.IndentedJSON(http.StatusOK, rule)
}
19 changes: 19 additions & 0 deletions pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/tikv/pd/pkg/schedule"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/filter"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/scatter"
Expand Down Expand Up @@ -1063,6 +1064,24 @@ func (h *Handler) GetHotBuckets(regionIDs ...uint64) (HotBucketsResponse, error)
return ret, nil
}

// GetRegion returns the region labeler.
func (h *Handler) GetRegion(id uint64) (*core.RegionInfo, error) {
c := h.GetCluster()
if c == nil {
return nil, errs.ErrNotBootstrapped.GenWithStackByArgs()
}
return c.GetRegion(id), nil
}

// GetRegionLabeler returns the region labeler.
func (h *Handler) GetRegionLabeler() (*labeler.RegionLabeler, error) {
c := h.GetCluster()
if c == nil || c.GetRegionLabeler() == nil {
return nil, errs.ErrNotBootstrapped
}
return c.GetRegionLabeler(), nil
}

// GetRuleManager returns the rule manager.
func (h *Handler) GetRuleManager() (*placement.RuleManager, error) {
c := h.GetCluster()
Expand Down
30 changes: 22 additions & 8 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type microserviceRedirectRule struct {
targetPath string
targetServiceName string
matchMethods []string
filter func(*http.Request) bool
}

// NewRedirector redirects request to the leader if needs to be handled in the leader.
Expand All @@ -94,14 +95,19 @@ func NewRedirector(s *server.Server, opts ...RedirectorOption) negroni.Handler {
type RedirectorOption func(*redirector)

// MicroserviceRedirectRule new a microservice redirect rule option
func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string, methods []string) RedirectorOption {
func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string,
methods []string, filters ...func(*http.Request) bool) RedirectorOption {
return func(s *redirector) {
s.microserviceRedirectRules = append(s.microserviceRedirectRules, &microserviceRedirectRule{
matchPath,
targetPath,
targetServiceName,
methods,
})
rule := &microserviceRedirectRule{
matchPath: matchPath,
targetPath: targetPath,
targetServiceName: targetServiceName,
matchMethods: methods,
}
if len(filters) > 0 {
rule.filter = filters[0]
}
s.microserviceRedirectRules = append(s.microserviceRedirectRules, rule)
}
}

Expand All @@ -117,18 +123,26 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri
r.URL.Path = strings.TrimRight(r.URL.Path, "/")
for _, rule := range h.microserviceRedirectRules {
if strings.HasPrefix(r.URL.Path, rule.matchPath) && slice.Contains(rule.matchMethods, r.Method) {
if rule.filter != nil && !rule.filter(r) {
continue
}
origin := r.URL.Path
addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName)
if !ok || addr == "" {
log.Warn("failed to get the service primary addr when trying to match redirect rules",
zap.String("path", r.URL.Path))
}
// If the URL contains escaped characters, use RawPath instead of Path
path := r.URL.Path
if r.URL.RawPath != "" {
path = r.URL.RawPath
}
// Extract parameters from the URL path
// e.g. r.URL.Path = /pd/api/v1/operators/1 (before redirect)
// matchPath = /pd/api/v1/operators
// targetPath = /scheduling/api/v1/operators
// r.URL.Path = /scheduling/api/v1/operator/1 (after redirect)
pathParams := strings.TrimPrefix(r.URL.Path, rule.matchPath)
pathParams := strings.TrimPrefix(path, rule.matchPath)
pathParams = strings.Trim(pathParams, "/") // Remove leading and trailing '/'
if len(pathParams) > 0 {
r.URL.Path = rule.targetPath + "/" + pathParams
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/testutil/api_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func ReadGetJSON(re *require.Assertions, client *http.Client, url string, data i
}

// ReadGetJSONWithBody is used to do get request with input and check whether given data can be extracted successfully.
func ReadGetJSONWithBody(re *require.Assertions, client *http.Client, url string, input []byte, data interface{}) error {
func ReadGetJSONWithBody(re *require.Assertions, client *http.Client, url string, input []byte, data interface{}, checkOpts ...func([]byte, int, http.Header)) error {
resp, err := apiutil.GetJSON(client, url, input)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion server/api/region_label.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (h *regionLabelHandler) PatchRegionLabelRules(w http.ResponseWriter, r *htt
// @Success 200 {array} labeler.LabelRule
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /config/region-label/rule/ids [get]
// @Router /config/region-label/rules/ids [get]
func (h *regionLabelHandler) GetRegionLabelRulesByIDs(w http.ResponseWriter, r *http.Request) {
cluster := getCluster(r)
var ids []string
Expand Down
21 changes: 21 additions & 0 deletions server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package api
import (
"context"
"net/http"
"strings"

"github.com/gorilla/mux"
scheapi "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1"
Expand Down Expand Up @@ -79,6 +80,26 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
scheapi.APIPathPrefix+"/checkers",
mcs.SchedulingServiceName,
[]string{http.MethodPost, http.MethodGet}),
serverapi.MicroserviceRedirectRule(
prefix+"/region/id",
scheapi.APIPathPrefix+"/config/regions",
mcs.SchedulingServiceName,
[]string{http.MethodGet},
func(r *http.Request) bool {
// The original code uses the path "/region/id" to get the region id.
// However, the path "/region/id" is used to get the region by id, which is not what we want.
return strings.Contains(r.URL.Path, "label")
}),
serverapi.MicroserviceRedirectRule(
prefix+"/config/region-label/rules",
scheapi.APIPathPrefix+"/config/region-label/rules",
mcs.SchedulingServiceName,
[]string{http.MethodGet}),
serverapi.MicroserviceRedirectRule(
prefix+"/config/region-label/rule/", // Note: this is a typo in the original code
scheapi.APIPathPrefix+"/config/region-label/rules",
mcs.SchedulingServiceName,
[]string{http.MethodGet}),
serverapi.MicroserviceRedirectRule(
prefix+"/hotspot",
scheapi.APIPathPrefix+"/hotspot",
Expand Down
23 changes: 23 additions & 0 deletions tests/integrations/mcs/scheduling/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
_ "github.com/tikv/pd/pkg/mcs/scheduling/server/apis/v1"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
"github.com/tikv/pd/pkg/schedule/handler"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/storage"
Expand Down Expand Up @@ -236,6 +237,28 @@ func (suite *apiTestSuite) TestAPIForward() {
testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true"))
re.NoError(err)

// Test region label
var labelRules []*labeler.LabelRule
err = testutil.ReadGetJSON(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/region-label/rules"), &labelRules,
testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true"))
re.NoError(err)
err = testutil.ReadGetJSONWithBody(re, testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/region-label/rules/ids"), []byte(`["rule1", "rule3"]`),
&labelRules, testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true"))
re.NoError(err)
err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "config/region-label/rule/rule1"), nil,
testutil.StatusNotOK(re), testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true"))
re.NoError(err)

err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "region/id/1"), nil,
testutil.WithoutHeader(re, apiutil.ForwardToMicroServiceHeader))
re.NoError(err)
err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "region/id/1/label/key"), nil,
testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true"))
re.NoError(err)
err = testutil.CheckGetJSON(testDialClient, fmt.Sprintf("%s/%s", urlPrefix, "region/id/1/labels"), nil,
testutil.WithHeader(re, apiutil.ForwardToMicroServiceHeader, "true"))
re.NoError(err)

// Test rules: only forward `GET` request
var rules []*placement.Rule
tests.MustPutRegion(re, suite.cluster, 2, 1, []byte("a"), []byte("b"), core.SetApproximateSize(60))
Expand Down
Loading

0 comments on commit 1e1817d

Please sign in to comment.