Skip to content

Commit

Permalink
server: add gRPC rate limit (#6834)
Browse files Browse the repository at this point in the history
close #5739, ref #6556

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] authored Aug 24, 2023
1 parent 1743552 commit fbd386a
Show file tree
Hide file tree
Showing 10 changed files with 448 additions and 42 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,11 @@ error = '''
leader is nil
'''

["PD:server:ErrRateLimitExceeded"]
error = '''
rate limit exceeded
'''

["PD:server:ErrServerNotStarted"]
error = '''
server not started
Expand Down
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ var (
ErrCancelStartEtcd = errors.Normalize("etcd start canceled", errors.RFCCodeText("PD:server:ErrCancelStartEtcd"))
ErrConfigItem = errors.Normalize("cannot set invalid configuration", errors.RFCCodeText("PD:server:ErrConfiguration"))
ErrServerNotStarted = errors.Normalize("server not started", errors.RFCCodeText("PD:server:ErrServerNotStarted"))
ErrRateLimitExceeded = errors.Normalize("rate limit exceeded", errors.RFCCodeText("PD:server:ErrRateLimitExceeded"))
)

// logutil errors
Expand Down
5 changes: 3 additions & 2 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
registerFunc(clusterRouter, "/store/{id}/limit", storeHandler.SetStoreLimit, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))

storesHandler := newStoresHandler(handler, rd)
registerFunc(clusterRouter, "/stores", storesHandler.GetStores, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/stores", storesHandler.GetAllStores, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/stores/remove-tombstone", storesHandler.RemoveTombStone, setMethods(http.MethodDelete), setAuditBackend(localLog, prometheus))
registerFunc(clusterRouter, "/stores/limit", storesHandler.GetAllStoresLimit, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/stores/limit", storesHandler.SetAllStoresLimit, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
Expand Down Expand Up @@ -311,7 +311,8 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
serviceMiddlewareHandler := newServiceMiddlewareHandler(svr, rd)
registerFunc(apiRouter, "/service-middleware/config", serviceMiddlewareHandler.GetServiceMiddlewareConfig, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(apiRouter, "/service-middleware/config", serviceMiddlewareHandler.SetServiceMiddlewareConfig, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
registerFunc(apiRouter, "/service-middleware/config/rate-limit", serviceMiddlewareHandler.SetRatelimitConfig, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus), setRateLimitAllowList())
registerFunc(apiRouter, "/service-middleware/config/rate-limit", serviceMiddlewareHandler.SetRateLimitConfig, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus), setRateLimitAllowList())
registerFunc(apiRouter, "/service-middleware/config/grpc-rate-limit", serviceMiddlewareHandler.SetGRPCRateLimitConfig, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus), setRateLimitAllowList())

logHandler := newLogHandler(svr, rd)
registerFunc(apiRouter, "/admin/log", logHandler.SetLogLevel, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
Expand Down
82 changes: 77 additions & 5 deletions server/api/service_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ func (h *serviceMiddlewareHandler) updateServiceMiddlewareConfig(cfg *config.Ser
return h.updateAudit(cfg, kp[len(kp)-1], value)
case "rate-limit":
return h.svr.UpdateRateLimit(&cfg.RateLimitConfig, kp[len(kp)-1], value)
case "grpc-rate-limit":
return h.svr.UpdateGRPCRateLimit(&cfg.GRPCRateLimitConfig, kp[len(kp)-1], value)
}
return errors.Errorf("config prefix %s not found", kp[0])
}
Expand Down Expand Up @@ -139,7 +141,7 @@ func (h *serviceMiddlewareHandler) updateAudit(config *config.ServiceMiddlewareC
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "config item not found"
// @Router /service-middleware/config/rate-limit [POST]
func (h *serviceMiddlewareHandler) SetRatelimitConfig(w http.ResponseWriter, r *http.Request) {
func (h *serviceMiddlewareHandler) SetRateLimitConfig(w http.ResponseWriter, r *http.Request) {
var input map[string]interface{}
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil {
return
Expand Down Expand Up @@ -192,14 +194,14 @@ func (h *serviceMiddlewareHandler) SetRatelimitConfig(w http.ResponseWriter, r *
qpsRateUpdatedFlag := "QPS rate limiter is not changed."
qps, okq := input["qps"].(float64)
if okq {
brust := 0
burst := 0
if int(qps) > 1 {
brust = int(qps)
burst = int(qps)
} else if qps > 0 {
brust = 1
burst = 1
}
cfg.QPS = qps
cfg.QPSBurst = brust
cfg.QPSBurst = burst
}
if !okc && !okq {
h.rd.JSON(w, http.StatusOK, "No changed.")
Expand Down Expand Up @@ -227,6 +229,76 @@ func (h *serviceMiddlewareHandler) SetRatelimitConfig(w http.ResponseWriter, r *
}
}

// @Tags service_middleware
// @Summary update gRPC ratelimit config
// @Param body body object string "json params"
// @Produce json
// @Success 200 {string} string
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "config item not found"
// @Router /service-middleware/config/grpc-rate-limit [POST]
func (h *serviceMiddlewareHandler) SetGRPCRateLimitConfig(w http.ResponseWriter, r *http.Request) {
var input map[string]interface{}
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil {
return
}

serviceLabel, ok := input["label"].(string)
if !ok || len(serviceLabel) == 0 {
h.rd.JSON(w, http.StatusBadRequest, "The label is empty.")
return
}
if !h.svr.IsGRPCServiceLabelExist(serviceLabel) {
h.rd.JSON(w, http.StatusBadRequest, "There is no label matched.")
return
}

cfg := h.svr.GetGRPCRateLimitConfig().LimiterConfig[serviceLabel]
// update concurrency limiter
concurrencyUpdatedFlag := "Concurrency limiter is not changed."
concurrencyFloat, okc := input["concurrency"].(float64)
if okc {
cfg.ConcurrencyLimit = uint64(concurrencyFloat)
}
// update qps rate limiter
qpsRateUpdatedFlag := "QPS rate limiter is not changed."
qps, okq := input["qps"].(float64)
if okq {
burst := 0
if int(qps) > 1 {
burst = int(qps)
} else if qps > 0 {
burst = 1
}
cfg.QPS = qps
cfg.QPSBurst = burst
}
if !okc && !okq {
h.rd.JSON(w, http.StatusOK, "No changed.")
} else {
status := h.svr.UpdateGRPCServiceRateLimiter(serviceLabel, ratelimit.UpdateDimensionConfig(&cfg))
switch {
case status&ratelimit.QPSChanged != 0:
qpsRateUpdatedFlag = "QPS rate limiter is changed."
case status&ratelimit.QPSDeleted != 0:
qpsRateUpdatedFlag = "QPS rate limiter is deleted."
}
switch {
case status&ratelimit.ConcurrencyChanged != 0:
concurrencyUpdatedFlag = "Concurrency limiter is changed."
case status&ratelimit.ConcurrencyDeleted != 0:
concurrencyUpdatedFlag = "Concurrency limiter is deleted."
}
err := h.svr.UpdateGRPCRateLimitConfig("grpc-limiter-config", serviceLabel, cfg)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
} else {
result := rateLimitResult{concurrencyUpdatedFlag, qpsRateUpdatedFlag, h.svr.GetServiceMiddlewareConfig().GRPCRateLimitConfig.LimiterConfig}
h.rd.JSON(w, http.StatusOK, result)
}
}
}

type rateLimitResult struct {
ConcurrencyUpdatedFlag string `json:"concurrency"`
QPSRateUpdatedFlag string `json:"qps"`
Expand Down
132 changes: 117 additions & 15 deletions server/api/service_middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,27 +62,31 @@ func (suite *auditMiddlewareTestSuite) TestConfigAuditSwitch() {
suite.True(sc.EnableAudit)

ms := map[string]interface{}{
"enable-audit": "true",
"enable-rate-limit": "true",
"enable-audit": "true",
"enable-rate-limit": "true",
"enable-grpc-rate-limit": "true",
}
postData, err := json.Marshal(ms)
suite.NoError(err)
suite.NoError(tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)))
sc = &config.ServiceMiddlewareConfig{}
suite.NoError(tu.ReadGetJSON(re, testDialClient, addr, sc))
suite.True(sc.EnableAudit)
suite.True(sc.EnableRateLimit)
suite.True(sc.RateLimitConfig.EnableRateLimit)
suite.True(sc.GRPCRateLimitConfig.EnableRateLimit)
ms = map[string]interface{}{
"audit.enable-audit": "false",
"enable-rate-limit": "false",
"audit.enable-audit": "false",
"enable-rate-limit": "false",
"enable-grpc-rate-limit": "false",
}
postData, err = json.Marshal(ms)
suite.NoError(err)
suite.NoError(tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)))
sc = &config.ServiceMiddlewareConfig{}
suite.NoError(tu.ReadGetJSON(re, testDialClient, addr, sc))
suite.False(sc.EnableAudit)
suite.False(sc.EnableRateLimit)
suite.False(sc.RateLimitConfig.EnableRateLimit)
suite.False(sc.GRPCRateLimitConfig.EnableRateLimit)

// test empty
ms = map[string]interface{}{}
Expand Down Expand Up @@ -273,12 +277,12 @@ func (suite *rateLimitConfigTestSuite) TestUpdateRateLimitConfig() {
suite.NoError(err)

limiter := suite.svr.GetServiceRateLimiter()
limiter.Update("SetRatelimitConfig", ratelimit.AddLabelAllowList())
limiter.Update("SetRateLimitConfig", ratelimit.AddLabelAllowList())

// Allow list
input = make(map[string]interface{})
input["type"] = "label"
input["label"] = "SetRatelimitConfig"
input["label"] = "SetRateLimitConfig"
input["qps"] = 100
input["concurrency"] = 100
jsonBody, err = json.Marshal(input)
Expand All @@ -288,31 +292,128 @@ func (suite *rateLimitConfigTestSuite) TestUpdateRateLimitConfig() {
suite.NoError(err)
}

func (suite *rateLimitConfigTestSuite) TestUpdateGRPCRateLimitConfig() {
urlPrefix := fmt.Sprintf("%s%s/api/v1/service-middleware/config/grpc-rate-limit", suite.svr.GetAddr(), apiPrefix)
re := suite.Require()

// test empty label
input := make(map[string]interface{})
input["label"] = ""
jsonBody, err := json.Marshal(input)
suite.NoError(err)
err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody,
tu.Status(re, http.StatusBadRequest), tu.StringEqual(re, "\"The label is empty.\"\n"))
suite.NoError(err)
// test no label matched
input = make(map[string]interface{})
input["label"] = "TestLabel"
jsonBody, err = json.Marshal(input)
suite.NoError(err)
err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody,
tu.Status(re, http.StatusBadRequest), tu.StringEqual(re, "\"There is no label matched.\"\n"))
suite.NoError(err)

// no change
input = make(map[string]interface{})
input["label"] = "StoreHeartbeat"
jsonBody, err = json.Marshal(input)
suite.NoError(err)
err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody,
tu.StatusOK(re), tu.StringEqual(re, "\"No changed.\"\n"))
suite.NoError(err)

// change concurrency
input = make(map[string]interface{})
input["label"] = "StoreHeartbeat"
input["concurrency"] = 100
jsonBody, err = json.Marshal(input)
suite.NoError(err)
err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody,
tu.StatusOK(re), tu.StringContain(re, "Concurrency limiter is changed."))
suite.NoError(err)
input["concurrency"] = 0
jsonBody, err = json.Marshal(input)
suite.NoError(err)
err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody,
tu.StatusOK(re), tu.StringContain(re, "Concurrency limiter is deleted."))
suite.NoError(err)

// change qps
input = make(map[string]interface{})
input["label"] = "StoreHeartbeat"
input["qps"] = 100
jsonBody, err = json.Marshal(input)
suite.NoError(err)
err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody,
tu.StatusOK(re), tu.StringContain(re, "QPS rate limiter is changed."))
suite.NoError(err)

input = make(map[string]interface{})
input["label"] = "StoreHeartbeat"
input["qps"] = 0.3
jsonBody, err = json.Marshal(input)
suite.NoError(err)
err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody,
tu.StatusOK(re), tu.StringContain(re, "QPS rate limiter is changed."))
suite.NoError(err)
suite.Equal(1, suite.svr.GetGRPCRateLimitConfig().LimiterConfig["StoreHeartbeat"].QPSBurst)

input["qps"] = -1
jsonBody, err = json.Marshal(input)
suite.NoError(err)
err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody,
tu.StatusOK(re), tu.StringContain(re, "QPS rate limiter is deleted."))
suite.NoError(err)

// change both
input = make(map[string]interface{})
input["label"] = "GetStore"
input["qps"] = 100
input["concurrency"] = 100
jsonBody, err = json.Marshal(input)
suite.NoError(err)
result := rateLimitResult{}
err = tu.CheckPostJSON(testDialClient, urlPrefix, jsonBody,
tu.StatusOK(re), tu.StringContain(re, "Concurrency limiter is changed."),
tu.StringContain(re, "QPS rate limiter is changed."),
tu.ExtractJSON(re, &result),
)
suite.Equal(100., result.LimiterConfig["GetStore"].QPS)
suite.Equal(100, result.LimiterConfig["GetStore"].QPSBurst)
suite.Equal(uint64(100), result.LimiterConfig["GetStore"].ConcurrencyLimit)
suite.NoError(err)
}

func (suite *rateLimitConfigTestSuite) TestConfigRateLimitSwitch() {
addr := fmt.Sprintf("%s/service-middleware/config", suite.urlPrefix)
sc := &config.ServiceMiddlewareConfig{}
re := suite.Require()
suite.NoError(tu.ReadGetJSON(re, testDialClient, addr, sc))
suite.False(sc.EnableRateLimit)
suite.False(sc.RateLimitConfig.EnableRateLimit)
suite.False(sc.GRPCRateLimitConfig.EnableRateLimit)

ms := map[string]interface{}{
"enable-rate-limit": "true",
"enable-rate-limit": "true",
"enable-grpc-rate-limit": "true",
}
postData, err := json.Marshal(ms)
suite.NoError(err)
suite.NoError(tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)))
sc = &config.ServiceMiddlewareConfig{}
suite.NoError(tu.ReadGetJSON(re, testDialClient, addr, sc))
suite.True(sc.EnableRateLimit)
suite.True(sc.RateLimitConfig.EnableRateLimit)
suite.True(sc.GRPCRateLimitConfig.EnableRateLimit)
ms = map[string]interface{}{
"enable-rate-limit": "false",
"enable-rate-limit": "false",
"enable-grpc-rate-limit": "false",
}
postData, err = json.Marshal(ms)
suite.NoError(err)
suite.NoError(tu.CheckPostJSON(testDialClient, addr, postData, tu.StatusOK(re)))
sc = &config.ServiceMiddlewareConfig{}
suite.NoError(tu.ReadGetJSON(re, testDialClient, addr, sc))
suite.False(sc.EnableRateLimit)
suite.False(sc.RateLimitConfig.EnableRateLimit)
suite.False(sc.GRPCRateLimitConfig.EnableRateLimit)

// test empty
ms = map[string]interface{}{}
Expand All @@ -327,7 +428,8 @@ func (suite *rateLimitConfigTestSuite) TestConfigRateLimitSwitch() {
suite.NoError(tu.CheckPostJSON(testDialClient, addr, postData, tu.Status(re, http.StatusBadRequest), tu.StringEqual(re, "config item rate-limit not found")))
suite.NoError(failpoint.Enable("github.com/tikv/pd/server/config/persistServiceMiddlewareFail", "return(true)"))
ms = map[string]interface{}{
"rate-limit.enable-rate-limit": "true",
"rate-limit.enable-rate-limit": "true",
"grpc-rate-limit.enable-grpc-rate-limit": "true",
}
postData, err = json.Marshal(ms)
suite.NoError(err)
Expand All @@ -341,7 +443,7 @@ func (suite *rateLimitConfigTestSuite) TestConfigRateLimitSwitch() {
suite.NoError(tu.CheckPostJSON(testDialClient, addr, postData, tu.Status(re, http.StatusBadRequest), tu.StringEqual(re, "config item rate-limit not found")))
}

func (suite *rateLimitConfigTestSuite) TestConfigLimiterConifgByOriginAPI() {
func (suite *rateLimitConfigTestSuite) TestConfigLimiterConfigByOriginAPI() {
// this test case is used to test updating `limiter-config` by origin API simply
addr := fmt.Sprintf("%s/service-middleware/config", suite.urlPrefix)
dimensionConfig := ratelimit.DimensionConfig{QPS: 1}
Expand Down
4 changes: 2 additions & 2 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,13 +734,13 @@ func (h *storesHandler) GetStoresProgress(w http.ResponseWriter, r *http.Request
}

// @Tags store
// @Summary Get stores in the cluster.
// @Summary Get all stores in the cluster.
// @Param state query array true "Specify accepted store states."
// @Produce json
// @Success 200 {object} StoresInfo
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /stores [get]
func (h *storesHandler) GetStores(w http.ResponseWriter, r *http.Request) {
func (h *storesHandler) GetAllStores(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r)
stores := rc.GetMetaStores()
StoresInfo := &StoresInfo{
Expand Down
Loading

0 comments on commit fbd386a

Please sign in to comment.