Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: operator's timeout depends on steps #4548

Merged
merged 33 commits into from
Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
8e8b375
operator add region size properties and close #4457
bufferflies Dec 14, 2021
98e6c9a
lint check
bufferflies Dec 14, 2021
ff4e389
adds timeoutfactor property to operator struct, adds max-region-size …
bufferflies Jan 5, 2022
de8f984
merge master
bufferflies Jan 5, 2022
59a0dff
pass ut
bufferflies Jan 5, 2022
b792225
replace max-region-size
bufferflies Jan 13, 2022
33933f9
resolved confict
bufferflies Jan 24, 2022
8a5a203
check all in step
bufferflies Jan 24, 2022
155a53d
remove unnecessary change
bufferflies Jan 24, 2022
0a9d9cc
remove unnecessary change
bufferflies Jan 24, 2022
b5413d6
resolved conflict
bufferflies Jan 24, 2022
62340cf
resolved conflict
bufferflies Jan 24, 2022
1c7bdc8
resolved conflict
bufferflies Jan 25, 2022
1a796f9
rename TimeOut to timeout
bufferflies Jan 25, 2022
bb427c8
add and pass unit test
bufferflies Jan 25, 2022
2a10af6
dead lock
bufferflies Jan 25, 2022
99488c0
remove sleep
bufferflies Feb 8, 2022
685b900
merge timeout
bufferflies Feb 10, 2022
bfe9788
remove config changes
bufferflies Feb 11, 2022
be1c56b
update deadlock
bufferflies Feb 11, 2022
8395b99
remove prepare lock
bufferflies Feb 11, 2022
22dfc3f
move executor rate to operator control
bufferflies Feb 15, 2022
e696959
operator remove executor rate
bufferflies Feb 15, 2022
76f35d2
Merge branch 'master' into feature/operator_factor
bufferflies Feb 16, 2022
e07edf6
address comment
bufferflies Feb 16, 2022
d2906f1
Merge branch 'feature/operator_factor' of github.com:bufferflies/pd i…
bufferflies Feb 16, 2022
1866b51
remove operator executor rate
bufferflies Feb 16, 2022
55ac82a
resolv conflict
bufferflies Feb 17, 2022
38cdf2e
change timeout signature
bufferflies Feb 18, 2022
ceee2b7
address comment
bufferflies Feb 18, 2022
e74e2f7
change comment
bufferflies Feb 23, 2022
d57e796
Merge branch 'master' into feature/operator_factor
bufferflies Feb 24, 2022
21cebb9
Merge branch 'master' into feature/operator_factor
ti-chi-bot Mar 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions server/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (o *Operator) String() string {
for i := range o.steps {
stepStrs[i] = o.steps[i].String()
}
s := fmt.Sprintf("%s {%s} (kind:%s, region:%v(%v,%v), createAt:%s, startAt:%s, currentStep:%v, size:%v, steps:[%s])",
s := fmt.Sprintf("%s {%s} (kind:%s, region:%v(%v, %v), createAt:%s, startAt:%s, currentStep:%v, size:%d, steps:[%s])",
o.desc, o.brief, o.kind, o.regionID, o.regionEpoch.GetVersion(), o.regionEpoch.GetConfVer(), o.GetCreateTime(),
o.GetStartTime(), atomic.LoadInt32(&o.currentStep), o.ApproximateSize, strings.Join(stepStrs, ", "))
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
if o.CheckSuccess() {
Expand Down Expand Up @@ -228,10 +228,9 @@ func (o *Operator) CheckTimeout() bool {
if o.CheckSuccess() {
return false
}
if o.kind&OpRegion != 0 {
return o.status.CheckTimeout(SlowOperatorWaitTime)
}
return o.status.CheckTimeout(FastOperatorWaitTime)
currentStep := atomic.LoadInt32(&o.currentStep)
startTime := o.getStepStartTime(currentStep)
return o.status.CheckStepTimeout(startTime, o.steps[currentStep], o.ApproximateSize)
}

// Len returns the operator's steps count.
Expand All @@ -247,6 +246,15 @@ func (o *Operator) Step(i int) OpStep {
return nil
}

// getStepStartTime returns the start time of the i-th step.
func (o *Operator) getStepStartTime(step int32) time.Time {
startTime := o.GetStartTime()
if 0 < step && int(step-1) < len(o.steps) {
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
startTime = time.Unix(0, atomic.LoadInt64(&(o.stepsTime[step-1])))
}
return startTime
}

// Check checks if current step is finished, returns next step to take action.
// If operator is at an end status, check returns nil.
// It's safe to be called by multiple goroutine concurrently.
Expand All @@ -259,14 +267,8 @@ func (o *Operator) Check(region *core.RegionInfo) OpStep {
for step := atomic.LoadInt32(&o.currentStep); int(step) < len(o.steps); step++ {
if o.steps[int(step)].IsFinish(region) {
if atomic.CompareAndSwapInt64(&(o.stepsTime[step]), 0, time.Now().UnixNano()) {
var startTime time.Time
if step == 0 {
startTime = o.GetStartTime()
} else {
startTime = time.Unix(0, atomic.LoadInt64(&(o.stepsTime[step-1])))
}
operatorStepDuration.WithLabelValues(reflect.TypeOf(o.steps[int(step)]).Name()).
Observe(time.Unix(0, o.stepsTime[step]).Sub(startTime).Seconds())
Observe(time.Unix(0, o.stepsTime[step]).Sub(o.getStepStartTime(step)).Seconds())
}
atomic.StoreInt32(&o.currentStep, step+1)
} else {
Expand Down Expand Up @@ -402,12 +404,16 @@ func (o *Operator) GetAdditionalInfo() string {
// these values are used for unit test.
const (
// mock region default region size is 96MB.
mockRegionSize = 96 * (1 << 20)
mockRegionSize = 96
mockDesc = "test"
mockBrief = "test"
)

// NewTestOperator creates a test operator, only used for unit test.
func NewTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind OpKind, steps ...OpStep) *Operator {
// OpSteps can not be empty for test.
if len(steps) == 0 {
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
steps = []OpStep{ChangePeerV2Leave{}}
}
return NewOperator(mockDesc, mockBrief, regionID, regionEpoch, kind, mockRegionSize, steps...)
}
87 changes: 86 additions & 1 deletion server/schedule/operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func (s *testOperatorSuite) TestOperator(c *C) {
c.Assert(op.CheckTimeout(), IsFalse)
SetOperatorStatusReachTime(op, STARTED, op.GetStartTime().Add(-FastOperatorWaitTime-time.Second))
c.Assert(op.CheckTimeout(), IsFalse)
op.stepsTime[op.currentStep-1] = op.GetReachTimeOf(STARTED).Unix()
SetOperatorStatusReachTime(op, STARTED, op.GetStartTime().Add(-SlowOperatorWaitTime-time.Second))
c.Assert(op.CheckTimeout(), IsTrue)
res, err := json.Marshal(op)
Expand Down Expand Up @@ -361,7 +362,7 @@ func (s *testOperatorSuite) TestCheck(c *C) {
c.Assert(op.Start(), IsTrue)
c.Assert(op.Check(region), NotNil)
c.Assert(op.Status(), Equals, STARTED)
op.status.setTime(STARTED, time.Now().Add(-SlowOperatorWaitTime))
op.stepsTime[op.currentStep-1] = time.Now().Add(-SlowOperatorWaitTime).Unix()
c.Assert(op.Check(region), NotNil)
c.Assert(op.Status(), Equals, TIMEOUT)
}
Expand Down Expand Up @@ -419,6 +420,90 @@ func (s *testOperatorSuite) TestSchedulerKind(c *C) {
}
}

func (s *testOperatorSuite) TestOpStepTimeout(c *C) {
testdata := []struct {
step []OpStep
regionSize int64
start time.Time
expect bool
}{
{
// case1: 10GB region will have 60,000s to executor.
step: []OpStep{AddLearner{}, AddPeer{}},
regionSize: 10 * 1000,
start: time.Now().Add(-(time.Second*(6*10*1000) + time.Second)),
expect: true,
},
{
step: []OpStep{AddLearner{}, AddPeer{}},
regionSize: 10 * 1000,
start: time.Now().Add(-(time.Second*(6*10*1000) - time.Second)),
expect: false,
}, {
// case2: 10MB region will have at least SlowOperatorWaitTime(10min) to executor.
step: []OpStep{AddLearner{}, AddPeer{}},
regionSize: 10,
start: time.Now().Add(-(SlowOperatorWaitTime + time.Second)),
expect: true,
}, {
step: []OpStep{AddLearner{}, AddPeer{}},
regionSize: 10,
start: time.Now().Add(-(time.Second*(6*10) - time.Second)),
expect: false,
}, {
// case3: 10GB region will have 1000s to executor for RemovePeer, TransferLeader, SplitRegion, PromoteLearner.
step: []OpStep{RemovePeer{}, TransferLeader{}, SplitRegion{}, PromoteLearner{}},
start: time.Now().Add(-(time.Second*(1000) + time.Second)),
regionSize: 10 * 1000,
expect: true,
}, {
step: []OpStep{RemovePeer{}, TransferLeader{}, SplitRegion{}, PromoteLearner{}},
start: time.Now().Add(-(time.Second*(1000) - time.Second)),
regionSize: 10 * 1000,
expect: false,
}, {
// case4: 10MB will have at lease FastOperatorWaitTime(10s) to executor for RemovePeer, TransferLeader, SplitRegion, PromoteLearner.
step: []OpStep{RemovePeer{}, TransferLeader{}, SplitRegion{}, PromoteLearner{}},
start: time.Now().Add(-(FastOperatorWaitTime + time.Second)),
regionSize: 10,
expect: true,
}, {
step: []OpStep{RemovePeer{}, TransferLeader{}, SplitRegion{}, PromoteLearner{}},
start: time.Now().Add(-(FastOperatorWaitTime - time.Second)),
regionSize: 10,
expect: false,
}, {
// case5: 10GB region will have 1000*3 for ChangePeerV2Enter, ChangePeerV2Leave.
step: []OpStep{ChangePeerV2Enter{PromoteLearners: []PromoteLearner{{}, {}}},
ChangePeerV2Leave{PromoteLearners: []PromoteLearner{{}, {}}}},
start: time.Now().Add(-(time.Second*(3000) + time.Second)),
regionSize: 10 * 1000,
expect: true,
}, {
step: []OpStep{ChangePeerV2Enter{PromoteLearners: []PromoteLearner{{}, {}}},
ChangePeerV2Leave{PromoteLearners: []PromoteLearner{{}, {}}}},
start: time.Now().Add(-(time.Second*(3000) - time.Second)),
regionSize: 10 * 1000,
expect: false,
}, {
//case6: 10GB region will have 1000*10s for ChangePeerV2Enter, ChangePeerV2Leave.
step: []OpStep{MergeRegion{}},
start: time.Now().Add(-(time.Second*(10000) + time.Second)),
regionSize: 10 * 1000,
expect: true,
}, {
step: []OpStep{MergeRegion{}},
start: time.Now().Add(-(time.Second*(10000) - time.Second)),
regionSize: 10 * 1000,
expect: false,
},
}
for _, v := range testdata {
for _, step := range v.step {
c.Assert(v.expect, Equals, step.Timeout(v.start, v.regionSize))
}
}
}
func (s *testOperatorSuite) TestRecord(c *C) {
operator := s.newTestOperator(1, OpLeader, AddLearner{ToStore: 1, PeerID: 1}, RemovePeer{FromStore: 1, PeerID: 1})
now := time.Now()
Expand Down
6 changes: 3 additions & 3 deletions server/schedule/operator/status_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ func (trk *OpStatusTracker) CheckExpired(exp time.Duration) bool {
return trk.current == EXPIRED
}

// CheckTimeout checks if timeout, and update the current status.
func (trk *OpStatusTracker) CheckTimeout(wait time.Duration) bool {
// CheckStepTimeout checks if timeout, and update the current status.
func (trk *OpStatusTracker) CheckStepTimeout(start time.Time, step OpStep, approximateSize int64) bool {
trk.rw.Lock()
defer trk.rw.Unlock()
if trk.current == STARTED {
if time.Since(trk.reachTimes[STARTED]) < wait {
if !step.Timeout(start, approximateSize) {
return false
}
_ = trk.toLocked(TIMEOUT)
Expand Down
46 changes: 19 additions & 27 deletions server/schedule/operator/status_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,35 +114,27 @@ func (s *testOpStatusTrackerSuite) TestCheckExpired(c *C) {
}
}

func (s *testOpStatusTrackerSuite) TestCheckTimeout(c *C) {
{
// Not timeout
trk := NewOpStatusTracker()
before := time.Now()
c.Assert(trk.To(STARTED), IsTrue)
after := time.Now()
c.Assert(trk.CheckTimeout(10*time.Second), IsFalse)
c.Assert(trk.Status(), Equals, STARTED)
checkTimeOrder(c, before, trk.ReachTime(), after)
}
{
// Timeout but status not changed
trk := NewOpStatusTracker()
c.Assert(trk.To(STARTED), IsTrue)
trk.setTime(STARTED, time.Now().Add(-10*time.Second))
c.Assert(trk.CheckTimeout(5*time.Second), IsTrue)
c.Assert(trk.Status(), Equals, TIMEOUT)
}
{
func (s *testOpStatusTrackerSuite) TestCheckStepTimeout(c *C) {
testdata := []struct {
step OpStep
start time.Time
status OpStatus
}{{
step: AddLearner{},
start: time.Now().Add(-(SlowOperatorWaitTime - 1*time.Second)),
status: STARTED,
}, {
step: AddLearner{},
start: time.Now().Add(-(SlowOperatorWaitTime + 1*time.Second)),
status: TIMEOUT,
}}

for _, v := range testdata {
// Timeout and status changed
trk := NewOpStatusTracker()
c.Assert(trk.To(STARTED), IsTrue)
before := time.Now()
c.Assert(trk.To(TIMEOUT), IsTrue)
after := time.Now()
c.Assert(trk.CheckTimeout(0), IsTrue)
c.Assert(trk.Status(), Equals, TIMEOUT)
checkTimeOrder(c, before, trk.ReachTime(), after)
trk.To(STARTED)
c.Assert(trk.CheckStepTimeout(v.start, v.step, 0), Equals, v.status == TIMEOUT)
c.Assert(trk.Status(), Equals, v.status)
}
}

Expand Down
Loading