diff --git a/lib/executor/variable_arrival_rate.go b/lib/executor/variable_arrival_rate.go index 75938aadb88..b19687dc903 100644 --- a/lib/executor/variable_arrival_rate.go +++ b/lib/executor/variable_arrival_rate.go @@ -314,10 +314,6 @@ func (varr VariableArrivalRate) Run(ctx context.Context, out chan<- stats.Sample startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, duration, gracefulStop) defer cancel() - ticker := &time.Ticker{} - if startTickerPeriod.Valid { - ticker = time.NewTicker(time.Duration(startTickerPeriod.Duration)) - } // Make sure the log and the progress bar have accurate information varr.logger.WithFields(logrus.Fields{ @@ -379,33 +375,68 @@ func (varr VariableArrivalRate) Run(ctx context.Context, out chan<- stats.Sample remainingUnplannedVUs := maxVUs - preAllocatedVUs rateChangesStream := varr.streamRateChanges(maxDurationCtx, startTime) + startIteration := func() error { + select { + case vu := <-vus: + // ideally, we get the VU from the buffer without any issues + go runIteration(vu) + default: + if remainingUnplannedVUs == 0 { + //TODO: emit an error metric? + varr.logger.Warningf("Insufficient VUs, reached %d active VUs and cannot allocate more", maxVUs) + break + } + vu, err := varr.executionState.GetUnplannedVU(maxDurationCtx, varr.logger) + if err != nil { + return err + } + remainingUnplannedVUs-- + atomic.AddUint64(&initialisedVUs, 1) + go runIteration(vu) + } + return nil + } + + var now time.Time + var lastTick = time.Now() + var ticker = &time.Ticker{} + if startTickerPeriod.Valid { + ticker = time.NewTicker(time.Duration(atomic.LoadInt64(tickerPeriod))) + } + var ch <-chan time.Time for { select { case rateChange := <-rateChangesStream: - newPeriod := rateChange.tickerPeriod ticker.Stop() - if newPeriod.Valid { - ticker = time.NewTicker(time.Duration(newPeriod.Duration)) - } - atomic.StoreInt64(tickerPeriod, int64(newPeriod.Duration)) - case <-ticker.C: select { - case vu := <-vus: - // ideally, we get the VU from the buffer without any issues - go runIteration(vu) + case <-ticker.C: default: - if remainingUnplannedVUs == 0 { - //TODO: emit an error metric? - varr.logger.Warningf("Insufficient VUs, reached %d active VUs and cannot allocate more", maxVUs) - break - } - vu, err := varr.executionState.GetUnplannedVU(maxDurationCtx, varr.logger) - if err != nil { - return err + } + now = time.Now() + newPeriod := rateChange.tickerPeriod + atomic.StoreInt64(tickerPeriod, int64(newPeriod.Duration)) + if newPeriod.Valid { + for lastTick.Before(now) { + nextIterationTime := time.Duration(newPeriod.Duration) - now.Sub(lastTick) + if nextIterationTime >= 0 { + ch = time.After(nextIterationTime) + break + } + lastTick = lastTick.Add(time.Duration(newPeriod.Duration)) + if err := startIteration(); err != nil { + return err + } } - remainingUnplannedVUs-- - atomic.AddUint64(&initialisedVUs, 1) - go runIteration(vu) + } + case lastTick = <-ch: + ch = nil + if err := startIteration(); err != nil { + return err + } + ticker = time.NewTicker(time.Duration(atomic.LoadInt64(tickerPeriod))) + case lastTick = <-ticker.C: + if err := startIteration(); err != nil { + return err } case <-regDurationDone: return nil diff --git a/lib/executor/variable_arrival_rate_test.go b/lib/executor/variable_arrival_rate_test.go index b0e4f4b6a52..e6d373ca79a 100644 --- a/lib/executor/variable_arrival_rate_test.go +++ b/lib/executor/variable_arrival_rate_test.go @@ -12,6 +12,7 @@ import ( "github.com/loadimpact/k6/lib/types" "github.com/loadimpact/k6/stats" "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" null "gopkg.in/guregu/null.v3" ) @@ -269,7 +270,7 @@ func TestVariableArrivalRateRunCorrectRate(t *testing.T) { currentCount = atomic.SwapInt64(&count, 0) // this is highly dependant on minIntervalBetweenRateAdjustments // TODO find out why this isn't 30 and fix it - require.InDelta(t, 23, currentCount, 2) + require.InDelta(t, 26, currentCount, 2) time.Sleep(time.Second) currentCount = atomic.SwapInt64(&count, 0) @@ -281,3 +282,44 @@ func TestVariableArrivalRateRunCorrectRate(t *testing.T) { require.NoError(t, err) require.Empty(t, logHook.Drain()) } + +func TestVariableArrivalRateRunCorrectRateWithSlowRate(t *testing.T) { + t.Parallel() + var count int64 + var now = time.Now() + var expectedTimes = []time.Duration{ + time.Millisecond * 2500, time.Second * 4, time.Millisecond * 5200} + var ctx, cancel, executor, logHook = setupExecutor( + t, VariableArrivalRateConfig{ + TimeUnit: types.NullDurationFrom(time.Second), + Stages: []Stage{ + { + Duration: types.NullDurationFrom(time.Second * 6), + Target: null.IntFrom(1), + }, + }, + PreAllocatedVUs: null.IntFrom(10), + MaxVUs: null.IntFrom(20), + }, + simpleRunner(func(ctx context.Context) error { + current := atomic.AddInt64(&count, 1) + if !assert.True(t, int(current) <= len(expectedTimes)) { + return nil + } + expectedTime := expectedTimes[current-1] + assert.WithinDuration(t, + now.Add(expectedTime), + time.Now(), + time.Millisecond*100, + "%d expectedTime %s", current, expectedTime, + ) + return nil + }), + ) + defer cancel() + var engineOut = make(chan stats.SampleContainer, 1000) + err := executor.Run(ctx, engineOut) + require.NoError(t, err) + require.Equal(t, int64(3), count) + require.Empty(t, logHook.Drain()) +}