Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: support config http interface in scheduling server #7278

Merged
merged 13 commits into from
Nov 7, 2023
56 changes: 56 additions & 0 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func NewService(srv *scheserver.Service) *Service {
rd: createIndentRender(),
}
s.RegisterAdminRouter()
s.RegisterConfigRouter()
s.RegisterOperatorsRouter()
s.RegisterSchedulersRouter()
s.RegisterCheckersRouter()
Expand All @@ -126,6 +127,15 @@ func (s *Service) RegisterAdminRouter() {
router.DELETE("cache/regions/:id", deleteRegionCacheByID)
}

// RegisterConfigRouter registers the router of the config handler.
func (s *Service) RegisterConfigRouter() {
router := s.root.Group("config")
router.GET("", getConfig)
router.GET("/schedule", getScheduleConfig)
router.GET("/replicate", getReplicationConfig)
router.GET("/store", getStoreConfig)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if we need these APIs, I think they are barely used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because api server has the same interface. Maybe we also call "/config" in api server for these interfaces?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better, WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can cancel "/config/schedule" and "/config/replicate", which are in "/config". But we should keep "config/store", otherwise we have no way to get store config, which is not in "/config".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we can put store config into "/config" and add a new struct for it. How about it?

type ConfigPayload{
   Schedule
   Replication
   StoreConfig
   ...
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, make sense to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or only remove "/config/store", I found there is no "/config/store" in api server

}

// RegisterSchedulersRouter registers the router of the schedulers handler.
func (s *Service) RegisterSchedulersRouter() {
router := s.root.Group("schedulers")
Expand Down Expand Up @@ -186,6 +196,52 @@ func changeLogLevel(c *gin.Context) {
c.String(http.StatusOK, "The log level is updated.")
}

// @Tags config
// @Summary Get full config.
// @Produce json
// @Success 200 {object} config.Config
// @Router /config [get]
func getConfig(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
cfg := svr.GetConfig()
cfg.Schedule.MaxMergeRegionKeys = cfg.Schedule.GetMaxMergeRegionKeys()
c.IndentedJSON(http.StatusOK, cfg)
}

// @Tags config
// @Summary Get schedule config.
// @Produce json
// @Success 200 {object} config.ScheduleConfig
// @Router /config/schedule [get]
func getScheduleConfig(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
cfg := svr.GetScheduleConfig()
cfg.MaxMergeRegionKeys = cfg.GetMaxMergeRegionKeys()
c.IndentedJSON(http.StatusOK, cfg)
}

// @Tags config
// @Summary Get replication config.
// @Produce json
// @Success 200 {object} config.ReplicationConfig
// @Router /config/replicate [get]
func getReplicationConfig(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
cfg := svr.GetReplicationConfig()
c.IndentedJSON(http.StatusOK, cfg)
}

// @Tags config
// @Summary Get store config.
// @Produce json
// @Success 200 {object} config.StoreConfig
// @Router /config/store [get]
func getStoreConfig(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
cfg := svr.GetStoreConfig()
c.IndentedJSON(http.StatusOK, cfg)
}

// @Tags admin
// @Summary Drop all regions from cache.
// @Produce json
Expand Down
13 changes: 10 additions & 3 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ type Config struct {
Metric metricutil.MetricConfig `toml:"metric" json:"metric"`

// Log related config.
Log log.Config `toml:"log" json:"log"`
Logger *zap.Logger
LogProps *log.ZapProperties
Log log.Config `toml:"log" json:"log"`
Logger *zap.Logger `json:"-"`
LogProps *log.ZapProperties `json:"-"`

Security configutil.SecurityConfig `toml:"security" json:"security"`

Expand Down Expand Up @@ -195,6 +195,13 @@ func (c *Config) validate() error {
return nil
}

// Clone creates a copy of current config.
func (c *Config) Clone() *Config {
cfg := &Config{}
*cfg = *c
return cfg
}

// PersistConfig wraps all configurations that need to persist to storage and
// allows to access them safely.
type PersistConfig struct {
Expand Down
39 changes: 39 additions & 0 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/schedule"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/hbstream"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/storage/endpoint"
Expand Down Expand Up @@ -504,6 +505,44 @@
s.metaWatcher.Close()
}

// GetPersistConfig returns the persist config.
// It's used to test.
func (s *Server) GetPersistConfig() *config.PersistConfig {
return s.persistConfig
}

// GetConfig gets the config.
func (s *Server) GetConfig() *config.Config {
cfg := s.cfg.Clone()
cfg.Schedule = *s.persistConfig.GetScheduleConfig().Clone()
cfg.Replication = *s.persistConfig.GetReplicationConfig().Clone()
cfg.ClusterVersion = *s.persistConfig.GetClusterVersion()
if s.storage == nil {
return cfg

Check warning on line 521 in pkg/mcs/scheduling/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/server.go#L521

Added line #L521 was not covered by tests
}
sches, configs, err := s.storage.LoadAllSchedulerConfigs()
if err != nil {
return cfg

Check warning on line 525 in pkg/mcs/scheduling/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/server.go#L525

Added line #L525 was not covered by tests
}
cfg.Schedule.SchedulersPayload = schedulers.ToPayload(sches, configs)
return cfg
}

// GetScheduleConfig gets the schedule config.
func (s *Server) GetScheduleConfig() *sc.ScheduleConfig {
return s.persistConfig.GetScheduleConfig().Clone()
}

// GetReplicationConfig gets the replication config.
func (s *Server) GetReplicationConfig() *sc.ReplicationConfig {
return s.persistConfig.GetReplicationConfig().Clone()
}

// GetStoreConfig gets the store config.
func (s *Server) GetStoreConfig() *sc.StoreConfig {
return s.persistConfig.GetStoreConfig().Clone()
}

// CreateServer creates the Server
func CreateServer(ctx context.Context, cfg *config.Config) *Server {
svr := &Server{
Expand Down
72 changes: 71 additions & 1 deletion server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
"github.com/pingcap/errcode"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/jsonutil"
Expand Down Expand Up @@ -60,7 +62,24 @@
// @Router /config [get]
func (h *confHandler) GetConfig(w http.ResponseWriter, r *http.Request) {
cfg := h.svr.GetConfig()
cfg.Schedule.MaxMergeRegionKeys = cfg.Schedule.GetMaxMergeRegionKeys()
if h.svr.IsAPIServiceMode() {
b, err := h.GetSchedulingServerConfig("config")
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return

Check warning on line 69 in server/api/config.go

View check run for this annotation

Codecov / codecov/patch

server/api/config.go#L68-L69

Added lines #L68 - L69 were not covered by tests
}
var configSchedulingServer config.Config
err = json.Unmarshal(b, &configSchedulingServer)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return

Check warning on line 75 in server/api/config.go

View check run for this annotation

Codecov / codecov/patch

server/api/config.go#L74-L75

Added lines #L74 - L75 were not covered by tests
}
cfg.Schedule = configSchedulingServer.Schedule
cfg.Replication = configSchedulingServer.Replication
// TODO: will we support config/store?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: will we support config/store?

} else {
cfg.Schedule.MaxMergeRegionKeys = cfg.Schedule.GetMaxMergeRegionKeys()
}
h.rd.JSON(w, http.StatusOK, cfg)
}

Expand Down Expand Up @@ -301,6 +320,21 @@
// @Success 200 {object} sc.ScheduleConfig
// @Router /config/schedule [get]
func (h *confHandler) GetScheduleConfig(w http.ResponseWriter, r *http.Request) {
if h.svr.IsAPIServiceMode() {
b, err := h.GetSchedulingServerConfig("config/schedule")
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return

Check warning on line 327 in server/api/config.go

View check run for this annotation

Codecov / codecov/patch

server/api/config.go#L326-L327

Added lines #L326 - L327 were not covered by tests
}
var cfg sc.ScheduleConfig
err = json.Unmarshal(b, &cfg)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return

Check warning on line 333 in server/api/config.go

View check run for this annotation

Codecov / codecov/patch

server/api/config.go#L332-L333

Added lines #L332 - L333 were not covered by tests
}
h.rd.JSON(w, http.StatusOK, cfg)
return
}
cfg := h.svr.GetScheduleConfig()
cfg.MaxMergeRegionKeys = cfg.GetMaxMergeRegionKeys()
h.rd.JSON(w, http.StatusOK, cfg)
Expand Down Expand Up @@ -364,6 +398,21 @@
// @Success 200 {object} sc.ReplicationConfig
// @Router /config/replicate [get]
func (h *confHandler) GetReplicationConfig(w http.ResponseWriter, r *http.Request) {
if h.svr.IsAPIServiceMode() {
b, err := h.GetSchedulingServerConfig("config/replicate")
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return

Check warning on line 405 in server/api/config.go

View check run for this annotation

Codecov / codecov/patch

server/api/config.go#L404-L405

Added lines #L404 - L405 were not covered by tests
}
var cfg sc.ReplicationConfig
err = json.Unmarshal(b, &cfg)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return

Check warning on line 411 in server/api/config.go

View check run for this annotation

Codecov / codecov/patch

server/api/config.go#L410-L411

Added lines #L410 - L411 were not covered by tests
}
h.rd.JSON(w, http.StatusOK, cfg)
return
}
h.rd.JSON(w, http.StatusOK, h.svr.GetReplicationConfig())
}

Expand Down Expand Up @@ -505,3 +554,24 @@
func (h *confHandler) GetPDServerConfig(w http.ResponseWriter, r *http.Request) {
h.rd.JSON(w, http.StatusOK, h.svr.GetPDServerConfig())
}

func (h *confHandler) GetSchedulingServerConfig(path string) ([]byte, error) {
addr, ok := h.svr.GetServicePrimaryAddr(h.svr.Context(), utils.SchedulingServiceName)
if !ok {
return nil, errs.ErrNotFoundSchedulingAddr.FastGenByArgs()

Check warning on line 561 in server/api/config.go

View check run for this annotation

Codecov / codecov/patch

server/api/config.go#L561

Added line #L561 was not covered by tests
}
url := fmt.Sprintf("%s/scheduling/api/v1/%s", addr, path)
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err

Check warning on line 566 in server/api/config.go

View check run for this annotation

Codecov / codecov/patch

server/api/config.go#L566

Added line #L566 was not covered by tests
}
resp, err := h.svr.GetHTTPClient().Do(req)
if err != nil {
return nil, err

Check warning on line 570 in server/api/config.go

View check run for this annotation

Codecov / codecov/patch

server/api/config.go#L570

Added line #L570 was not covered by tests
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, errs.ErrSchedulingServer.FastGenByArgs(resp.StatusCode)

Check warning on line 574 in server/api/config.go

View check run for this annotation

Codecov / codecov/patch

server/api/config.go#L574

Added line #L574 was not covered by tests
}
return io.ReadAll(resp.Body)
}
Loading
Loading