Skip to content

Commit

Permalink
cluster, schedule: unify the scheduling halt to decouple dependencies (
Browse files Browse the repository at this point in the history
…#6569)

ref #5839

Unify the scheduling halt to decouple dependencies between `cluster` and `coordinator`.

Signed-off-by: JmPotato <ghzpotato@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
JmPotato and ti-chi-bot[bot] authored Jun 8, 2023
1 parent e807811 commit a5d76c6
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 46 deletions.
15 changes: 11 additions & 4 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (c *Coordinator) PatrolRegions() {
log.Info("patrol regions has been stopped")
return
}
if allowed, _ := c.cluster.CheckSchedulingAllowance(); !allowed {
if c.isSchedulingHalted() {
continue
}

Expand All @@ -172,6 +172,10 @@ func (c *Coordinator) PatrolRegions() {
}
}

func (c *Coordinator) isSchedulingHalted() bool {
return c.cluster.GetPersistOptions().IsSchedulingHalted()
}

func (c *Coordinator) checkRegions(startKey []byte) (key []byte, regions []*core.RegionInfo) {
regions = c.cluster.ScanRegions(startKey, nil, patrolScanRegionLimit)
if len(regions) == 0 {
Expand Down Expand Up @@ -561,7 +565,7 @@ func (c *Coordinator) CollectSchedulerMetrics() {
var allowScheduler float64
// If the scheduler is not allowed to schedule, it will disappear in Grafana panel.
// See issue #1341.
if allowed, _ := s.cluster.CheckSchedulingAllowance(); !s.IsPaused() && allowed {
if !s.IsPaused() && !c.isSchedulingHalted() {
allowScheduler = 1
}
schedulerStatusGauge.WithLabelValues(s.Scheduler.GetName(), "allow").Set(allowScheduler)
Expand Down Expand Up @@ -1036,8 +1040,7 @@ func (s *scheduleController) AllowSchedule(diagnosable bool) bool {
}
return false
}
allowed, _ := s.cluster.CheckSchedulingAllowance()
if !allowed {
if s.isSchedulingHalted() {
if diagnosable {
s.diagnosticRecorder.setResultFromStatus(halted)
}
Expand All @@ -1052,6 +1055,10 @@ func (s *scheduleController) AllowSchedule(diagnosable bool) bool {
return true
}

func (s *scheduleController) isSchedulingHalted() bool {
return s.cluster.GetPersistOptions().IsSchedulingHalted()
}

// isPaused returns if a scheduler is paused.
func (s *scheduleController) IsPaused() bool {
delayUntil := atomic.LoadInt64(&s.delayUntil)
Expand Down
1 change: 0 additions & 1 deletion pkg/schedule/core/cluster_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ type ClusterInformer interface {
GetRegionLabeler() *labeler.RegionLabeler
GetStorage() storage.Storage
UpdateRegionsLabelLevelStats(regions []*core.RegionInfo)
CheckSchedulingAllowance() (bool, error)
AddSuspectRegions(ids ...uint64)
GetPersistOptions() *config.PersistOptions
}
Expand Down
17 changes: 12 additions & 5 deletions pkg/unsaferecovery/unsafe_recovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/server/config"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -109,6 +110,7 @@ type cluster interface {
DropCacheAllRegion()
GetAllocator() id.Allocator
BuryStore(storeID uint64, forceBury bool) error
GetPersistOptions() *config.PersistOptions
}

// Controller is used to control the unsafe recovery process.
Expand Down Expand Up @@ -174,19 +176,19 @@ func (u *Controller) reset() {
func (u *Controller) IsRunning() bool {
u.RLock()
defer u.RUnlock()
return u.isRunningLocked()
return isRunning(u.stage)
}

func (u *Controller) isRunningLocked() bool {
return u.stage != Idle && u.stage != Finished && u.stage != Failed
func isRunning(s stage) bool {
return s != Idle && s != Finished && s != Failed
}

// RemoveFailedStores removes Failed stores from the cluster.
func (u *Controller) RemoveFailedStores(failedStores map[uint64]struct{}, timeout uint64, autoDetect bool) error {
u.Lock()
defer u.Unlock()

if u.isRunningLocked() {
if isRunning(u.stage) {
return errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs()
}

Expand Down Expand Up @@ -316,7 +318,7 @@ func (u *Controller) HandleStoreHeartbeat(heartbeat *pdpb.StoreHeartbeatRequest,
u.Lock()
defer u.Unlock()

if !u.isRunningLocked() {
if !isRunning(u.stage) {
// no recovery in progress, do nothing
return
}
Expand Down Expand Up @@ -490,6 +492,11 @@ func (u *Controller) GetStage() stage {

func (u *Controller) changeStage(stage stage) {
u.stage = stage
// Halt and resume the scheduling once the running state changed.
running := isRunning(stage)
if opt := u.cluster.GetPersistOptions(); opt.IsSchedulingHalted() != running {
opt.SetHaltScheduling(running, "online-unsafe-recovery")
}

var output StageOutput
output.Time = time.Now().Format("2006-01-02 15:04:05.000")
Expand Down
22 changes: 0 additions & 22 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2678,25 +2678,3 @@ func (c *RaftCluster) GetPausedSchedulerDelayAt(name string) (int64, error) {
func (c *RaftCluster) GetPausedSchedulerDelayUntil(name string) (int64, error) {
return c.coordinator.GetPausedSchedulerDelayUntil(name)
}

var (
onlineUnsafeRecoveryStatus = schedulingAllowanceStatusGauge.WithLabelValues("online-unsafe-recovery")
haltSchedulingStatus = schedulingAllowanceStatusGauge.WithLabelValues("halt-scheduling")
)

// CheckSchedulingAllowance checks if the cluster allows scheduling currently.
func (c *RaftCluster) CheckSchedulingAllowance() (bool, error) {
// If the cluster is in the process of online unsafe recovery, it should not allow scheduling.
if c.GetUnsafeRecoveryController().IsRunning() {
onlineUnsafeRecoveryStatus.Set(1)
return false, errs.ErrUnsafeRecoveryIsRunning.FastGenByArgs()
}
onlineUnsafeRecoveryStatus.Set(0)
// If the halt-scheduling is set, it should not allow scheduling.
if c.opt.IsSchedulingHalted() {
haltSchedulingStatus.Set(1)
return false, errs.ErrSchedulingIsHalted.FastGenByArgs()
}
haltSchedulingStatus.Set(0)
return true, nil
}
12 changes: 8 additions & 4 deletions server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error {

// HandleAskSplit handles the split request.
func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSplitResponse, error) {
if allowed, err := c.CheckSchedulingAllowance(); !allowed {
return nil, err
if c.isSchedulingHalted() {
return nil, errs.ErrSchedulingIsHalted.FastGenByArgs()
}
if !c.opt.IsTikvRegionSplitEnabled() {
return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs()
Expand Down Expand Up @@ -86,6 +86,10 @@ func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSp
return split, nil
}

func (c *RaftCluster) isSchedulingHalted() bool {
return c.opt.IsSchedulingHalted()
}

// ValidRequestRegion is used to decide if the region is valid.
func (c *RaftCluster) ValidRequestRegion(reqRegion *metapb.Region) error {
startKey := reqRegion.GetStartKey()
Expand All @@ -105,8 +109,8 @@ func (c *RaftCluster) ValidRequestRegion(reqRegion *metapb.Region) error {

// HandleAskBatchSplit handles the batch split request.
func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) {
if allowed, err := c.CheckSchedulingAllowance(); !allowed {
return nil, err
if c.isSchedulingHalted() {
return nil, errs.ErrSchedulingIsHalted.FastGenByArgs()
}
if !c.opt.IsTikvRegionSplitEnabled() {
return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs()
Expand Down
9 changes: 0 additions & 9 deletions server/cluster/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,11 @@ var (
Name: "store_sync",
Help: "The state of store sync config",
}, []string{"address", "state"})

schedulingAllowanceStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "scheduling",
Name: "allowance_status",
Help: "Status of the scheduling allowance.",
}, []string{"kind"})
)

func init() {
prometheus.MustRegister(regionEventCounter)
prometheus.MustRegister(healthStatusGauge)
prometheus.MustRegister(schedulingAllowanceStatusGauge)
prometheus.MustRegister(clusterStateCPUGauge)
prometheus.MustRegister(clusterStateCurrent)
prometheus.MustRegister(bucketEventCounter)
Expand Down
29 changes: 29 additions & 0 deletions server/config/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import "github.com/prometheus/client_golang/prometheus"

var schedulingAllowanceStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "scheduling",
Name: "allowance_status",
Help: "Status of the scheduling allowance.",
}, []string{"kind"})

func init() {
prometheus.MustRegister(schedulingAllowanceStatusGauge)
}
14 changes: 13 additions & 1 deletion server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,14 +920,26 @@ func (o *PersistOptions) SetAllStoresLimitTTL(ctx context.Context, client *clien
return err
}

var haltSchedulingStatus = schedulingAllowanceStatusGauge.WithLabelValues("halt-scheduling")

// SetHaltScheduling set HaltScheduling.
func (o *PersistOptions) SetHaltScheduling(halt bool) {
func (o *PersistOptions) SetHaltScheduling(halt bool, source string) {
v := o.GetScheduleConfig().Clone()
v.HaltScheduling = halt
o.SetScheduleConfig(v)
if halt {
haltSchedulingStatus.Set(1)
schedulingAllowanceStatusGauge.WithLabelValues(source).Set(1)
} else {
haltSchedulingStatus.Set(0)
schedulingAllowanceStatusGauge.WithLabelValues(source).Set(0)
}
}

// IsSchedulingHalted returns if PD scheduling is halted.
func (o *PersistOptions) IsSchedulingHalted() bool {
if o == nil {
return false
}
return o.GetScheduleConfig().HaltScheduling
}

0 comments on commit a5d76c6

Please sign in to comment.