Skip to content

Commit

Permalink
Rewrite arrival based executor with calculus
Browse files Browse the repository at this point in the history
  • Loading branch information
mstoykov committed Jan 22, 2020
1 parent 25e7e91 commit 9b59904
Show file tree
Hide file tree
Showing 2 changed files with 313 additions and 349 deletions.
331 changes: 172 additions & 159 deletions lib/executor/variable_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ import (

const variableArrivalRateType = "variable-arrival-rate"

// How often we can make arrival rate adjustments when processing stages
// TODO: make configurable, in some bounds?
const minIntervalBetweenRateAdjustments = 250 * time.Millisecond

func init() {
lib.RegisterExecutorConfigType(
variableArrivalRateType,
Expand Down Expand Up @@ -154,109 +150,13 @@ func (varc VariableArrivalRateConfig) GetExecutionRequirements(es *lib.Execution
}
}

type rateChange struct {
// At what time should the rate below be applied.
timeOffset time.Duration
// Equals 1/rate: if rate was "1/5s", then this value, which is intended to
// be passed to time.NewTicker(), will be 5s. There's a special case when
// the rate is 0, for which we'll set Valid=false. That's because 0 isn't a
// valid ticker period and shouldn't be passed to time.NewTicker(). Instead,
// an empty or stopped ticker should be used.
tickerPeriod types.NullDuration
}

// A helper method to generate the plan how the rate changes would happen.
func (varc VariableArrivalRateConfig) getPlannedRateChanges(segment *lib.ExecutionSegment) []rateChange {
timeUnit := time.Duration(varc.TimeUnit.Duration)
// Important note for accuracy: we must work with and scale only the
// rational numbers, never the raw target values directly. It matters most
// for the accuracy of the intermediate rate change values, but it's
// important even here.
//
// Say we have a desired rate growth from 1/sec to 2/sec over 1 minute, and
// we split the test into two segments of 20% and 80%. If we used the whole
// numbers for scaling, then the instance executing the first segment won't
// ever do even a single request, since scale(20%, 1) would be 0, whereas
// the rational value for scale(20%, 1/sec) is 0.2/sec, or rather 1/5sec...
currentRate := getScaledArrivalRate(segment, varc.StartRate.Int64, timeUnit)

rateChanges := []rateChange{}
timeFromStart := time.Duration(0)

var tArrivalRate = new(big.Rat)
var tArrivalRateStep = new(big.Rat)
var stepCoef = new(big.Rat)
for _, stage := range varc.Stages {
stageTargetRate := getScaledArrivalRate(segment, stage.Target.Int64, timeUnit)
stageDuration := time.Duration(stage.Duration.Duration)

if currentRate.Cmp(stageTargetRate) == 0 {
// We don't have to do anything but update the time offset
// if the rate wasn't changed in this stage
timeFromStart += stageDuration
continue
}

// Handle 0-duration stages, i.e. instant rate jumps
if stageDuration == 0 {
// check if the last set change is for the same time and overwrite it
if len(rateChanges) > 0 && rateChanges[len(rateChanges)-1].timeOffset == timeFromStart {
rateChanges[len(rateChanges)-1].tickerPeriod = getTickerPeriod(stageTargetRate)
} else {
rateChanges = append(rateChanges, rateChange{
timeOffset: timeFromStart,
tickerPeriod: getTickerPeriod(stageTargetRate),
})
}
currentRate = stageTargetRate
continue
}
// Basically, find out how many regular intervals with size of at least
// minIntervalBetweenRateAdjustments are in the stage's duration, and
// then use that number to calculate the actual step. All durations have
// nanosecond precision, so there isn't any actual loss of precision...
stepNumber := (stageDuration / minIntervalBetweenRateAdjustments)
if stepNumber > 1 {
rateDiff := new(big.Rat).Sub(stageTargetRate, currentRate)
stepInterval := stageDuration / stepNumber
for t := stepInterval; ; t += stepInterval {
if stageDuration-t < minIntervalBetweenRateAdjustments {
break
}

tArrivalRate.Add(
currentRate,
tArrivalRateStep.Mul(
rateDiff,
stepCoef.SetFrac64(int64(t), int64(stageDuration)),
),
)

rateChanges = append(rateChanges, rateChange{
timeOffset: timeFromStart + t,
tickerPeriod: getTickerPeriod(tArrivalRate),
})
}
}
timeFromStart += stageDuration
rateChanges = append(rateChanges, rateChange{
timeOffset: timeFromStart,
tickerPeriod: getTickerPeriod(stageTargetRate),
})
currentRate = stageTargetRate
}

return rateChanges
}

// NewExecutor creates a new VariableArrivalRate executor
func (varc VariableArrivalRateConfig) NewExecutor(
es *lib.ExecutionState, logger *logrus.Entry,
) (lib.Executor, error) {
return VariableArrivalRate{
BaseExecutor: NewBaseExecutor(varc, es, logger),
config: varc,
plannedRateChanges: varc.getPlannedRateChanges(es.Options.ExecutionSegment),
BaseExecutor: NewBaseExecutor(varc, es, logger),
config: varc,
}, nil
}

Expand All @@ -270,36 +170,159 @@ func (varc VariableArrivalRateConfig) HasWork(es *lib.ExecutionSegment) bool {
//TODO: combine with the ConstantArrivalRate?
type VariableArrivalRate struct {
*BaseExecutor
config VariableArrivalRateConfig
plannedRateChanges []rateChange
config VariableArrivalRateConfig
}

// Make sure we implement the lib.Executor interface.
var _ lib.Executor = &VariableArrivalRate{}

// streamRateChanges is a helper method that emits rate change events at their
// proper time.
func (varr VariableArrivalRate) streamRateChanges(ctx context.Context, startTime time.Time) <-chan rateChange {
ch := make(chan rateChange)
go func() {
for _, step := range varr.plannedRateChanges {
offsetDiff := step.timeOffset - time.Since(startTime)
if offsetDiff > 0 { // wait until time of event arrives
select {
case <-ctx.Done():
return // exit if context is cancelled
case <-time.After(offsetDiff): //TODO: reuse a timer?
// do nothing
var two big.Rat

func init() {
two.SetInt64(2)
}

// from https://groups.google.com/forum/#!topic/golang-nuts/aIcDf8T-Png
func sqrtRat(x *big.Rat) *big.Rat {
var z, a, b big.Rat
var ns, ds big.Int
ni, di := x.Num(), x.Denom()
z.SetFrac(ns.Rsh(ni, uint(ni.BitLen())/2), ds.Rsh(di, uint(di.BitLen())/2))
for i := 10; i > 0; i-- { //TODO: better termination
a.Sub(a.Mul(&z, &z), x)
f, _ := a.Float64()
if f == 0 {
break
}
fmt.Println(x, z, i)
z.Sub(&z, b.Quo(&a, b.Mul(&two, &z)))
}
return &z
}

// This implementation is just for reference
// TODO: add test to check that `cal` is accurate enough ...
func (varc VariableArrivalRateConfig) calRat(ch chan<- time.Duration) {
defer close(ch)
curr := varc.StartRate.ValueOrZero()
var base time.Duration = 0
for _, stage := range varc.Stages {
// fmt.Println(stage)
target := stage.Target.ValueOrZero()
if target != curr {
var (
a = big.NewRat(curr, int64(time.Second))
b = big.NewRat(target, int64(time.Second))
c = big.NewRat(time.Duration(stage.Duration.Duration).Nanoseconds(), 1)
)
i := int64(1)
for ; ; i++ {
// a - b!=0, x = (a c - sqrt(c (a^2 c - 2 a d + 2 b d)))/(a - b), c!=0
x := new(big.Rat).Mul( // divide
new(big.Rat).Sub( // -
new(big.Rat).Mul(a, c), // a *c
sqrtRat( // sqrt (c * (c*a^2 + 2d*(b-a)))
new(big.Rat).Mul(
c,
new(big.Rat).Add(
new(big.Rat).Mul(c, new(big.Rat).Mul(a, a)),
new(big.Rat).Mul(big.NewRat(2*i, 1), new(big.Rat).Sub(b, a)),
)))),
new(big.Rat).Inv(new(big.Rat).Sub(a, b))) // a - b
fmt.Println(a, b, c, i, x.FloatString(50))
if x.Cmp(c) > 0 {
// fmt.Println(x.Sub(x, c))
break
}
r, _ := x.Float64()
ch <- base + time.Duration(r)
}
} else {
step := big.NewRat(int64(time.Second), target)
a := big.NewRat(int64(time.Second), target)
c := big.NewRat(time.Duration(stage.Duration.Duration).Nanoseconds(), 1)
for { // TODO: remove the 50
if a.Cmp(c) > 0 {
break
}
// fmt.Println(a, step)
r, _ := a.Float64()
ch <- base + time.Duration(r)
a.Add(a, step)
}
}
base += time.Duration(stage.Duration.Duration)
curr = target
// fmt.Println("end ", stage)
}
}

const epsilon = 0.000_000_000_1 // this 1/10 nanosecond ... so we don' care at that point

func (varc VariableArrivalRateConfig) cal(ch chan<- time.Duration) {
// TODO: add inline comments with explanation of what is happening
// for now just link to https://github.com/loadimpact/k6/issues/1299#issuecomment-575661084
defer close(ch)
var base time.Duration
curr := varc.StartRate.ValueOrZero()
var carry float64
for _, stage := range varc.Stages {
// TODO remove the fmt.Println debug helpers :D
// fmt.Println(stage)
target := stage.Target.ValueOrZero()
if target != curr {
var (
a = float64(curr) / float64(time.Second)
b = float64(target) / float64(time.Second)
c = float64(time.Duration(stage.Duration.Duration).Nanoseconds())
x float64
endCount = c * ((b-a)/2 + a)
i = float64(1)
)

if carry != 0 {
i -= carry
}

// fmt.Printf("%f %f %f\n", i, endCount, carry)
for ; i <= endCount; i++ {
// fmt.Println(i, endCount)
x = (a*c - math.Sqrt(c*(a*a*c+2*i*(b-a)))) / (a - b)
// fmt.Printf("%.10f, %.10f, %.10f, %.10f, %f\n", a, b, c, i, x)

if math.IsNaN(x) {
fmt.Printf("break, %f,%f\n", x, c)
break
}
// fmt.Println("x=", time.Duration(x), base)
ch <- time.Duration(x) + base
}
select {
case <-ctx.Done():
return // exit if context is cancelled
case ch <- step: // send the step
carry = endCount - (i - 1)
} else {
var (
a = float64(time.Second) / float64(target)
step = a
c = float64(time.Duration(stage.Duration.Duration).Nanoseconds())
count = c / float64(time.Duration(varc.TimeUnit.Duration).Nanoseconds()) * float64(target)
i = float64(1)
)
if carry != 0 {
i -= carry
a -= a * carry
}
// fmt.Printf("%f %f %f\n", a, carry, a*carry)
// a -= a * carry

for ; i <= count; i++ {
// fmt.Println(time.Duration(a), time.Duration(step))
ch <- time.Duration(a) + base
a += step
}
}
}()
return ch
curr = target
base += time.Duration(stage.Duration.Duration)
// fmt.Println("end ", stage)
}
}

// Run executes a variable number of iterations per second.
Expand Down Expand Up @@ -378,7 +401,6 @@ func (varr VariableArrivalRate) Run(ctx context.Context, out chan<- stats.Sample
}

remainingUnplannedVUs := maxVUs - preAllocatedVUs
rateChangesStream := varr.streamRateChanges(maxDurationCtx, startTime)

startIteration := func() error {
select {
Expand All @@ -402,46 +424,37 @@ func (varr VariableArrivalRate) Run(ctx context.Context, out chan<- stats.Sample
return nil
}

var now time.Time
var lastTick = time.Now()
var ticker = &time.Ticker{}
if startTickerPeriod.Valid {
ticker = time.NewTicker(time.Duration(atomic.LoadInt64(tickerPeriod)))
}
var ch <-chan time.Time
var timer = time.NewTimer(time.Hour)
var start = time.Now()
var ch = make(chan time.Duration, 0)
go varr.config.cal(ch)
for {
select {
case rateChange := <-rateChangesStream:
ticker.Stop()
select {
case <-ticker.C:
default:
case nextTime, ok := <-ch:
if !ok {
return nil
}
now = time.Now()
newPeriod := rateChange.tickerPeriod
atomic.StoreInt64(tickerPeriod, int64(newPeriod.Duration))
if newPeriod.Valid {
for lastTick.Before(now) {
nextIterationTime := time.Duration(newPeriod.Duration) - now.Sub(lastTick)
if nextIterationTime >= 0 {
ch = time.After(nextIterationTime)
break
}
lastTick = lastTick.Add(time.Duration(newPeriod.Duration))
if err := startIteration(); err != nil {
return err
}
b := time.Until(start.Add(nextTime))
// fmt.Println(b)
atomic.StoreInt64(tickerPeriod, int64(b))
if b < 0 {
// fmt.Println(time.Now())
err := startIteration()
if err != nil {
return err
}
continue
}
case lastTick = <-ch:
ch = nil
if err := startIteration(); err != nil {
return err
}
ticker = time.NewTicker(time.Duration(atomic.LoadInt64(tickerPeriod)))
case lastTick = <-ticker.C:
if err := startIteration(); err != nil {
return err
timer.Reset(b)
select {
case <-timer.C:
// fmt.Println(time.Now())
err := startIteration()
if err != nil {
return err
}
case <-regDurationDone:
return nil
}
case <-regDurationDone:
return nil
Expand Down
Loading

0 comments on commit 9b59904

Please sign in to comment.