Skip to content

Commit

Permalink
Fix VariableArrivalRate not working well
Browse files Browse the repository at this point in the history
The previous implementation worked basically as follows:
A list of changes to the arrival rate is generated where there is a
minimum time between this changes (currently 250ms). This means that for
any ramp-up/down there is a step every 250ms (more or less).
After this is generated a goroutine will read that list and send the
change on a channel at the appropriate time. During ramp-up/down this is
every 250ms (more or less).
Another goroutine will be receiving those changes and resetting a timer
to the new value. And here is where the problem lies:
If the arrival rate is more then 1 iteration each 250ms this means that
it will never trigger a start of a VU.
The extreme example is having ramp-up to 4 iteration per second - this
is exactly 1 iteration each 250ms, which means that for the whole
duration up to end of the ramp up there will be zero iterations started.

The new design changes only the last part of how VariableArrivalRate
Executor works. Instead of reseting the timer to the new value we
calculate what the value should be given how much has passed since the
last itearation in order to compensate, example:
If the last iteration was 300ms ago and the new rate is 320ms then the
timer is reset to 20ms at first and then to 300ms after each iteration
is triggered.
Of course there is the complication that we can have the situation in
which we already needed to make a iteration given that and that we
should start the next iteration not 320ms from when we get the new time
between iterations but after that 1 iteration we do 20ms from that time
...
So the code got hairy.

The even bigger problem with the code (apart from it probably needing to
be split in more functions or reorganised totally differently). Is that
this makes it even harder to solve the problem where splitting the test
in multiple execution segments won't keep the timing of individual
iterations instead it will group them in time, as shown and reported in
#1007
  • Loading branch information
mstoykov committed Dec 19, 2019
1 parent f6be35c commit 052120d
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 25 deletions.
79 changes: 55 additions & 24 deletions lib/executor/variable_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,6 @@ func (varr VariableArrivalRate) Run(ctx context.Context, out chan<- stats.Sample

startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, duration, gracefulStop)
defer cancel()
ticker := &time.Ticker{}
if startTickerPeriod.Valid {
ticker = time.NewTicker(time.Duration(startTickerPeriod.Duration))
}

// Make sure the log and the progress bar have accurate information
varr.logger.WithFields(logrus.Fields{
Expand Down Expand Up @@ -379,33 +375,68 @@ func (varr VariableArrivalRate) Run(ctx context.Context, out chan<- stats.Sample
remainingUnplannedVUs := maxVUs - preAllocatedVUs
rateChangesStream := varr.streamRateChanges(maxDurationCtx, startTime)

startIteration := func() error {
select {
case vu := <-vus:
// ideally, we get the VU from the buffer without any issues
go runIteration(vu)
default:
if remainingUnplannedVUs == 0 {
//TODO: emit an error metric?
varr.logger.Warningf("Insufficient VUs, reached %d active VUs and cannot allocate more", maxVUs)
break
}
vu, err := varr.executionState.GetUnplannedVU(maxDurationCtx, varr.logger)
if err != nil {
return err
}
remainingUnplannedVUs--
atomic.AddUint64(&initialisedVUs, 1)
go runIteration(vu)
}
return nil
}

var now time.Time
var lastTick = time.Now()
var ticker = &time.Ticker{}
if startTickerPeriod.Valid {
ticker = time.NewTicker(time.Duration(atomic.LoadInt64(tickerPeriod)))
}
var ch <-chan time.Time
for {
select {
case rateChange := <-rateChangesStream:
newPeriod := rateChange.tickerPeriod
ticker.Stop()
if newPeriod.Valid {
ticker = time.NewTicker(time.Duration(newPeriod.Duration))
}
atomic.StoreInt64(tickerPeriod, int64(newPeriod.Duration))
case <-ticker.C:
select {
case vu := <-vus:
// ideally, we get the VU from the buffer without any issues
go runIteration(vu)
case <-ticker.C:
default:
if remainingUnplannedVUs == 0 {
//TODO: emit an error metric?
varr.logger.Warningf("Insufficient VUs, reached %d active VUs and cannot allocate more", maxVUs)
break
}
vu, err := varr.executionState.GetUnplannedVU(maxDurationCtx, varr.logger)
if err != nil {
return err
}
now = time.Now()
newPeriod := rateChange.tickerPeriod
atomic.StoreInt64(tickerPeriod, int64(newPeriod.Duration))
if newPeriod.Valid {
for lastTick.Before(now) {
nextIterationTime := time.Duration(newPeriod.Duration) - now.Sub(lastTick)
if nextIterationTime >= 0 {
ch = time.After(nextIterationTime)
break
}
lastTick = lastTick.Add(time.Duration(newPeriod.Duration))
if err := startIteration(); err != nil {
return err
}
}
remainingUnplannedVUs--
atomic.AddUint64(&initialisedVUs, 1)
go runIteration(vu)
}
case lastTick = <-ch:
ch = nil
if err := startIteration(); err != nil {
return err
}
ticker = time.NewTicker(time.Duration(atomic.LoadInt64(tickerPeriod)))
case lastTick = <-ticker.C:
if err := startIteration(); err != nil {
return err
}
case <-regDurationDone:
return nil
Expand Down
43 changes: 42 additions & 1 deletion lib/executor/variable_arrival_rate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/loadimpact/k6/lib/types"
"github.com/loadimpact/k6/stats"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
null "gopkg.in/guregu/null.v3"
)
Expand Down Expand Up @@ -269,7 +270,7 @@ func TestVariableArrivalRateRunCorrectRate(t *testing.T) {
currentCount = atomic.SwapInt64(&count, 0)
// this is highly dependant on minIntervalBetweenRateAdjustments
// TODO find out why this isn't 30 and fix it
require.InDelta(t, 23, currentCount, 2)
require.InDelta(t, 26, currentCount, 2)

time.Sleep(time.Second)
currentCount = atomic.SwapInt64(&count, 0)
Expand All @@ -281,3 +282,43 @@ func TestVariableArrivalRateRunCorrectRate(t *testing.T) {
require.NoError(t, err)
require.Empty(t, logHook.Drain())
}

func TestVariableArrivalRateRunCorrectRateWithSlowRate(t *testing.T) {
t.Parallel()
var count int64
var now = time.Now()
var expectedTimes = []time.Duration{
time.Millisecond * 2500, time.Second * 4, time.Millisecond * 5200}
var ctx, cancel, executor, logHook = setupExecutor(
t, VariableArrivalRateConfig{
TimeUnit: types.NullDurationFrom(time.Second),
Stages: []Stage{
{
Duration: types.NullDurationFrom(time.Second * 6),
Target: null.IntFrom(1),
},
},
PreAllocatedVUs: null.IntFrom(10),
MaxVUs: null.IntFrom(20),
},
func(ctx context.Context, out chan<- stats.SampleContainer) error {
current := atomic.AddInt64(&count, 1)
if !assert.True(t, int(current) <= len(expectedTimes)) {
return nil
}
expectedTime := expectedTimes[current-1]
assert.WithinDuration(t,
now.Add(expectedTime),
time.Now(),
time.Millisecond*100,
"%d expectedTime %s", current, expectedTime,
)
return nil
})
defer cancel()
var engineOut = make(chan stats.SampleContainer, 1000)
err := executor.Run(ctx, engineOut)
require.NoError(t, err)
require.Equal(t, int64(3), count)
require.Empty(t, logHook.Drain())
}

0 comments on commit 052120d

Please sign in to comment.