diff --git a/server/schedule/operator/operator.go b/server/schedule/operator/operator.go index 792778a201a..62638a2271a 100644 --- a/server/schedule/operator/operator.go +++ b/server/schedule/operator/operator.go @@ -225,16 +225,12 @@ func (o *Operator) CheckExpired() bool { // CheckTimeout checks if the operator is timeout, and update the status. func (o *Operator) CheckTimeout() bool { - if o.CheckSuccess() || len(o.steps) == 0 { + if o.CheckSuccess() { return false } - currentStep := int(atomic.LoadInt32(&o.currentStep)) - startTime := o.GetStartTime() - if 0 < currentStep && currentStep-1 < len(o.steps) { - startTime = time.Unix(0, atomic.LoadInt64(&(o.stepsTime[currentStep-1]))) - } - step := o.steps[currentStep] - return o.status.CheckStepTimeout(startTime, step, o.ApproximateSize) + currentStep := atomic.LoadInt32(&o.currentStep) + startTime := o.getStepStartTime(currentStep) + return o.status.CheckStepTimeout(startTime, o.steps[currentStep], o.ApproximateSize) } // Len returns the operator's steps count. @@ -250,6 +246,15 @@ func (o *Operator) Step(i int) OpStep { return nil } +// getStepStartTime returns the start time of the i-th step. +func (o *Operator) getStepStartTime(step int32) time.Time { + startTime := o.GetStartTime() + if 0 < step && int(step-1) < len(o.steps) { + startTime = time.Unix(0, atomic.LoadInt64(&(o.stepsTime[step-1]))) + } + return startTime +} + // Check checks if current step is finished, returns next step to take action. // If operator is at an end status, check returns nil. // It's safe to be called by multiple goroutine concurrently. @@ -262,14 +267,8 @@ func (o *Operator) Check(region *core.RegionInfo) OpStep { for step := atomic.LoadInt32(&o.currentStep); int(step) < len(o.steps); step++ { if o.steps[int(step)].IsFinish(region) { if atomic.CompareAndSwapInt64(&(o.stepsTime[step]), 0, time.Now().UnixNano()) { - var startTime time.Time - if step == 0 { - startTime = o.GetStartTime() - } else { - startTime = time.Unix(0, atomic.LoadInt64(&(o.stepsTime[step-1]))) - } operatorStepDuration.WithLabelValues(reflect.TypeOf(o.steps[int(step)]).Name()). - Observe(time.Unix(0, o.stepsTime[step]).Sub(startTime).Seconds()) + Observe(time.Unix(0, o.stepsTime[step]).Sub(o.getStepStartTime(step)).Seconds()) } atomic.StoreInt32(&o.currentStep, step+1) } else { @@ -405,12 +404,16 @@ func (o *Operator) GetAdditionalInfo() string { // these values are used for unit test. const ( // mock region default region size is 96MB. - mockRegionSize = 96 * (1 << 20) + mockRegionSize = 96 mockDesc = "test" mockBrief = "test" ) // NewTestOperator creates a test operator, only used for unit test. func NewTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind OpKind, steps ...OpStep) *Operator { + // OpSteps can not be empty for test. + if len(steps) == 0 { + steps = []OpStep{ChangePeerV2Leave{}} + } return NewOperator(mockDesc, mockBrief, regionID, regionEpoch, kind, mockRegionSize, steps...) } diff --git a/server/schedule/operator/status_tracker_test.go b/server/schedule/operator/status_tracker_test.go index aed287d833d..8ada8b386f2 100644 --- a/server/schedule/operator/status_tracker_test.go +++ b/server/schedule/operator/status_tracker_test.go @@ -130,6 +130,7 @@ func (s *testOpStatusTrackerSuite) TestCheckStepTimeout(c *C) { }} for _, v := range testdata { + // Timeout and status changed trk := NewOpStatusTracker() trk.To(STARTED) c.Assert(trk.CheckStepTimeout(v.start, v.step, 0), Equals, v.status == TIMEOUT) diff --git a/server/schedule/operator/step.go b/server/schedule/operator/step.go index 3b9e194e172..f8d075577f5 100644 --- a/server/schedule/operator/step.go +++ b/server/schedule/operator/step.go @@ -36,7 +36,7 @@ const ( // default: 6 s/Mb DefaultSlowExecutorRate = 6 // DefaultFastExecutorRate is the slow rate of the operator executor. - // default: 0.01 s/Mb + // default: 0.1 s/Mb DefaultFastExecutorRate = 0.1 ) @@ -112,7 +112,7 @@ func (tl TransferLeader) Influence(opInfluence OpInfluence, region *core.RegionI // Timeout returns true if the step is timeout. func (tl TransferLeader) Timeout(start time.Time, regionSize int64) bool { - return time.Since(start) > maxFastWaitTime(regionSize) + return time.Since(start) > fastStepWaitDuration(regionSize) } // AddPeer is an OpStep that adds a region peer. @@ -170,7 +170,7 @@ func (ap AddPeer) CheckInProgress(ci ClusterInformer, region *core.RegionInfo) e // Timeout returns true if the step is timeout. func (ap AddPeer) Timeout(start time.Time, regionSize int64) bool { - return time.Since(start) > maxSlowWaitTime(regionSize) + return time.Since(start) > slowStepWaitDuration(regionSize) } // AddLearner is an OpStep that adds a region learner peer. @@ -234,7 +234,7 @@ func (al AddLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) // Timeout returns true if the step is timeout. func (al AddLearner) Timeout(start time.Time, regionSize int64) bool { - return time.Since(start) > maxSlowWaitTime(regionSize) + return time.Since(start) > slowStepWaitDuration(regionSize) } // PromoteLearner is an OpStep that promotes a region learner peer to normal voter. @@ -277,7 +277,7 @@ func (pl PromoteLearner) Influence(_ OpInfluence, _ *core.RegionInfo) {} // Timeout returns true if the step is timeout. func (pl PromoteLearner) Timeout(start time.Time, regionSize int64) bool { - return time.Since(start) > maxFastWaitTime(regionSize) + return time.Since(start) > fastStepWaitDuration(regionSize) } // RemovePeer is an OpStep that removes a region peer. @@ -333,7 +333,7 @@ func (rp RemovePeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) // Timeout returns true if the step is timeout. func (rp RemovePeer) Timeout(start time.Time, regionSize int64) bool { - return time.Since(start) > maxFastWaitTime(regionSize) + return time.Since(start) > fastStepWaitDuration(regionSize) } // MergeRegion is an OpStep that merge two regions. @@ -386,7 +386,7 @@ func (mr MergeRegion) Influence(opInfluence OpInfluence, region *core.RegionInfo // Timeout returns true if the step is timeout. func (mr MergeRegion) Timeout(start time.Time, regionSize int64) bool { - return time.Since(start) > maxFastWaitTime(regionSize)*10 + return time.Since(start) > fastStepWaitDuration(regionSize)*10 } // SplitRegion is an OpStep that splits a region. @@ -428,7 +428,7 @@ func (sr SplitRegion) CheckInProgress(_ ClusterInformer, _ *core.RegionInfo) err // Timeout returns true if the step is timeout. func (sr SplitRegion) Timeout(start time.Time, regionSize int64) bool { - return time.Since(start) > maxFastWaitTime(regionSize) + return time.Since(start) > fastStepWaitDuration(regionSize) } // DemoteVoter is very similar to DemoteFollower. But it allows Demote Leader. @@ -460,7 +460,7 @@ func (dv DemoteVoter) IsFinish(region *core.RegionInfo) bool { // Timeout returns true if the step is timeout. func (dv DemoteVoter) Timeout(start time.Time, regionSize int64) bool { - return time.Since(start) > maxFastWaitTime(regionSize) + return time.Since(start) > fastStepWaitDuration(regionSize) } // ChangePeerV2Enter is an OpStep that uses joint consensus to request all PromoteLearner and DemoteVoter. @@ -607,7 +607,7 @@ func (cpe ChangePeerV2Enter) GetRequest() *pdpb.ChangePeerV2 { // Timeout returns true if the step is timeout. func (cpe ChangePeerV2Enter) Timeout(start time.Time, regionSize int64) bool { count := uint64(len(cpe.PromoteLearners)+len(cpe.DemoteVoters)) + 1 - return time.Since(start) > maxFastWaitTime(regionSize)*time.Duration(count) + return time.Since(start) > fastStepWaitDuration(regionSize)*time.Duration(count) } // ChangePeerV2Leave is an OpStep that leaves the joint state. @@ -732,7 +732,7 @@ func (cpl ChangePeerV2Leave) Influence(_ OpInfluence, _ *core.RegionInfo) {} // Timeout returns true if the step is timeout. func (cpl ChangePeerV2Leave) Timeout(start time.Time, regionSize int64) bool { count := uint64(len(cpl.PromoteLearners)+len(cpl.DemoteVoters)) + 1 - return time.Since(start) > maxFastWaitTime(regionSize)*time.Duration(count) + return time.Since(start) > fastStepWaitDuration(regionSize)*time.Duration(count) } func validateStore(ci ClusterInformer, id uint64) error { @@ -746,7 +746,7 @@ func validateStore(ci ClusterInformer, id uint64) error { return nil } -func maxSlowWaitTime(regionSize int64) time.Duration { +func slowStepWaitDuration(regionSize int64) time.Duration { seconds := DefaultSlowExecutorRate * regionSize wait := time.Duration(seconds) * time.Second if wait < SlowOperatorWaitTime { @@ -755,7 +755,7 @@ func maxSlowWaitTime(regionSize int64) time.Duration { return wait } -func maxFastWaitTime(regionSize int64) time.Duration { +func fastStepWaitDuration(regionSize int64) time.Duration { seconds := int64(DefaultFastExecutorRate * float64(regionSize)) wait := time.Duration(seconds) * time.Second if wait < FastOperatorWaitTime {