Skip to content

Commit

Permalink
Merge branch 'master' into role
Browse files Browse the repository at this point in the history
  • Loading branch information
rleungx authored Oct 9, 2023
2 parents 3345aaf + 2556b5b commit 1ced9ca
Show file tree
Hide file tree
Showing 18 changed files with 654 additions and 501 deletions.
173 changes: 100 additions & 73 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"net/http"
"strconv"
"sync"
"time"

"github.com/gin-contrib/cors"
"github.com/gin-contrib/gzip"
Expand Down Expand Up @@ -129,12 +128,17 @@ func (s *Service) RegisterAdminRouter() {
func (s *Service) RegisterSchedulersRouter() {
router := s.root.Group("schedulers")
router.GET("", getSchedulers)
router.GET("/diagnostic/:name", getDiagnosticResult)
// TODO: in the future, we should split pauseOrResumeScheduler to two different APIs.
// And we need to do one-to-two forwarding in the API middleware.
router.POST("/:name", pauseOrResumeScheduler)
}

// RegisterCheckersRouter registers the router of the checkers handler.
func (s *Service) RegisterCheckersRouter() {
router := s.root.Group("checkers")
router.GET("/:name", getCheckerByName)
router.POST("/:name", pauseOrResumeChecker)
}

// RegisterOperatorsRouter registers the router of the operators handler.
Expand Down Expand Up @@ -304,24 +308,54 @@ func createOperator(c *gin.Context) {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /checkers/{name} [get]
func getCheckerByName(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
handler := c.MustGet(handlerKey).(*handler.Handler)
name := c.Param("name")
co := svr.GetCoordinator()
isPaused, err := co.IsCheckerPaused(name)
output, err := handler.GetCheckerStatus(name)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
output := map[string]bool{
"paused": isPaused,
}
c.IndentedJSON(http.StatusOK, output)
}

type schedulerPausedPeriod struct {
Name string `json:"name"`
PausedAt time.Time `json:"paused_at"`
ResumeAt time.Time `json:"resume_at"`
// FIXME: details of input json body params
// @Tags checker
// @Summary Pause or resume region merge.
// @Accept json
// @Param name path string true "The name of the checker."
// @Param body body object true "json params"
// @Produce json
// @Success 200 {string} string "Pause or resume the scheduler successfully."
// @Failure 400 {string} string "Bad format request."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /checker/{name} [post]
func pauseOrResumeChecker(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
var input map[string]int
if err := c.BindJSON(&input); err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}

name := c.Param("name")
t, ok := input["delay"]
if !ok {
c.String(http.StatusBadRequest, "missing pause time")
return
}
if t < 0 {
c.String(http.StatusBadRequest, "delay cannot be negative")
return
}
if err := handler.PauseOrResumeChecker(name, int64(t)); err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
if t == 0 {
c.String(http.StatusOK, "Resume the checker successfully.")
} else {
c.String(http.StatusOK, "Pause the checker successfully.")
}
}

// @Tags schedulers
Expand All @@ -331,70 +365,63 @@ type schedulerPausedPeriod struct {
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /schedulers [get]
func getSchedulers(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
co := svr.GetCoordinator()
sc := co.GetSchedulersController()
schedulers := sc.GetSchedulerNames()

handler := c.MustGet(handlerKey).(*handler.Handler)
status := c.Query("status")
_, needTS := c.GetQuery("timestamp")
switch status {
case "paused":
var pausedSchedulers []string
pausedPeriods := []schedulerPausedPeriod{}
for _, scheduler := range schedulers {
paused, err := sc.IsSchedulerPaused(scheduler)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

if paused {
if needTS {
s := schedulerPausedPeriod{
Name: scheduler,
PausedAt: time.Time{},
ResumeAt: time.Time{},
}
pausedAt, err := sc.GetPausedSchedulerDelayAt(scheduler)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
s.PausedAt = time.Unix(pausedAt, 0)
resumeAt, err := sc.GetPausedSchedulerDelayUntil(scheduler)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
s.ResumeAt = time.Unix(resumeAt, 0)
pausedPeriods = append(pausedPeriods, s)
} else {
pausedSchedulers = append(pausedSchedulers, scheduler)
}
}
}
if needTS {
c.IndentedJSON(http.StatusOK, pausedPeriods)
} else {
c.IndentedJSON(http.StatusOK, pausedSchedulers)
}
output, err := handler.GetSchedulerByStatus(status, needTS)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, output)
}

// @Tags schedulers
// @Summary List schedulers diagnostic result.
// @Produce json
// @Success 200 {array} string
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /schedulers/diagnostic/{name} [get]
func getDiagnosticResult(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)
name := c.Param("name")
result, err := handler.GetDiagnosticResult(name)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, result)
}

// FIXME: details of input json body params
// @Tags scheduler
// @Summary Pause or resume a scheduler.
// @Accept json
// @Param name path string true "The name of the scheduler."
// @Param body body object true "json params"
// @Produce json
// @Success 200 {string} string "Pause or resume the scheduler successfully."
// @Failure 400 {string} string "Bad format request."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /schedulers/{name} [post]
func pauseOrResumeScheduler(c *gin.Context) {
handler := c.MustGet(handlerKey).(*handler.Handler)

var input map[string]int64
if err := c.BindJSON(&input); err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}

name := c.Param("name")
t, ok := input["delay"]
if !ok {
c.String(http.StatusBadRequest, "missing pause time")
return
}
if err := handler.PauseOrResumeScheduler(name, t); err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
case "disabled":
var disabledSchedulers []string
for _, scheduler := range schedulers {
disabled, err := sc.IsSchedulerDisabled(scheduler)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

if disabled {
disabledSchedulers = append(disabledSchedulers, scheduler)
}
}
c.IndentedJSON(http.StatusOK, disabledSchedulers)
default:
c.IndentedJSON(http.StatusOK, schedulers)
}
c.String(http.StatusOK, "Pause or resume the scheduler successfully.")
}
42 changes: 40 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Cluster struct {
checkMembershipCh chan struct{}
apiServerLeader atomic.Value
clusterID uint64
running atomic.Bool
}

const regionLabelGCInterval = time.Hour
Expand Down Expand Up @@ -203,6 +204,14 @@ func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool {
return c.apiServerLeader.CompareAndSwap(old, new)
}

func trySend(notifier chan struct{}) {
select {
case notifier <- struct{}{}:
// If the channel is not empty, it means the check is triggered.
default:
}
}

// updateScheduler listens on the schedulers updating notifier and manage the scheduler creation and deletion.
func (c *Cluster) updateScheduler() {
defer logutil.LogPanic()
Expand All @@ -213,8 +222,11 @@ func (c *Cluster) updateScheduler() {
// Establish a notifier to listen the schedulers updating.
notifier := make(chan struct{}, 1)
// Make sure the check will be triggered once later.
notifier <- struct{}{}
trySend(notifier)
c.persistConfig.SetSchedulersUpdatingNotifier(notifier)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
select {
case <-c.ctx.Done():
Expand All @@ -224,6 +236,18 @@ func (c *Cluster) updateScheduler() {
// This is triggered by the watcher when the schedulers are updated.
}

if !c.running.Load() {
select {
case <-c.ctx.Done():
log.Info("cluster is closing, stop listening the schedulers updating notifier")
return
case <-ticker.C:
// retry
trySend(notifier)
continue
}
}

log.Info("schedulers updating notifier is triggered, try to update the scheduler")
var (
schedulersController = c.coordinator.GetSchedulersController()
Expand Down Expand Up @@ -394,15 +418,29 @@ func (c *Cluster) runUpdateStoreStats() {
}
}

// runCoordinator runs the main scheduling loop.
func (c *Cluster) runCoordinator() {
defer logutil.LogPanic()
defer c.wg.Done()
c.coordinator.RunUntilStop()
}

// StartBackgroundJobs starts background jobs.
func (c *Cluster) StartBackgroundJobs() {
c.wg.Add(2)
c.wg.Add(3)
go c.updateScheduler()
go c.runUpdateStoreStats()
go c.runCoordinator()
c.running.Store(true)
}

// StopBackgroundJobs stops background jobs.
func (c *Cluster) StopBackgroundJobs() {
if !c.running.Load() {
return
}
c.running.Store(false)
c.coordinator.Stop()
c.cancel()
c.wg.Wait()
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,16 +462,12 @@ func (s *Server) startCluster(context.Context) error {
}
s.configWatcher.SetSchedulersController(s.cluster.GetCoordinator().GetSchedulersController())
s.cluster.StartBackgroundJobs()
go s.GetCoordinator().RunUntilStop()
return nil
}

func (s *Server) stopCluster() {
s.GetCoordinator().Stop()
s.cluster.StopBackgroundJobs()
s.ruleWatcher.Close()
s.configWatcher.Close()
s.metaWatcher.Close()
s.stopWatcher()
}

func (s *Server) startWatcher() (err error) {
Expand All @@ -487,6 +483,12 @@ func (s *Server) startWatcher() (err error) {
return err
}

func (s *Server) stopWatcher() {
s.ruleWatcher.Close()
s.configWatcher.Close()
s.metaWatcher.Close()
}

// GetPersistConfig returns the persist config.
// It's used to test.
func (s *Server) GetPersistConfig() *config.PersistConfig {
Expand Down
Loading

0 comments on commit 1ced9ca

Please sign in to comment.