Skip to content

Commit

Permalink
move prepare checker
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Dec 2, 2024
1 parent e60afe6 commit e3fea90
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 57 deletions.
9 changes: 1 addition & 8 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/schedulingpb"
Expand Down Expand Up @@ -67,7 +66,6 @@ type Cluster struct {
const (
regionLabelGCInterval = time.Hour
requestTimeout = 3 * time.Second
collectWaitTime = time.Minute

// heartbeat relative const
heartbeatTaskRunner = "heartbeat-task-runner"
Expand Down Expand Up @@ -491,12 +489,7 @@ func (c *Cluster) runUpdateStoreStats() {
func (c *Cluster) runCoordinator() {
defer logutil.LogPanic()
defer c.wg.Done()
// force wait for 1 minute to make prepare checker won't be directly skipped
runCollectWaitTime := collectWaitTime
failpoint.Inject("changeRunCollectWaitTime", func() {
runCollectWaitTime = 1 * time.Second
})
c.coordinator.RunUntilStop(runCollectWaitTime)
c.coordinator.RunUntilStop()
}

func (c *Cluster) runMetricsCollectionJob() {
Expand Down
8 changes: 7 additions & 1 deletion pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@ type Controller struct {
// patrolRegionScanLimit is the limit of regions to scan.
// It is calculated by the number of regions.
patrolRegionScanLimit int
prepareChecker *sche.PrepareChecker
}

// NewController create a new Controller.
func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller) *Controller {
func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *operator.Controller, prepareChecker *sche.PrepareChecker) *Controller {
pendingProcessedRegions := cache.NewIDTTL(ctx, time.Minute, 3*time.Minute)
c := &Controller{
ctx: ctx,
Expand All @@ -111,6 +112,7 @@ func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config
patrolRegionContext: &PatrolRegionContext{},
interval: cluster.GetCheckerConfig().GetPatrolRegionInterval(),
patrolRegionScanLimit: calculateScanLimit(cluster),
prepareChecker: prepareChecker,
}
c.duration.Store(time.Duration(0))
return c
Expand All @@ -130,6 +132,10 @@ func (c *Controller) PatrolRegions() {
regions []*core.RegionInfo
)
for {
if !c.prepareChecker.Check(c.cluster.GetBasicCluster()) {
time.Sleep(time.Second)
continue
}
select {
case <-ticker.C:
c.updateTickerIfNeeded(ticker)
Expand Down
35 changes: 11 additions & 24 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (

const (
runSchedulerCheckInterval = 3 * time.Second
collectTimeout = 5 * time.Minute
maxLoadConfigRetries = 10
// pushOperatorTickInterval is the interval try to push the operator.
pushOperatorTickInterval = 500 * time.Millisecond
Expand All @@ -66,7 +65,7 @@ type Coordinator struct {
schedulersInitialized bool

cluster sche.ClusterInformer
prepareChecker *prepareChecker
prepareChecker *sche.PrepareChecker
checkers *checker.Controller
regionScatterer *scatter.RegionScatterer
regionSplitter *splitter.RegionSplitter
Expand All @@ -80,15 +79,16 @@ type Coordinator struct {
// NewCoordinator creates a new Coordinator.
func NewCoordinator(parentCtx context.Context, cluster sche.ClusterInformer, hbStreams *hbstream.HeartbeatStreams) *Coordinator {
ctx, cancel := context.WithCancel(parentCtx)
prepareChecker := sche.NewPrepareChecker()
opController := operator.NewController(ctx, cluster.GetBasicCluster(), cluster.GetSharedConfig(), hbStreams)
schedulers := schedulers.NewController(ctx, cluster, cluster.GetStorage(), opController)
checkers := checker.NewController(ctx, cluster, cluster.GetCheckerConfig(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController)
schedulers := schedulers.NewController(ctx, cluster, cluster.GetStorage(), opController, prepareChecker)
checkers := checker.NewController(ctx, cluster, cluster.GetCheckerConfig(), cluster.GetRuleManager(), cluster.GetRegionLabeler(), opController, prepareChecker)
return &Coordinator{
ctx: ctx,
cancel: cancel,
schedulersInitialized: false,
cluster: cluster,
prepareChecker: newPrepareChecker(),
prepareChecker: prepareChecker,
checkers: checkers,
regionScatterer: scatter.NewRegionScatterer(ctx, cluster, opController, checkers.AddPendingProcessedRegions),
regionSplitter: splitter.NewRegionSplitter(cluster, splitter.NewSplitRegionsHandler(cluster, opController), checkers.AddPendingProcessedRegions),
Expand Down Expand Up @@ -204,8 +204,8 @@ func (c *Coordinator) driveSlowNodeScheduler() {
}

// RunUntilStop runs the coordinator until receiving the stop signal.
func (c *Coordinator) RunUntilStop(collectWaitTime ...time.Duration) {
c.Run(collectWaitTime...)
func (c *Coordinator) RunUntilStop() {
c.Run()
<-c.ctx.Done()
log.Info("coordinator is stopping")
c.GetSchedulersController().Wait()
Expand All @@ -214,25 +214,12 @@ func (c *Coordinator) RunUntilStop(collectWaitTime ...time.Duration) {
}

// Run starts coordinator.
func (c *Coordinator) Run(collectWaitTime ...time.Duration) {
func (c *Coordinator) Run() {
ticker := time.NewTicker(runSchedulerCheckInterval)
failpoint.Inject("changeCoordinatorTicker", func() {
ticker.Reset(100 * time.Millisecond)
})
defer ticker.Stop()
log.Info("coordinator starts to collect cluster information")
for {
if c.ShouldRun(collectWaitTime...) {
log.Info("coordinator has finished cluster information preparation")
break
}
select {
case <-ticker.C:
case <-c.ctx.Done():
log.Info("coordinator stops running")
return
}
}
log.Info("coordinator starts to run schedulers")
c.InitSchedulers(true)

Expand Down Expand Up @@ -547,8 +534,8 @@ func ResetHotSpotMetrics() {
}

// ShouldRun returns true if the coordinator should run.
func (c *Coordinator) ShouldRun(collectWaitTime ...time.Duration) bool {
return c.prepareChecker.check(c.cluster.GetBasicCluster(), collectWaitTime...)
func (c *Coordinator) ShouldRun() bool {
return c.prepareChecker.Check(c.cluster.GetBasicCluster())
}

// GetSchedulersController returns the schedulers controller.
Expand Down Expand Up @@ -616,7 +603,7 @@ func (c *Coordinator) GetRuleChecker() *checker.RuleChecker {
}

// GetPrepareChecker returns the prepare checker.
func (c *Coordinator) GetPrepareChecker() *prepareChecker {
func (c *Coordinator) GetPrepareChecker() *sche.PrepareChecker {
return c.prepareChecker
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package schedule
package core

import (
"fmt"
"time"

"github.com/pingcap/log"
Expand All @@ -24,33 +23,34 @@ import (
"go.uber.org/zap"
)

type prepareChecker struct {
const collectTimeout = 5 * time.Minute

// PrepareChecker is used to check if the coordinator has finished cluster information preparation.
type PrepareChecker struct {
syncutil.RWMutex
start time.Time
prepared bool
}

func newPrepareChecker() *prepareChecker {
return &prepareChecker{
// NewPrepareChecker creates a new PrepareChecker.
func NewPrepareChecker() *PrepareChecker {
return &PrepareChecker{
start: time.Now(),
}
}

// Before starting up the scheduler, we need to take the proportion of the regions on each store into consideration.
func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...time.Duration) bool {
checker.Lock()
defer checker.Unlock()
log.Info("check prepare checker", zap.Bool("prepared", checker.prepared), zap.String("count", fmt.Sprintf("%d", c.GetTotalRegionCount())), zap.String("not-from-storage-count", fmt.Sprintf("%d", c.GetClusterNotFromStorageRegionsCnt())))
if checker.prepared {
// Check checks if the coordinator has finished cluster information preparation.
func (checker *PrepareChecker) Check(c *core.BasicCluster) bool {
if checker.IsPrepared() {
return true
}
checker.Lock()
defer checker.Unlock()

if time.Since(checker.start) > collectTimeout {
checker.prepared = true
return true
}
if len(collectWaitTime) > 0 && time.Since(checker.start) < collectWaitTime[0] {
return false
}
notLoadedFromRegionsCnt := c.GetClusterNotFromStorageRegionsCnt()
totalRegionsCnt := c.GetTotalRegionCount()
// The number of active regions should be more than total region of all stores * core.CollectFactor
Expand All @@ -63,7 +63,7 @@ func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...ti
}
storeID := store.GetID()
// It is used to avoid sudden scheduling when scheduling service is just started.
if len(collectWaitTime) > 0 && (float64(store.GetStoreStats().GetRegionCount())*core.CollectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID))) {
if float64(store.GetStoreStats().GetRegionCount())*core.CollectFactor > float64(c.GetNotFromStorageRegionsCntByStore(storeID)) {
return false
}
if !c.IsStorePrepared(storeID) {
Expand All @@ -76,22 +76,23 @@ func (checker *prepareChecker) check(c *core.BasicCluster, collectWaitTime ...ti
}

// IsPrepared returns whether the coordinator is prepared.
func (checker *prepareChecker) IsPrepared() bool {
func (checker *PrepareChecker) IsPrepared() bool {
checker.RLock()
defer checker.RUnlock()
return checker.prepared
}

// SetPrepared is for test purpose
func (checker *prepareChecker) SetPrepared() {
func (checker *PrepareChecker) SetPrepared() {
checker.Lock()
defer checker.Unlock()
checker.prepared = true
}

// ResetPrepared is for test purpose
func (checker *prepareChecker) ResetPrepared() {
func (checker *PrepareChecker) ResetPrepared() {
checker.Lock()
defer checker.Unlock()
checker.prepared = false
checker.start = time.Now()
}
8 changes: 7 additions & 1 deletion pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,19 @@ type Controller struct {
// which will only be initialized and used in the API service mode now.
schedulerHandlers map[string]http.Handler
opController *operator.Controller
prepareChecker *sche.PrepareChecker
}

// NewController creates a scheduler controller.
func NewController(ctx context.Context, cluster sche.SchedulerCluster, storage endpoint.ConfigStorage, opController *operator.Controller) *Controller {
func NewController(ctx context.Context, cluster sche.SchedulerCluster, storage endpoint.ConfigStorage, opController *operator.Controller, prepareChecker *sche.PrepareChecker) *Controller {
return &Controller{
ctx: ctx,
cluster: cluster,
storage: storage,
schedulers: make(map[string]*ScheduleController),
schedulerHandlers: make(map[string]http.Handler),
opController: opController,
prepareChecker: prepareChecker,
}
}

Expand Down Expand Up @@ -368,6 +370,10 @@ func (c *Controller) runScheduler(s *ScheduleController) {
for {
select {
case <-ticker.C:
if !c.prepareChecker.Check(c.cluster.GetBasicCluster()) {
time.Sleep(time.Second)
continue
}
diagnosable := s.IsDiagnosticAllowed()
if !s.AllowSchedule(diagnosable) {
continue
Expand Down
2 changes: 0 additions & 2 deletions tests/integrations/mcs/scheduling/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,13 @@ func TestAPI(t *testing.T) {
func (suite *apiTestSuite) SetupSuite() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/changeRunCollectWaitTime", `return(true)`))
suite.env = tests.NewSchedulingTestEnvironment(suite.T())
}

func (suite *apiTestSuite) TearDownSuite() {
suite.env.Cleanup()
re := suite.Require()
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/changeRunCollectWaitTime"))
}

func (suite *apiTestSuite) TestGetCheckerByName() {
Expand Down
2 changes: 0 additions & 2 deletions tests/integrations/mcs/scheduling/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func (suite *serverTestSuite) SetupSuite() {
var err error
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/changeRunCollectWaitTime", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`))
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 1)
Expand All @@ -83,7 +82,6 @@ func (suite *serverTestSuite) TearDownSuite() {
suite.cancel()
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/changeRunCollectWaitTime"))
}

func (suite *serverTestSuite) TestAllocID() {
Expand Down
2 changes: 1 addition & 1 deletion tests/server/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ func TestDeleteAllRegionCacheScheduling(t *testing.T) {
re.Equal(0, int(rc.GetOperatorController().OperatorCount(operator.OpRegion)))

// 模拟持续的 region 心跳上报
for i := 0; i < 10; i++ {
for range 10 {
rc.HandleRegionHeartbeat(regionInfo)
time.Sleep(100 * time.Millisecond)
}
Expand Down

0 comments on commit e3fea90

Please sign in to comment.