Skip to content

Commit

Permalink
Merge 8d43a83 into 37cd20b
Browse files Browse the repository at this point in the history
  • Loading branch information
imiric authored Mar 7, 2023
2 parents 37cd20b + 8d43a83 commit fe9ff74
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 9 deletions.
9 changes: 6 additions & 3 deletions lib/executor/constant_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- metrics
<-waitOnProgressChannel
}()

vusPool := newActiveVUPool()
vusPool := newActiveVUPool(car.executionState)
defer func() {
// Make sure all VUs aren't executing iterations anymore, for the cancel()
// below to deactivate them.
Expand Down Expand Up @@ -264,7 +264,11 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- metrics
}()

returnVU := func(u lib.InitializedVU) {
car.executionState.ReturnVU(u, true)
// Return the VU without decreasing the global active VU counter, which
// is done in the goroutine started by activeVUPool.AddVU, whenever the
// VU finishes running an iteration. This results in a more accurate
// report of VUs that are _actually_ active.
car.executionState.ReturnVU(u, false)
activeVUsWg.Done()
}

Expand All @@ -275,7 +279,6 @@ func (car ConstantArrivalRate) Run(parentCtx context.Context, out chan<- metrics
maxDurationCtx, car.config.BaseConfig, returnVU,
car.nextIterationCounters,
))
car.executionState.ModCurrentlyActiveVUsCount(+1)
atomic.AddUint64(&activeVUsCount, 1)
vusPool.AddVU(maxDurationCtx, activeVU, runIterationBasic)
return activeVU
Expand Down
38 changes: 38 additions & 0 deletions lib/executor/constant_arrival_rate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,3 +354,41 @@ func TestConstantArrivalRateGlobalIters(t *testing.T) {
})
}
}

func TestConstantArrivalRateActiveVUs(t *testing.T) {
t.Parallel()

config := &ConstantArrivalRateConfig{
BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(0 * time.Second)},
TimeUnit: types.NullDurationFrom(time.Second),
Rate: null.IntFrom(10),
Duration: types.NullDurationFrom(950 * time.Millisecond),
PreAllocatedVUs: null.IntFrom(5),
MaxVUs: null.IntFrom(10),
}

var (
running int64
getCurrActiveVUs func() int64
runMx sync.Mutex
)

runner := simpleRunner(func(ctx context.Context, _ *lib.State) error {
runMx.Lock()
assert.Equal(t, atomic.AddInt64(&running, 1), getCurrActiveVUs())
runMx.Unlock()
// Block the VU to cause the executor to schedule more
<-ctx.Done()
return nil
})
test := setupExecutorTest(t, "", "", lib.Options{}, runner, config)
defer test.cancel()

getCurrActiveVUs = test.state.GetCurrentlyActiveVUsCount

engineOut := make(chan metrics.SampleContainer, 1000)
require.NoError(t, test.executor.Run(test.ctx, engineOut))

assert.GreaterOrEqual(t, running, int64(5))
assert.LessOrEqual(t, running, int64(10))
}
21 changes: 15 additions & 6 deletions lib/executor/ramping_arrival_rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- metrics
waitOnProgressChannel := make(chan struct{})
startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(parentCtx, duration, gracefulStop)

vusPool := newActiveVUPool()
vusPool := newActiveVUPool(varr.executionState)

defer func() {
// Make sure all VUs aren't executing iterations anymore, for the cancel()
Expand Down Expand Up @@ -382,7 +382,11 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- metrics
}()

returnVU := func(u lib.InitializedVU) {
varr.executionState.ReturnVU(u, true)
// Return the VU without decreasing the global active VU counter, which
// is done in the goroutine started by activeVUPool.AddVU, whenever the
// VU finishes running an iteration. This results in a more accurate
// report of VUs that are _actually_ active.
varr.executionState.ReturnVU(u, false)
activeVUsWg.Done()
}

Expand All @@ -394,7 +398,6 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- metrics
getVUActivationParams(
maxDurationCtx, varr.config.BaseConfig, returnVU,
varr.nextIterationCounters))
varr.executionState.ModCurrentlyActiveVUsCount(+1)
atomic.AddUint64(&activeVUsCount, 1)

vusPool.AddVU(maxDurationCtx, activeVU, runIterationBasic)
Expand Down Expand Up @@ -494,13 +497,15 @@ func (varr RampingArrivalRate) Run(parentCtx context.Context, out chan<- metrics
type activeVUPool struct {
iterations chan struct{}
running uint64
execState *lib.ExecutionState
wg sync.WaitGroup
}

// newActiveVUPool returns an activeVUPool.
func newActiveVUPool() *activeVUPool {
func newActiveVUPool(es *lib.ExecutionState) *activeVUPool {
return &activeVUPool{
iterations: make(chan struct{}),
execState: es,
}
}

Expand All @@ -521,8 +526,10 @@ func (p *activeVUPool) Running() uint64 {
return atomic.LoadUint64(&p.running)
}

// AddVU adds the active VU to the pool of VUs for handling the incoming requests.
// When a new request is accepted the runfn function is executed.
// AddVU adds the active VU to the pool of VUs for handling the incoming
// requests. When a new request is accepted the runfn function is executed. This
// is also when we change the global active VUs counter, since it results in a
// more accurate report of VUs that are _actually_ active.
func (p *activeVUPool) AddVU(ctx context.Context, avu lib.ActiveVU, runfn func(context.Context, lib.ActiveVU) bool) {
p.wg.Add(1)
ch := make(chan struct{})
Expand All @@ -532,7 +539,9 @@ func (p *activeVUPool) AddVU(ctx context.Context, avu lib.ActiveVU, runfn func(c
close(ch)
for range p.iterations {
atomic.AddUint64(&p.running, uint64(1))
p.execState.ModCurrentlyActiveVUsCount(+1)
runfn(ctx, avu)
p.execState.ModCurrentlyActiveVUsCount(-1)
atomic.AddUint64(&p.running, ^uint64(0))
}
}()
Expand Down
43 changes: 43 additions & 0 deletions lib/executor/ramping_arrival_rate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,3 +734,46 @@ func TestRampingArrivalRateCornerCase(t *testing.T) {

require.False(t, config.HasWork(et))
}

func TestRampingArrivalRateActiveVUs(t *testing.T) {
t.Parallel()

config := &RampingArrivalRateConfig{
BaseConfig: BaseConfig{GracefulStop: types.NullDurationFrom(0 * time.Second)},
TimeUnit: types.NullDurationFrom(time.Second),
StartRate: null.IntFrom(10),
Stages: []Stage{
{
Duration: types.NullDurationFrom(950 * time.Millisecond),
Target: null.IntFrom(20),
},
},
PreAllocatedVUs: null.IntFrom(5),
MaxVUs: null.IntFrom(10),
}

var (
running int64
getCurrActiveVUs func() int64
runMx sync.Mutex
)

runner := simpleRunner(func(ctx context.Context, _ *lib.State) error {
runMx.Lock()
assert.Equal(t, atomic.AddInt64(&running, 1), getCurrActiveVUs())
runMx.Unlock()
// Block the VU to cause the executor to schedule more
<-ctx.Done()
return nil
})
test := setupExecutorTest(t, "", "", lib.Options{}, runner, config)
defer test.cancel()

getCurrActiveVUs = test.state.GetCurrentlyActiveVUsCount

engineOut := make(chan metrics.SampleContainer, 1000)
require.NoError(t, test.executor.Run(test.ctx, engineOut))

assert.GreaterOrEqual(t, running, int64(5))
assert.LessOrEqual(t, running, int64(10))
}

0 comments on commit fe9ff74

Please sign in to comment.