Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: support config http interface in scheduling server #7278

Merged
merged 13 commits into from
Nov 7, 2023
19 changes: 19 additions & 0 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func NewService(srv *scheserver.Service) *Service {
rd: createIndentRender(),
}
s.RegisterAdminRouter()
s.RegisterConfigRouter()
s.RegisterOperatorsRouter()
s.RegisterSchedulersRouter()
s.RegisterCheckersRouter()
Expand All @@ -126,6 +127,12 @@ func (s *Service) RegisterAdminRouter() {
router.DELETE("cache/regions/:id", deleteRegionCacheByID)
}

// RegisterConfigRouter registers the router of the config handler.
func (s *Service) RegisterConfigRouter() {
router := s.root.Group("config")
router.GET("", getConfig)
}

// RegisterSchedulersRouter registers the router of the schedulers handler.
func (s *Service) RegisterSchedulersRouter() {
router := s.root.Group("schedulers")
Expand Down Expand Up @@ -186,6 +193,18 @@ func changeLogLevel(c *gin.Context) {
c.String(http.StatusOK, "The log level is updated.")
}

// @Tags config
// @Summary Get full config.
// @Produce json
// @Success 200 {object} config.Config
// @Router /config [get]
func getConfig(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
cfg := svr.GetConfig()
cfg.Schedule.MaxMergeRegionKeys = cfg.Schedule.GetMaxMergeRegionKeys()
c.IndentedJSON(http.StatusOK, cfg)
}

// @Tags admin
// @Summary Drop all regions from cache.
// @Produce json
Expand Down
13 changes: 10 additions & 3 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ type Config struct {
Metric metricutil.MetricConfig `toml:"metric" json:"metric"`

// Log related config.
Log log.Config `toml:"log" json:"log"`
Logger *zap.Logger
LogProps *log.ZapProperties
Log log.Config `toml:"log" json:"log"`
Logger *zap.Logger `json:"-"`
LogProps *log.ZapProperties `json:"-"`

Security configutil.SecurityConfig `toml:"security" json:"security"`

Expand Down Expand Up @@ -195,6 +195,13 @@ func (c *Config) validate() error {
return nil
}

// Clone creates a copy of current config.
func (c *Config) Clone() *Config {
cfg := &Config{}
*cfg = *c
return cfg
}

// PersistConfig wraps all configurations that need to persist to storage and
// allows to access them safely.
type PersistConfig struct {
Expand Down
23 changes: 23 additions & 0 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,29 @@ func (s *Server) stopWatcher() {
s.metaWatcher.Close()
}

// GetPersistConfig returns the persist config.
// It's used to test.
func (s *Server) GetPersistConfig() *config.PersistConfig {
return s.persistConfig
}

// GetConfig gets the config.
func (s *Server) GetConfig() *config.Config {
cfg := s.cfg.Clone()
cfg.Schedule = *s.persistConfig.GetScheduleConfig().Clone()
cfg.Replication = *s.persistConfig.GetReplicationConfig().Clone()
cfg.ClusterVersion = *s.persistConfig.GetClusterVersion()
if s.storage == nil {
return cfg
}
sches, configs, err := s.storage.LoadAllSchedulerConfigs()
if err != nil {
return cfg
}
cfg.Schedule.SchedulersPayload = schedulers.ToPayload(sches, configs)
return cfg
}

// CreateServer creates the Server
func CreateServer(ctx context.Context, cfg *config.Config) *Server {
svr := &Server{
Expand Down
63 changes: 62 additions & 1 deletion server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/pingcap/errcode"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/jsonutil"
Expand Down Expand Up @@ -60,7 +62,17 @@ func newConfHandler(svr *server.Server, rd *render.Render) *confHandler {
// @Router /config [get]
func (h *confHandler) GetConfig(w http.ResponseWriter, r *http.Request) {
cfg := h.svr.GetConfig()
cfg.Schedule.MaxMergeRegionKeys = cfg.Schedule.GetMaxMergeRegionKeys()
if h.svr.IsAPIServiceMode() {
schedulingServerConfig, err := h.GetSchedulingServerConfig()
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
cfg.Schedule = schedulingServerConfig.Schedule
cfg.Replication = schedulingServerConfig.Replication
} else {
cfg.Schedule.MaxMergeRegionKeys = cfg.Schedule.GetMaxMergeRegionKeys()
}
h.rd.JSON(w, http.StatusOK, cfg)
}

Expand Down Expand Up @@ -301,6 +313,16 @@ func getConfigMap(cfg map[string]interface{}, key []string, value interface{}) m
// @Success 200 {object} sc.ScheduleConfig
// @Router /config/schedule [get]
func (h *confHandler) GetScheduleConfig(w http.ResponseWriter, r *http.Request) {
if h.svr.IsAPIServiceMode() {
cfg, err := h.GetSchedulingServerConfig()
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
cfg.Schedule.SchedulersPayload = nil
h.rd.JSON(w, http.StatusOK, cfg.Schedule)
return
}
cfg := h.svr.GetScheduleConfig()
cfg.MaxMergeRegionKeys = cfg.GetMaxMergeRegionKeys()
h.rd.JSON(w, http.StatusOK, cfg)
Expand Down Expand Up @@ -364,6 +386,15 @@ func (h *confHandler) SetScheduleConfig(w http.ResponseWriter, r *http.Request)
// @Success 200 {object} sc.ReplicationConfig
// @Router /config/replicate [get]
func (h *confHandler) GetReplicationConfig(w http.ResponseWriter, r *http.Request) {
if h.svr.IsAPIServiceMode() {
cfg, err := h.GetSchedulingServerConfig()
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, cfg.Replication)
return
}
h.rd.JSON(w, http.StatusOK, h.svr.GetReplicationConfig())
}

Expand Down Expand Up @@ -505,3 +536,33 @@ func (h *confHandler) SetReplicationModeConfig(w http.ResponseWriter, r *http.Re
func (h *confHandler) GetPDServerConfig(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, h.svr.GetPDServerConfig())
}

func (h *confHandler) GetSchedulingServerConfig() (*config.Config, error) {
addr, ok := h.svr.GetServicePrimaryAddr(h.svr.Context(), utils.SchedulingServiceName)
if !ok {
return nil, errs.ErrNotFoundSchedulingAddr.FastGenByArgs()
}
url := fmt.Sprintf("%s/scheduling/api/v1/config", addr)
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
resp, err := h.svr.GetHTTPClient().Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, errs.ErrSchedulingServer.FastGenByArgs(resp.StatusCode)
}
b, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var schedulingServerConfig config.Config
err = json.Unmarshal(b, &schedulingServerConfig)
if err != nil {
return nil, err
}
return &schedulingServerConfig, nil
}
Loading
Loading