diff --git a/lib/executor/variable_arrival_rate.go b/lib/executor/variable_arrival_rate.go index 24c8d8107ab..fa1864099bf 100644 --- a/lib/executor/variable_arrival_rate.go +++ b/lib/executor/variable_arrival_rate.go @@ -278,95 +278,110 @@ func (varc VariableArrivalRateConfig) cal(et *lib.ExecutionTuple, ch chan<- time } // Run executes a variable number of iterations per second. -func (varr VariableArrivalRate) Run(ctx context.Context, out chan<- stats.SampleContainer) (err error) { //nolint:funlen +func (varr VariableArrivalRate) Run(ctx context.Context, out chan<- stats.SampleContainer) (err error) { segment := varr.executionState.ExecutionTuple.Segment gracefulStop := varr.config.GetGracefulStop() duration := sumStagesDuration(varr.config.Stages) preAllocatedVUs := varr.config.GetPreAllocatedVUs(varr.executionState.ExecutionTuple) maxVUs := varr.config.GetMaxVUs(varr.executionState.ExecutionTuple) + vm := &vuManager{ + executionState: varr.executionState, + logger: varr.logger, + maxVUs: maxVUs, + activeVUs: make(chan lib.ActiveVU, maxVUs), + activeVUsWg: &sync.WaitGroup{}, + returnedVUs: make(chan struct{}), + makeUnplannedVUCh: make(chan struct{}), + runIterationBasic: getIterationRunner(varr.executionState, varr.logger), + remainingUnplannedVUs: maxVUs - preAllocatedVUs, + } + // TODO: refactor and simplify timeUnit := time.Duration(varr.config.TimeUnit.Duration) startArrivalRate := getScaledArrivalRate(segment, varr.config.StartRate.Int64, timeUnit) - maxUnscaledRate := getStagesUnscaledMaxTarget(varr.config.StartRate.Int64, varr.config.Stages) - maxArrivalRatePerSec, _ := getArrivalRatePerSec(getScaledArrivalRate(segment, maxUnscaledRate, timeUnit)).Float64() startTickerPeriod := getTickerPeriod(startArrivalRate) // Make sure the log and the progress bar have accurate information varr.logger.WithFields(logrus.Fields{ - "maxVUs": maxVUs, "preAllocatedVUs": preAllocatedVUs, "duration": duration, "numStages": len(varr.config.Stages), + "maxVUs": vm.maxVUs, "preAllocatedVUs": preAllocatedVUs, "duration": duration, "numStages": len(varr.config.Stages), "startTickerPeriod": startTickerPeriod.Duration, "type": varr.config.GetType(), }).Debug("Starting executor run...") - activeVUsWg := &sync.WaitGroup{} - - returnedVUs := make(chan struct{}) startTime, maxDurationCtx, regDurationCtx, cancel := getDurationContexts(ctx, duration, gracefulStop) + vm.ctx = maxDurationCtx // Pre-allocate the VUs local shared buffer defer func() { // Make sure all VUs aren't executing iterations anymore, for the cancel() // above to deactivate them. - <-returnedVUs + close(vm.makeUnplannedVUCh) + <-vm.returnedVUs cancel() - activeVUsWg.Wait() + vm.activeVUsWg.Wait() }() - activeVUs := make(chan lib.ActiveVU, maxVUs) - activeVUsCount := uint64(0) - - activationParams := getVUActivationParams(maxDurationCtx, varr.config.BaseConfig, - func(u lib.InitializedVU) { - varr.executionState.ReturnVU(u, true) - activeVUsWg.Done() - }) - activateVU := func(initVU lib.InitializedVU) lib.ActiveVU { - activeVUsWg.Add(1) - activeVU := initVU.Activate(activationParams) - varr.executionState.ModCurrentlyActiveVUsCount(+1) - atomic.AddUint64(&activeVUsCount, 1) - return activeVU + + go vm.unplannedVUMaking() + // Get the pre-allocated VUs in the local buffer + if err := vm.getPreallocatedVUs(preAllocatedVUs); err != nil { + return err } - remainingUnplannedVUs := maxVUs - preAllocatedVUs - makeUnplannedVUCh := make(chan struct{}) - defer close(makeUnplannedVUCh) - go func() { - defer close(returnedVUs) - defer func() { - // this is done here as to not have an unplannedVU in the middle of initialization when - // starting to return activeVUs - for i := uint64(0); i < atomic.LoadUint64(&activeVUsCount); i++ { - <-activeVUs - } - }() - for range makeUnplannedVUCh { - initVU, err := varr.executionState.GetUnplannedVU(maxDurationCtx, varr.logger) - if err != nil { - // TODO figure out how to return it to the Run goroutine - varr.logger.WithError(err).Error("Error while allocating unplanned VU") + tickerPeriod := int64(startTickerPeriod.Duration) + progressFn := varr.progressFn(vm, timeUnit, duration, startTime, &tickerPeriod) + varr.progress.Modify(pb.WithProgress(progressFn)) + go trackProgress(ctx, maxDurationCtx, regDurationCtx, varr, progressFn) + + timer := time.NewTimer(time.Hour) + start := time.Now() + ch := make(chan time.Duration, 10) // buffer 10 iteration times ahead + var prevTime time.Duration + go varr.config.cal(varr.executionState.ExecutionTuple, ch) + for nextTime := range ch { + atomic.StoreInt64(&tickerPeriod, int64(nextTime-prevTime)) + prevTime = nextTime + b := time.Until(start.Add(nextTime)) + if b > 0 { // TODO: have a minimal ? + timer.Reset(b) + select { + case <-timer.C: + case <-regDurationCtx.Done(): + return nil } - activeVUs <- activateVU(initVU) } - }() - // Get the pre-allocated VUs in the local buffer - for i := int64(0); i < preAllocatedVUs; i++ { - initVU, err := varr.executionState.GetPlannedVU(varr.logger, false) - if err != nil { - return err - } - activeVUs <- activateVU(initVU) + vm.startIteration() } + return nil +} - tickerPeriod := int64(startTickerPeriod.Duration) +type vuManager struct { // TODO: add a factory + executionState *lib.ExecutionState + ctx context.Context + activeVUsCount uint64 + activeVUs chan lib.ActiveVU + activeVUsWg *sync.WaitGroup + makeUnplannedVUCh chan struct{} + returnedVUs chan struct{} + remainingUnplannedVUs int64 + maxVUs int64 + logger *logrus.Entry + runIterationBasic func(context.Context, lib.ActiveVU) +} - vusFmt := pb.GetFixedLengthIntFormat(maxVUs) - itersFmt := pb.GetFixedLengthFloatFormat(maxArrivalRatePerSec, 0) + " iters/s" +func (varr VariableArrivalRate) progressFn( + vm *vuManager, timeUnit, duration time.Duration, startTime time.Time, tickerPeriod *int64, +) func() (float64, []string) { + segment := varr.executionState.ExecutionTuple.Segment - progresFn := func() (float64, []string) { - currActiveVUs := atomic.LoadUint64(&activeVUsCount) - currentTickerPeriod := atomic.LoadInt64(&tickerPeriod) - vusInBuffer := uint64(len(activeVUs)) + maxUnscaledRate := getStagesUnscaledMaxTarget(varr.config.StartRate.Int64, varr.config.Stages) + maxArrivalRatePerSec, _ := getArrivalRatePerSec(getScaledArrivalRate(segment, maxUnscaledRate, timeUnit)).Float64() + vusFmt := pb.GetFixedLengthIntFormat(vm.maxVUs) + itersFmt := pb.GetFixedLengthFloatFormat(maxArrivalRatePerSec, 0) + " iters/s" + return func() (float64, []string) { + currActiveVUs := atomic.LoadUint64(&vm.activeVUsCount) + currentTickerPeriod := atomic.LoadInt64(tickerPeriod) + vusInBuffer := uint64(len(vm.activeVUs)) progVUs := fmt.Sprintf(vusFmt+"/"+vusFmt+" VUs", currActiveVUs-vusInBuffer, currActiveVUs) @@ -389,61 +404,75 @@ func (varr VariableArrivalRate) Run(ctx context.Context, out chan<- stats.Sample return math.Min(1, float64(spent)/float64(duration)), right } +} - varr.progress.Modify(pb.WithProgress(progresFn)) - go trackProgress(ctx, maxDurationCtx, regDurationCtx, varr, progresFn) - - regDurationDone := regDurationCtx.Done() - runIterationBasic := getIterationRunner(varr.executionState, varr.logger) - runIteration := func(vu lib.ActiveVU) { - runIterationBasic(maxDurationCtx, vu) - activeVUs <- vu - } - - timer := time.NewTimer(time.Hour) - start := time.Now() - ch := make(chan time.Duration, 10) // buffer 10 iteration times ahead - var prevTime time.Duration - go varr.config.cal(varr.executionState.ExecutionTuple, ch) - for nextTime := range ch { - select { - case <-regDurationDone: - return nil - default: +func (vm *vuManager) unplannedVUMaking() { + defer close(vm.returnedVUs) + defer func() { + // this is done here as to not have an unplannedVU in the middle of initialization when + // starting to return activeVUs + for i := uint64(0); i < atomic.LoadUint64(&vm.activeVUsCount); i++ { + <-vm.activeVUs } - atomic.StoreInt64(&tickerPeriod, int64(nextTime-prevTime)) - prevTime = nextTime - b := time.Until(start.Add(nextTime)) - if b > 0 { // TODO: have a minimal ? - timer.Reset(b) - select { - case <-timer.C: - case <-regDurationDone: - return nil - } + }() + for range vm.makeUnplannedVUCh { + initVU, err := vm.executionState.GetUnplannedVU(vm.ctx, vm.logger) + if err != nil { + // TODO figure out how to return it to the Run goroutine + vm.logger.WithError(err).Error("Error while allocating unplanned VU") } + vm.activeVUs <- vm.activateVU(initVU) + } +} - var vu lib.ActiveVU - select { - case vu = <-activeVUs: - // ideally, we get the VU from the buffer without any issues - default: - if remainingUnplannedVUs == 0 { - // TODO: emit an error metric? - varr.logger.Warningf("Insufficient VUs, reached %d active VUs and cannot allocate more", maxVUs) - continue - } - +func (vm *vuManager) startIteration() { + var vu lib.ActiveVU + select { + case vu = <-vm.activeVUs: + // ideally, we get the VU from the buffer without any issues + default: + if vm.remainingUnplannedVUs == 0 { + //TODO: emit an error metric? + vm.logger.Warningf("Insufficient VUs, reached %d active VUs and cannot allocate more", vm.maxVUs) + vu = <-vm.activeVUs // we just block while waiting to get a free vu + } else { select { - case makeUnplannedVUCh <- struct{}{}: + case vm.makeUnplannedVUCh <- struct{}{}: // this is the only goroutine that touches remainingUnplannedVUs and if we didn't // send on the channel no new unplannedVU will be stared so no need to decrease it - remainingUnplannedVUs-- - vu = <-activeVUs // just get any VU that gets activated, whether it is the unplanned or not doesn't matter - case vu = <-activeVUs: // a VU got freed while were waiting to start a new unplanned one + vm.remainingUnplannedVUs-- + vu = <-vm.activeVUs // just get any VU that gets activated, whether it is the unplanned or not doesn't matter + case vu = <-vm.activeVUs: // a VU got freed while were waiting to start a new unplanned one } } - go runIteration(vu) + } + go func(vu lib.ActiveVU) { + vm.runIterationBasic(vm.ctx, vu) + vm.activeVUs <- vu + }(vu) +} + +func (vm *vuManager) getPreallocatedVUs(preAllocatedVUs int64) error { + for i := int64(0); i < preAllocatedVUs; i++ { + initVU, err := vm.executionState.GetPlannedVU(vm.logger, false) + if err != nil { + return err + } + vm.activeVUs <- vm.activateVU(initVU) } return nil } + +func (vm *vuManager) activateVU(initVU lib.InitializedVU) lib.ActiveVU { + vm.activeVUsWg.Add(1) + activeVU := initVU.Activate(&lib.VUActivationParams{ + RunContext: vm.ctx, + DeactivateCallback: func(initVU lib.InitializedVU) { + vm.executionState.ReturnVU(initVU, true) + vm.activeVUsWg.Done() + }, + }) + vm.executionState.ModCurrentlyActiveVUsCount(+1) + atomic.AddUint64(&vm.activeVUsCount, 1) + return activeVU +}