diff --git a/api/v1/setup_teardown_routes_test.go b/api/v1/setup_teardown_routes_test.go index fa61aeecb14..8829755c901 100644 --- a/api/v1/setup_teardown_routes_test.go +++ b/api/v1/setup_teardown_routes_test.go @@ -142,7 +142,7 @@ func TestSetupData(t *testing.T) { require.NoError(t, err) require.NoError(t, engine.OutputManager.StartOutputs()) - defer engine.OutputManager.StopOutputs() + defer engine.OutputManager.StopOutputs(nil) globalCtx, globalCancel := context.WithCancel(context.Background()) runCtx, runCancel := context.WithCancel(globalCtx) diff --git a/api/v1/status_routes_test.go b/api/v1/status_routes_test.go index 685a2c25737..03ce1f2d4f5 100644 --- a/api/v1/status_routes_test.go +++ b/api/v1/status_routes_test.go @@ -116,7 +116,7 @@ func TestPatchStatus(t *testing.T) { require.NoError(t, err) require.NoError(t, engine.OutputManager.StartOutputs()) - defer engine.OutputManager.StopOutputs() + defer engine.OutputManager.StopOutputs(nil) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) run, wait, err := engine.Init(ctx, ctx) diff --git a/cmd/cloud.go b/cmd/cloud.go index 30104f9dd06..796adce7de5 100644 --- a/cmd/cloud.go +++ b/cmd/cloud.go @@ -63,6 +63,7 @@ func (c *cmdCloud) preRun(cmd *cobra.Command, args []string) error { } // TODO: split apart some more +// //nolint:funlen,gocognit,cyclop func (c *cmdCloud) run(cmd *cobra.Command, args []string) error { printBanner(c.gs) diff --git a/cmd/run.go b/cmd/run.go index fac3c66e17a..c2060c86527 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -37,7 +37,7 @@ type cmdRun struct { // TODO: split apart some more // //nolint:funlen,gocognit,gocyclo,cyclop -func (c *cmdRun) run(cmd *cobra.Command, args []string) error { +func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { printBanner(c.gs) test, err := loadAndConfigureTest(c.gs, cmd, args, getConfig) @@ -159,7 +159,9 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) error { if err != nil { return err } - defer engine.OutputManager.StopOutputs() + defer func() { + engine.OutputManager.StopOutputs(err) + }() printExecutionDescription( c.gs, "local", args[0], "", conf, execScheduler.GetState().ExecutionTuple, executionPlan, outputs, @@ -274,7 +276,9 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) error { } else { logger.Error("some thresholds have failed") // log this, even if there was already a previous error } - err = errext.WithExitCodeIfNone(err, exitcodes.ThresholdsHaveFailed) + err = errext.WithAbortReasonIfNone( + errext.WithExitCodeIfNone(err, exitcodes.ThresholdsHaveFailed), errext.AbortedByThresholdsAfterTestEnd, + ) } return err } diff --git a/cmd/tests/cmd_test.go b/cmd/tests/cmd_test.go index 7fcffd7e06f..c5818338068 100644 --- a/cmd/tests/cmd_test.go +++ b/cmd/tests/cmd_test.go @@ -1403,7 +1403,7 @@ func TestMinIterationDuration(t *testing.T) { import { Counter } from 'k6/metrics'; export let options = { - minIterationDuration: '5s', + minIterationDuration: '7s', setupTimeout: '2s', teardownTimeout: '2s', thresholds: { @@ -1423,9 +1423,9 @@ func TestMinIterationDuration(t *testing.T) { start := time.Now() cmd.ExecuteWithGlobalState(ts.GlobalState) elapsed := time.Since(start) - assert.Greater(t, elapsed, 5*time.Second, "expected more time to have passed because of minIterationDuration") + assert.Greater(t, elapsed, 7*time.Second, "expected more time to have passed because of minIterationDuration") assert.Less( - t, elapsed, 10*time.Second, + t, elapsed, 14*time.Second, "expected less time to have passed because minIterationDuration should not affect setup() and teardown() ", ) diff --git a/core/engine.go b/core/engine.go index 7afcf1660cf..5e31a567a4f 100644 --- a/core/engine.go +++ b/core/engine.go @@ -8,7 +8,6 @@ import ( "github.com/sirupsen/logrus" - "go.k6.io/k6/cloudapi" "go.k6.io/k6/errext" "go.k6.io/k6/errext/exitcodes" "go.k6.io/k6/lib" @@ -108,7 +107,6 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait // TODO: if we ever need metrics processing in the init context, we can move // this below the other components... or even start them concurrently? if err := e.ExecutionScheduler.Init(runCtx, e.Samples); err != nil { - e.setRunStatusFromError(err) return nil, nil, err } @@ -148,18 +146,6 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait return runFn, waitFn, nil } -func (e *Engine) setRunStatusFromError(err error) { - var serr errext.Exception - switch { - case errors.As(err, &serr): - e.OutputManager.SetRunStatus(cloudapi.RunStatusAbortedScriptError) - case errext.IsInterruptError(err): - e.OutputManager.SetRunStatus(cloudapi.RunStatusAbortedUser) - default: - e.OutputManager.SetRunStatus(cloudapi.RunStatusAbortedSystem) - } -} - // This starts a bunch of goroutines to process metrics, thresholds, and set the // test run status when it ends. It returns a function that can be used after // the provided context is called, to wait for the complete winding down of all @@ -196,27 +182,31 @@ func (e *Engine) startBackgroundProcesses( case err = <-execRunResult: if err != nil { e.logger.WithError(err).Debug("run: execution scheduler returned an error") - e.setRunStatusFromError(err) } else { e.logger.Debug("run: execution scheduler finished nominally") - e.OutputManager.SetRunStatus(cloudapi.RunStatusFinished) } // do nothing, return the same err value we got from the Run() // ExecutionScheduler result, we just set the run_status based on it case <-runCtx.Done(): e.logger.Debug("run: context expired; exiting...") - e.OutputManager.SetRunStatus(cloudapi.RunStatusAbortedUser) - err = errext.WithExitCodeIfNone(errors.New("test run aborted by signal"), exitcodes.ExternalAbort) + err = errext.WithAbortReasonIfNone( + errext.WithExitCodeIfNone(errors.New("test run aborted by signal"), exitcodes.ExternalAbort), + errext.AbortedByUser, + ) case <-e.stopChan: runSubCancel() e.logger.Debug("run: stopped by user via REST API; exiting...") - e.OutputManager.SetRunStatus(cloudapi.RunStatusAbortedUser) - err = errext.WithExitCodeIfNone(errors.New("test run stopped from the REST API"), exitcodes.ScriptStoppedFromRESTAPI) + err = errext.WithAbortReasonIfNone( + errext.WithExitCodeIfNone(errors.New("test run stopped from the REST API"), exitcodes.ScriptStoppedFromRESTAPI), + errext.AbortedByUser, + ) case <-thresholdAbortChan: e.logger.Debug("run: stopped by thresholds; exiting...") runSubCancel() - e.OutputManager.SetRunStatus(cloudapi.RunStatusAbortedThreshold) - err = errext.WithExitCodeIfNone(errors.New("test run aborted by failed thresholds"), exitcodes.ThresholdsHaveFailed) + err = errext.WithAbortReasonIfNone( + errext.WithExitCodeIfNone(errors.New("test run aborted by failed thresholds"), exitcodes.ThresholdsHaveFailed), + errext.AbortedByThreshold, + ) } }() diff --git a/core/engine_test.go b/core/engine_test.go index fb22d048df3..d06962dd242 100644 --- a/core/engine_test.go +++ b/core/engine_test.go @@ -109,7 +109,7 @@ func newTestEngineWithTestPreInitState( //nolint:golint test.runCancel() globalCancel() waitFn() - engine.OutputManager.StopOutputs() + engine.OutputManager.StopOutputs(nil) }, piState: piState, } @@ -1333,7 +1333,7 @@ func TestActiveVUsCount(t *testing.T) { require.NoError(t, err) cancel() waitFn() - engine.OutputManager.StopOutputs() + engine.OutputManager.StopOutputs(nil) require.False(t, engine.IsTainted()) } diff --git a/errext/abort_reason.go b/errext/abort_reason.go new file mode 100644 index 00000000000..a9452f92be7 --- /dev/null +++ b/errext/abort_reason.go @@ -0,0 +1,54 @@ +package errext + +import "errors" + +// AbortReason is used to signal to outputs what type of error caused the test +// run to be stopped prematurely. +type AbortReason uint8 + +// These are the various reasons why a test might have been aborted prematurely. +const ( + AbortedByUser AbortReason = iota + 1 + AbortedByThreshold + AbortedByThresholdsAfterTestEnd // TODO: rename? + AbortedByScriptError + AbortedByScriptAbort + AbortedByTimeout +) + +// HasAbortReason is a wrapper around an error with an attached abort reason. +type HasAbortReason interface { + error + AbortReason() AbortReason +} + +// WithAbortReasonIfNone can attach an abort reason to the given error, if it +// doesn't have one already. It won't do anything if the error already had an +// abort reason attached. Similarly, if there is no error (i.e. the given error +// is nil), it also won't do anything and will return nil. +func WithAbortReasonIfNone(err error, abortReason AbortReason) error { + if err == nil { + return nil // No error, do nothing + } + var arerr HasAbortReason + if errors.As(err, &arerr) { + // The given error already has an abort reason, do nothing + return err + } + return withAbortReason{err, abortReason} +} + +type withAbortReason struct { + error + abortReason AbortReason +} + +func (ar withAbortReason) Unwrap() error { + return ar.error +} + +func (ar withAbortReason) AbortReason() AbortReason { + return ar.abortReason +} + +var _ HasAbortReason = withAbortReason{} diff --git a/errext/exception.go b/errext/exception.go index 77d33f4afeb..cce7260de04 100644 --- a/errext/exception.go +++ b/errext/exception.go @@ -5,5 +5,6 @@ package errext // a stack trace that lead to them. type Exception interface { error + HasAbortReason StackTrace() string } diff --git a/errext/interrupt_error.go b/errext/interrupt_error.go index 637622daed2..77a6e649163 100644 --- a/errext/interrupt_error.go +++ b/errext/interrupt_error.go @@ -11,7 +11,10 @@ type InterruptError struct { Reason string } -var _ HasExitCode = &InterruptError{} +var _ interface { + HasExitCode + HasAbortReason +} = &InterruptError{} // Error returns the reason of the interruption. func (i *InterruptError) Error() string { @@ -23,6 +26,12 @@ func (i *InterruptError) ExitCode() exitcodes.ExitCode { return exitcodes.ScriptAborted } +// AbortReason is used to signal that an InterruptError is caused by the +// test.abort() functin in k6/execution. +func (i *InterruptError) AbortReason() AbortReason { + return AbortedByScriptAbort +} + // AbortTest is the reason emitted when a test script calls test.abort() const AbortTest = "test aborted" diff --git a/js/runner.go b/js/runner.go index a8bd4a0a9a9..edb1d43c648 100644 --- a/js/runner.go +++ b/js/runner.go @@ -845,11 +845,12 @@ type scriptException struct { inner *goja.Exception } -var ( - _ errext.Exception = &scriptException{} - _ errext.HasExitCode = &scriptException{} - _ errext.HasHint = &scriptException{} -) +var _ interface { + errext.Exception + errext.HasExitCode + errext.HasHint + errext.HasAbortReason +} = &scriptException{} func (s *scriptException) Error() string { // this calls String instead of error so that by default if it's printed to print the stacktrace @@ -868,6 +869,10 @@ func (s *scriptException) Hint() string { return "script exception" } +func (s *scriptException) AbortReason() errext.AbortReason { + return errext.AbortedByScriptError +} + func (s *scriptException) ExitCode() exitcodes.ExitCode { return exitcodes.ScriptException } diff --git a/js/runner_test.go b/js/runner_test.go index f18a8c76797..10c8a9bfa4c 100644 --- a/js/runner_test.go +++ b/js/runner_test.go @@ -391,7 +391,7 @@ func TestDataIsolation(t *testing.T) { engine, err := core.NewEngine(testRunState, execScheduler, []output.Output{mockOutput}) require.NoError(t, err) require.NoError(t, engine.OutputManager.StartOutputs()) - defer engine.OutputManager.StopOutputs() + defer engine.OutputManager.StopOutputs(nil) ctx, cancel := context.WithCancel(context.Background()) run, wait, err := engine.Init(ctx, ctx) diff --git a/js/timeout_error.go b/js/timeout_error.go index 72cc1d7aa79..e887b0922b2 100644 --- a/js/timeout_error.go +++ b/js/timeout_error.go @@ -15,10 +15,11 @@ type timeoutError struct { d time.Duration } -var ( - _ errext.HasExitCode = timeoutError{} - _ errext.HasHint = timeoutError{} -) +var _ interface { + errext.HasExitCode + errext.HasHint + errext.HasAbortReason +} = timeoutError{} // newTimeoutError returns a new timeout error, reporting that a timeout has // happened at the given place and given duration. @@ -44,6 +45,10 @@ func (t timeoutError) Hint() string { return hint } +func (t timeoutError) AbortReason() errext.AbortReason { + return errext.AbortedByTimeout +} + // ExitCode returns the coresponding exit code value to the place. func (t timeoutError) ExitCode() exitcodes.ExitCode { // TODO: add handleSummary() diff --git a/output/cloud/output.go b/output/cloud/output.go index 0a73c5f16e5..2b8e3d43716 100644 --- a/output/cloud/output.go +++ b/output/cloud/output.go @@ -14,6 +14,7 @@ import ( "gopkg.in/guregu/null.v3" "go.k6.io/k6/cloudapi" + "go.k6.io/k6/errext" "go.k6.io/k6/output" "go.k6.io/k6/lib" @@ -36,8 +37,6 @@ type Output struct { thresholds map[string][]*metrics.Threshold client *MetricsClient - runStatus cloudapi.RunStatus - bufferMutex sync.Mutex bufferHTTPTrails []*httpext.Trail bufferSamples []*Sample @@ -65,7 +64,7 @@ type Output struct { // Verify that Output implements the wanted interfaces var _ interface { - output.WithRunStatusUpdates + output.WithStopWithTestError output.WithThresholds output.WithTestRunStop } = &Output{} @@ -259,9 +258,18 @@ func (out *Output) startBackgroundProcesses() { }() } -// Stop gracefully stops all metric emission from the output and when all metric -// samples are emitted, it sends an API to the cloud to finish the test run. +// Stop gracefully stops all metric emission from the output: when all metric +// samples are emitted, it makes a cloud API call to finish the test run. +// +// Deprecated: use StopWithTestError() instead. func (out *Output) Stop() error { + return out.StopWithTestError(nil) +} + +// StopWithTestError gracefully stops all metric emission from the output: when +// all metric samples are emitted, it makes a cloud API call to finish the test +// run. If testErr was specified, it extracts the RunStatus from it. +func (out *Output) StopWithTestError(testErr error) error { out.logger.Debug("Stopping the cloud output...") close(out.stopAggregation) out.aggregationDone.Wait() // could be a no-op, if we have never started the aggregation @@ -269,7 +277,7 @@ func (out *Output) Stop() error { close(out.stopOutput) out.outputDone.Wait() out.logger.Debug("Metric emission stopped, calling cloud API...") - err := out.testFinished() + err := out.testFinished(testErr) if err != nil { out.logger.WithFields(logrus.Fields{"error": err}).Warn("Failed to send test finished to the cloud") } else { @@ -283,9 +291,37 @@ func (out *Output) Description() string { return fmt.Sprintf("cloud (%s)", cloudapi.URLForResults(out.referenceID, out.config)) } -// SetRunStatus receives the latest run status from the Engine. -func (out *Output) SetRunStatus(status cloudapi.RunStatus) { - out.runStatus = status +// getRunStatus determines the run status of the test based on the error. +func (out *Output) getRunStatus(testErr error) cloudapi.RunStatus { + if testErr == nil { + return cloudapi.RunStatusFinished + } + + var err errext.HasAbortReason + if errors.As(testErr, &err) { + abortReason := err.AbortReason() + switch abortReason { + case errext.AbortedByUser: + return cloudapi.RunStatusAbortedUser + case errext.AbortedByThreshold: + return cloudapi.RunStatusAbortedThreshold + case errext.AbortedByScriptError: + return cloudapi.RunStatusAbortedScriptError + case errext.AbortedByScriptAbort: + return cloudapi.RunStatusAbortedUser // TODO: have a better value than this? + case errext.AbortedByTimeout: + return cloudapi.RunStatusAbortedLimit + case errext.AbortedByThresholdsAfterTestEnd: + // The test finished normally, it wasn't prematurely aborted. Such + // failures are tracked by the restult_status, not the run_status + // (so called "tainted" in some places of the API here). + return cloudapi.RunStatusFinished + } + } + + // By default, the catch-all error is "aborted by system", but let's log that + out.logger.WithError(testErr).Debug("unknown test error classified as 'aborted by system'") + return cloudapi.RunStatusAbortedSystem } // SetThresholds receives the thresholds before the output is Start()-ed. @@ -617,7 +653,7 @@ func (out *Output) pushMetrics() { }).Debug("Pushing metrics to cloud finished") } -func (out *Output) testFinished() error { +func (out *Output) testFinished(testErr error) error { if out.referenceID == "" || out.config.PushRefID.Valid { return nil } @@ -634,11 +670,7 @@ func (out *Output) testFinished() error { } } - runStatus := cloudapi.RunStatusFinished - if out.runStatus != cloudapi.RunStatusQueued { - runStatus = out.runStatus - } - + runStatus := out.getRunStatus(testErr) out.logger.WithFields(logrus.Fields{ "ref": out.referenceID, "tainted": testTainted, diff --git a/output/cloud/output_test.go b/output/cloud/output_test.go index 6cd04612219..7b16e03a1f7 100644 --- a/output/cloud/output_test.go +++ b/output/cloud/output_test.go @@ -506,7 +506,6 @@ func testCloudOutputStopSendingMetric(t *testing.T, stopOnError bool) { require.NoError(t, out.Stop()) - require.Equal(t, cloudapi.RunStatusQueued, out.runStatus) select { case <-out.stopSendingMetrics: // all is fine @@ -598,7 +597,6 @@ func TestCloudOutputAggregationPeriodZeroNoBlock(t *testing.T) { tb.Mux.HandleFunc(fmt.Sprintf("/v1/metrics/%s", out.referenceID), getSampleChecker(t, expSamples)) require.NoError(t, out.Stop()) - require.Equal(t, cloudapi.RunStatusQueued, out.runStatus) } func TestCloudOutputPushRefID(t *testing.T) { diff --git a/output/manager.go b/output/manager.go index 1ab3ba29ca5..4d041dd346c 100644 --- a/output/manager.go +++ b/output/manager.go @@ -2,7 +2,6 @@ package output import ( "github.com/sirupsen/logrus" - "go.k6.io/k6/cloudapi" "go.k6.io/k6/metrics" ) @@ -35,7 +34,7 @@ func (om *Manager) StartOutputs() error { } if err := out.Start(); err != nil { - om.stopOutputs(i) + om.stopOutputs(err, i) return err } } @@ -43,25 +42,23 @@ func (om *Manager) StartOutputs() error { } // StopOutputs stops all configured outputs. -func (om *Manager) StopOutputs() { - om.stopOutputs(len(om.outputs)) +func (om *Manager) StopOutputs(testErr error) { + om.stopOutputs(testErr, len(om.outputs)) } -func (om *Manager) stopOutputs(upToID int) { +func (om *Manager) stopOutputs(testErr error, upToID int) { om.logger.Debugf("Stopping %d outputs...", upToID) for i := 0; i < upToID; i++ { - if err := om.outputs[i].Stop(); err != nil { - om.logger.WithError(err).Errorf("Stopping output %d failed", i) + out := om.outputs[i] + var err error + if sout, ok := out.(WithStopWithTestError); ok { + err = sout.StopWithTestError(testErr) + } else { + err = out.Stop() } - } -} -// SetRunStatus checks which outputs implement the WithRunStatusUpdates -// interface and sets the provided RunStatus to them. -func (om *Manager) SetRunStatus(status cloudapi.RunStatus) { - for _, out := range om.outputs { - if statUpdOut, ok := out.(WithRunStatusUpdates); ok { - statUpdOut.SetRunStatus(status) + if err != nil { + om.logger.WithError(err).Errorf("Stopping output %d failed", i) } } } diff --git a/output/types.go b/output/types.go index 92a2648d1d0..646bacc8589 100644 --- a/output/types.go +++ b/output/types.go @@ -11,7 +11,6 @@ import ( "github.com/sirupsen/logrus" "github.com/spf13/afero" - "go.k6.io/k6/cloudapi" "go.k6.io/k6/lib" "go.k6.io/k6/metrics" ) @@ -76,10 +75,17 @@ type WithTestRunStop interface { SetTestRunStopCallback(func(error)) } -// WithRunStatusUpdates means the output can receive test run status updates. -type WithRunStatusUpdates interface { +// WithStopWithTestError allows output to receive the error value that the test +// finished with. It could be nil, if the test finished nominally. +// +// If this interface is implemented by the output, StopWithError() will be +// called instead of Stop(). +// +// TODO: refactor the main interface to use this method instead of Stop()? Or +// something else along the lines of https://github.com/grafana/k6/issues/2430 ? +type WithStopWithTestError interface { Output - SetRunStatus(latestStatus cloudapi.RunStatus) + StopWithTestError(testRunErr error) error // nil testRunErr means error-free test run } // WithBuiltinMetrics means the output can receive the builtin metrics.