Skip to content

Commit

Permalink
Handle external test aborts outside of the Engine
Browse files Browse the repository at this point in the history
  • Loading branch information
na-- committed Dec 8, 2022
1 parent 2c67153 commit 577089f
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 73 deletions.
16 changes: 13 additions & 3 deletions api/v1/status_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -119,16 +121,24 @@ func TestPatchStatus(t *testing.T) {
defer engine.OutputManager.StopOutputs(nil)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
run, wait, err := engine.Init(ctx, ctx)
defer cancel()
runSubCtx, runSubAbort := execution.NewTestRunContext(ctx, testState.Logger)
engine.AbortFn = runSubAbort

run, wait, err := engine.Init(ctx, runSubCtx)
require.NoError(t, err)

wg := &sync.WaitGroup{}
wg.Add(1)
defer func() {
cancel()
runSubAbort(fmt.Errorf("custom cancel signal"))
wait()
wg.Wait()
}()

go func() {
assert.ErrorContains(t, run(), "test run aborted by signal")
assert.ErrorContains(t, run(), "custom cancel signal")
wg.Done()
}()
// wait for the executor to initialize to avoid a potential data race below
time.Sleep(200 * time.Millisecond)
Expand Down
2 changes: 1 addition & 1 deletion cmd/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ func TestAbortedByUserWithGoodThresholds(t *testing.T) {

logs := ts.loggerHook.Drain()
assert.False(t, testutils.LogContains(logs, logrus.ErrorLevel, `some thresholds have failed`))
assert.True(t, testutils.LogContains(logs, logrus.ErrorLevel, `test run aborted by signal`))
assert.True(t, testutils.LogContains(logs, logrus.ErrorLevel, `test run was aborted because k6 received a 'interrupt' signal`))
stdOut := ts.stdOut.String()
t.Log(stdOut)
assert.Contains(t, stdOut, `✓ iterations`)
Expand Down
13 changes: 11 additions & 2 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
defer runCancel()

logger := testRunState.Logger
runSubCtx, runSubAbort := execution.NewTestRunContext(runCtx, logger)

// Create a local execution scheduler wrapping the runner.
logger.Debug("Initializing the execution scheduler...")
execScheduler, err := execution.NewScheduler(testRunState)
Expand Down Expand Up @@ -113,6 +115,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
if err != nil {
return err
}
engine.AbortFn = runSubAbort

// Spin up the REST API server, if not disabled.
if c.gs.flags.address != "" {
Expand Down Expand Up @@ -167,7 +170,13 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
// Trap Interrupts, SIGINTs and SIGTERMs.
gracefulStop := func(sig os.Signal) {
logger.WithField("sig", sig).Debug("Stopping k6 in response to signal...")
lingerCancel() // stop the test run, metric processing is cancelled below
// first abort the test run this way, to propagate the error
runSubAbort(errext.WithAbortReasonIfNone(
errext.WithExitCodeIfNone(
fmt.Errorf("test run was aborted because k6 received a '%s' signal", sig), exitcodes.ExternalAbort,
), errext.AbortedByUser,
))
lingerCancel() // cancel this context as well, since the user did Ctrl+C
}
onHardStop := func(sig os.Signal) {
logger.WithField("sig", sig).Error("Aborting k6 in response to signal")
Expand All @@ -178,7 +187,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {

// Initialize the engine
initBar.Modify(pb.WithConstProgress(0, "Init VUs..."))
engineRun, engineWait, err := engine.Init(globalCtx, runCtx)
engineRun, engineWait, err := engine.Init(globalCtx, runSubCtx)
if err != nil {
err = common.UnwrapGojaInterruptedError(err)
// Add a generic engine exit code if we don't have a more specific one
Expand Down
69 changes: 17 additions & 52 deletions core/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Engine struct {
logger *logrus.Entry
stopOnce sync.Once
stopChan chan struct{}
abortFn func(error) // temporary
AbortFn func(error) // temporary

Samples chan metrics.SampleContainer
}
Expand Down Expand Up @@ -107,24 +107,19 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait
}

// TODO: move all of this in a separate struct? see main TODO above
runSubCtx, runSubAbort := execution.NewTestRunContext(runCtx, e.logger)
e.abortFn = runSubAbort

execRunResult := make(chan error)
engineRunResult := make(chan error)
processMetricsAfterRun := make(chan struct{})
runFn := func() error {
e.logger.Debug("Execution scheduler starting...")
err := e.ExecutionScheduler.Run(globalCtx, runSubCtx, e.Samples)
e.logger.WithError(err).Debug("Execution scheduler terminated")

select {
case <-runCtx.Done():
// do nothing, the test run was aborted somehow
default:
execRunResult <- err // we finished normally, so send the result
err := e.ExecutionScheduler.Run(globalCtx, runCtx, e.Samples)
if err == nil {
e.logger.Debug("Execution scheduler finished nominally")
err = runCtx.Err()
}
if err != nil {
e.logger.WithError(err).Debug("Engine run returned an error")
} else {
e.logger.Debug("Execution scheduler and engine finished nominally")
}
result := <-engineRunResult // get the final result

// Make the background jobs process the currently buffered metrics and
// run the thresholds, then wait for that to be done.
Expand All @@ -134,12 +129,10 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait
case <-globalCtx.Done():
}

return result
return err
}

waitFn := e.startBackgroundProcesses(
globalCtx, runCtx, execRunResult, engineRunResult, runSubAbort, processMetricsAfterRun,
)
waitFn := e.startBackgroundProcesses(globalCtx, processMetricsAfterRun)
return runFn, waitFn, nil
}

Expand All @@ -153,41 +146,15 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait
// and that the remaining metrics samples in the pipeline should be processed as the background
// process is about to exit.
func (e *Engine) startBackgroundProcesses(
globalCtx, runCtx context.Context, execRunResult, engineRunResult chan error,
runSubAbort func(error), processMetricsAfterRun chan struct{},
globalCtx context.Context, processMetricsAfterRun chan struct{},
) (wait func()) {
processes := new(sync.WaitGroup)

// Siphon and handle all produced metric samples
processes.Add(1)
go func() {
defer processes.Done()
e.processMetrics(globalCtx, processMetricsAfterRun, runSubAbort)
}()

// Update the test run status when the test finishes
processes.Add(1)
go func() {
defer processes.Done()
var err error
defer func() {
e.logger.WithError(err).Debug("Final Engine.Run() result")
engineRunResult <- err
}()
select {
case err = <-execRunResult:
if err != nil {
e.logger.WithError(err).Debug("run: execution scheduler returned an error")
} else {
e.logger.Debug("run: execution scheduler finished nominally")
}
case <-runCtx.Done():
e.logger.Debug("run: context expired; exiting...")
err = errext.WithAbortReasonIfNone(
errext.WithExitCodeIfNone(errors.New("test run aborted by signal"), exitcodes.ExternalAbort),
errext.AbortedByUser,
)
}
e.processMetrics(globalCtx, processMetricsAfterRun)
}()

return processes.Wait
Expand All @@ -200,15 +167,13 @@ func (e *Engine) startBackgroundProcesses(
// The `processMetricsAfterRun` channel argument is used by the caller to signal
// that the test run is finished, no more metric samples will be produced, and that
// the metrics samples remaining in the pipeline should be should be processed.
func (e *Engine) processMetrics(
globalCtx context.Context, processMetricsAfterRun chan struct{}, runSubAbort func(error),
) {
func (e *Engine) processMetrics(globalCtx context.Context, processMetricsAfterRun chan struct{}) {
sampleContainers := []metrics.SampleContainer{}

// Run thresholds, if not disabled.
var finalizeThresholds func() (breached []string)
if !e.runtimeOptions.NoThresholds.Bool {
finalizeThresholds = e.MetricsEngine.StartThresholdCalculations(runSubAbort)
finalizeThresholds = e.MetricsEngine.StartThresholdCalculations(e.AbortFn)
}

ticker := time.NewTicker(collectRate)
Expand Down Expand Up @@ -275,7 +240,7 @@ func (e *Engine) Stop() {
errext.WithExitCodeIfNone(errors.New("test run stopped from the REST API"), exitcodes.ScriptStoppedFromRESTAPI),
errext.AbortedByUser,
)
e.abortFn(err)
e.AbortFn(err)
close(e.stopChan)
})
}
Expand Down
33 changes: 18 additions & 15 deletions core/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ const isWindows = runtime.GOOS == "windows"
// TODO: completely rewrite all of these tests

type testStruct struct {
engine *Engine
run func() error
runCancel func()
wait func()
piState *lib.TestPreInitState
engine *Engine
run func() error
runAbort func(error)
wait func()
piState *lib.TestPreInitState
}

func getTestPreInitState(tb testing.TB) *lib.TestPreInitState {
Expand Down Expand Up @@ -99,16 +99,19 @@ func newTestEngineWithTestPreInitState( //nolint:golint
} else {
runCtx, runCancel = context.WithCancel(globalCtx)
}
run, waitFn, err := engine.Init(globalCtx, runCtx)
runSubCtx, runSubAbort := execution.NewTestRunContext(runCtx, piState.Logger)
engine.AbortFn = runSubAbort

run, waitFn, err := engine.Init(globalCtx, runSubCtx)
require.NoError(t, err)

var test *testStruct
test = &testStruct{
engine: engine,
run: run,
runCancel: runCancel,
engine: engine,
run: run,
runAbort: runSubAbort,
wait: func() {
test.runCancel()
runCancel()
globalCancel()
waitFn()
engine.OutputManager.StopOutputs(nil)
Expand Down Expand Up @@ -143,7 +146,7 @@ func TestEngineRun(t *testing.T) {
defer test.wait()

startTime := time.Now()
assert.ErrorContains(t, test.run(), "test run aborted by signal")
assert.ErrorContains(t, test.run(), "context deadline exceeded")
assert.WithinDuration(t, startTime.Add(duration), time.Now(), 100*time.Millisecond)
<-done
})
Expand Down Expand Up @@ -193,8 +196,8 @@ func TestEngineRun(t *testing.T) {
errC := make(chan error)
go func() { errC <- test.run() }()
<-signalChan
test.runCancel()
assert.ErrorContains(t, <-errC, "test run aborted by signal")
test.runAbort(fmt.Errorf("custom error"))
assert.ErrorContains(t, <-errC, "custom error")
test.wait()

found := 0
Expand Down Expand Up @@ -1217,7 +1220,7 @@ func TestEngineRunsTeardownEvenAfterTestRunIsAborted(t *testing.T) {
var test *testStruct
runner := &minirunner.MiniRunner{
Fn: func(_ context.Context, _ *lib.State, _ chan<- metrics.SampleContainer) error {
test.runCancel() // we cancel the run immediately after the test starts
test.runAbort(fmt.Errorf("custom error")) // we cancel the run immediately after the test starts
return nil
},
TeardownFn: func(_ context.Context, out chan<- metrics.SampleContainer) error {
Expand All @@ -1238,7 +1241,7 @@ func TestEngineRunsTeardownEvenAfterTestRunIsAborted(t *testing.T) {
VUs: null.IntFrom(1), Iterations: null.IntFrom(1),
}, piState)

assert.ErrorContains(t, test.run(), "test run aborted by signal")
assert.ErrorContains(t, test.run(), "custom error")
test.wait()

var count float64
Expand Down

0 comments on commit 577089f

Please sign in to comment.