Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedule: refactor diagnostic manager #6771

Merged
merged 4 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 18 additions & 182 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"net/http"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand All @@ -31,10 +30,9 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule/checker"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/diagnostic"
"github.com/tikv/pd/pkg/schedule/hbstream"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
"github.com/tikv/pd/pkg/schedule/scatter"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/schedule/splitter"
Expand All @@ -51,7 +49,6 @@ const (
checkSuspectRangesInterval = 100 * time.Millisecond
collectFactor = 0.9
collectTimeout = 5 * time.Minute
maxScheduleRetries = 10
maxLoadConfigRetries = 10
// pushOperatorTickInterval is the interval try to push the operator.
pushOperatorTickInterval = 500 * time.Millisecond
Expand All @@ -65,9 +62,8 @@ const (

var (
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
waitingListGauge = regionListGauge.WithLabelValues("waiting_list")
priorityListGauge = regionListGauge.WithLabelValues("priority_list")
denySchedulersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("schedulers", "deny")
waitingListGauge = regionListGauge.WithLabelValues("waiting_list")
priorityListGauge = regionListGauge.WithLabelValues("priority_list")
)

// Coordinator is used to manage all schedulers and checkers to decide if the region needs to be scheduled.
Expand All @@ -82,18 +78,18 @@ type Coordinator struct {
checkers *checker.Controller
regionScatterer *scatter.RegionScatterer
regionSplitter *splitter.RegionSplitter
schedulers map[string]*scheduleController
schedulers map[string]*schedulers.ScheduleController
opController *operator.Controller
hbStreams *hbstream.HeartbeatStreams
pluginInterface *PluginInterface
diagnosticManager *diagnosticManager
diagnosticManager *diagnostic.Manager
}

// NewCoordinator creates a new Coordinator.
func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams *hbstream.HeartbeatStreams) *Coordinator {
ctx, cancel := context.WithCancel(ctx)
opController := operator.NewController(ctx, cluster.GetBasicCluster(), cluster.GetPersistOptions(), hbStreams)
schedulers := make(map[string]*scheduleController)
schedulers := make(map[string]*schedulers.ScheduleController)
c := &Coordinator{
ctx: ctx,
cancel: cancel,
Expand All @@ -107,7 +103,7 @@ func NewCoordinator(ctx context.Context, cluster sche.ClusterInformer, hbStreams
hbStreams: hbStreams,
pluginInterface: NewPluginInterface(),
}
c.diagnosticManager = newDiagnosticManager(c, cluster.GetPersistOptions())
c.diagnosticManager = diagnostic.NewManager(schedulers, cluster.GetPersistOptions())
return c
}

Expand Down Expand Up @@ -679,7 +675,7 @@ func (c *Coordinator) AddScheduler(scheduler schedulers.Scheduler, args ...strin
return errs.ErrSchedulerExisted.FastGenByArgs()
}

s := NewScheduleController(c, scheduler)
s := schedulers.NewScheduleController(c.ctx, c.cluster, c.opController, scheduler)
if err := s.Scheduler.Prepare(c.cluster); err != nil {
return err
}
Expand Down Expand Up @@ -756,7 +752,7 @@ func (c *Coordinator) PauseOrResumeScheduler(name string, t int64) error {
if c.cluster == nil {
return errs.ErrNotBootstrapped.FastGenByArgs()
}
var s []*scheduleController
var s []*schedulers.ScheduleController
if name != "all" {
sc, ok := c.schedulers[name]
if !ok {
Expand All @@ -775,8 +771,7 @@ func (c *Coordinator) PauseOrResumeScheduler(name string, t int64) error {
delayAt = time.Now().Unix()
delayUntil = delayAt + t
}
atomic.StoreInt64(&sc.delayAt, delayAt)
atomic.StoreInt64(&sc.delayUntil, delayUntil)
sc.SetDelay(delayAt, delayUntil)
}
return err
}
Expand Down Expand Up @@ -844,7 +839,7 @@ func (c *Coordinator) IsSchedulerExisted(name string) (bool, error) {
return true, nil
}

func (c *Coordinator) runScheduler(s *scheduleController) {
func (c *Coordinator) runScheduler(s *schedulers.ScheduleController) {
defer logutil.LogPanic()
defer c.wg.Done()
defer s.Scheduler.Cleanup(c.cluster)
Expand All @@ -854,7 +849,7 @@ func (c *Coordinator) runScheduler(s *scheduleController) {
for {
select {
case <-ticker.C:
diagnosable := s.diagnosticRecorder.isAllowed()
diagnosable := s.IsDiagnosticAllowed()
if !s.AllowSchedule(diagnosable) {
continue
}
Expand Down Expand Up @@ -948,157 +943,17 @@ func (c *Coordinator) GetCluster() sche.ClusterInformer {
}

// GetDiagnosticResult returns the diagnostic result.
func (c *Coordinator) GetDiagnosticResult(name string) (*DiagnosticResult, error) {
return c.diagnosticManager.getDiagnosticResult(name)
func (c *Coordinator) GetDiagnosticResult(name string) (*schedulers.DiagnosticResult, error) {
c.RLock()
defer c.RUnlock()
return c.diagnosticManager.GetDiagnosticResult(name)
}

// RecordOpStepWithTTL records OpStep with TTL
func (c *Coordinator) RecordOpStepWithTTL(regionID uint64) {
c.GetRuleChecker().RecordRegionPromoteToNonWitness(regionID)
}

// scheduleController is used to manage a scheduler to schedulers.
type scheduleController struct {
schedulers.Scheduler
cluster sche.ScheduleCluster
opController *operator.Controller
nextInterval time.Duration
ctx context.Context
cancel context.CancelFunc
delayAt int64
delayUntil int64
diagnosticRecorder *diagnosticRecorder
}

// NewScheduleController creates a new scheduleController.
func NewScheduleController(c *Coordinator, s schedulers.Scheduler) *scheduleController {
ctx, cancel := context.WithCancel(c.ctx)
return &scheduleController{
Scheduler: s,
cluster: c.cluster,
opController: c.opController,
nextInterval: s.GetMinInterval(),
ctx: ctx,
cancel: cancel,
diagnosticRecorder: c.diagnosticManager.getRecorder(s.GetName()),
}
}

func (s *scheduleController) Ctx() context.Context {
return s.ctx
}

func (s *scheduleController) Stop() {
s.cancel()
}

func (s *scheduleController) Schedule(diagnosable bool) []*operator.Operator {
for i := 0; i < maxScheduleRetries; i++ {
// no need to retry if schedule should stop to speed exit
select {
case <-s.ctx.Done():
return nil
default:
}
cacheCluster := newCacheCluster(s.cluster)
// we need only process diagnostic once in the retry loop
diagnosable = diagnosable && i == 0
ops, plans := s.Scheduler.Schedule(cacheCluster, diagnosable)
if diagnosable {
s.diagnosticRecorder.setResultFromPlans(ops, plans)
}
foundDisabled := false
for _, op := range ops {
if labelMgr := s.cluster.GetRegionLabeler(); labelMgr != nil {
region := s.cluster.GetRegion(op.RegionID())
if region == nil {
continue
}
if labelMgr.ScheduleDisabled(region) {
denySchedulersByLabelerCounter.Inc()
foundDisabled = true
break
}
}
}
if len(ops) > 0 {
// If we have schedule, reset interval to the minimal interval.
s.nextInterval = s.Scheduler.GetMinInterval()
// try regenerating operators
if foundDisabled {
continue
}
return ops
}
}
s.nextInterval = s.Scheduler.GetNextInterval(s.nextInterval)
return nil
}

func (s *scheduleController) DiagnoseDryRun() ([]*operator.Operator, []plan.Plan) {
cacheCluster := newCacheCluster(s.cluster)
return s.Scheduler.Schedule(cacheCluster, true)
}

// GetInterval returns the interval of scheduling for a scheduler.
func (s *scheduleController) GetInterval() time.Duration {
return s.nextInterval
}

// SetInterval sets the interval of scheduling for a scheduler. for test purpose.
func (s *scheduleController) SetInterval(interval time.Duration) {
s.nextInterval = interval
}

// AllowSchedule returns if a scheduler is allowed to schedulers.
func (s *scheduleController) AllowSchedule(diagnosable bool) bool {
if !s.Scheduler.IsScheduleAllowed(s.cluster) {
if diagnosable {
s.diagnosticRecorder.setResultFromStatus(pending)
}
return false
}
if s.isSchedulingHalted() {
if diagnosable {
s.diagnosticRecorder.setResultFromStatus(halted)
}
return false
}
if s.IsPaused() {
if diagnosable {
s.diagnosticRecorder.setResultFromStatus(paused)
}
return false
}
return true
}

func (s *scheduleController) isSchedulingHalted() bool {
return s.cluster.GetOpts().IsSchedulingHalted()
}

// isPaused returns if a scheduler is paused.
func (s *scheduleController) IsPaused() bool {
delayUntil := atomic.LoadInt64(&s.delayUntil)
return time.Now().Unix() < delayUntil
}

// getDelayAt returns paused timestamp of a paused scheduler
func (s *scheduleController) getDelayAt() int64 {
if s.IsPaused() {
return atomic.LoadInt64(&s.delayAt)
}
return 0
}

// getDelayUntil returns resume timestamp of a paused scheduler
func (s *scheduleController) getDelayUntil() int64 {
if s.IsPaused() {
return atomic.LoadInt64(&s.delayUntil)
}
return 0
}

// GetPausedSchedulerDelayAt returns paused timestamp of a paused scheduler
func (c *Coordinator) GetPausedSchedulerDelayAt(name string) (int64, error) {
c.RLock()
Expand All @@ -1110,7 +965,7 @@ func (c *Coordinator) GetPausedSchedulerDelayAt(name string) (int64, error) {
if !ok {
return -1, errs.ErrSchedulerNotFound.FastGenByArgs()
}
return s.getDelayAt(), nil
return s.GetDelayAt(), nil
}

// GetPausedSchedulerDelayUntil returns the delay time until the scheduler is paused.
Expand All @@ -1124,7 +979,7 @@ func (c *Coordinator) GetPausedSchedulerDelayUntil(name string) (int64, error) {
if !ok {
return -1, errs.ErrSchedulerNotFound.FastGenByArgs()
}
return s.getDelayUntil(), nil
return s.GetDelayUntil(), nil
}

// CheckTransferWitnessLeader determines if transfer leader is required, then sends to the scheduler if needed
Expand All @@ -1142,22 +997,3 @@ func (c *Coordinator) CheckTransferWitnessLeader(region *core.RegionInfo) {
}
}
}

// cacheCluster include cache info to improve the performance.
type cacheCluster struct {
sche.ScheduleCluster
stores []*core.StoreInfo
}

// GetStores returns store infos from cache
func (c *cacheCluster) GetStores() []*core.StoreInfo {
return c.stores
}

// newCacheCluster constructor for cache
func newCacheCluster(c sche.ScheduleCluster) *cacheCluster {
return &cacheCluster{
ScheduleCluster: c,
stores: c.GetStores(),
}
}
79 changes: 79 additions & 0 deletions pkg/schedule/diagnostic/diagnostic_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2022 TiKV Project Authors.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Copyright 2022 TiKV Project Authors.
// Copyright 2023 TiKV Project Authors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's just a movement, maybe we don't need to change it?

//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package diagnostic

import (
"time"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/server/config"
)

// Manager is used to manage the diagnostic result of schedulers for now.
type Manager struct {
config *config.PersistOptions
schedulerController map[string]*schedulers.ScheduleController
}

// NewManager creates a new Manager.
func NewManager(schedulerController map[string]*schedulers.ScheduleController, config *config.PersistOptions) *Manager {
return &Manager{
config: config,
schedulerController: schedulerController,
}
}

// GetDiagnosticResult gets the diagnostic result of the scheduler.
func (d *Manager) GetDiagnosticResult(name string) (*schedulers.DiagnosticResult, error) {
if !d.config.IsDiagnosticAllowed() {
return nil, errs.ErrDiagnosticDisabled

Check warning on line 42 in pkg/schedule/diagnostic/diagnostic_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/diagnostic/diagnostic_manager.go#L42

Added line #L42 was not covered by tests
}

scheduler, isSchedulerExisted := d.schedulerController[name]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's going to be a data race. We handle schedulers in coordinator with lock, but here is no lock.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, add a lock, and I will remove it in the next PR.

if !isSchedulerExisted {
ts := uint64(time.Now().Unix())
res := &schedulers.DiagnosticResult{Name: name, Timestamp: ts, Status: schedulers.Disabled}
return res, nil
}
var isDisabled bool
t := scheduler.Scheduler.GetType()
scheduleConfig := d.config.GetScheduleConfig()
for _, s := range scheduleConfig.Schedulers {
if t == s.Type {
isDisabled = s.Disable
break
}
}
if isDisabled {
ts := uint64(time.Now().Unix())
res := &schedulers.DiagnosticResult{Name: name, Timestamp: ts, Status: schedulers.Disabled}
return res, nil

Check warning on line 63 in pkg/schedule/diagnostic/diagnostic_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/diagnostic/diagnostic_manager.go#L61-L63

Added lines #L61 - L63 were not covered by tests
}

recorder := d.getSchedulerRecorder(name)
if recorder == nil {
return nil, errs.ErrSchedulerUndiagnosable.FastGenByArgs(name)

Check warning on line 68 in pkg/schedule/diagnostic/diagnostic_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/diagnostic/diagnostic_manager.go#L68

Added line #L68 was not covered by tests
}
result := recorder.GetLastResult()
if result == nil {
return nil, errs.ErrNoDiagnosticResult.FastGenByArgs(name)

Check warning on line 72 in pkg/schedule/diagnostic/diagnostic_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/diagnostic/diagnostic_manager.go#L72

Added line #L72 was not covered by tests
}
return result, nil
}

func (d *Manager) getSchedulerRecorder(name string) *schedulers.DiagnosticRecorder {
return d.schedulerController[name].GetDiagnosticRecorder()
}
Loading