diff --git a/lib/executor/variable_arrival_rate.go b/lib/executor/variable_arrival_rate.go index a6b94412997..d255b607259 100644 --- a/lib/executor/variable_arrival_rate.go +++ b/lib/executor/variable_arrival_rate.go @@ -178,7 +178,19 @@ func (varc VariableArrivalRateConfig) getPlannedRateChanges(segment *lib.Executi // the rational value for scale(20%, 1/sec) is 0.2/sec, or rather 1/5sec... currentRate := getScaledArrivalRate(segment, varc.StartRate.Int64, timeUnit) - rateChanges := []rateChange{} + var size int + for i, stage := range varc.Stages { + if stage.Duration.Duration == 0 && + (i == 0 || + (i >= 2 && varc.Stages[i-1].Target.Int64 != stage.Target.Int64 && + varc.Stages[i-1].Target.Int64 == varc.Stages[i-2].Target.Int64)) { + size++ + } else if i == 0 || varc.Stages[i-1].Target.Int64 != stage.Target.Int64 { + size += int(time.Duration(stage.Duration.Duration) / minIntervalBetweenRateAdjustments) + } + } + + rateChanges := make([]rateChange, 0, size) timeFromStart := time.Duration(0) for _, stage := range varc.Stages { diff --git a/lib/executor/variable_arrival_rate_test.go b/lib/executor/variable_arrival_rate_test.go index e7ec602a1b8..2b0fcabbee9 100644 --- a/lib/executor/variable_arrival_rate_test.go +++ b/lib/executor/variable_arrival_rate_test.go @@ -32,16 +32,94 @@ func TestGetPlannedRateChanges0DurationStage(t *testing.T) { Duration: types.NullDurationFrom(time.Minute), Target: null.IntFrom(50), }, + { + Duration: types.NullDurationFrom(0), + Target: null.IntFrom(100), + }, + { + Duration: types.NullDurationFrom(time.Minute), + Target: null.IntFrom(100), + }, }, } - var v *lib.ExecutionSegment - changes := config.getPlannedRateChanges(v) - require.Equal(t, 1, len(changes)) + var es *lib.ExecutionSegment + changes := config.getPlannedRateChanges(es) + require.Equal(t, len(changes), cap(changes)) + require.Equal(t, 2, len(changes)) require.Equal(t, time.Duration(0), changes[0].timeOffset) require.Equal(t, types.NullDurationFrom(time.Millisecond*20), changes[0].tickerPeriod) + + require.Equal(t, time.Minute, changes[1].timeOffset) + require.Equal(t, types.NullDurationFrom(time.Millisecond*10), changes[1].tickerPeriod) +} + +// helper function to calculate the expected rate change at a given time +func calculateTickerPeriod(current, start, duration time.Duration, from, to int64) types.Duration { + var coef = big.NewRat( + (current - start).Nanoseconds(), + duration.Nanoseconds(), + ) + + var oneRat = new(big.Rat).Mul(big.NewRat(from-to, 1), coef) + oneRat = new(big.Rat).Sub(big.NewRat(from, 1), oneRat) + oneRat = new(big.Rat).Mul(big.NewRat(int64(time.Second), 1), new(big.Rat).Inv(oneRat)) + return types.Duration(new(big.Int).Div(oneRat.Num(), oneRat.Denom()).Int64()) +} + +func TestGetPlannedRateChangesZeroDurationStart(t *testing.T) { + // TODO: Make multiple of those tests + t.Parallel() + var config = VariableArrivalRateConfig{ + TimeUnit: types.NullDurationFrom(time.Second), + StartRate: null.IntFrom(0), + Stages: []Stage{ + { + Duration: types.NullDurationFrom(0), + Target: null.IntFrom(50), + }, + { + Duration: types.NullDurationFrom(time.Minute), + Target: null.IntFrom(50), + }, + { + Duration: types.NullDurationFrom(0), + Target: null.IntFrom(100), + }, + { + Duration: types.NullDurationFrom(time.Minute), + Target: null.IntFrom(100), + }, + { + Duration: types.NullDurationFrom(time.Minute), + Target: null.IntFrom(0), + }, + }, + } + + var es *lib.ExecutionSegment + changes := config.getPlannedRateChanges(es) + require.Equal(t, len(changes), cap(changes)) + var expectedTickerPeriod types.Duration + for i, change := range changes { + switch { + case change.timeOffset == 0: + expectedTickerPeriod = types.Duration(20 * time.Millisecond) + case change.timeOffset == time.Minute*1: + expectedTickerPeriod = types.Duration(10 * time.Millisecond) + case change.timeOffset < time.Minute*3: + expectedTickerPeriod = calculateTickerPeriod(change.timeOffset, 2*time.Minute, time.Minute, 100, 0) + case change.timeOffset == time.Minute*3: + expectedTickerPeriod = 0 + default: + t.Fatalf("this shouldn't happen %d index %+v", i, change) + } + require.Equal(t, time.Duration(0), + change.timeOffset%minIntervalBetweenRateAdjustments, "%d index %+v", i, change) + require.Equal(t, change.tickerPeriod.Duration, expectedTickerPeriod, "%d index %+v", i, change) + } } -func TestGetPlannedRateChanges(t *testing.T) { +func TestGetPlannedRateChanges2(t *testing.T) { // TODO: Make multiple of those tests t.Parallel() var config = VariableArrivalRateConfig{ @@ -72,30 +150,20 @@ func TestGetPlannedRateChanges(t *testing.T) { }, } - var v *lib.ExecutionSegment - changes := config.getPlannedRateChanges(v) - c := func(change rateChange, start, duration time.Duration, from, to int64) types.Duration { - var coef = big.NewRat( - (change.timeOffset - start).Nanoseconds(), - duration.Nanoseconds(), - ) - - var oneRat = new(big.Rat).Mul(big.NewRat(from-to, 1), coef) - oneRat = new(big.Rat).Sub(big.NewRat(from, 1), oneRat) - oneRat = new(big.Rat).Mul(big.NewRat(int64(time.Second), 1), new(big.Rat).Inv(oneRat)) - return types.Duration(new(big.Int).Div(oneRat.Num(), oneRat.Denom()).Int64()) - } + var es *lib.ExecutionSegment + changes := config.getPlannedRateChanges(es) + require.Equal(t, len(changes), cap(changes)) var expectedTickerPeriod types.Duration for i, change := range changes { switch { case change.timeOffset <= time.Minute*2: - expectedTickerPeriod = c(change, 0, time.Minute*2, 0, 50) + expectedTickerPeriod = calculateTickerPeriod(change.timeOffset, 0, time.Minute*2, 0, 50) case change.timeOffset < time.Minute*4: - expectedTickerPeriod = c(change, time.Minute*3, time.Minute, 50, 100) + expectedTickerPeriod = calculateTickerPeriod(change.timeOffset, time.Minute*3, time.Minute, 50, 100) case change.timeOffset == time.Minute*4: expectedTickerPeriod = types.Duration(5 * time.Millisecond) default: - expectedTickerPeriod = c(change, 4*time.Minute, 23*time.Second, 200, 50) + expectedTickerPeriod = calculateTickerPeriod(change.timeOffset, 4*time.Minute, 23*time.Second, 200, 50) } require.Equal(t, time.Duration(0), change.timeOffset%minIntervalBetweenRateAdjustments, "%d index %+v", i, change) @@ -123,13 +191,14 @@ func BenchmarkGetPlannedRateChanges(b *testing.B) { }, } - var v *lib.ExecutionSegment + var es *lib.ExecutionSegment b.RunParallel(func(pb *testing.PB) { for pb.Next() { - changes := config.getPlannedRateChanges(v) + changes := config.getPlannedRateChanges(es) require.Equal(b, time.Duration(0), changes[0].timeOffset%minIntervalBetweenRateAdjustments, "%+v", changes[0]) + require.Equal(b, len(changes), cap(changes)) } }) }