From 1c5c9203d991004598023b07aa1a13df4682775a Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 13 Apr 2020 20:46:45 +0800 Subject: [PATCH 1/5] reimplement dynamic config Signed-off-by: Ryan Leung --- server/api/component_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/api/component_test.go b/server/api/component_test.go index 697e1486b15..ee80ae52273 100644 --- a/server/api/component_test.go +++ b/server/api/component_test.go @@ -56,7 +56,7 @@ func (s *testComponentSuite) TestComponent(c *C) { c.Assert(strings.Contains(err.Error(), "404"), IsTrue) c.Assert(len(output1), Equals, 0) - // register 2 c1 and 1 c2 + // register 2 c1, 1 c2, and 1 c3 reqs := []map[string]string{ {"component": "c1", "addr": "127.0.0.1:1"}, {"component": "c1", "addr": "127.0.0.1:2"}, From 8a0b77e3e5fc569314b6cfa0206fa534186481d8 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 15 Apr 2020 13:40:55 +0800 Subject: [PATCH 2/5] support prefix config item in API Signed-off-by: Ryan Leung --- server/api/config.go | 71 +++++++++++++++++++++++++++++++- server/api/config_test.go | 40 ++++++++++++++++++ server/api/log.go | 6 ++- server/config/persist_options.go | 21 ---------- server/server.go | 41 +++++++----------- 5 files changed, 128 insertions(+), 51 deletions(-) diff --git a/server/api/config.go b/server/api/config.go index 94f25153456..7cffd585735 100644 --- a/server/api/config.go +++ b/server/api/config.go @@ -16,13 +16,16 @@ package api import ( "bytes" "encoding/json" + "fmt" "io/ioutil" "net/http" "reflect" "strings" "github.com/pingcap/errcode" + "github.com/pingcap/log" "github.com/pingcap/pd/v4/pkg/apiutil" + "github.com/pingcap/pd/v4/pkg/logutil" "github.com/pingcap/pd/v4/server" "github.com/pingcap/pd/v4/server/config" "github.com/pkg/errors" @@ -85,6 +88,48 @@ func (h *confHandler) Post(w http.ResponseWriter, r *http.Request) { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } + + conf := make(map[string]interface{}) + if err := json.Unmarshal(data, &conf); err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + + for k, v := range conf { + if kp := strings.Split(k, "."); len(kp) == 2 { + if !isPrefixLegal(kp[0]) { + h.rd.JSON(w, http.StatusBadRequest, fmt.Sprintf("prefix %s not found", kp[0])) + return + } + delete(conf, k) + conf[kp[1]] = v + } + } + data, err = json.Marshal(conf) + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.updateWithoutPrefix(w, config, data) + + h.rd.JSON(w, http.StatusOK, nil) +} + +func isPrefixLegal(prefix string) bool { + switch prefix { + case "schedule", "replication", "pd-server", "log": + return true + default: + return false + } +} + +func (h *confHandler) updateWithoutPrefix(w http.ResponseWriter, config *config.Config, data []byte) { + var err error + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } found1, err := h.updateSchedule(data, config) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) @@ -100,11 +145,15 @@ func (h *confHandler) Post(w http.ResponseWriter, r *http.Request) { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } - if !found1 && !found2 && !found3 { + found4, err := h.updateLogLevel(data, config) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + if !found1 && !found2 && !found3 && !found4 { h.rd.JSON(w, http.StatusBadRequest, "config item not found") return } - h.rd.JSON(w, http.StatusOK, nil) } func (h *confHandler) updateSchedule(data []byte, config *config.Config) (bool, error) { @@ -140,6 +189,24 @@ func (h *confHandler) updatePDServerConfig(data []byte, config *config.Config) ( return found, err } +func (h *confHandler) updateLogLevel(data []byte, config *config.Config) (bool, error) { + cfg := make(map[string]interface{}) + err := json.Unmarshal(data, &cfg) + if err != nil { + return false, err + } + + if level, ok := cfg["level"].(string); ok { + err = h.svr.SetLogLevel(level) + if err != nil { + return true, err + } + log.SetLevel(logutil.StringToZapLogLevel(level)) + return true, nil + } + return false, err +} + func (h *confHandler) mergeConfig(v interface{}, data []byte) (updated bool, found bool, err error) { old, _ := json.Marshal(v) if err := json.Unmarshal(data, v); err != nil { diff --git a/server/api/config_test.go b/server/api/config_test.go index 7957869439a..bc3f7a96e2b 100644 --- a/server/api/config_test.go +++ b/server/api/config_test.go @@ -16,6 +16,7 @@ package api import ( "encoding/json" "fmt" + "strings" "time" . "github.com/pingcap/check" @@ -50,6 +51,7 @@ func (s *testConfigSuite) TestConfigAll(c *C) { err := readJSON(addr, cfg) c.Assert(err, IsNil) + // the original way r := map[string]int{"max-replicas": 5} postData, err := json.Marshal(r) c.Assert(err, IsNil) @@ -80,6 +82,44 @@ func (s *testConfigSuite) TestConfigAll(c *C) { cfg.Schedule.RegionScheduleLimit = 10 cfg.PDServerCfg.MetricStorage = "http://127.0.0.1:9090" c.Assert(cfg, DeepEquals, newCfg) + + // the new way + l = map[string]interface{}{ + "schedule.tolerant-size-ratio": 2.5, + "replication.location-labels": "idc,host", + "pd-server.metric-storage": "http://127.0.0.1:1234", + "log.level": "warn", + } + postData, err = json.Marshal(l) + c.Assert(err, IsNil) + err = postJSON(addr, postData) + c.Assert(err, IsNil) + newCfg1 := &config.Config{} + err = readJSON(addr, newCfg1) + c.Assert(err, IsNil) + cfg.Schedule.TolerantSizeRatio = 2.5 + cfg.Replication.LocationLabels = []string{"idc", "host"} + cfg.PDServerCfg.MetricStorage = "http://127.0.0.1:1234" + cfg.Log.Level = "warn" + c.Assert(newCfg1, DeepEquals, cfg) + + // illegal prefix + l = map[string]interface{}{ + "replicate.max-replicas": 1, + } + postData, err = json.Marshal(l) + c.Assert(err, IsNil) + err = postJSON(addr, postData) + c.Assert(strings.Contains(err.Error(), "replicate"), IsTrue) + + // config item not found + l = map[string]interface{}{ + "schedule.region-limit": 10, + } + postData, err = json.Marshal(l) + c.Assert(err, IsNil) + err = postJSON(addr, postData) + c.Assert(strings.Contains(err.Error(), "config item not found"), IsTrue) } func (s *testConfigSuite) TestConfigSchedule(c *C) { diff --git a/server/api/log.go b/server/api/log.go index c9cd0edf830..cdb02e1d59e 100644 --- a/server/api/log.go +++ b/server/api/log.go @@ -60,7 +60,11 @@ func (h *logHandler) Handle(w http.ResponseWriter, r *http.Request) { return } - h.svr.SetLogLevel(level) + err = h.svr.SetLogLevel(level) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } log.SetLevel(logutil.StringToZapLogLevel(level)) h.rd.JSON(w, http.StatusOK, nil) diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 00f6777930a..3dd1c57448f 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -22,7 +22,6 @@ import ( "github.com/coreos/go-semver/semver" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/log" "github.com/pingcap/pd/v4/pkg/typeutil" "github.com/pingcap/pd/v4/server/core" "github.com/pingcap/pd/v4/server/kv" @@ -37,7 +36,6 @@ type PersistOptions struct { labelProperty atomic.Value clusterVersion unsafe.Pointer pdServerConfig atomic.Value - logConfig atomic.Value replicationMode atomic.Value } @@ -49,7 +47,6 @@ func NewPersistOptions(cfg *Config) *PersistOptions { o.pdServerConfig.Store(&cfg.PDServerCfg) o.labelProperty.Store(cfg.LabelProperty) o.SetClusterVersion(&cfg.ClusterVersion) - o.logConfig.Store(&cfg.Log) o.replicationMode.Store(&cfg.ReplicationMode) return o } @@ -79,16 +76,6 @@ func (o *PersistOptions) SetPDServerConfig(cfg *PDServerConfig) { o.pdServerConfig.Store(cfg) } -// GetLogConfig returns log configuration. -func (o *PersistOptions) GetLogConfig() *log.Config { - return o.logConfig.Load().(*log.Config) -} - -// SetLogConfig sets the log configuration. -func (o *PersistOptions) SetLogConfig(cfg *log.Config) { - o.logConfig.Store(cfg) -} - // GetReplicationModeConfig returns the replication mode config. func (o *PersistOptions) GetReplicationModeConfig() *ReplicationModeConfig { return o.replicationMode.Load().(*ReplicationModeConfig) @@ -376,11 +363,6 @@ func (o *PersistOptions) LoadPDServerConfig() *PDServerConfig { return o.pdServerConfig.Load().(*PDServerConfig) } -// LoadLogConfig returns log configuration. -func (o *PersistOptions) LoadLogConfig() *log.Config { - return o.logConfig.Load().(*log.Config) -} - // Persist saves the configuration to the storage. func (o *PersistOptions) Persist(storage *core.Storage) error { cfg := &Config{ @@ -389,7 +371,6 @@ func (o *PersistOptions) Persist(storage *core.Storage) error { LabelProperty: o.LoadLabelPropertyConfig(), ClusterVersion: *o.LoadClusterVersion(), PDServerCfg: *o.LoadPDServerConfig(), - Log: *o.LoadLogConfig(), ReplicationMode: *o.GetReplicationModeConfig(), } err := storage.SaveConfig(cfg) @@ -404,7 +385,6 @@ func (o *PersistOptions) Reload(storage *core.Storage) error { LabelProperty: o.LoadLabelPropertyConfig().Clone(), ClusterVersion: *o.LoadClusterVersion(), PDServerCfg: *o.LoadPDServerConfig(), - Log: *o.LoadLogConfig(), ReplicationMode: *o.GetReplicationModeConfig().Clone(), } isExist, err := storage.LoadConfig(cfg) @@ -418,7 +398,6 @@ func (o *PersistOptions) Reload(storage *core.Storage) error { o.labelProperty.Store(cfg.LabelProperty) o.SetClusterVersion(&cfg.ClusterVersion) o.pdServerConfig.Store(&cfg.PDServerCfg) - o.logConfig.Store(&cfg.Log) o.replicationMode.Store(&cfg.ReplicationMode) } return nil diff --git a/server/server.go b/server/server.go index eeb69437544..c018fff5281 100644 --- a/server/server.go +++ b/server/server.go @@ -676,7 +676,6 @@ func (s *Server) GetConfig() *config.Config { cfg.LabelProperty = s.persistOptions.LoadLabelPropertyConfig().Clone() cfg.ClusterVersion = *s.persistOptions.LoadClusterVersion() cfg.PDServerCfg = *s.persistOptions.LoadPDServerConfig() - cfg.Log = *s.persistOptions.LoadLogConfig() storage := s.GetStorage() if storage == nil { return cfg @@ -808,31 +807,6 @@ func (s *Server) SetLabelPropertyConfig(cfg config.LabelPropertyConfig) error { return nil } -// GetLogConfig gets the log config. -func (s *Server) GetLogConfig() *log.Config { - cfg := &log.Config{} - *cfg = *s.persistOptions.GetLogConfig() - return cfg -} - -// SetLogConfig sets the log config. -func (s *Server) SetLogConfig(cfg log.Config) error { - old := s.persistOptions.LoadLogConfig() - s.persistOptions.SetLogConfig(&cfg) - log.SetLevel(logutil.StringToZapLogLevel(cfg.Level)) - if err := s.persistOptions.Persist(s.storage); err != nil { - s.persistOptions.SetLogConfig(old) - log.SetLevel(logutil.StringToZapLogLevel(old.Level)) - log.Error("failed to update log config", - zap.Reflect("new", cfg), - zap.Reflect("old", old), - zap.Error(err)) - return err - } - log.Info("log config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old)) - return nil -} - // SetLabelProperty inserts a label property config. func (s *Server) SetLabelProperty(typ, labelKey, labelValue string) error { s.persistOptions.SetLabelProperty(typ, labelKey, labelValue) @@ -960,10 +934,23 @@ func (s *Server) GetClusterStatus() (*cluster.Status, error) { } // SetLogLevel sets log level. -func (s *Server) SetLogLevel(level string) { +func (s *Server) SetLogLevel(level string) error { + if !isLevelLegal(level) { + return errors.Errorf("log level %s is illegal", level) + } s.cfg.Log.Level = level log.SetLevel(logutil.StringToZapLogLevel(level)) log.Warn("log level changed", zap.String("level", log.GetLevel().String())) + return nil +} + +func isLevelLegal(level string) bool { + switch strings.ToLower(level) { + case "fatal", "error", "warn", "warning", "debug", "info": + return true + default: + return false + } } // GetReplicationModeConfig returns the replication mode config. From b7c1d763fa2f60791a2ed83d034352bc1e28ff50 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 16 Apr 2020 17:31:56 +0800 Subject: [PATCH 3/5] support cluster version and label property Signed-off-by: Ryan Leung --- server/api/config.go | 58 ++++++++++++++++++++++++++++++++++++++- server/api/config_test.go | 17 ++++++++++++ 2 files changed, 74 insertions(+), 1 deletion(-) diff --git a/server/api/config.go b/server/api/config.go index 7cffd585735..99909e042b6 100644 --- a/server/api/config.go +++ b/server/api/config.go @@ -150,7 +150,17 @@ func (h *confHandler) updateWithoutPrefix(w http.ResponseWriter, config *config. h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } - if !found1 && !found2 && !found3 && !found4 { + found5, err := h.updateClusterVersion(data, config) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + found6, err := h.updateLabelProperty(data, config) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + if !found1 && !found2 && !found3 && !found4 && !found5 && !found6 { h.rd.JSON(w, http.StatusBadRequest, "config item not found") return } @@ -207,6 +217,52 @@ func (h *confHandler) updateLogLevel(data []byte, config *config.Config) (bool, return false, err } +func (h *confHandler) updateClusterVersion(data []byte, config *config.Config) (bool, error) { + cfg := make(map[string]interface{}) + err := json.Unmarshal(data, &cfg) + if err != nil { + return false, err + } + + if version, ok := cfg["cluster-version"].(string); ok { + err := h.svr.SetClusterVersion(version) + if err != nil { + return true, err + } + return true, nil + } + return false, err +} + +func (h *confHandler) updateLabelProperty(data []byte, config *config.Config) (bool, error) { + cfg := make(map[string]interface{}) + err := json.Unmarshal(data, &cfg) + if err != nil { + return false, err + } + + if lp, ok := cfg["label-property"].(string); ok { + input := make(map[string]string) + err = json.Unmarshal([]byte(lp), &input) + if err != nil { + return true, err + } + switch input["action"] { + case "set": + err = h.svr.SetLabelProperty(input["type"], input["label-key"], input["label-value"]) + case "delete": + err = h.svr.DeleteLabelProperty(input["type"], input["label-key"], input["label-value"]) + default: + err = errors.Errorf("unknown action %v", input["action"]) + } + if err != nil { + return true, err + } + return true, nil + } + return false, err +} + func (h *confHandler) mergeConfig(v interface{}, data []byte) (updated bool, found bool, err error) { old, _ := json.Marshal(v) if err := json.Unmarshal(data, v); err != nil { diff --git a/server/api/config_test.go b/server/api/config_test.go index bc3f7a96e2b..f046bc2991b 100644 --- a/server/api/config_test.go +++ b/server/api/config_test.go @@ -22,6 +22,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/pd/v4/pkg/typeutil" "github.com/pingcap/pd/v4/server" + "github.com/pingcap/pd/v4/server/cluster" "github.com/pingcap/pd/v4/server/config" ) @@ -89,6 +90,8 @@ func (s *testConfigSuite) TestConfigAll(c *C) { "replication.location-labels": "idc,host", "pd-server.metric-storage": "http://127.0.0.1:1234", "log.level": "warn", + "cluster-version": "v4.0.0-beta", + "label-property": `{"type": "foo", "action": "set", "label-key": "zone", "label-value": "cn1"}`, } postData, err = json.Marshal(l) c.Assert(err, IsNil) @@ -101,8 +104,22 @@ func (s *testConfigSuite) TestConfigAll(c *C) { cfg.Replication.LocationLabels = []string{"idc", "host"} cfg.PDServerCfg.MetricStorage = "http://127.0.0.1:1234" cfg.Log.Level = "warn" + v, err := cluster.ParseVersion("v4.0.0-beta") + c.Assert(err, IsNil) + cfg.ClusterVersion = *v + cfg.LabelProperty = map[string][]config.StoreLabel{ + "foo": {{Key: "zone", Value: "cn1"}}, + } c.Assert(newCfg1, DeepEquals, cfg) + l = map[string]interface{}{ + "label-property": `{"type": "foo", "action": "delete", "label-key": "zone", "label-value": "cn1"}`, + } + postData, err = json.Marshal(l) + c.Assert(err, IsNil) + err = postJSON(addr, postData) + c.Assert(err, IsNil) + // illegal prefix l = map[string]interface{}{ "replicate.max-replicas": 1, From 33368e954282569cb3a8024427a7b5ced828bd4a Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 20 Apr 2020 20:46:23 +0800 Subject: [PATCH 4/5] refactor config api set method Signed-off-by: Ryan Leung --- pkg/apiutil/serverapi/middleware.go | 2 +- server/api/config.go | 237 ++++++++++++++------------- server/api/config_test.go | 25 ++- server/config/config_test.go | 4 +- server/config/persist_options.go | 13 +- server/server.go | 11 +- tests/pdctl/config/config_test.go | 2 +- tests/server/cluster/cluster_test.go | 6 +- 8 files changed, 155 insertions(+), 145 deletions(-) diff --git a/pkg/apiutil/serverapi/middleware.go b/pkg/apiutil/serverapi/middleware.go index f07d85955c6..c97c78906d3 100644 --- a/pkg/apiutil/serverapi/middleware.go +++ b/pkg/apiutil/serverapi/middleware.go @@ -77,7 +77,7 @@ func IsServiceAllowed(s *server.Server, group server.ServiceGroup) bool { } opt := s.GetServerOption() - cfg := opt.LoadPDServerConfig() + cfg := opt.GetPDServerConfig() if cfg != nil { for _, allow := range cfg.RuntimeServices { if group.Name == allow { diff --git a/server/api/config.go b/server/api/config.go index 99909e042b6..d08b12ba226 100644 --- a/server/api/config.go +++ b/server/api/config.go @@ -78,10 +78,9 @@ func (h *confHandler) GetDefault(w http.ResponseWriter, r *http.Request) { // @Success 200 {string} string "The config is updated." // @Failure 400 {string} string "The input is invalid." // @Failure 500 {string} string "PD server failed to proceed the request." -// @Failure 503 {string} string "PD server has no leader." // @Router /config [post] func (h *confHandler) Post(w http.ResponseWriter, r *http.Request) { - config := h.svr.GetConfig() + cfg := h.svr.GetConfig() data, err := ioutil.ReadAll(r.Body) r.Body.Close() if err != nil { @@ -96,171 +95,189 @@ func (h *confHandler) Post(w http.ResponseWriter, r *http.Request) { } for k, v := range conf { - if kp := strings.Split(k, "."); len(kp) == 2 { - if !isPrefixLegal(kp[0]) { - h.rd.JSON(w, http.StatusBadRequest, fmt.Sprintf("prefix %s not found", kp[0])) + if s := strings.Split(k, "."); len(s) > 1 { + if err := h.updateConfig(cfg, k, v); err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } - delete(conf, k) - conf[kp[1]] = v + continue + } + key := findTag(reflect.TypeOf(&config.Config{}).Elem(), k) + if key == "" { + h.rd.JSON(w, http.StatusBadRequest, fmt.Sprintf("config item %s not found", k)) + return + } + if err := h.updateConfig(cfg, key, v); err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return } } - data, err = json.Marshal(conf) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - h.updateWithoutPrefix(w, config, data) h.rd.JSON(w, http.StatusOK, nil) } -func isPrefixLegal(prefix string) bool { - switch prefix { - case "schedule", "replication", "pd-server", "log": - return true - default: - return false +func (h *confHandler) updateConfig(cfg *config.Config, key string, value interface{}) error { + kp := strings.Split(key, ".") + switch kp[0] { + case "schedule": + return h.updateSchedule(cfg, kp[len(kp)-1], value) + case "replication": + return h.updateReplication(cfg, kp[len(kp)-1], value) + case "replication-mode": + return h.updateReplicationModeConfig(cfg, kp[1:], value) + case "pd-server": + return h.updatePDServerConfig(cfg, kp[len(kp)-1], value) + case "log": + return h.updateLogLevel(kp, value) + case "cluster-version": + return h.updateClusterVersion(value) + case "label-property": // TODO: support changing label-property + } + return errors.Errorf("config prefix %s not found", kp[0]) +} + +func findTag(t reflect.Type, tag string) string { + for i := 0; i < t.NumField(); i++ { + field := t.Field(i) + + column := field.Tag.Get("json") + c := strings.Split(column, ",") + if c[0] == tag { + return c[0] + } + + if field.Type.Kind() == reflect.Struct { + path := findTag(field.Type, tag) + if path == "" { + continue + } + return field.Tag.Get("json") + "." + path + } } + return "" } -func (h *confHandler) updateWithoutPrefix(w http.ResponseWriter, config *config.Config, data []byte) { - var err error +func (h *confHandler) updateSchedule(config *config.Config, key string, value interface{}) error { + data, err := json.Marshal(map[string]interface{}{key: value}) if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return + return err } - found1, err := h.updateSchedule(data, config) + + updated, found, err := h.mergeConfig(&config.Schedule, data) if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return + return err } - found2, err := h.updateReplication(data, config) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return + + if !found { + return errors.Errorf("config item %s not found", key) } - found3, err := h.updatePDServerConfig(data, config) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return + + if updated { + err = h.svr.SetScheduleConfig(config.Schedule) } - found4, err := h.updateLogLevel(data, config) + return err +} + +func (h *confHandler) updateReplication(config *config.Config, key string, value interface{}) error { + data, err := json.Marshal(map[string]interface{}{key: value}) if err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) - return + return err } - found5, err := h.updateClusterVersion(data, config) + + updated, found, err := h.mergeConfig(&config.Replication, data) if err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) - return + return err } - found6, err := h.updateLabelProperty(data, config) - if err != nil { - h.rd.JSON(w, http.StatusBadRequest, err.Error()) - return + + if !found { + return errors.Errorf("config item %s not found", key) } - if !found1 && !found2 && !found3 && !found4 && !found5 && !found6 { - h.rd.JSON(w, http.StatusBadRequest, "config item not found") - return + + if updated { + err = h.svr.SetReplicationConfig(config.Replication) } + return err } -func (h *confHandler) updateSchedule(data []byte, config *config.Config) (bool, error) { - updated, found, err := h.mergeConfig(&config.Schedule, data) +func (h *confHandler) updateReplicationModeConfig(config *config.Config, key []string, value interface{}) error { + cfg := make(map[string]interface{}) + cfg = getConfigMap(cfg, key, value) + data, err := json.Marshal(cfg) if err != nil { - return false, err + return err } - if updated { - err = h.svr.SetScheduleConfig(config.Schedule) - } - return found, err -} -func (h *confHandler) updateReplication(data []byte, config *config.Config) (bool, error) { - updated, found, err := h.mergeConfig(&config.Replication, data) + updated, found, err := h.mergeConfig(&config.ReplicationMode, data) if err != nil { - return false, err + return err } + + if !found { + return errors.Errorf("config item %s not found", key) + } + if updated { - err = h.svr.SetReplicationConfig(config.Replication) + err = h.svr.SetReplicationModeConfig(config.ReplicationMode) } - return found, err + return err } -func (h *confHandler) updatePDServerConfig(data []byte, config *config.Config) (bool, error) { +func (h *confHandler) updatePDServerConfig(config *config.Config, key string, value interface{}) error { + data, err := json.Marshal(map[string]interface{}{key: value}) + if err != nil { + return err + } + updated, found, err := h.mergeConfig(&config.PDServerCfg, data) if err != nil { - return false, err + return err } + + if !found { + return errors.Errorf("config item %s not found", key) + } + if updated { err = h.svr.SetPDServerConfig(config.PDServerCfg) } - return found, err + return err } -func (h *confHandler) updateLogLevel(data []byte, config *config.Config) (bool, error) { - cfg := make(map[string]interface{}) - err := json.Unmarshal(data, &cfg) - if err != nil { - return false, err +func (h *confHandler) updateLogLevel(kp []string, value interface{}) error { + if len(kp) != 2 || kp[1] != "level" { + return errors.Errorf("only support changing log level") } - - if level, ok := cfg["level"].(string); ok { - err = h.svr.SetLogLevel(level) + if level, ok := value.(string); ok { + err := h.svr.SetLogLevel(level) if err != nil { - return true, err + return err } log.SetLevel(logutil.StringToZapLogLevel(level)) - return true, nil + return nil } - return false, err + return errors.Errorf("input value %v is illegal", value) } -func (h *confHandler) updateClusterVersion(data []byte, config *config.Config) (bool, error) { - cfg := make(map[string]interface{}) - err := json.Unmarshal(data, &cfg) - if err != nil { - return false, err - } - - if version, ok := cfg["cluster-version"].(string); ok { +func (h *confHandler) updateClusterVersion(value interface{}) error { + if version, ok := value.(string); ok { err := h.svr.SetClusterVersion(version) if err != nil { - return true, err + return err } - return true, nil + return nil } - return false, err + return errors.Errorf("input value %v is illegal", value) } -func (h *confHandler) updateLabelProperty(data []byte, config *config.Config) (bool, error) { - cfg := make(map[string]interface{}) - err := json.Unmarshal(data, &cfg) - if err != nil { - return false, err +func getConfigMap(cfg map[string]interface{}, key []string, value interface{}) map[string]interface{} { + if len(key) == 1 { + cfg[key[0]] = value + return cfg } - if lp, ok := cfg["label-property"].(string); ok { - input := make(map[string]string) - err = json.Unmarshal([]byte(lp), &input) - if err != nil { - return true, err - } - switch input["action"] { - case "set": - err = h.svr.SetLabelProperty(input["type"], input["label-key"], input["label-value"]) - case "delete": - err = h.svr.DeleteLabelProperty(input["type"], input["label-key"], input["label-value"]) - default: - err = errors.Errorf("unknown action %v", input["action"]) - } - if err != nil { - return true, err - } - return true, nil - } - return false, err + subConfig := make(map[string]interface{}) + cfg[key[0]] = getConfigMap(subConfig, key[1:], value) + return cfg } func (h *confHandler) mergeConfig(v interface{}, data []byte) (updated bool, found bool, err error) { diff --git a/server/api/config_test.go b/server/api/config_test.go index f046bc2991b..07f484ae77f 100644 --- a/server/api/config_test.go +++ b/server/api/config_test.go @@ -86,12 +86,13 @@ func (s *testConfigSuite) TestConfigAll(c *C) { // the new way l = map[string]interface{}{ - "schedule.tolerant-size-ratio": 2.5, - "replication.location-labels": "idc,host", - "pd-server.metric-storage": "http://127.0.0.1:1234", - "log.level": "warn", - "cluster-version": "v4.0.0-beta", - "label-property": `{"type": "foo", "action": "set", "label-key": "zone", "label-value": "cn1"}`, + "schedule.tolerant-size-ratio": 2.5, + "replication.location-labels": "idc,host", + "pd-server.metric-storage": "http://127.0.0.1:1234", + "log.level": "warn", + "cluster-version": "v4.0.0-beta", + "replication-mode.replication-mode": "dr_auto_sync", + "replication-mode.dr-auto-sync.label-key": "foobar", } postData, err = json.Marshal(l) c.Assert(err, IsNil) @@ -104,17 +105,13 @@ func (s *testConfigSuite) TestConfigAll(c *C) { cfg.Replication.LocationLabels = []string{"idc", "host"} cfg.PDServerCfg.MetricStorage = "http://127.0.0.1:1234" cfg.Log.Level = "warn" + cfg.ReplicationMode.DRAutoSync.LabelKey = "foobar" + cfg.ReplicationMode.ReplicationMode = "dr_auto_sync" v, err := cluster.ParseVersion("v4.0.0-beta") c.Assert(err, IsNil) cfg.ClusterVersion = *v - cfg.LabelProperty = map[string][]config.StoreLabel{ - "foo": {{Key: "zone", Value: "cn1"}}, - } c.Assert(newCfg1, DeepEquals, cfg) - l = map[string]interface{}{ - "label-property": `{"type": "foo", "action": "delete", "label-key": "zone", "label-value": "cn1"}`, - } postData, err = json.Marshal(l) c.Assert(err, IsNil) err = postJSON(addr, postData) @@ -127,7 +124,7 @@ func (s *testConfigSuite) TestConfigAll(c *C) { postData, err = json.Marshal(l) c.Assert(err, IsNil) err = postJSON(addr, postData) - c.Assert(strings.Contains(err.Error(), "replicate"), IsTrue) + c.Assert(strings.Contains(err.Error(), "not found"), IsTrue) // config item not found l = map[string]interface{}{ @@ -136,7 +133,7 @@ func (s *testConfigSuite) TestConfigAll(c *C) { postData, err = json.Marshal(l) c.Assert(err, IsNil) err = postJSON(addr, postData) - c.Assert(strings.Contains(err.Error(), "config item not found"), IsTrue) + c.Assert(strings.Contains(err.Error(), "not found"), IsTrue) } func (s *testConfigSuite) TestConfigSchedule(c *C) { diff --git a/server/config/config_test.go b/server/config/config_test.go index 92cd0c69de6..3c83f4f25b3 100644 --- a/server/config/config_test.go +++ b/server/config/config_test.go @@ -59,7 +59,7 @@ func (s *testConfigSuite) TestReloadConfig(c *C) { scheduleCfg := opt.Load() scheduleCfg.MaxSnapshotCount = 10 opt.SetMaxReplicas(5) - opt.LoadPDServerConfig().UseRegionStorage = true + opt.GetPDServerConfig().UseRegionStorage = true c.Assert(opt.Persist(storage), IsNil) // suppose we add a new default enable scheduler "adjacent-region" @@ -70,7 +70,7 @@ func (s *testConfigSuite) TestReloadConfig(c *C) { c.Assert(newOpt.Reload(storage), IsNil) schedulers := newOpt.GetSchedulers() c.Assert(schedulers, HasLen, 5) - c.Assert(newOpt.LoadPDServerConfig().UseRegionStorage, IsTrue) + c.Assert(newOpt.GetPDServerConfig().UseRegionStorage, IsTrue) for i, s := range schedulers { c.Assert(s.Type, Equals, defaultSchedulers[i]) c.Assert(s.Disable, IsFalse) diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 3dd1c57448f..02b1997f837 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -213,12 +213,12 @@ func (o *PersistOptions) GetLeaderSchedulePolicy() core.SchedulePolicy { // GetKeyType is to get key type. func (o *PersistOptions) GetKeyType() core.KeyType { - return core.StringToKeyType(o.LoadPDServerConfig().KeyType) + return core.StringToKeyType(o.GetPDServerConfig().KeyType) } // GetDashboardAddress gets dashboard address. func (o *PersistOptions) GetDashboardAddress() string { - return o.LoadPDServerConfig().DashboardAddress + return o.GetPDServerConfig().DashboardAddress } // IsRemoveDownReplicaEnabled returns if remove down replica is enabled. @@ -358,11 +358,6 @@ func (o *PersistOptions) LoadClusterVersion() *semver.Version { return (*semver.Version)(atomic.LoadPointer(&o.clusterVersion)) } -// LoadPDServerConfig returns PD server configuration. -func (o *PersistOptions) LoadPDServerConfig() *PDServerConfig { - return o.pdServerConfig.Load().(*PDServerConfig) -} - // Persist saves the configuration to the storage. func (o *PersistOptions) Persist(storage *core.Storage) error { cfg := &Config{ @@ -370,7 +365,7 @@ func (o *PersistOptions) Persist(storage *core.Storage) error { Replication: *o.replication.Load(), LabelProperty: o.LoadLabelPropertyConfig(), ClusterVersion: *o.LoadClusterVersion(), - PDServerCfg: *o.LoadPDServerConfig(), + PDServerCfg: *o.GetPDServerConfig(), ReplicationMode: *o.GetReplicationModeConfig(), } err := storage.SaveConfig(cfg) @@ -384,7 +379,7 @@ func (o *PersistOptions) Reload(storage *core.Storage) error { Replication: *o.replication.Load(), LabelProperty: o.LoadLabelPropertyConfig().Clone(), ClusterVersion: *o.LoadClusterVersion(), - PDServerCfg: *o.LoadPDServerConfig(), + PDServerCfg: *o.GetPDServerConfig().Clone(), ReplicationMode: *o.GetReplicationModeConfig().Clone(), } isExist, err := storage.LoadConfig(cfg) diff --git a/server/server.go b/server/server.go index c018fff5281..da93e08eb53 100644 --- a/server/server.go +++ b/server/server.go @@ -340,7 +340,7 @@ func (s *Server) startServer(ctx context.Context) error { s.rootPath, s.member.MemberValue(), s.cfg.TsoSaveInterval.Duration, - func() time.Duration { return s.persistOptions.LoadPDServerConfig().MaxResetTSGap }, + func() time.Duration { return s.persistOptions.GetPDServerConfig().MaxResetTSGap }, ) kvBase := kv.NewEtcdKVBase(s.client, s.rootPath) path := filepath.Join(s.cfg.DataDir, "region-meta") @@ -675,7 +675,8 @@ func (s *Server) GetConfig() *config.Config { cfg.Replication = *s.persistOptions.GetReplication().Load() cfg.LabelProperty = s.persistOptions.LoadLabelPropertyConfig().Clone() cfg.ClusterVersion = *s.persistOptions.LoadClusterVersion() - cfg.PDServerCfg = *s.persistOptions.LoadPDServerConfig() + cfg.PDServerCfg = *s.persistOptions.GetPDServerConfig() + cfg.ReplicationMode = *s.persistOptions.GetReplicationModeConfig() storage := s.GetStorage() if storage == nil { return cfg @@ -777,7 +778,7 @@ func (s *Server) GetPDServerConfig() *config.PDServerConfig { // SetPDServerConfig sets the server config. func (s *Server) SetPDServerConfig(cfg config.PDServerConfig) error { - old := s.persistOptions.LoadPDServerConfig() + old := s.persistOptions.GetPDServerConfig() s.persistOptions.SetPDServerConfig(&cfg) if err := s.persistOptions.Persist(s.storage); err != nil { s.persistOptions.SetPDServerConfig(old) @@ -1015,7 +1016,7 @@ func (s *Server) leaderLoop() { continue } syncer := s.cluster.GetRegionSyncer() - if s.persistOptions.LoadPDServerConfig().UseRegionStorage { + if s.persistOptions.GetPDServerConfig().UseRegionStorage { syncer.StartSyncWithLeader(leader.GetClientUrls()[0]) } log.Info("start watch leader", zap.Stringer("leader", leader)) @@ -1134,7 +1135,7 @@ func (s *Server) reloadConfigFromKV() error { if err != nil { return err } - if s.persistOptions.LoadPDServerConfig().UseRegionStorage { + if s.persistOptions.GetPDServerConfig().UseRegionStorage { s.storage.SwitchToRegionStorage() log.Info("server enable region storage") } else { diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index dfd10ed0dd4..7c56921ea9b 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -195,7 +195,7 @@ func (s *configTestSuite) TestConfig(c *C) { args1 = []string{"-u", pdAddr, "config", "set", "foo-bar", "1"} _, output, err = pdctl.ExecuteCommandC(cmd, args1...) c.Assert(err, IsNil) - c.Assert(strings.Contains(string(output), "config item not found"), IsTrue) + c.Assert(strings.Contains(string(output), "not found"), IsTrue) args1 = []string{"-u", pdAddr, "config", "set", "disable-remove-down-replica", "true"} _, output, err = pdctl.ExecuteCommandC(cmd, args1...) c.Assert(err, IsNil) diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 6470a39b58d..23af88be882 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -564,7 +564,7 @@ func (s *clusterTestSuite) TestSetScheduleOpt(c *C) { scheduleCfg := opt.Load() replicationCfg := svr.GetReplicationConfig() persistOptions := svr.GetPersistOptions() - pdServerCfg := persistOptions.LoadPDServerConfig() + pdServerCfg := persistOptions.GetPDServerConfig() // PUT GET DELETE succeed replicationCfg.MaxReplicas = 5 @@ -579,7 +579,7 @@ func (s *clusterTestSuite) TestSetScheduleOpt(c *C) { c.Assert(svr.GetReplicationConfig().MaxReplicas, Equals, uint64(5)) c.Assert(persistOptions.GetMaxSnapshotCount(), Equals, uint64(10)) - c.Assert(persistOptions.LoadPDServerConfig().UseRegionStorage, Equals, true) + c.Assert(persistOptions.GetPDServerConfig().UseRegionStorage, Equals, true) c.Assert(persistOptions.LoadLabelPropertyConfig()[typ][0].Key, Equals, "testKey") c.Assert(persistOptions.LoadLabelPropertyConfig()[typ][0].Value, Equals, "testValue") @@ -601,7 +601,7 @@ func (s *clusterTestSuite) TestSetScheduleOpt(c *C) { c.Assert(svr.GetReplicationConfig().MaxReplicas, Equals, uint64(5)) c.Assert(persistOptions.GetMaxSnapshotCount(), Equals, uint64(10)) - c.Assert(persistOptions.LoadPDServerConfig().UseRegionStorage, Equals, true) + c.Assert(persistOptions.GetPDServerConfig().UseRegionStorage, Equals, true) c.Assert(len(persistOptions.LoadLabelPropertyConfig()[typ]), Equals, 0) // DELETE failed From 53592f1ec14e05f933b76e9116ed431323e13401 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 21 Apr 2020 20:19:50 +0800 Subject: [PATCH 5/5] address comments Signed-off-by: Ryan Leung --- server/api/config.go | 7 ++++++- server/api/config_test.go | 9 +++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/server/api/config.go b/server/api/config.go index d08b12ba226..2835747103e 100644 --- a/server/api/config.go +++ b/server/api/config.go @@ -102,7 +102,7 @@ func (h *confHandler) Post(w http.ResponseWriter, r *http.Request) { } continue } - key := findTag(reflect.TypeOf(&config.Config{}).Elem(), k) + key := findTag(reflect.TypeOf(config.Config{}), k) if key == "" { h.rd.JSON(w, http.StatusBadRequest, fmt.Sprintf("config item %s not found", k)) return @@ -124,6 +124,9 @@ func (h *confHandler) updateConfig(cfg *config.Config, key string, value interfa case "replication": return h.updateReplication(cfg, kp[len(kp)-1], value) case "replication-mode": + if len(kp) < 2 { + return errors.Errorf("cannot update config prefix %s", kp[0]) + } return h.updateReplicationModeConfig(cfg, kp[1:], value) case "pd-server": return h.updatePDServerConfig(cfg, kp[len(kp)-1], value) @@ -136,6 +139,8 @@ func (h *confHandler) updateConfig(cfg *config.Config, key string, value interfa return errors.Errorf("config prefix %s not found", kp[0]) } +// If we have both "a.c" and "b.c" config items, for a given c, it's hard for us to decide which config item it represents. +// We'd better to naming a config item without duplication. func findTag(t reflect.Type, tag string) string { for i := 0; i < t.NumField(); i++ { field := t.Field(i) diff --git a/server/api/config_test.go b/server/api/config_test.go index 07f484ae77f..59c3c969055 100644 --- a/server/api/config_test.go +++ b/server/api/config_test.go @@ -126,6 +126,15 @@ func (s *testConfigSuite) TestConfigAll(c *C) { err = postJSON(addr, postData) c.Assert(strings.Contains(err.Error(), "not found"), IsTrue) + // update prefix directly + l = map[string]interface{}{ + "replication-mode": nil, + } + postData, err = json.Marshal(l) + c.Assert(err, IsNil) + err = postJSON(addr, postData) + c.Assert(strings.Contains(err.Error(), "cannot update config prefix"), IsTrue) + // config item not found l = map[string]interface{}{ "schedule.region-limit": 10,