Skip to content

Commit

Permalink
use config interface for scheduler controller
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Jul 10, 2023
1 parent 5759daf commit 2a7a43e
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 60 deletions.
5 changes: 5 additions & 0 deletions pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/storage/endpoint"
)

// RejectLeader is the label property type that suggests a store should not
Expand All @@ -30,6 +31,10 @@ func IsSchedulerRegistered(name string) bool {
// Config is the interface that wraps the Config related methods.
type Config interface {
IsSchedulingHalted() bool
IsSchedulerDisabled(string) bool
AddSchedulerCfg(string, []string)
RemoveSchedulerCfg(string)
Persist(endpoint.ConfigStorage) error

GetReplicaScheduleLimit() uint64
GetRegionScheduleLimit() uint64
Expand Down
6 changes: 3 additions & 3 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ type Coordinator struct {
// NewCoordinator creates a new Coordinator.
func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams *hbstream.HeartbeatStreams) *Coordinator {
ctx, cancel := context.WithCancel(ctx)
opController := operator.NewController(ctx, cluster.GetBasicCluster(), cluster.GetPersistOptions(), hbStreams)
schedulers := schedulers.NewController(ctx, cluster, opController)
opController := operator.NewController(ctx, cluster.GetBasicCluster(), cluster.GetOpts(), hbStreams)
schedulers := schedulers.NewController(ctx, cluster, cluster.GetStorage(), opController)
c := &Coordinator{
ctx: ctx,
cancel: cancel,
Expand All @@ -101,7 +101,7 @@ func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams
hbStreams: hbStreams,
pluginInterface: NewPluginInterface(),
}
c.diagnosticManager = diagnostic.NewManager(schedulers, cluster.GetPersistOptions())
c.diagnosticManager = diagnostic.NewManager(schedulers, cluster.GetOpts())
return c
}

Expand Down
16 changes: 4 additions & 12 deletions pkg/schedule/diagnostic/diagnostic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ import (
"time"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/server/config"
)

// Manager is used to manage the diagnostic result of schedulers for now.
type Manager struct {
config *config.PersistOptions
config config.Config
schedulerController *schedulers.Controller
}

// NewManager creates a new Manager.
func NewManager(schedulerController *schedulers.Controller, config *config.PersistOptions) *Manager {
func NewManager(schedulerController *schedulers.Controller, config config.Config) *Manager {
return &Manager{
config: config,
schedulerController: schedulerController,
Expand All @@ -48,15 +48,7 @@ func (d *Manager) GetDiagnosticResult(name string) (*schedulers.DiagnosticResult
res := &schedulers.DiagnosticResult{Name: name, Timestamp: ts, Status: schedulers.Disabled}
return res, nil
}
var isDisabled bool
t := scheduler.Scheduler.GetType()
scheduleConfig := d.config.GetScheduleConfig()
for _, s := range scheduleConfig.Schedulers {
if t == s.Type {
isDisabled = s.Disable
break
}
}
isDisabled := d.config.IsSchedulerDisabled(scheduler.Scheduler.GetType())
if isDisabled {
ts := uint64(time.Now().Unix())
res := &schedulers.DiagnosticResult{Name: name, Timestamp: ts, Status: schedulers.Disabled}
Expand Down
57 changes: 12 additions & 45 deletions pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ import (
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/server/config"
"go.uber.org/zap"
)

Expand All @@ -43,16 +42,18 @@ type Controller struct {
sync.RWMutex
wg sync.WaitGroup
ctx context.Context
cluster sche.ClusterInformer
cluster sche.ScheduleCluster
storage endpoint.ConfigStorage
schedulers map[string]*ScheduleController
opController *operator.Controller
}

// NewController creates a scheduler controller.
func NewController(ctx context.Context, cluster sche.ClusterInformer, opController *operator.Controller) *Controller {
func NewController(ctx context.Context, cluster sche.ScheduleCluster, storage endpoint.ConfigStorage, opController *operator.Controller) *Controller {
return &Controller{
ctx: ctx,
cluster: cluster,
storage: storage,
schedulers: make(map[string]*ScheduleController),
opController: opController,
}
Expand Down Expand Up @@ -108,7 +109,7 @@ func (c *Controller) CollectSchedulerMetrics() {
}

func (c *Controller) isSchedulingHalted() bool {
return c.cluster.GetPersistOptions().IsSchedulingHalted()
return c.cluster.GetOpts().IsSchedulingHalted()
}

// ResetSchedulerMetrics resets metrics of all schedulers.
Expand All @@ -133,7 +134,7 @@ func (c *Controller) AddScheduler(scheduler Scheduler, args ...string) error {
c.wg.Add(1)
go c.runScheduler(s)
c.schedulers[s.Scheduler.GetName()] = s
c.cluster.GetPersistOptions().AddSchedulerCfg(s.Scheduler.GetType(), args)
c.cluster.GetOpts().AddSchedulerCfg(s.Scheduler.GetType(), args)
return nil
}

Expand All @@ -149,18 +150,14 @@ func (c *Controller) RemoveScheduler(name string) error {
return errs.ErrSchedulerNotFound.FastGenByArgs()
}

opt := c.cluster.GetPersistOptions()
if err := c.removeOptScheduler(opt, name); err != nil {
log.Error("can not remove scheduler", zap.String("scheduler-name", name), errs.ZapError(err))
return err
}

if err := opt.Persist(c.cluster.GetStorage()); err != nil {
opt := c.cluster.GetOpts()
opt.RemoveSchedulerCfg(s.Scheduler.GetType())
if err := opt.Persist(c.storage); err != nil {
log.Error("the option can not persist scheduler config", errs.ZapError(err))
return err
}

if err := c.cluster.GetStorage().RemoveScheduleConfig(name); err != nil {
if err := c.storage.RemoveScheduleConfig(name); err != nil {
log.Error("can not remove the scheduler config", errs.ZapError(err))
return err
}
Expand All @@ -172,29 +169,6 @@ func (c *Controller) RemoveScheduler(name string) error {
return nil
}

func (c *Controller) removeOptScheduler(o *config.PersistOptions, name string) error {
v := o.GetScheduleConfig().Clone()
for i, schedulerCfg := range v.Schedulers {
// To create a temporary scheduler is just used to get scheduler's name
decoder := ConfigSliceDecoder(schedulerCfg.Type, schedulerCfg.Args)
tmp, err := CreateScheduler(schedulerCfg.Type, c.opController, storage.NewStorageWithMemoryBackend(), decoder, c.RemoveScheduler)
if err != nil {
return err
}
if tmp.GetName() == name {
if config.IsDefaultScheduler(tmp.GetType()) {
schedulerCfg.Disable = true
v.Schedulers[i] = schedulerCfg
} else {
v.Schedulers = append(v.Schedulers[:i], v.Schedulers[i+1:]...)
}
o.SetScheduleConfig(v)
return nil
}
}
return nil
}

// PauseOrResumeScheduler pauses or resumes a scheduler by name.
func (c *Controller) PauseOrResumeScheduler(name string, t int64) error {
c.Lock()
Expand Down Expand Up @@ -265,14 +239,7 @@ func (c *Controller) IsSchedulerDisabled(name string) (bool, error) {
if !ok {
return false, errs.ErrSchedulerNotFound.FastGenByArgs()
}
t := s.Scheduler.GetType()
scheduleConfig := c.cluster.GetPersistOptions().GetScheduleConfig()
for _, s := range scheduleConfig.Schedulers {
if t == s.Type {
return s.Disable, nil
}
}
return false, nil
return c.cluster.GetOpts().IsSchedulerDisabled(s.Scheduler.GetType()), nil
}

// IsSchedulerExisted returns whether a scheduler is existed.
Expand Down
27 changes: 27 additions & 0 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,17 @@ func (o *PersistOptions) GetSchedulers() SchedulerConfigs {
return o.GetScheduleConfig().Schedulers
}

// IsSchedulerDisabled returns if the scheduler is disabled.
func (o *PersistOptions) IsSchedulerDisabled(t string) bool {
schedulers := o.GetScheduleConfig().Schedulers
for _, s := range schedulers {
if t == s.Type {
return s.Disable
}
}
return false
}

// GetHotRegionsWriteInterval gets interval for PD to store Hot Region information.
func (o *PersistOptions) GetHotRegionsWriteInterval() time.Duration {
return o.GetScheduleConfig().HotRegionsWriteInterval.Duration
Expand Down Expand Up @@ -701,6 +712,22 @@ func (o *PersistOptions) AddSchedulerCfg(tp string, args []string) {
o.SetScheduleConfig(v)
}

// RemoveSchedulerCfg removes the scheduler configurations.
func (o *PersistOptions) RemoveSchedulerCfg(tp string) {
v := o.GetScheduleConfig().Clone()
for i, schedulerCfg := range v.Schedulers {
if tp == schedulerCfg.Type {
if IsDefaultScheduler(tp) {
schedulerCfg.Disable = true
v.Schedulers[i] = schedulerCfg
} else {
v.Schedulers = append(v.Schedulers[:i], v.Schedulers[i+1:]...)
}
o.SetScheduleConfig(v)
}
}
}

// SetLabelProperty sets the label property.
func (o *PersistOptions) SetLabelProperty(typ, labelKey, labelValue string) {
cfg := o.GetLabelPropertyConfig().Clone()
Expand Down

0 comments on commit 2a7a43e

Please sign in to comment.