Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: bufferflies <1045931706@qq.com>
  • Loading branch information
bufferflies committed Feb 21, 2022
1 parent 38cdf2e commit ceee2b7
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 29 deletions.
35 changes: 19 additions & 16 deletions server/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,16 +225,12 @@ func (o *Operator) CheckExpired() bool {

// CheckTimeout checks if the operator is timeout, and update the status.
func (o *Operator) CheckTimeout() bool {
if o.CheckSuccess() || len(o.steps) == 0 {
if o.CheckSuccess() {
return false
}
currentStep := int(atomic.LoadInt32(&o.currentStep))
startTime := o.GetStartTime()
if 0 < currentStep && currentStep-1 < len(o.steps) {
startTime = time.Unix(0, atomic.LoadInt64(&(o.stepsTime[currentStep-1])))
}
step := o.steps[currentStep]
return o.status.CheckStepTimeout(startTime, step, o.ApproximateSize)
currentStep := atomic.LoadInt32(&o.currentStep)
startTime := o.getStepStartTime(currentStep)
return o.status.CheckStepTimeout(startTime, o.steps[currentStep], o.ApproximateSize)
}

// Len returns the operator's steps count.
Expand All @@ -250,6 +246,15 @@ func (o *Operator) Step(i int) OpStep {
return nil
}

// getStepStartTime returns the start time of the i-th step.
func (o *Operator) getStepStartTime(step int32) time.Time {
startTime := o.GetStartTime()
if 0 < step && int(step-1) < len(o.steps) {
startTime = time.Unix(0, atomic.LoadInt64(&(o.stepsTime[step-1])))
}
return startTime
}

// Check checks if current step is finished, returns next step to take action.
// If operator is at an end status, check returns nil.
// It's safe to be called by multiple goroutine concurrently.
Expand All @@ -262,14 +267,8 @@ func (o *Operator) Check(region *core.RegionInfo) OpStep {
for step := atomic.LoadInt32(&o.currentStep); int(step) < len(o.steps); step++ {
if o.steps[int(step)].IsFinish(region) {
if atomic.CompareAndSwapInt64(&(o.stepsTime[step]), 0, time.Now().UnixNano()) {
var startTime time.Time
if step == 0 {
startTime = o.GetStartTime()
} else {
startTime = time.Unix(0, atomic.LoadInt64(&(o.stepsTime[step-1])))
}
operatorStepDuration.WithLabelValues(reflect.TypeOf(o.steps[int(step)]).Name()).
Observe(time.Unix(0, o.stepsTime[step]).Sub(startTime).Seconds())
Observe(time.Unix(0, o.stepsTime[step]).Sub(o.getStepStartTime(step)).Seconds())
}
atomic.StoreInt32(&o.currentStep, step+1)
} else {
Expand Down Expand Up @@ -405,12 +404,16 @@ func (o *Operator) GetAdditionalInfo() string {
// these values are used for unit test.
const (
// mock region default region size is 96MB.
mockRegionSize = 96 * (1 << 20)
mockRegionSize = 96
mockDesc = "test"
mockBrief = "test"
)

// NewTestOperator creates a test operator, only used for unit test.
func NewTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind OpKind, steps ...OpStep) *Operator {
// OpSteps can not be empty for test.
if len(steps) == 0 {
steps = []OpStep{ChangePeerV2Leave{}}
}
return NewOperator(mockDesc, mockBrief, regionID, regionEpoch, kind, mockRegionSize, steps...)
}
1 change: 1 addition & 0 deletions server/schedule/operator/status_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func (s *testOpStatusTrackerSuite) TestCheckStepTimeout(c *C) {
}}

for _, v := range testdata {
// Timeout and status changed
trk := NewOpStatusTracker()
trk.To(STARTED)
c.Assert(trk.CheckStepTimeout(v.start, v.step, 0), Equals, v.status == TIMEOUT)
Expand Down
26 changes: 13 additions & 13 deletions server/schedule/operator/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
// default: 6 s/Mb
DefaultSlowExecutorRate = 6
// DefaultFastExecutorRate is the slow rate of the operator executor.
// default: 0.01 s/Mb
// default: 0.1 s/Mb
DefaultFastExecutorRate = 0.1
)

Expand Down Expand Up @@ -112,7 +112,7 @@ func (tl TransferLeader) Influence(opInfluence OpInfluence, region *core.RegionI

// Timeout returns true if the step is timeout.
func (tl TransferLeader) Timeout(start time.Time, regionSize int64) bool {
return time.Since(start) > maxFastWaitTime(regionSize)
return time.Since(start) > fastStepWaitDuration(regionSize)
}

// AddPeer is an OpStep that adds a region peer.
Expand Down Expand Up @@ -170,7 +170,7 @@ func (ap AddPeer) CheckInProgress(ci ClusterInformer, region *core.RegionInfo) e

// Timeout returns true if the step is timeout.
func (ap AddPeer) Timeout(start time.Time, regionSize int64) bool {
return time.Since(start) > maxSlowWaitTime(regionSize)
return time.Since(start) > slowStepWaitDuration(regionSize)
}

// AddLearner is an OpStep that adds a region learner peer.
Expand Down Expand Up @@ -234,7 +234,7 @@ func (al AddLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo)

// Timeout returns true if the step is timeout.
func (al AddLearner) Timeout(start time.Time, regionSize int64) bool {
return time.Since(start) > maxSlowWaitTime(regionSize)
return time.Since(start) > slowStepWaitDuration(regionSize)
}

// PromoteLearner is an OpStep that promotes a region learner peer to normal voter.
Expand Down Expand Up @@ -277,7 +277,7 @@ func (pl PromoteLearner) Influence(_ OpInfluence, _ *core.RegionInfo) {}

// Timeout returns true if the step is timeout.
func (pl PromoteLearner) Timeout(start time.Time, regionSize int64) bool {
return time.Since(start) > maxFastWaitTime(regionSize)
return time.Since(start) > fastStepWaitDuration(regionSize)
}

// RemovePeer is an OpStep that removes a region peer.
Expand Down Expand Up @@ -333,7 +333,7 @@ func (rp RemovePeer) Influence(opInfluence OpInfluence, region *core.RegionInfo)

// Timeout returns true if the step is timeout.
func (rp RemovePeer) Timeout(start time.Time, regionSize int64) bool {
return time.Since(start) > maxFastWaitTime(regionSize)
return time.Since(start) > fastStepWaitDuration(regionSize)
}

// MergeRegion is an OpStep that merge two regions.
Expand Down Expand Up @@ -386,7 +386,7 @@ func (mr MergeRegion) Influence(opInfluence OpInfluence, region *core.RegionInfo

// Timeout returns true if the step is timeout.
func (mr MergeRegion) Timeout(start time.Time, regionSize int64) bool {
return time.Since(start) > maxFastWaitTime(regionSize)*10
return time.Since(start) > fastStepWaitDuration(regionSize)*10
}

// SplitRegion is an OpStep that splits a region.
Expand Down Expand Up @@ -428,7 +428,7 @@ func (sr SplitRegion) CheckInProgress(_ ClusterInformer, _ *core.RegionInfo) err

// Timeout returns true if the step is timeout.
func (sr SplitRegion) Timeout(start time.Time, regionSize int64) bool {
return time.Since(start) > maxFastWaitTime(regionSize)
return time.Since(start) > fastStepWaitDuration(regionSize)
}

// DemoteVoter is very similar to DemoteFollower. But it allows Demote Leader.
Expand Down Expand Up @@ -460,7 +460,7 @@ func (dv DemoteVoter) IsFinish(region *core.RegionInfo) bool {

// Timeout returns true if the step is timeout.
func (dv DemoteVoter) Timeout(start time.Time, regionSize int64) bool {
return time.Since(start) > maxFastWaitTime(regionSize)
return time.Since(start) > fastStepWaitDuration(regionSize)
}

// ChangePeerV2Enter is an OpStep that uses joint consensus to request all PromoteLearner and DemoteVoter.
Expand Down Expand Up @@ -607,7 +607,7 @@ func (cpe ChangePeerV2Enter) GetRequest() *pdpb.ChangePeerV2 {
// Timeout returns true if the step is timeout.
func (cpe ChangePeerV2Enter) Timeout(start time.Time, regionSize int64) bool {
count := uint64(len(cpe.PromoteLearners)+len(cpe.DemoteVoters)) + 1
return time.Since(start) > maxFastWaitTime(regionSize)*time.Duration(count)
return time.Since(start) > fastStepWaitDuration(regionSize)*time.Duration(count)
}

// ChangePeerV2Leave is an OpStep that leaves the joint state.
Expand Down Expand Up @@ -732,7 +732,7 @@ func (cpl ChangePeerV2Leave) Influence(_ OpInfluence, _ *core.RegionInfo) {}
// Timeout returns true if the step is timeout.
func (cpl ChangePeerV2Leave) Timeout(start time.Time, regionSize int64) bool {
count := uint64(len(cpl.PromoteLearners)+len(cpl.DemoteVoters)) + 1
return time.Since(start) > maxFastWaitTime(regionSize)*time.Duration(count)
return time.Since(start) > fastStepWaitDuration(regionSize)*time.Duration(count)
}

func validateStore(ci ClusterInformer, id uint64) error {
Expand All @@ -746,7 +746,7 @@ func validateStore(ci ClusterInformer, id uint64) error {
return nil
}

func maxSlowWaitTime(regionSize int64) time.Duration {
func slowStepWaitDuration(regionSize int64) time.Duration {
seconds := DefaultSlowExecutorRate * regionSize
wait := time.Duration(seconds) * time.Second
if wait < SlowOperatorWaitTime {
Expand All @@ -755,7 +755,7 @@ func maxSlowWaitTime(regionSize int64) time.Duration {
return wait
}

func maxFastWaitTime(regionSize int64) time.Duration {
func fastStepWaitDuration(regionSize int64) time.Duration {
seconds := int64(DefaultFastExecutorRate * float64(regionSize))
wait := time.Duration(seconds) * time.Second
if wait < FastOperatorWaitTime {
Expand Down

0 comments on commit ceee2b7

Please sign in to comment.