-
Notifications
You must be signed in to change notification settings - Fork 720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
mcs: make scheduling server support checker and scheduler http interface #7131
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,6 @@ | |
"net/http" | ||
"strconv" | ||
"sync" | ||
"time" | ||
|
||
"github.com/gin-contrib/cors" | ||
"github.com/gin-contrib/gzip" | ||
|
@@ -129,12 +128,17 @@ | |
func (s *Service) RegisterSchedulersRouter() { | ||
router := s.root.Group("schedulers") | ||
router.GET("", getSchedulers) | ||
router.GET("/diagnostic/:name", getDiagnosticResult) | ||
// TODO: in the future, we should split pauseOrResumeScheduler to two different APIs. | ||
// And we need to do one-to-two forwarding in the API middleware. | ||
router.POST("/:name", pauseOrResumeScheduler) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we split it into two APIs? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, we only support one-to-one forwarding at this time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What I mean is since we use the new framework for scheduling service, we should make it more standard. The previous one is not good enough, so we should make it more clear. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we want to achieve one-to-two forwarding, i.e. post There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should make the new API more clear. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you don't want to change it and since it is only used by forwarding now, I'm ok with the current status. But in the future, it's better to separate these actions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, I will add TODO about it. |
||
} | ||
|
||
// RegisterCheckersRouter registers the router of the checkers handler. | ||
func (s *Service) RegisterCheckersRouter() { | ||
router := s.root.Group("checkers") | ||
router.GET("/:name", getCheckerByName) | ||
router.POST("/:name", pauseOrResumeChecker) | ||
} | ||
|
||
// RegisterOperatorsRouter registers the router of the operators handler. | ||
|
@@ -148,19 +152,19 @@ | |
} | ||
|
||
func changeLogLevel(c *gin.Context) { | ||
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) | ||
var level string | ||
if err := c.Bind(&level); err != nil { | ||
c.String(http.StatusBadRequest, err.Error()) | ||
return | ||
} | ||
|
||
if err := svr.SetLogLevel(level); err != nil { | ||
c.String(http.StatusBadRequest, err.Error()) | ||
return | ||
} | ||
log.SetLevel(logutil.StringToZapLogLevel(level)) | ||
c.String(http.StatusOK, "The log level is updated.") | ||
} | ||
|
||
// @Tags operators | ||
|
@@ -304,24 +308,54 @@ | |
// @Failure 500 {string} string "PD server failed to proceed the request." | ||
// @Router /checkers/{name} [get] | ||
func getCheckerByName(c *gin.Context) { | ||
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) | ||
handler := c.MustGet(handlerKey).(*handler.Handler) | ||
name := c.Param("name") | ||
co := svr.GetCoordinator() | ||
isPaused, err := co.IsCheckerPaused(name) | ||
output, err := handler.GetCheckerStatus(name) | ||
if err != nil { | ||
c.String(http.StatusInternalServerError, err.Error()) | ||
return | ||
} | ||
output := map[string]bool{ | ||
"paused": isPaused, | ||
} | ||
c.IndentedJSON(http.StatusOK, output) | ||
} | ||
|
||
type schedulerPausedPeriod struct { | ||
Name string `json:"name"` | ||
PausedAt time.Time `json:"paused_at"` | ||
ResumeAt time.Time `json:"resume_at"` | ||
// FIXME: details of input json body params | ||
// @Tags checker | ||
// @Summary Pause or resume region merge. | ||
// @Accept json | ||
// @Param name path string true "The name of the checker." | ||
// @Param body body object true "json params" | ||
// @Produce json | ||
// @Success 200 {string} string "Pause or resume the scheduler successfully." | ||
// @Failure 400 {string} string "Bad format request." | ||
// @Failure 500 {string} string "PD server failed to proceed the request." | ||
// @Router /checker/{name} [post] | ||
func pauseOrResumeChecker(c *gin.Context) { | ||
handler := c.MustGet(handlerKey).(*handler.Handler) | ||
var input map[string]int | ||
if err := c.BindJSON(&input); err != nil { | ||
c.String(http.StatusBadRequest, err.Error()) | ||
return | ||
} | ||
|
||
name := c.Param("name") | ||
t, ok := input["delay"] | ||
if !ok { | ||
c.String(http.StatusBadRequest, "missing pause time") | ||
return | ||
} | ||
if t < 0 { | ||
c.String(http.StatusBadRequest, "delay cannot be negative") | ||
return | ||
} | ||
if err := handler.PauseOrResumeChecker(name, int64(t)); err != nil { | ||
c.String(http.StatusInternalServerError, err.Error()) | ||
return | ||
} | ||
if t == 0 { | ||
c.String(http.StatusOK, "Resume the checker successfully.") | ||
} else { | ||
c.String(http.StatusOK, "Pause the checker successfully.") | ||
} | ||
} | ||
|
||
// @Tags schedulers | ||
|
@@ -331,70 +365,63 @@ | |
// @Failure 500 {string} string "PD server failed to proceed the request." | ||
// @Router /schedulers [get] | ||
func getSchedulers(c *gin.Context) { | ||
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) | ||
co := svr.GetCoordinator() | ||
sc := co.GetSchedulersController() | ||
schedulers := sc.GetSchedulerNames() | ||
|
||
handler := c.MustGet(handlerKey).(*handler.Handler) | ||
status := c.Query("status") | ||
_, needTS := c.GetQuery("timestamp") | ||
switch status { | ||
case "paused": | ||
var pausedSchedulers []string | ||
pausedPeriods := []schedulerPausedPeriod{} | ||
for _, scheduler := range schedulers { | ||
paused, err := sc.IsSchedulerPaused(scheduler) | ||
if err != nil { | ||
c.String(http.StatusInternalServerError, err.Error()) | ||
return | ||
} | ||
|
||
if paused { | ||
if needTS { | ||
s := schedulerPausedPeriod{ | ||
Name: scheduler, | ||
PausedAt: time.Time{}, | ||
ResumeAt: time.Time{}, | ||
} | ||
pausedAt, err := sc.GetPausedSchedulerDelayAt(scheduler) | ||
if err != nil { | ||
c.String(http.StatusInternalServerError, err.Error()) | ||
return | ||
} | ||
s.PausedAt = time.Unix(pausedAt, 0) | ||
resumeAt, err := sc.GetPausedSchedulerDelayUntil(scheduler) | ||
if err != nil { | ||
c.String(http.StatusInternalServerError, err.Error()) | ||
return | ||
} | ||
s.ResumeAt = time.Unix(resumeAt, 0) | ||
pausedPeriods = append(pausedPeriods, s) | ||
} else { | ||
pausedSchedulers = append(pausedSchedulers, scheduler) | ||
} | ||
} | ||
} | ||
if needTS { | ||
c.IndentedJSON(http.StatusOK, pausedPeriods) | ||
} else { | ||
c.IndentedJSON(http.StatusOK, pausedSchedulers) | ||
} | ||
output, err := handler.GetSchedulerByStatus(status, needTS) | ||
if err != nil { | ||
c.String(http.StatusInternalServerError, err.Error()) | ||
return | ||
} | ||
c.IndentedJSON(http.StatusOK, output) | ||
} | ||
|
||
// @Tags schedulers | ||
// @Summary List schedulers diagnostic result. | ||
// @Produce json | ||
// @Success 200 {array} string | ||
// @Failure 500 {string} string "PD server failed to proceed the request." | ||
// @Router /schedulers/diagnostic/{name} [get] | ||
func getDiagnosticResult(c *gin.Context) { | ||
handler := c.MustGet(handlerKey).(*handler.Handler) | ||
name := c.Param("name") | ||
result, err := handler.GetDiagnosticResult(name) | ||
if err != nil { | ||
c.String(http.StatusInternalServerError, err.Error()) | ||
return | ||
} | ||
c.IndentedJSON(http.StatusOK, result) | ||
} | ||
|
||
// FIXME: details of input json body params | ||
// @Tags scheduler | ||
// @Summary Pause or resume a scheduler. | ||
// @Accept json | ||
// @Param name path string true "The name of the scheduler." | ||
// @Param body body object true "json params" | ||
// @Produce json | ||
// @Success 200 {string} string "Pause or resume the scheduler successfully." | ||
// @Failure 400 {string} string "Bad format request." | ||
// @Failure 500 {string} string "PD server failed to proceed the request." | ||
// @Router /schedulers/{name} [post] | ||
func pauseOrResumeScheduler(c *gin.Context) { | ||
handler := c.MustGet(handlerKey).(*handler.Handler) | ||
|
||
var input map[string]int64 | ||
if err := c.BindJSON(&input); err != nil { | ||
c.String(http.StatusBadRequest, err.Error()) | ||
return | ||
} | ||
|
||
name := c.Param("name") | ||
t, ok := input["delay"] | ||
if !ok { | ||
c.String(http.StatusBadRequest, "missing pause time") | ||
return | ||
} | ||
if err := handler.PauseOrResumeScheduler(name, t); err != nil { | ||
c.String(http.StatusInternalServerError, err.Error()) | ||
return | ||
case "disabled": | ||
var disabledSchedulers []string | ||
for _, scheduler := range schedulers { | ||
disabled, err := sc.IsSchedulerDisabled(scheduler) | ||
if err != nil { | ||
c.String(http.StatusInternalServerError, err.Error()) | ||
return | ||
} | ||
|
||
if disabled { | ||
disabledSchedulers = append(disabledSchedulers, scheduler) | ||
} | ||
} | ||
c.IndentedJSON(http.StatusOK, disabledSchedulers) | ||
default: | ||
c.IndentedJSON(http.StatusOK, schedulers) | ||
} | ||
c.String(http.StatusOK, "Pause or resume the scheduler successfully.") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need it for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why? It can pass tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feature is not widely used, to reduce the problem, I think we can hold it instead of expose it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's best to keep forwarding, one reason is that it will make matching rules simple, and another reason is to avoid us missing an interface or creating incompatibility issues