Skip to content

Commit

Permalink
api: add Rate-limit config update API (tikv#4843)
Browse files Browse the repository at this point in the history
ref tikv#4666, ref tikv#4839

add Rate-limit config update API

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

Co-authored-by: Ryan Leung <rleungx@gmail.com>
Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
3 people committed Jul 14, 2022
1 parent 382a912 commit a66b06f
Show file tree
Hide file tree
Showing 7 changed files with 420 additions and 61 deletions.
49 changes: 49 additions & 0 deletions pkg/jsonutil/jsonutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jsonutil

import (
"bytes"
"encoding/json"

"github.com/tikv/pd/pkg/reflectutil"
)

// AddKeyValue is used to add a key value pair into `old`
func AddKeyValue(old interface{}, key string, value interface{}) (updated bool, found bool, err error) {
data, err := json.Marshal(map[string]interface{}{key: value})
if err != nil {
return false, false, err
}
return MergeJSONObject(old, data)
}

// MergeJSONObject is used to merge a marshaled json object into v
func MergeJSONObject(v interface{}, data []byte) (updated bool, found bool, err error) {
old, _ := json.Marshal(v)
if err := json.Unmarshal(data, v); err != nil {
return false, false, err
}
new, _ := json.Marshal(v)
if !bytes.Equal(old, new) {
return true, true, nil
}
m := make(map[string]interface{})
if err := json.Unmarshal(data, &m); err != nil {
return false, false, err
}
found = reflectutil.FindSameFieldByJSON(v, m)
return false, found, nil
}
65 changes: 65 additions & 0 deletions pkg/jsonutil/jsonutil_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jsonutil

import (
"testing"

"github.com/stretchr/testify/require"
)

type testJSONStructLevel1 struct {
Name string `json:"name"`
Sub1 testJSONStructLevel2 `json:"sub1"`
Sub2 testJSONStructLevel2 `json:"sub2"`
}

type testJSONStructLevel2 struct {
SubName string `json:"sub-name"`
}

func TestJSONUtil(t *testing.T) {
t.Parallel()
re := require.New(t)
father := &testJSONStructLevel1{
Name: "father",
}
son1 := &testJSONStructLevel2{
SubName: "son1",
}
update, found, err := AddKeyValue(&father, "sub1", &son1)
re.NoError(err)
re.True(update)
re.True(found)

son2 := &testJSONStructLevel2{
SubName: "son2",
}

update, found, err = AddKeyValue(father, "sub2", &son2)
re.NoError(err)
re.True(update)
re.True(found)

update, found, err = AddKeyValue(father, "sub3", &son2)
re.NoError(err)
re.False(update)
re.False(found)

update, found, err = AddKeyValue(father, "sub2", &son2)
re.NoError(err)
re.False(update)
re.True(found)
}
43 changes: 5 additions & 38 deletions server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package api

import (
"bytes"
"encoding/json"
"fmt"
"io"
Expand All @@ -29,6 +28,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/apiutil"
"github.com/tikv/pd/pkg/jsonutil"
"github.com/tikv/pd/pkg/logutil"
"github.com/tikv/pd/pkg/reflectutil"
"github.com/tikv/pd/server"
Expand Down Expand Up @@ -166,12 +166,7 @@ func (h *confHandler) updateConfig(cfg *config.Config, key string, value interfa
}

func (h *confHandler) updateSchedule(config *config.Config, key string, value interface{}) error {
data, err := json.Marshal(map[string]interface{}{key: value})
if err != nil {
return err
}

updated, found, err := mergeConfig(&config.Schedule, data)
updated, found, err := jsonutil.AddKeyValue(&config.Schedule, key, value)
if err != nil {
return err
}
Expand All @@ -187,12 +182,7 @@ func (h *confHandler) updateSchedule(config *config.Config, key string, value in
}

func (h *confHandler) updateReplication(config *config.Config, key string, value interface{}) error {
data, err := json.Marshal(map[string]interface{}{key: value})
if err != nil {
return err
}

updated, found, err := mergeConfig(&config.Replication, data)
updated, found, err := jsonutil.AddKeyValue(&config.Replication, key, value)
if err != nil {
return err
}
Expand All @@ -214,8 +204,7 @@ func (h *confHandler) updateReplicationModeConfig(config *config.Config, key []s
if err != nil {
return err
}

updated, found, err := mergeConfig(&config.ReplicationMode, data)
updated, found, err := jsonutil.MergeJSONObject(&config.ReplicationMode, data)
if err != nil {
return err
}
Expand All @@ -231,12 +220,7 @@ func (h *confHandler) updateReplicationModeConfig(config *config.Config, key []s
}

func (h *confHandler) updatePDServerConfig(config *config.Config, key string, value interface{}) error {
data, err := json.Marshal(map[string]interface{}{key: value})
if err != nil {
return err
}

updated, found, err := mergeConfig(&config.PDServerCfg, data)
updated, found, err := jsonutil.AddKeyValue(&config.PDServerCfg, key, value)
if err != nil {
return err
}
Expand Down Expand Up @@ -288,23 +272,6 @@ func getConfigMap(cfg map[string]interface{}, key []string, value interface{}) m
return cfg
}

func mergeConfig(v interface{}, data []byte) (updated bool, found bool, err error) {
old, _ := json.Marshal(v)
if err := json.Unmarshal(data, v); err != nil {
return false, false, err
}
new, _ := json.Marshal(v)
if !bytes.Equal(old, new) {
return true, true, nil
}
m := make(map[string]interface{})
if err := json.Unmarshal(data, &m); err != nil {
return false, false, err
}
found = reflectutil.FindSameFieldByJSON(v, m)
return false, found, nil
}

// @Tags config
// @Summary Get schedule config.
// @Produce json
Expand Down
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
serviceMiddlewareHandler := newServiceMiddlewareHandler(svr, rd)
registerFunc(apiRouter, "/service-middleware/config", serviceMiddlewareHandler.GetServiceMiddlewareConfig, setMethods("GET"))
registerFunc(apiRouter, "/service-middleware/config", serviceMiddlewareHandler.SetServiceMiddlewareConfig, setMethods("POST"), setAuditBackend(localLog))
registerFunc(apiRouter, "/service-middleware/config/rate-limit", serviceMiddlewareHandler.SetRatelimitConfig, setMethods("POST"), setAuditBackend(localLog))

logHandler := newLogHandler(svr, rd)
registerFunc(apiRouter, "/admin/log", logHandler.SetLogLevel, setMethods("POST"), setAuditBackend(localLog))
Expand Down
123 changes: 101 additions & 22 deletions server/api/service_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"strings"

"github.com/pingcap/errors"
"github.com/tikv/pd/pkg/apiutil"
"github.com/tikv/pd/pkg/jsonutil"
"github.com/tikv/pd/pkg/ratelimit"
"github.com/tikv/pd/pkg/reflectutil"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/config"
Expand Down Expand Up @@ -107,18 +110,13 @@ func (h *serviceMiddlewareHandler) updateServiceMiddlewareConfig(cfg *config.Ser
case "audit":
return h.updateAudit(cfg, kp[len(kp)-1], value)
case "rate-limit":
return h.updateRateLimit(cfg, kp[len(kp)-1], value)
return h.svr.UpdateRateLimit(&cfg.RateLimitConfig, kp[len(kp)-1], value)
}
return errors.Errorf("config prefix %s not found", kp[0])
}

func (h *serviceMiddlewareHandler) updateAudit(config *config.ServiceMiddlewareConfig, key string, value interface{}) error {
data, err := json.Marshal(map[string]interface{}{key: value})
if err != nil {
return err
}

updated, found, err := mergeConfig(&config.AuditConfig, data)
updated, found, err := jsonutil.AddKeyValue(&config.AuditConfig, key, value)
if err != nil {
return err
}
Expand All @@ -133,23 +131,104 @@ func (h *serviceMiddlewareHandler) updateAudit(config *config.ServiceMiddlewareC
return err
}

func (h *serviceMiddlewareHandler) updateRateLimit(config *config.ServiceMiddlewareConfig, key string, value interface{}) error {
data, err := json.Marshal(map[string]interface{}{key: value})
if err != nil {
return err
// @Tags service_middleware
// @Summary update ratelimit config
// @Param body body object string "json params"
// @Produce json
// @Success 200 {string} string
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "config item not found"
// @Router /service-middleware/config/rate-limit [POST]
func (h *serviceMiddlewareHandler) SetRatelimitConfig(w http.ResponseWriter, r *http.Request) {
var input map[string]interface{}
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil {
return
}

updated, found, err := mergeConfig(&config.RateLimitConfig, data)
if err != nil {
return err
typeStr, ok := input["type"].(string)
if !ok {
h.rd.JSON(w, http.StatusBadRequest, "The type is empty.")
return
}

if !found {
return errors.Errorf("config item %s not found", key)
var serviceLabel string
switch typeStr {
case "label":
serviceLabel, ok = input["label"].(string)
if !ok || len(serviceLabel) == 0 {
h.rd.JSON(w, http.StatusBadRequest, "The label is empty.")
return
}
if len(h.svr.GetServiceLabels(serviceLabel)) == 0 {
h.rd.JSON(w, http.StatusBadRequest, "There is no label matched.")
return
}
case "path":
method, _ := input["method"].(string)
path, ok := input["path"].(string)
if !ok || len(path) == 0 {
h.rd.JSON(w, http.StatusBadRequest, "The path is empty.")
return
}
serviceLabel = h.svr.GetAPIAccessServiceLabel(apiutil.NewAccessPath(path, method))
if len(serviceLabel) == 0 {
h.rd.JSON(w, http.StatusBadRequest, "There is no label matched.")
return
}
default:
h.rd.JSON(w, http.StatusBadRequest, "The type is invalid.")
return
}

if updated {
err = h.svr.SetRateLimitConfig(config.RateLimitConfig)
if h.svr.IsInRateLimitAllowList(serviceLabel) {
h.rd.JSON(w, http.StatusBadRequest, "This service is in allow list whose config can not be changed.")
return
}
return err
cfg := h.svr.GetRateLimitConfig().LimiterConfig[serviceLabel]
// update concurrency limiter
concurrencyUpdatedFlag := "Concurrency limiter is not changed."
concurrencyFloat, okc := input["concurrency"].(float64)
if okc {
cfg.ConcurrencyLimit = uint64(concurrencyFloat)
}
// update qps rate limiter
qpsRateUpdatedFlag := "QPS rate limiter is not changed."
qps, okq := input["qps"].(float64)
if okq {
brust := 0
if int(qps) > 1 {
brust = int(qps)
} else if qps > 0 {
brust = 1
}
cfg.QPS = qps
cfg.QPSBurst = brust
}
if !okc && !okq {
h.rd.JSON(w, http.StatusOK, "No changed.")
} else {
status := h.svr.UpdateServiceRateLimiter(serviceLabel, ratelimit.UpdateDimensionConfig(&cfg))
switch {
case status&ratelimit.QPSChanged != 0:
qpsRateUpdatedFlag = "QPS rate limiter is changed."
case status&ratelimit.QPSDeleted != 0:
qpsRateUpdatedFlag = "QPS rate limiter is deleted."
}
switch {
case status&ratelimit.ConcurrencyChanged != 0:
concurrencyUpdatedFlag = "Concurrency limiter is changed."
case status&ratelimit.ConcurrencyDeleted != 0:
concurrencyUpdatedFlag = "Concurrency limiter is deleted."
}
err := h.svr.UpdateRateLimitConfig("limiter-config", serviceLabel, cfg)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
} else {
result := rateLimitResult{concurrencyUpdatedFlag, qpsRateUpdatedFlag, h.svr.GetServiceMiddlewareConfig().RateLimitConfig.LimiterConfig}
h.rd.JSON(w, http.StatusOK, result)
}
}
}

type rateLimitResult struct {
ConcurrencyUpdatedFlag string `json:"concurrency"`
QPSRateUpdatedFlag string `json:"qps"`
LimiterConfig map[string]ratelimit.DimensionConfig `json:"limiter-config"`
}
Loading

0 comments on commit a66b06f

Please sign in to comment.