Skip to content

Commit

Permalink
mcs: add http support for scheduling service (#6960)
Browse files Browse the repository at this point in the history
ref #5839

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] authored Aug 23, 2023
1 parent 497c942 commit de5f53e
Show file tree
Hide file tree
Showing 8 changed files with 434 additions and 19 deletions.
199 changes: 198 additions & 1 deletion pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package apis

import (
"net/http"
"strconv"
"sync"
"time"

"github.com/gin-contrib/cors"
"github.com/gin-contrib/gzip"
Expand All @@ -25,6 +27,7 @@ import (
"github.com/joho/godotenv"
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/unrolled/render"
Expand Down Expand Up @@ -77,7 +80,7 @@ func NewService(srv *scheserver.Service) *Service {
apiHandlerEngine.Use(cors.Default())
apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression))
apiHandlerEngine.Use(func(c *gin.Context) {
c.Set(multiservicesapi.ServiceContextKey, srv)
c.Set(multiservicesapi.ServiceContextKey, srv.Server)
c.Next()
})
apiHandlerEngine.Use(multiservicesapi.ServiceRedirector())
Expand All @@ -90,5 +93,199 @@ func NewService(srv *scheserver.Service) *Service {
root: root,
rd: createIndentRender(),
}
s.RegisterOperatorsRouter()
s.RegisterSchedulersRouter()
s.RegisterCheckersRouter()
return s
}

// RegisterSchedulersRouter registers the router of the schedulers handler.
func (s *Service) RegisterSchedulersRouter() {
router := s.root.Group("schedulers")
router.GET("", getSchedulers)
}

// RegisterCheckersRouter registers the router of the checkers handler.
func (s *Service) RegisterCheckersRouter() {
router := s.root.Group("checkers")
router.GET("/:name", getCheckerByName)
}

// RegisterOperatorsRouter registers the router of the operators handler.
func (s *Service) RegisterOperatorsRouter() {
router := s.root.Group("operators")
router.GET("", getOperators)
router.GET("/:id", getOperatorByID)
}

// @Tags operators
// @Summary Get an operator by ID.
// @Param region_id path int true "A Region's Id"
// @Produce json
// @Success 200 {object} operator.OpWithStatus
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators/{id} [GET]
func getOperatorByID(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
id := c.Param("id")

regionID, err := strconv.ParseUint(id, 10, 64)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}

opController := svr.GetCoordinator().GetOperatorController()
if opController == nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

c.JSON(http.StatusOK, opController.GetOperatorStatus(regionID))
}

// @Tags operators
// @Summary List operators.
// @Param kind query string false "Specify the operator kind." Enums(admin, leader, region, waiting)
// @Produce json
// @Success 200 {array} operator.Operator
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators [GET]
func getOperators(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
var (
results []*operator.Operator
ops []*operator.Operator
err error
)

opController := svr.GetCoordinator().GetOperatorController()
if opController == nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
kinds := c.QueryArray("kind")
if len(kinds) == 0 {
results = opController.GetOperators()
} else {
for _, kind := range kinds {
switch kind {
case "admin":
ops = opController.GetOperatorsOfKind(operator.OpAdmin)
case "leader":
ops = opController.GetOperatorsOfKind(operator.OpLeader)
case "region":
ops = opController.GetOperatorsOfKind(operator.OpRegion)
case "waiting":
ops = opController.GetWaitingOperators()
}
results = append(results, ops...)
}
}

c.JSON(http.StatusOK, results)
}

// @Tags checkers
// @Summary Get checker by name
// @Param name path string true "The name of the checker."
// @Produce json
// @Success 200 {string} string "The checker's status."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /checkers/{name} [get]
func getCheckerByName(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
name := c.Param("name")
co := svr.GetCoordinator()
isPaused, err := co.IsCheckerPaused(name)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
output := map[string]bool{
"paused": isPaused,
}
c.JSON(http.StatusOK, output)
}

type schedulerPausedPeriod struct {
Name string `json:"name"`
PausedAt time.Time `json:"paused_at"`
ResumeAt time.Time `json:"resume_at"`
}

// @Tags schedulers
// @Summary List all created schedulers by status.
// @Produce json
// @Success 200 {array} string
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /schedulers [get]
func getSchedulers(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
co := svr.GetCoordinator()
sc := co.GetSchedulersController()
schedulers := sc.GetSchedulerNames()

status := c.Query("status")
_, needTS := c.GetQuery("timestamp")
switch status {
case "paused":
var pausedSchedulers []string
pausedPeriods := []schedulerPausedPeriod{}
for _, scheduler := range schedulers {
paused, err := sc.IsSchedulerPaused(scheduler)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

if paused {
if needTS {
s := schedulerPausedPeriod{
Name: scheduler,
PausedAt: time.Time{},
ResumeAt: time.Time{},
}
pausedAt, err := sc.GetPausedSchedulerDelayAt(scheduler)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
s.PausedAt = time.Unix(pausedAt, 0)
resumeAt, err := sc.GetPausedSchedulerDelayUntil(scheduler)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
s.ResumeAt = time.Unix(resumeAt, 0)
pausedPeriods = append(pausedPeriods, s)
} else {
pausedSchedulers = append(pausedSchedulers, scheduler)
}
}
}
if needTS {
c.JSON(http.StatusOK, pausedPeriods)
} else {
c.JSON(http.StatusOK, pausedSchedulers)
}
return
case "disabled":
var disabledSchedulers []string
for _, scheduler := range schedulers {
disabled, err := sc.IsSchedulerDisabled(scheduler)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

if disabled {
disabledSchedulers = append(disabledSchedulers, scheduler)
}
}
c.JSON(http.StatusOK, disabledSchedulers)
default:
c.JSON(http.StatusOK, schedulers)
}
}
5 changes: 3 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConf

// TODO: implement the following methods

// UpdateRegionsLabelLevelStats updates the region label level stats.
func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {}
// UpdateRegionsLabelLevelStats updates the status of the region label level by types.
func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {
}

// AllocID allocates a new ID.
func (c *Cluster) AllocID() (uint64, error) { return 0, nil }
7 changes: 6 additions & 1 deletion pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,11 @@ func (s *Server) GetTLSConfig() *grpcutil.TLSConfig {
return &s.cfg.Security.TLSConfig
}

// GetCoordinator returns the coordinator.
func (s *Server) GetCoordinator() *schedule.Coordinator {
return s.coordinator
}

func (s *Server) initClient() error {
tlsConfig, err := s.cfg.Security.ToTLSConfig()
if err != nil {
Expand Down Expand Up @@ -501,6 +506,7 @@ func (s *Server) startServer() (err error) {
if err != nil {
return err
}
s.service = &Service{Server: s}
tlsConfig, err := s.cfg.Security.ToTLSConfig()
if err != nil {
return err
Expand Down Expand Up @@ -543,7 +549,6 @@ func (s *Server) startServer() (err error) {
log.Error("failed to register the service", zap.String("service-name", utils.SchedulingServiceName), errs.ZapError(err))
return err
}

atomic.StoreInt64(&s.isRunning, 1)
return nil
}
Expand Down
78 changes: 78 additions & 0 deletions pkg/mcs/scheduling/server/testutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"context"
"os"

"github.com/pingcap/log"
"github.com/spf13/pflag"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/testutil"
)

// NewTestServer creates a resource manager server for testing.
func NewTestServer(ctx context.Context, re *require.Assertions, cfg *config.Config) (*Server, testutil.CleanupFunc, error) {
// New zap logger
err := logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
re.NoError(err)
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
// Flushing any buffered log entries
defer log.Sync()

s := CreateServer(ctx, cfg)
if err = s.Run(); err != nil {
return nil, nil, err
}

cleanup := func() {
s.Close()
os.RemoveAll(cfg.DataDir)
}
return s, cleanup, nil
}

// GenerateConfig generates a new config with the given options.
func GenerateConfig(c *config.Config) (*config.Config, error) {
arguments := []string{
"--listen-addr=" + c.ListenAddr,
"--advertise-listen-addr=" + c.AdvertiseListenAddr,
"--backend-endpoints=" + c.BackendEndpoints,
}

flagSet := pflag.NewFlagSet("test", pflag.ContinueOnError)
flagSet.BoolP("version", "V", false, "print version information and exit")
flagSet.StringP("config", "", "", "config file")
flagSet.StringP("backend-endpoints", "", "", "url for etcd client")
flagSet.StringP("listen-addr", "", "", "listen address for tso service")
flagSet.StringP("advertise-listen-addr", "", "", "advertise urls for listen address (default '${listen-addr}')")
flagSet.StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs")
flagSet.StringP("cert", "", "", "path of file that contains X509 certificate in PEM format")
flagSet.StringP("key", "", "", "path of file that contains X509 key in PEM format")
err := flagSet.Parse(arguments)
if err != nil {
return nil, err
}
cfg := config.NewConfig()
err = cfg.Parse(flagSet)
if err != nil {
return nil, err
}

return cfg, nil
}
15 changes: 15 additions & 0 deletions pkg/schedule/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,21 @@ func (oc *Controller) GetWaitingOperators() []*Operator {
return oc.wop.ListOperator()
}

// GetOperatorsOfKind returns the running operators of the kind.
func (oc *Controller) GetOperatorsOfKind(mask OpKind) []*Operator {
oc.RLock()
defer oc.RUnlock()

operators := make([]*Operator, 0, len(oc.operators))
for _, op := range oc.operators {
if op.Kind()&mask != 0 {
operators = append(operators, op)
}
}

return operators
}

// SendScheduleCommand sends a command to the region.
func (oc *Controller) SendScheduleCommand(region *core.RegionInfo, step OpStep, source string) {
log.Info("send schedule command",
Expand Down
Loading

0 comments on commit de5f53e

Please sign in to comment.