Skip to content

Commit

Permalink
fix lint
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed May 9, 2023
1 parent 0b4c892 commit b150f05
Show file tree
Hide file tree
Showing 49 changed files with 406 additions and 387 deletions.
16 changes: 8 additions & 8 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/statistics/buckets"
"github.com/tikv/pd/pkg/storage"
urc "github.com/tikv/pd/pkg/unsaferecovery"
"github.com/tikv/pd/pkg/unsaferecovery"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/tikv/pd/pkg/versioninfo"
"github.com/tikv/pd/server/config"
Expand All @@ -49,6 +49,7 @@ const (

// Cluster is used to mock a cluster for test purpose.
type Cluster struct {
ctx context.Context
*core.BasicCluster
*mockid.IDAllocator
*placement.RuleManager
Expand All @@ -60,13 +61,13 @@ type Cluster struct {
*config.StoreConfigManager
*buckets.HotBucketCache
storage.Storage
ctx context.Context
*urc.UnsafeRecoveryController
RecoveryController *unsaferecovery.Controller
}

// NewCluster creates a new Cluster
func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
clus := &Cluster{
ctx: ctx,
BasicCluster: core.NewBasicCluster(),
IDAllocator: mockid.NewIDAllocator(),
HotStat: statistics.NewHotStat(ctx),
Expand All @@ -75,14 +76,13 @@ func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
suspectRegions: map[uint64]struct{}{},
StoreConfigManager: config.NewTestStoreConfigManager(nil),
Storage: storage.NewStorageWithMemoryBackend(),
ctx: ctx,
}
if clus.PersistOptions.GetReplicationConfig().EnablePlacementRules {
clus.initRuleManager()
}
// It should be updated to the latest feature version.
clus.PersistOptions.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.HotScheduleWithQuery))
clus.UnsafeRecoveryController = urc.NewUnsafeRecoveryController(clus)
clus.RecoveryController = unsaferecovery.NewController(clus)
clus.RegionLabeler, _ = labeler.NewRegionLabeler(ctx, clus.Storage, time.Second*5)
return clus
}
Expand All @@ -97,7 +97,7 @@ func (mc *Cluster) GetOpts() sc.Config {
return mc.PersistOptions
}

// GetOpts returns the cluster configuration.
// GetStorage returns the storage.
func (mc *Cluster) GetStorage() storage.Storage {
return mc.Storage
}
Expand All @@ -109,10 +109,10 @@ func (mc *Cluster) GetAllocator() id.Allocator {

// IsUnsafeRecovering returns if the cluster is in unsafe recovering.
func (mc *Cluster) IsUnsafeRecovering() bool {
if mc.UnsafeRecoveryController == nil {
if mc.RecoveryController == nil {
return false
}
return mc.UnsafeRecoveryController.IsRunning()
return mc.RecoveryController.IsRunning()
}

// GetPersistOptions returns the persist options.
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const DefaultCacheSize = 1000
type Controller struct {
cluster operator.ClusterInformer
conf config.Config
opController *operator.OperatorController
opController *operator.Controller
learnerChecker *LearnerChecker
replicaChecker *ReplicaChecker
ruleChecker *RuleChecker
Expand All @@ -50,7 +50,7 @@ type Controller struct {
}

// NewController create a new Controller.
func NewController(ctx context.Context, cluster operator.ClusterInformer, conf config.Config, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.OperatorController) *Controller {
func NewController(ctx context.Context, cluster operator.ClusterInformer, conf config.Config, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller) *Controller {
regionWaitingList := cache.NewDefaultCache(DefaultCacheSize)
return &Controller{
cluster: cluster,
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/checker/merge_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ func (suite *mergeCheckerTestSuite) TestStoreLimitWithMerge() {

mc := NewMergeChecker(suite.ctx, tc, tc.GetOpts())
stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */)
oc := operator.NewOperatorController(suite.ctx, tc, stream)
oc := operator.NewController(suite.ctx, tc, stream)

regions[2] = regions[2].Clone(
core.SetPeers([]*metapb.Peer{
Expand Down
57 changes: 44 additions & 13 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type Coordinator struct {
regionScatterer *RegionScatterer
regionSplitter *RegionSplitter
schedulers map[string]*scheduleController
opController *operator.OperatorController
opController *operator.Controller
hbStreams *hbstream.HeartbeatStreams
pluginInterface *PluginInterface
diagnosticManager *diagnosticManager
Expand All @@ -87,7 +87,7 @@ type Coordinator struct {
// NewCoordinator creates a new coordinator.
func NewCoordinator(ctx context.Context, cluster Cluster, hbStreams *hbstream.HeartbeatStreams) *Coordinator {
ctx, cancel := context.WithCancel(ctx)
opController := operator.NewOperatorController(ctx, cluster, hbStreams)
opController := operator.NewController(ctx, cluster, hbStreams)
schedulers := make(map[string]*scheduleController)
return &Coordinator{
ctx: ctx,
Expand All @@ -105,10 +105,12 @@ func NewCoordinator(ctx context.Context, cluster Cluster, hbStreams *hbstream.He
}
}

// GetWaitingRegions returns the regions in the waiting list.
func (c *Coordinator) GetWaitingRegions() []*cache.Item {
return c.checkers.GetWaitingRegions()
}

// IsPendingRegion returns if the region is in the pending list.
func (c *Coordinator) IsPendingRegion(region uint64) bool {
return c.checkers.IsPendingRegion(region)
}
Expand Down Expand Up @@ -304,6 +306,7 @@ func (c *Coordinator) drivePushOperator() {
}
}

// RunUntilStop runs the coordinator until receiving the stop signal.
func (c *Coordinator) RunUntilStop() {
c.Run()
<-c.ctx.Done()
Expand All @@ -312,6 +315,7 @@ func (c *Coordinator) RunUntilStop() {
log.Info("coordinator has been stopped")
}

// Run starts coordinator.
func (c *Coordinator) Run() {
ticker := time.NewTicker(runSchedulerCheckInterval)
failpoint.Inject("changeCoordinatorTicker", func() {
Expand Down Expand Up @@ -484,10 +488,12 @@ func (c *Coordinator) waitPluginUnload(pluginPath, schedulerName string, ch chan
}
}

// Stop stops the coordinator.
func (c *Coordinator) Stop() {
c.cancel()
}

// GetHotRegionsByType gets hot regions' statistics by RWType.
func (c *Coordinator) GetHotRegionsByType(typ statistics.RWType) *statistics.StoreHotPeersInfos {
isTraceFlow := c.cluster.GetOpts().IsTraceRegionFlow()
storeLoads := c.cluster.GetStoresLoads()
Expand All @@ -514,6 +520,7 @@ func (c *Coordinator) GetHotRegionsByType(typ statistics.RWType) *statistics.Sto
return infos
}

// GetSchedulers returns all names of schedulers.
func (c *Coordinator) GetSchedulers() []string {
c.RLock()
defer c.RUnlock()
Expand All @@ -524,6 +531,7 @@ func (c *Coordinator) GetSchedulers() []string {
return names
}

// GetSchedulerHandlers returns all handlers of schedulers.
func (c *Coordinator) GetSchedulerHandlers() map[string]http.Handler {
c.RLock()
defer c.RUnlock()
Expand All @@ -534,6 +542,7 @@ func (c *Coordinator) GetSchedulerHandlers() map[string]http.Handler {
return handlers
}

// CollectSchedulerMetrics collects metrics of all schedulers.
func (c *Coordinator) CollectSchedulerMetrics() {
c.RLock()
defer c.RUnlock()
Expand All @@ -548,10 +557,12 @@ func (c *Coordinator) CollectSchedulerMetrics() {
}
}

// ResetSchedulerMetrics resets metrics of all schedulers.
func (c *Coordinator) ResetSchedulerMetrics() {
schedulerStatusGauge.Reset()
}

// CollectHotSpotMetrics collects hot spot metrics.
func (c *Coordinator) CollectHotSpotMetrics() {
stores := c.cluster.GetStores()
// Collects hot write region metrics.
Expand All @@ -560,6 +571,12 @@ func (c *Coordinator) CollectHotSpotMetrics() {
collectHotMetrics(c.cluster, stores, statistics.Read)
}

// ResetHotSpotMetrics resets hot spot metrics.
func (c *Coordinator) ResetHotSpotMetrics() {
hotSpotStatusGauge.Reset()
schedulers.HotPendingSum.Reset()
}

func collectHotMetrics(cluster Cluster, stores []*core.StoreInfo, typ statistics.RWType) {
var (
kind string
Expand Down Expand Up @@ -615,15 +632,12 @@ func collectHotMetrics(cluster Cluster, stores []*core.StoreInfo, typ statistics
}
}

func (c *Coordinator) ResetHotSpotMetrics() {
hotSpotStatusGauge.Reset()
schedulers.HotPendingSum.Reset()
}

// ShouldRun returns true if the coordinator should run.
func (c *Coordinator) ShouldRun() bool {
return c.prepareChecker.check(c.cluster.GetBasicCluster())
}

// AddScheduler adds a scheduler.
func (c *Coordinator) AddScheduler(scheduler schedulers.Scheduler, args ...string) error {
c.Lock()
defer c.Unlock()
Expand All @@ -644,6 +658,7 @@ func (c *Coordinator) AddScheduler(scheduler schedulers.Scheduler, args ...strin
return nil
}

// RemoveScheduler removes a scheduler by name.
func (c *Coordinator) RemoveScheduler(name string) error {
c.Lock()
defer c.Unlock()
Expand Down Expand Up @@ -683,7 +698,7 @@ func (c *Coordinator) removeOptScheduler(o *config.PersistOptions, name string)
for i, schedulerCfg := range v.Schedulers {
// To create a temporary scheduler is just used to get scheduler's name
decoder := schedulers.ConfigSliceDecoder(schedulerCfg.Type, schedulerCfg.Args)
tmp, err := schedulers.CreateScheduler(schedulerCfg.Type, operator.NewOperatorController(c.ctx, nil, nil), storage.NewStorageWithMemoryBackend(), decoder)
tmp, err := schedulers.CreateScheduler(schedulerCfg.Type, operator.NewController(c.ctx, nil, nil), storage.NewStorageWithMemoryBackend(), decoder)
if err != nil {
return err
}
Expand All @@ -701,6 +716,7 @@ func (c *Coordinator) removeOptScheduler(o *config.PersistOptions, name string)
return nil
}

// PauseOrResumeScheduler pauses or resumes a scheduler by name.
func (c *Coordinator) PauseOrResumeScheduler(name string, t int64) error {
c.Lock()
defer c.Unlock()
Expand Down Expand Up @@ -732,7 +748,7 @@ func (c *Coordinator) PauseOrResumeScheduler(name string, t int64) error {
return err
}

// isSchedulerAllowed returns whether a scheduler is allowed to schedule, a scheduler is not allowed to schedule if it is paused or blocked by unsafe recovery.
// IsSchedulerAllowed returns whether a scheduler is allowed to schedule, a scheduler is not allowed to schedule if it is paused or blocked by unsafe recovery.
func (c *Coordinator) IsSchedulerAllowed(name string) (bool, error) {
c.RLock()
defer c.RUnlock()
Expand All @@ -746,6 +762,7 @@ func (c *Coordinator) IsSchedulerAllowed(name string) (bool, error) {
return s.AllowSchedule(false), nil
}

// IsSchedulerPaused returns whether a scheduler is paused.
func (c *Coordinator) IsSchedulerPaused(name string) (bool, error) {
c.RLock()
defer c.RUnlock()
Expand All @@ -759,6 +776,7 @@ func (c *Coordinator) IsSchedulerPaused(name string) (bool, error) {
return s.IsPaused(), nil
}

// IsSchedulerDisabled returns whether a scheduler is disabled.
func (c *Coordinator) IsSchedulerDisabled(name string) (bool, error) {
c.RLock()
defer c.RUnlock()
Expand All @@ -779,6 +797,7 @@ func (c *Coordinator) IsSchedulerDisabled(name string) (bool, error) {
return false, nil
}

// IsSchedulerExisted returns whether a scheduler is existed.
func (c *Coordinator) IsSchedulerExisted(name string) (bool, error) {
c.RLock()
defer c.RUnlock()
Expand Down Expand Up @@ -821,6 +840,7 @@ func (c *Coordinator) runScheduler(s *scheduleController) {
}
}

// PauseOrResumeChecker pauses or resumes a checker by name.
func (c *Coordinator) PauseOrResumeChecker(name string, t int64) error {
c.Lock()
defer c.Unlock()
Expand All @@ -835,6 +855,7 @@ func (c *Coordinator) PauseOrResumeChecker(name string, t int64) error {
return nil
}

// IsCheckerPaused returns whether a checker is paused.
func (c *Coordinator) IsCheckerPaused(name string) (bool, error) {
c.RLock()
defer c.RUnlock()
Expand All @@ -848,48 +869,57 @@ func (c *Coordinator) IsCheckerPaused(name string) (bool, error) {
return p.IsPaused(), nil
}

// GetRegionScatterer returns the region scatterer.
func (c *Coordinator) GetRegionScatterer() *RegionScatterer {
return c.regionScatterer
}

// GetRegionSplitter returns the region splitter.
func (c *Coordinator) GetRegionSplitter() *RegionSplitter {
return c.regionSplitter
}

func (c *Coordinator) GetOperatorController() *operator.OperatorController {
// GetOperatorController returns the operator controller.
func (c *Coordinator) GetOperatorController() *operator.Controller {
return c.opController
}

// GetCheckerController returns the checker controller.
func (c *Coordinator) GetCheckerController() *checker.Controller {
return c.checkers
}

// GetMergeChecker returns the merge checker.
func (c *Coordinator) GetMergeChecker() *checker.MergeChecker {
return c.checkers.GetMergeChecker()
}

// GetRuleChecker returns the rule checker.
func (c *Coordinator) GetRuleChecker() *checker.RuleChecker {
return c.checkers.GetRuleChecker()
}

// GetPrepareChecker returns the prepare checker.
func (c *Coordinator) GetPrepareChecker() *prepareChecker {
return c.prepareChecker
}

// for test purpose
// GetHeartbeatStreams returns the heartbeat streams. Only for test purpose.
func (c *Coordinator) GetHeartbeatStreams() *hbstream.HeartbeatStreams {
return c.hbStreams
}

// for test purpose
// GetCluster returns the cluster. Only for test purpose.
func (c *Coordinator) GetCluster() Cluster {
return c.cluster
}

// GetDiagnosticResult returns the diagnostic results.
func (c *Coordinator) GetDiagnosticResult(name string) (*DiagnosticResult, error) {
return c.diagnosticManager.getDiagnosticResult(name)
}

// GetPausedSchedulerDelayUntil returns the delay time until the scheduler is paused.
func (c *Coordinator) GetPausedSchedulerDelayUntil(name string) (int64, error) {
c.RLock()
defer c.RUnlock()
Expand Down Expand Up @@ -923,7 +953,7 @@ func (c *Coordinator) CheckTransferWitnessLeader(region *core.RegionInfo) {
type scheduleController struct {
schedulers.Scheduler
cluster Cluster
opController *operator.OperatorController
opController *operator.Controller
nextInterval time.Duration
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -1033,6 +1063,7 @@ func (s *scheduleController) GetDelayUntil() int64 {
return 0
}

// GetPausedSchedulerDelayAt returns paused timestamp of a paused scheduler
func (c *Coordinator) GetPausedSchedulerDelayAt(name string) (int64, error) {
c.RLock()
defer c.RUnlock()
Expand Down
Loading

0 comments on commit b150f05

Please sign in to comment.