Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make possible for the cloud output to stop the engine #1965

Merged
merged 7 commits into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cloudapi/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Config struct {
PushRefID null.String `json:"pushRefID" envconfig:"K6_CLOUD_PUSH_REF_ID"`
WebAppURL null.String `json:"webAppURL" envconfig:"K6_CLOUD_WEB_APP_URL"`
NoCompress null.Bool `json:"noCompress" envconfig:"K6_CLOUD_NO_COMPRESS"`
StopOnError null.Bool `json:"stopOnError" envconfig:"K6_CLOUD_STOP_ON_ERROR"`

MaxMetricSamplesPerPackage null.Int `json:"maxMetricSamplesPerPackage" envconfig:"K6_CLOUD_MAX_METRIC_SAMPLES_PER_PACKAGE"`

Expand Down Expand Up @@ -209,6 +210,9 @@ func (c Config) Apply(cfg Config) Config {
if cfg.NoCompress.Valid {
c.NoCompress = cfg.NoCompress
}
if cfg.StopOnError.Valid {
c.StopOnError = cfg.StopOnError
}
if cfg.MaxMetricSamplesPerPackage.Valid {
c.MaxMetricSamplesPerPackage = cfg.MaxMetricSamplesPerPackage
}
Expand Down
1 change: 1 addition & 0 deletions cloudapi/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestConfigApply(t *testing.T) {
PushRefID: null.NewString("PushRefID", true),
WebAppURL: null.NewString("foo", true),
NoCompress: null.NewBool(true, true),
StopOnError: null.NewBool(true, true),
na-- marked this conversation as resolved.
Show resolved Hide resolved
MaxMetricSamplesPerPackage: null.NewInt(2, true),
MetricPushInterval: types.NewNullDuration(1*time.Second, true),
MetricPushConcurrency: null.NewInt(3, true),
Expand Down
8 changes: 8 additions & 0 deletions core/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ func (e *Engine) StartOutputs() error {
thresholdOut.SetThresholds(e.thresholds)
}

if stopOut, ok := out.(output.WithTestRunStop); ok {
stopOut.SetTestRunStopCallback(
func(err error) {
e.logger.WithError(err).Error("Received error to stop from output")
e.Stop()
})
}

if err := out.Start(); err != nil {
e.stopOutputs(i)
return err
Expand Down
10 changes: 10 additions & 0 deletions output/cloud/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,14 @@ type Output struct {
aggregationDone *sync.WaitGroup
stopOutput chan struct{}
outputDone *sync.WaitGroup
engineStopFunc func(error)
}

// Verify that Output implements the wanted interfaces
var _ interface {
output.WithRunStatusUpdates
output.WithThresholds
output.WithTestRunStop
} = &Output{}

// New creates a new cloud output.
Expand Down Expand Up @@ -322,6 +324,11 @@ func (out *Output) SetThresholds(scriptThresholds map[string]stats.Thresholds) {
out.thresholds = thresholds
}

// SetTestRunStopCallback receives the function that stops the engine on error
func (out *Output) SetTestRunStopCallback(stopFunc func(error)) {
out.engineStopFunc = stopFunc
}

func useCloudTags(source *httpext.Trail) *httpext.Trail {
name, nameExist := source.Tags.Get("name")
url, urlExist := source.Tags.Get("url")
Expand Down Expand Up @@ -660,6 +667,9 @@ func (out *Output) pushMetrics() {
if err != nil {
if out.shouldStopSendingMetrics(err) {
out.logger.WithError(err).Warn("Stopped sending metrics to cloud due to an error")
if out.config.StopOnError.Bool {
out.engineStopFunc(err)
}
close(out.stopSendingMetrics)
break
}
Expand Down
27 changes: 25 additions & 2 deletions output/cloud/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"sort"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -387,6 +388,18 @@ func TestCloudOutputMaxPerPacket(t *testing.T) {

func TestCloudOutputStopSendingMetric(t *testing.T) {
t.Parallel()
t.Run("stop engine on error", func(t *testing.T) {
t.Parallel()
testCloudOutputStopSendingMetric(t, true)
})

t.Run("don't stop engine on error", func(t *testing.T) {
t.Parallel()
testCloudOutputStopSendingMetric(t, false)
})
}

func testCloudOutputStopSendingMetric(t *testing.T, stopOnError bool) {
tb := httpmultibin.NewHTTPMultiBin(t)
tb.Mux.HandleFunc("/v1/tests", http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
body, err := ioutil.ReadAll(req.Body)
Expand Down Expand Up @@ -416,8 +429,9 @@ func TestCloudOutputStopSendingMetric(t *testing.T) {
JSONConfig: json.RawMessage(fmt.Sprintf(`{
"host": "%s", "noCompress": true,
"maxMetricSamplesPerPackage": 50,
"name": "something-that-should-be-overwritten"
}`, tb.ServerHTTP.URL)),
"name": "something-that-should-be-overwritten",
"stopOnError": %t
}`, tb.ServerHTTP.URL, stopOnError)),
ScriptOptions: lib.Options{
Duration: types.NullDurationFrom(1 * time.Second),
SystemTags: &stats.DefaultSystemTagSet,
Expand All @@ -427,6 +441,14 @@ func TestCloudOutputStopSendingMetric(t *testing.T) {
},
ScriptPath: &url.URL{Path: "/script.js"},
})
var expectedEngineStopFuncCalled int64
if stopOnError {
expectedEngineStopFuncCalled = 1
}
var engineStopFuncCalled int64
out.engineStopFunc = func(error) {
atomic.AddInt64(&engineStopFuncCalled, 1)
}
require.NoError(t, err)
now := time.Now()
tags := stats.IntoSampleTags(&map[string]string{"test": "mest", "a": "b"})
Expand Down Expand Up @@ -495,6 +517,7 @@ func TestCloudOutputStopSendingMetric(t *testing.T) {
t.Fatal("sending metrics wasn't stopped")
}
require.Equal(t, max, count)
require.Equal(t, expectedEngineStopFuncCalled, engineStopFuncCalled)

nBufferSamples := len(out.bufferSamples)
nBufferHTTPTrails := len(out.bufferHTTPTrails)
Expand Down
10 changes: 8 additions & 2 deletions output/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,14 @@ type WithThresholds interface {
SetThresholds(map[string]stats.Thresholds)
}

// TODO: add some way for outputs to report mid-test errors and potentially
// abort the whole test run
// WithTestRunStop is an output that can stop the Engine mid-test, interrupting
// the whole test run execution if some internal condition occurs, completely
// independently from the thresholds. It requires a callback function which
// expects an error and triggers the Engine to stop.
type WithTestRunStop interface {
Output
SetTestRunStopCallback(func(error))
}

// WithRunStatusUpdates means the output can receive test run status updates.
type WithRunStatusUpdates interface {
Expand Down