diff --git a/api/common/context.go b/api/common/context.go deleted file mode 100644 index 7340f222d7a..00000000000 --- a/api/common/context.go +++ /dev/null @@ -1,25 +0,0 @@ -package common - -import ( - "context" - - "go.k6.io/k6/core" -) - -type ContextKey int - -const ctxKeyEngine = ContextKey(1) - -// WithEngine sets the k6 running Engine in the under the hood context. -// -// Deprecated: Use directly the Engine as dependency. -func WithEngine(ctx context.Context, engine *core.Engine) context.Context { - return context.WithValue(ctx, ctxKeyEngine, engine) -} - -// GetEngine returns the k6 running Engine fetching it from the context. -// -// Deprecated: Use directly the Engine as dependency. -func GetEngine(ctx context.Context) *core.Engine { - return ctx.Value(ctxKeyEngine).(*core.Engine) -} diff --git a/api/server.go b/api/server.go index caed31b692f..ec605d49df1 100644 --- a/api/server.go +++ b/api/server.go @@ -1,28 +1,44 @@ package api import ( + "context" "fmt" "net/http" "time" "github.com/sirupsen/logrus" - "go.k6.io/k6/api/common" v1 "go.k6.io/k6/api/v1" - "go.k6.io/k6/core" + "go.k6.io/k6/execution" + "go.k6.io/k6/lib" + "go.k6.io/k6/metrics" + "go.k6.io/k6/metrics/engine" ) -func newHandler(logger logrus.FieldLogger) http.Handler { +func newHandler(cs *v1.ControlSurface) http.Handler { mux := http.NewServeMux() - mux.Handle("/v1/", v1.NewHandler()) - mux.Handle("/ping", handlePing(logger)) - mux.Handle("/", handlePing(logger)) + mux.Handle("/v1/", v1.NewHandler(cs)) + mux.Handle("/ping", handlePing(cs.RunState.Logger)) + mux.Handle("/", handlePing(cs.RunState.Logger)) return mux } // GetServer returns a http.Server instance that can serve k6's REST API. -func GetServer(addr string, engine *core.Engine, logger logrus.FieldLogger) *http.Server { - mux := withEngine(engine, newLogger(logger, newHandler(logger))) +func GetServer( + runCtx context.Context, addr string, runState *lib.TestRunState, + samples chan metrics.SampleContainer, me *engine.MetricsEngine, es *execution.Scheduler, +) *http.Server { + // TODO: reduce the control surface as much as possible? For example, if + // we refactor the Runner API, we won't need to send the Samples channel. + cs := &v1.ControlSurface{ + RunCtx: runCtx, + Samples: samples, + MetricsEngine: me, + Scheduler: es, + RunState: runState, + } + + mux := withLoggingHandler(runState.Logger, newHandler(cs)) return &http.Server{Addr: addr, Handler: mux, ReadHeaderTimeout: 10 * time.Second} } @@ -36,8 +52,8 @@ func (w wrappedResponseWriter) WriteHeader(status int) { w.ResponseWriter.WriteHeader(status) } -// newLogger returns the middleware which logs response status for request. -func newLogger(l logrus.FieldLogger, next http.Handler) http.HandlerFunc { +// withLoggingHandler returns the middleware which logs response status for request. +func withLoggingHandler(l logrus.FieldLogger, next http.Handler) http.HandlerFunc { return func(rw http.ResponseWriter, r *http.Request) { wrapped := wrappedResponseWriter{ResponseWriter: rw, status: 200} // The default status code is 200 if it's not set next.ServeHTTP(wrapped, r) @@ -46,13 +62,6 @@ func newLogger(l logrus.FieldLogger, next http.Handler) http.HandlerFunc { } } -func withEngine(engine *core.Engine, next http.Handler) http.HandlerFunc { - return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - r = r.WithContext(common.WithEngine(r.Context(), engine)) - next.ServeHTTP(rw, r) - }) -} - func handlePing(logger logrus.FieldLogger) http.Handler { return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { rw.Header().Add("Content-Type", "text/plain; charset=utf-8") diff --git a/api/server_test.go b/api/server_test.go index dd7fc06c814..a8c0ee8d897 100644 --- a/api/server_test.go +++ b/api/server_test.go @@ -9,15 +9,8 @@ import ( "github.com/sirupsen/logrus" logtest "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.k6.io/k6/api/common" - "go.k6.io/k6/core" - "go.k6.io/k6/execution" - "go.k6.io/k6/lib" "go.k6.io/k6/lib/testutils" - "go.k6.io/k6/lib/testutils/minirunner" - "go.k6.io/k6/metrics" ) func testHTTPHandler(rw http.ResponseWriter, r *http.Request) { @@ -37,7 +30,7 @@ func TestLogger(t *testing.T) { l, hook := logtest.NewNullLogger() l.Level = logrus.DebugLevel - newLogger(l, http.HandlerFunc(testHTTPHandler))(rw, r) + withLoggingHandler(l, http.HandlerFunc(testHTTPHandler))(rw, r) res := rw.Result() assert.Equal(t, http.StatusOK, res.StatusCode) @@ -57,36 +50,10 @@ func TestLogger(t *testing.T) { } } -func TestWithEngine(t *testing.T) { - logger := logrus.New() - logger.SetOutput(testutils.NewTestOutput(t)) - registry := metrics.NewRegistry() - testState := &lib.TestRunState{ - TestPreInitState: &lib.TestPreInitState{ - Logger: logger, - Registry: registry, - BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry), - }, - Options: lib.Options{}, - Runner: &minirunner.MiniRunner{}, - } - - execScheduler, err := execution.NewScheduler(testState) - require.NoError(t, err) - engine, err := core.NewEngine(testState, execScheduler, nil) - require.NoError(t, err) - - rw := httptest.NewRecorder() - r := httptest.NewRequest("GET", "http://example.com/", nil) - withEngine(engine, http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - assert.Equal(t, engine, common.GetEngine(r.Context())) - }))(rw, r) -} - func TestPing(t *testing.T) { logger := logrus.New() logger.SetOutput(testutils.NewTestOutput(t)) - mux := newHandler(logger) + mux := handlePing(logger) rw := httptest.NewRecorder() r := httptest.NewRequest("GET", "/ping", nil) diff --git a/api/v1/control_surface.go b/api/v1/control_surface.go new file mode 100644 index 00000000000..12689cd1e3c --- /dev/null +++ b/api/v1/control_surface.go @@ -0,0 +1,20 @@ +package v1 + +import ( + "context" + + "go.k6.io/k6/execution" + "go.k6.io/k6/lib" + "go.k6.io/k6/metrics" + "go.k6.io/k6/metrics/engine" +) + +// ControlSurface includes the methods the REST API can use to control and +// communicate with the rest of k6. +type ControlSurface struct { + RunCtx context.Context + Samples chan metrics.SampleContainer + MetricsEngine *engine.MetricsEngine + Scheduler *execution.Scheduler + RunState *lib.TestRunState +} diff --git a/api/v1/group_routes.go b/api/v1/group_routes.go index d84056eeeb7..d4c9f1773e4 100644 --- a/api/v1/group_routes.go +++ b/api/v1/group_routes.go @@ -3,14 +3,10 @@ package v1 import ( "encoding/json" "net/http" - - "go.k6.io/k6/api/common" ) -func handleGetGroups(rw http.ResponseWriter, r *http.Request) { - engine := common.GetEngine(r.Context()) - - root := NewGroup(engine.ExecutionScheduler.GetRunner().GetDefaultGroup(), nil) +func handleGetGroups(cs *ControlSurface, rw http.ResponseWriter, _ *http.Request) { + root := NewGroup(cs.RunState.Runner.GetDefaultGroup(), nil) groups := FlattenGroup(root) data, err := json.Marshal(newGroupsJSONAPI(groups)) @@ -21,10 +17,8 @@ func handleGetGroups(rw http.ResponseWriter, r *http.Request) { _, _ = rw.Write(data) } -func handleGetGroup(rw http.ResponseWriter, r *http.Request, id string) { - engine := common.GetEngine(r.Context()) - - root := NewGroup(engine.ExecutionScheduler.GetRunner().GetDefaultGroup(), nil) +func handleGetGroup(cs *ControlSurface, rw http.ResponseWriter, _ *http.Request, id string) { + root := NewGroup(cs.RunState.Runner.GetDefaultGroup(), nil) groups := FlattenGroup(root) var group *Group diff --git a/api/v1/group_routes_test.go b/api/v1/group_routes_test.go index 460fb0e4635..e9b7868ab41 100644 --- a/api/v1/group_routes_test.go +++ b/api/v1/group_routes_test.go @@ -1,6 +1,7 @@ package v1 import ( + "context" "encoding/json" "net/http" "net/http/httptest" @@ -10,12 +11,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.k6.io/k6/core" "go.k6.io/k6/execution" "go.k6.io/k6/lib" "go.k6.io/k6/lib/testutils" "go.k6.io/k6/lib/testutils/minirunner" "go.k6.io/k6/metrics" + "go.k6.io/k6/metrics/engine" ) func getTestPreInitState(tb testing.TB) *lib.TestPreInitState { @@ -41,6 +42,26 @@ func getTestRunState(tb testing.TB, options lib.Options, runner lib.Runner) *lib } } +func getControlSurface(tb testing.TB, testState *lib.TestRunState) *ControlSurface { + execScheduler, err := execution.NewScheduler(testState) + require.NoError(tb, err) + + me, err := engine.NewMetricsEngine(execScheduler.GetState()) + require.NoError(tb, err) + + ctx, cancel := context.WithCancel(context.Background()) + tb.Cleanup(cancel) + ctx, _ = execution.NewTestRunContext(ctx, testState.Logger) + + return &ControlSurface{ + RunCtx: ctx, + Samples: make(chan metrics.SampleContainer, 1000), + MetricsEngine: me, + Scheduler: execScheduler, + RunState: testState, + } +} + func TestGetGroups(t *testing.T) { g0, err := lib.NewGroup("", nil) assert.NoError(t, err) @@ -49,15 +70,11 @@ func TestGetGroups(t *testing.T) { g2, err := g1.Group("group 2") assert.NoError(t, err) - testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{Group: g0}) - execScheduler, err := execution.NewScheduler(testState) - require.NoError(t, err) - engine, err := core.NewEngine(testState, execScheduler, nil) - require.NoError(t, err) + cs := getControlSurface(t, getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{Group: g0})) t.Run("list", func(t *testing.T) { rw := httptest.NewRecorder() - NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/groups", nil)) + NewHandler(cs).ServeHTTP(rw, httptest.NewRequest(http.MethodGet, "/v1/groups", nil)) res := rw.Result() body := rw.Body.Bytes() assert.Equal(t, http.StatusOK, res.StatusCode) @@ -105,7 +122,7 @@ func TestGetGroups(t *testing.T) { for _, gp := range []*lib.Group{g0, g1, g2} { t.Run(gp.Name, func(t *testing.T) { rw := httptest.NewRecorder() - NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/groups/"+gp.ID, nil)) + NewHandler(cs).ServeHTTP(rw, httptest.NewRequest(http.MethodGet, "/v1/groups/"+gp.ID, nil)) res := rw.Result() assert.Equal(t, http.StatusOK, res.StatusCode) }) diff --git a/api/v1/metric_routes.go b/api/v1/metric_routes.go index a8e81bc4159..c98408f75e2 100644 --- a/api/v1/metric_routes.go +++ b/api/v1/metric_routes.go @@ -4,21 +4,17 @@ import ( "encoding/json" "net/http" "time" - - "go.k6.io/k6/api/common" ) -func handleGetMetrics(rw http.ResponseWriter, r *http.Request) { - engine := common.GetEngine(r.Context()) - +func handleGetMetrics(cs *ControlSurface, rw http.ResponseWriter, _ *http.Request) { var t time.Duration - if engine.ExecutionScheduler != nil { - t = engine.ExecutionScheduler.GetState().GetCurrentTestRunDuration() + if cs.Scheduler != nil { + t = cs.Scheduler.GetState().GetCurrentTestRunDuration() } - engine.MetricsEngine.MetricsLock.Lock() - metrics := newMetricsJSONAPI(engine.MetricsEngine.ObservedMetrics, t) - engine.MetricsEngine.MetricsLock.Unlock() + cs.MetricsEngine.MetricsLock.Lock() + metrics := newMetricsJSONAPI(cs.MetricsEngine.ObservedMetrics, t) + cs.MetricsEngine.MetricsLock.Unlock() data, err := json.Marshal(metrics) if err != nil { @@ -28,23 +24,21 @@ func handleGetMetrics(rw http.ResponseWriter, r *http.Request) { _, _ = rw.Write(data) } -func handleGetMetric(rw http.ResponseWriter, r *http.Request, id string) { - engine := common.GetEngine(r.Context()) - +func handleGetMetric(cs *ControlSurface, rw http.ResponseWriter, _ *http.Request, id string) { var t time.Duration - if engine.ExecutionScheduler != nil { - t = engine.ExecutionScheduler.GetState().GetCurrentTestRunDuration() + if cs.Scheduler != nil { + t = cs.Scheduler.GetState().GetCurrentTestRunDuration() } - engine.MetricsEngine.MetricsLock.Lock() - metric, ok := engine.MetricsEngine.ObservedMetrics[id] + cs.MetricsEngine.MetricsLock.Lock() + metric, ok := cs.MetricsEngine.ObservedMetrics[id] if !ok { - engine.MetricsEngine.MetricsLock.Unlock() + cs.MetricsEngine.MetricsLock.Unlock() apiError(rw, "Not Found", "No metric with that ID was found", http.StatusNotFound) return } wrappedMetric := newMetricEnvelope(metric, t) - engine.MetricsEngine.MetricsLock.Unlock() + cs.MetricsEngine.MetricsLock.Unlock() data, err := json.Marshal(wrappedMetric) if err != nil { diff --git a/api/v1/metric_routes_test.go b/api/v1/metric_routes_test.go index 5612d5f3523..f39ed3c1317 100644 --- a/api/v1/metric_routes_test.go +++ b/api/v1/metric_routes_test.go @@ -10,8 +10,6 @@ import ( "github.com/stretchr/testify/require" "gopkg.in/guregu/null.v3" - "go.k6.io/k6/core" - "go.k6.io/k6/execution" "go.k6.io/k6/lib" "go.k6.io/k6/lib/testutils/minirunner" "go.k6.io/k6/metrics" @@ -23,18 +21,15 @@ func TestGetMetrics(t *testing.T) { testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{}) testMetric, err := testState.Registry.NewMetric("my_metric", metrics.Trend, metrics.Time) require.NoError(t, err) - execScheduler, err := execution.NewScheduler(testState) - require.NoError(t, err) - engine, err := core.NewEngine(testState, execScheduler, nil) - require.NoError(t, err) + cs := getControlSurface(t, testState) - engine.MetricsEngine.ObservedMetrics = map[string]*metrics.Metric{ + cs.MetricsEngine.ObservedMetrics = map[string]*metrics.Metric{ "my_metric": testMetric, } - engine.MetricsEngine.ObservedMetrics["my_metric"].Tainted = null.BoolFrom(true) + cs.MetricsEngine.ObservedMetrics["my_metric"].Tainted = null.BoolFrom(true) rw := httptest.NewRecorder() - NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/metrics", nil)) + NewHandler(cs).ServeHTTP(rw, httptest.NewRequest(http.MethodGet, "/v1/metrics", nil)) res := rw.Result() assert.Equal(t, http.StatusOK, res.StatusCode) @@ -82,21 +77,18 @@ func TestGetMetric(t *testing.T) { testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{}) testMetric, err := testState.Registry.NewMetric("my_metric", metrics.Trend, metrics.Time) require.NoError(t, err) - execScheduler, err := execution.NewScheduler(testState) - require.NoError(t, err) - engine, err := core.NewEngine(testState, execScheduler, nil) - require.NoError(t, err) + cs := getControlSurface(t, testState) - engine.MetricsEngine.ObservedMetrics = map[string]*metrics.Metric{ + cs.MetricsEngine.ObservedMetrics = map[string]*metrics.Metric{ "my_metric": testMetric, } - engine.MetricsEngine.ObservedMetrics["my_metric"].Tainted = null.BoolFrom(true) + cs.MetricsEngine.ObservedMetrics["my_metric"].Tainted = null.BoolFrom(true) t.Run("nonexistent", func(t *testing.T) { t.Parallel() rw := httptest.NewRecorder() - NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/metrics/notreal", nil)) + NewHandler(cs).ServeHTTP(rw, httptest.NewRequest(http.MethodGet, "/v1/metrics/notreal", nil)) res := rw.Result() assert.Equal(t, http.StatusNotFound, res.StatusCode) }) @@ -105,7 +97,7 @@ func TestGetMetric(t *testing.T) { t.Parallel() rw := httptest.NewRecorder() - NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/metrics/my_metric", nil)) + NewHandler(cs).ServeHTTP(rw, httptest.NewRequest(http.MethodGet, "/v1/metrics/my_metric", nil)) res := rw.Result() assert.Equal(t, http.StatusOK, res.StatusCode) diff --git a/api/v1/routes.go b/api/v1/routes.go index 27d1e361bef..81fe99305c7 100644 --- a/api/v1/routes.go +++ b/api/v1/routes.go @@ -5,15 +5,16 @@ import ( "net/http" ) -func NewHandler() http.Handler { +// NewHandler returns the top handler for the v1 REST APIs +func NewHandler(cs *ControlSurface) http.Handler { mux := http.NewServeMux() mux.HandleFunc("/v1/status", func(rw http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: - handleGetStatus(rw, r) + handleGetStatus(cs, rw, r) case http.MethodPatch: - handlePatchStatus(rw, r) + handlePatchStatus(cs, rw, r) default: rw.WriteHeader(http.StatusMethodNotAllowed) } @@ -24,7 +25,7 @@ func NewHandler() http.Handler { rw.WriteHeader(http.StatusMethodNotAllowed) return } - handleGetMetrics(rw, r) + handleGetMetrics(cs, rw, r) }) mux.HandleFunc("/v1/metrics/", func(rw http.ResponseWriter, r *http.Request) { @@ -34,7 +35,7 @@ func NewHandler() http.Handler { } id := r.URL.Path[len("/v1/metrics/"):] - handleGetMetric(rw, r, id) + handleGetMetric(cs, rw, r, id) }) mux.HandleFunc("/v1/groups", func(rw http.ResponseWriter, r *http.Request) { @@ -43,7 +44,7 @@ func NewHandler() http.Handler { return } - handleGetGroups(rw, r) + handleGetGroups(cs, rw, r) }) mux.HandleFunc("/v1/groups/", func(rw http.ResponseWriter, r *http.Request) { @@ -53,17 +54,17 @@ func NewHandler() http.Handler { } id := r.URL.Path[len("/v1/groups/"):] - handleGetGroup(rw, r, id) + handleGetGroup(cs, rw, r, id) }) mux.HandleFunc("/v1/setup", func(rw http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodPost: - handleRunSetup(rw, r) + handleRunSetup(cs, rw, r) case http.MethodPut: - handleSetSetupData(rw, r) + handleSetSetupData(cs, rw, r) case http.MethodGet: - handleGetSetupData(rw, r) + handleGetSetupData(cs, rw, r) default: rw.WriteHeader(http.StatusMethodNotAllowed) } @@ -75,7 +76,7 @@ func NewHandler() http.Handler { return } - handleRunTeardown(rw, r) + handleRunTeardown(cs, rw, r) }) return mux diff --git a/api/v1/routes_test.go b/api/v1/routes_test.go deleted file mode 100644 index ffbd9ebcc5f..00000000000 --- a/api/v1/routes_test.go +++ /dev/null @@ -1,22 +0,0 @@ -package v1 - -import ( - "io" - "net/http" - "net/http/httptest" - "testing" - - "github.com/stretchr/testify/assert" - - "go.k6.io/k6/api/common" - "go.k6.io/k6/core" -) - -func newRequestWithEngine(engine *core.Engine, method, target string, body io.Reader) *http.Request { - r := httptest.NewRequest(method, target, body) - return r.WithContext(common.WithEngine(r.Context(), engine)) -} - -func TestNewHandler(t *testing.T) { - assert.NotNil(t, NewHandler()) -} diff --git a/api/v1/setup_teardown_routes.go b/api/v1/setup_teardown_routes.go index 207c809c981..0e7a90a82e0 100644 --- a/api/v1/setup_teardown_routes.go +++ b/api/v1/setup_teardown_routes.go @@ -4,8 +4,6 @@ import ( "encoding/json" "io/ioutil" "net/http" - - "go.k6.io/k6/api/common" ) // NullSetupData is wrapper around null to satisfy jsonapi @@ -38,13 +36,12 @@ func handleSetupDataOutput(rw http.ResponseWriter, setupData json.RawMessage) { } // handleGetSetupData just returns the current JSON-encoded setup data -func handleGetSetupData(rw http.ResponseWriter, r *http.Request) { - runner := common.GetEngine(r.Context()).ExecutionScheduler.GetRunner() - handleSetupDataOutput(rw, runner.GetSetupData()) +func handleGetSetupData(cs *ControlSurface, rw http.ResponseWriter, _ *http.Request) { + handleSetupDataOutput(rw, cs.RunState.Runner.GetSetupData()) } // handleSetSetupData just parses the JSON request body and sets the result as setup data for the runner -func handleSetSetupData(rw http.ResponseWriter, r *http.Request) { +func handleSetSetupData(cs *ControlSurface, rw http.ResponseWriter, r *http.Request) { body, err := ioutil.ReadAll(r.Body) if err != nil { apiError(rw, "Error reading request body", err.Error(), http.StatusBadRequest) @@ -59,8 +56,7 @@ func handleSetSetupData(rw http.ResponseWriter, r *http.Request) { } } - runner := common.GetEngine(r.Context()).ExecutionScheduler.GetRunner() - + runner := cs.RunState.Runner if len(body) == 0 { runner.SetSetupData(nil) } else { @@ -71,11 +67,10 @@ func handleSetSetupData(rw http.ResponseWriter, r *http.Request) { } // handleRunSetup executes the runner's Setup() method and returns the result -func handleRunSetup(rw http.ResponseWriter, r *http.Request) { - engine := common.GetEngine(r.Context()) - runner := engine.ExecutionScheduler.GetRunner() +func handleRunSetup(cs *ControlSurface, rw http.ResponseWriter, r *http.Request) { + runner := cs.RunState.Runner - if err := runner.Setup(r.Context(), engine.Samples); err != nil { + if err := cs.RunState.Runner.Setup(r.Context(), cs.Samples); err != nil { apiError(rw, "Error executing setup", err.Error(), http.StatusInternalServerError) return } @@ -84,11 +79,8 @@ func handleRunSetup(rw http.ResponseWriter, r *http.Request) { } // handleRunTeardown executes the runner's Teardown() method -func handleRunTeardown(rw http.ResponseWriter, r *http.Request) { - engine := common.GetEngine(r.Context()) - runner := common.GetEngine(r.Context()).ExecutionScheduler.GetRunner() - - if err := runner.Teardown(r.Context(), engine.Samples); err != nil { +func handleRunTeardown(cs *ControlSurface, rw http.ResponseWriter, r *http.Request) { + if err := cs.RunState.Runner.Teardown(r.Context(), cs.Samples); err != nil { apiError(rw, "Error executing teardown", err.Error(), http.StatusInternalServerError) } } diff --git a/api/v1/setup_teardown_routes_test.go b/api/v1/setup_teardown_routes_test.go index 4b9e06b150e..7d0571fc94d 100644 --- a/api/v1/setup_teardown_routes_test.go +++ b/api/v1/setup_teardown_routes_test.go @@ -141,25 +141,34 @@ func TestSetupData(t *testing.T) { engine, err := core.NewEngine(testState, execScheduler, nil) require.NoError(t, err) + globalCtx, globalCancel := context.WithCancel(context.Background()) + t.Cleanup(globalCancel) + runCtx, runAbort := execution.NewTestRunContext(globalCtx, testState.Logger) + defer runAbort(fmt.Errorf("unexpected abort")) + require.NoError(t, engine.OutputManager.StartOutputs()) defer engine.OutputManager.StopOutputs(nil) - globalCtx, globalCancel := context.WithCancel(context.Background()) - runCtx, runCancel := context.WithCancel(globalCtx) + cs := &ControlSurface{ + RunCtx: runCtx, + Samples: engine.Samples, + MetricsEngine: engine.MetricsEngine, + Scheduler: execScheduler, + RunState: testState, + } run, wait, err := engine.Init(globalCtx, runCtx) require.NoError(t, err) defer wait() - defer globalCancel() errC := make(chan error) go func() { errC <- run() }() - handler := NewHandler() + handler := NewHandler(cs) checkSetup := func(method, body, expResult string) { rw := httptest.NewRecorder() - handler.ServeHTTP(rw, newRequestWithEngine(engine, method, "/v1/setup", bytes.NewBufferString(body))) + handler.ServeHTTP(rw, httptest.NewRequest(method, "/v1/setup", bytes.NewBufferString(body))) res := rw.Result() if !assert.Equal(t, http.StatusOK, res.StatusCode) { t.Logf("body: %s\n", rw.Body.String()) @@ -179,14 +188,12 @@ func TestSetupData(t *testing.T) { checkSetup(setupRun[0], setupRun[1], setupRun[2]) } - require.NoError(t, engine.ExecutionScheduler.SetPaused(false)) + require.NoError(t, cs.Scheduler.SetPaused(false)) select { case <-time.After(10 * time.Second): - runCancel() t.Fatal("Test timed out") case err := <-errC: - runCancel() require.NoError(t, err) } }) diff --git a/api/v1/status.go b/api/v1/status.go index ad33ce60c86..0915c08a8c4 100644 --- a/api/v1/status.go +++ b/api/v1/status.go @@ -3,7 +3,6 @@ package v1 import ( "gopkg.in/guregu/null.v3" - "go.k6.io/k6/core" "go.k6.io/k6/lib" ) @@ -18,15 +17,21 @@ type Status struct { Tainted bool `json:"tainted" yaml:"tainted"` } -func NewStatus(engine *core.Engine) Status { - executionState := engine.ExecutionScheduler.GetState() +func newStatus(cs *ControlSurface) Status { + executionState := cs.Scheduler.GetState() + isStopped := false + select { + case <-cs.RunCtx.Done(): + isStopped = true + default: + } return Status{ Status: executionState.GetCurrentExecutionStatus(), Running: executionState.HasStarted() && !executionState.HasEnded(), Paused: null.BoolFrom(executionState.IsPaused()), - Stopped: engine.IsStopped(), + Stopped: isStopped, VUs: null.IntFrom(executionState.GetCurrentlyActiveVUsCount()), VUsMax: null.IntFrom(executionState.GetInitializedVUsCount()), - Tainted: engine.IsTainted(), + Tainted: cs.MetricsEngine.GetMetricsWithBreachedThresholdsCount() > 0, } } diff --git a/api/v1/status_jsonapi.go b/api/v1/status_jsonapi.go index 5bd28bd31f8..42977ea1ba2 100644 --- a/api/v1/status_jsonapi.go +++ b/api/v1/status_jsonapi.go @@ -1,9 +1,5 @@ package v1 -import ( - "go.k6.io/k6/core" -) - // StatusJSONAPI is JSON API envelop for metrics type StatusJSONAPI struct { Data statusData `json:"data"` @@ -31,6 +27,6 @@ type statusData struct { Attributes Status `json:"attributes"` } -func newStatusJSONAPIFromEngine(engine *core.Engine) StatusJSONAPI { - return NewStatusJSONAPI(NewStatus(engine)) +func newStatusJSONAPIFromEngine(cs *ControlSurface) StatusJSONAPI { + return NewStatusJSONAPI(newStatus(cs)) } diff --git a/api/v1/status_routes.go b/api/v1/status_routes.go index dc18a822660..1a1eefc3b46 100644 --- a/api/v1/status_routes.go +++ b/api/v1/status_routes.go @@ -3,18 +3,18 @@ package v1 import ( "encoding/json" "errors" + "fmt" "io/ioutil" "net/http" - "go.k6.io/k6/api/common" + "go.k6.io/k6/errext" + "go.k6.io/k6/errext/exitcodes" "go.k6.io/k6/execution" "go.k6.io/k6/lib/executor" ) -func handleGetStatus(rw http.ResponseWriter, r *http.Request) { - engine := common.GetEngine(r.Context()) - - status := newStatusJSONAPIFromEngine(engine) +func handleGetStatus(cs *ControlSurface, rw http.ResponseWriter, _ *http.Request) { + status := newStatusJSONAPIFromEngine(cs) data, err := json.Marshal(status) if err != nil { apiError(rw, "Encoding error", err.Error(), http.StatusInternalServerError) @@ -34,9 +34,8 @@ func getFirstExternallyControlledExecutor(execScheduler *execution.Scheduler) (* return nil, errors.New("an externally-controlled executor needs to be configured for live configuration updates") } -func handlePatchStatus(rw http.ResponseWriter, r *http.Request) { +func handlePatchStatus(cs *ControlSurface, rw http.ResponseWriter, r *http.Request) { rw.Header().Set("Content-Type", "application/json; charset=utf-8") - engine := common.GetEngine(r.Context()) body, err := ioutil.ReadAll(r.Body) if err != nil { @@ -53,10 +52,13 @@ func handlePatchStatus(rw http.ResponseWriter, r *http.Request) { status := statusEnvelop.Status() if status.Stopped { //nolint:nestif - engine.Stop() + execution.AbortTestRun(cs.RunCtx, errext.WithAbortReasonIfNone( + errext.WithExitCodeIfNone(fmt.Errorf("test run stopped from REST API"), exitcodes.ScriptStoppedFromRESTAPI), + errext.AbortedByUser, + )) } else { if status.Paused.Valid { - if err = engine.ExecutionScheduler.SetPaused(status.Paused.Bool); err != nil { + if err = cs.Scheduler.SetPaused(status.Paused.Bool); err != nil { apiError(rw, "Pause error", err.Error(), http.StatusInternalServerError) return } @@ -66,7 +68,7 @@ func handlePatchStatus(rw http.ResponseWriter, r *http.Request) { // TODO: add ability to specify the actual executor id? Though this should // likely be in the v2 REST API, where we could implement it in a way that // may allow us to eventually support other executor types. - executor, updateErr := getFirstExternallyControlledExecutor(engine.ExecutionScheduler) + executor, updateErr := getFirstExternallyControlledExecutor(cs.Scheduler) if updateErr != nil { apiError(rw, "Execution config error", updateErr.Error(), http.StatusInternalServerError) return @@ -85,7 +87,7 @@ func handlePatchStatus(rw http.ResponseWriter, r *http.Request) { } } - data, err := json.Marshal(newStatusJSONAPIFromEngine(engine)) + data, err := json.Marshal(newStatusJSONAPIFromEngine(cs)) if err != nil { apiError(rw, "Encoding error", err.Error(), http.StatusInternalServerError) return diff --git a/api/v1/status_routes_test.go b/api/v1/status_routes_test.go index bb31003ed2b..9e74f768e89 100644 --- a/api/v1/status_routes_test.go +++ b/api/v1/status_routes_test.go @@ -4,8 +4,10 @@ import ( "bytes" "context" "encoding/json" + "fmt" "net/http" "net/http/httptest" + "sync" "testing" "time" @@ -23,13 +25,10 @@ func TestGetStatus(t *testing.T) { t.Parallel() testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{}) - execScheduler, err := execution.NewScheduler(testState) - require.NoError(t, err) - engine, err := core.NewEngine(testState, execScheduler, nil) - require.NoError(t, err) + cs := getControlSurface(t, testState) rw := httptest.NewRecorder() - NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/status", nil)) + NewHandler(cs).ServeHTTP(rw, httptest.NewRequest(http.MethodGet, "/v1/status", nil)) res := rw.Result() assert.Equal(t, http.StatusOK, res.StatusCode) @@ -118,23 +117,40 @@ func TestPatchStatus(t *testing.T) { require.NoError(t, engine.OutputManager.StartOutputs()) defer engine.OutputManager.StopOutputs(nil) - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) - run, wait, err := engine.Init(ctx, ctx) + globalCtx, globalCancel := context.WithCancel(context.Background()) + t.Cleanup(globalCancel) + runCtx, runAbort := execution.NewTestRunContext(globalCtx, testState.Logger) + defer runAbort(fmt.Errorf("unexpected abort")) + engine.AbortFn = runAbort + + cs := &ControlSurface{ + RunCtx: runCtx, + Samples: engine.Samples, + MetricsEngine: engine.MetricsEngine, + Scheduler: execScheduler, + RunState: testState, + } + + run, wait, err := engine.Init(globalCtx, runCtx) require.NoError(t, err) + wg := &sync.WaitGroup{} + wg.Add(1) defer func() { - cancel() + runAbort(fmt.Errorf("custom cancel signal")) wait() + wg.Wait() }() go func() { - assert.ErrorContains(t, run(), "test run aborted by signal") + assert.ErrorContains(t, run(), "custom cancel signal") + wg.Done() }() // wait for the executor to initialize to avoid a potential data race below time.Sleep(200 * time.Millisecond) rw := httptest.NewRecorder() - NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "PATCH", "/v1/status", bytes.NewReader(testCase.Payload))) + NewHandler(cs).ServeHTTP(rw, httptest.NewRequest(http.MethodPatch, "/v1/status", bytes.NewReader(testCase.Payload))) res := rw.Result() require.Equal(t, "application/json; charset=utf-8", rw.Header().Get("Content-Type")) @@ -145,7 +161,7 @@ func TestPatchStatus(t *testing.T) { return } - status := NewStatus(engine) + status := newStatus(cs) if testCase.ExpectedStatus.Paused.Valid { assert.Equal(t, testCase.ExpectedStatus.Paused, status.Paused) } diff --git a/cmd/common.go b/cmd/common.go index 2b43cb7f1cd..f95f208f6ef 100644 --- a/cmd/common.go +++ b/cmd/common.go @@ -74,6 +74,7 @@ func printToStdout(gs *state.GlobalState, s string) { // Trap Interrupts, SIGINTs and SIGTERMs and call the given. func handleTestAbortSignals(gs *state.GlobalState, gracefulStopHandler, onHardStop func(os.Signal)) (stop func()) { + gs.Logger.Debug("Trapping interrupt signals so k6 can handle them gracefully...") sigC := make(chan os.Signal, 2) done := make(chan struct{}) gs.SignalNotify(sigC, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) @@ -100,6 +101,7 @@ func handleTestAbortSignals(gs *state.GlobalState, gracefulStopHandler, onHardSt }() return func() { + gs.Logger.Debug("Releasing signal trap...") close(done) gs.SignalStop(sigC) } diff --git a/cmd/run.go b/cmd/run.go index 98f24af36a0..14cd939448f 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -70,6 +70,8 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { defer runCancel() logger := testRunState.Logger + runSubCtx, runSubAbort := execution.NewTestRunContext(runCtx, logger) + // Create a local execution scheduler wrapping the runner. logger.Debug("Initializing the execution scheduler...") execScheduler, err := execution.NewScheduler(testRunState) @@ -114,6 +116,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { if err != nil { return err } + engine.AbortFn = runSubAbort // Spin up the REST API server, if not disabled. if c.gs.Flags.Address != "" { //nolint:nestif @@ -126,8 +129,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { srvCtx, srvCancel := context.WithCancel(globalCtx) defer srvCancel() - // TODO: send the ExecutionState and MetricsEngine instead of the Engine - srv := api.GetServer(c.gs.Flags.Address, engine, logger) + srv := api.GetServer(runSubCtx, c.gs.Flags.Address, testRunState, engine.Samples, engine.MetricsEngine, execScheduler) go func() { defer apiWG.Done() logger.Debugf("Starting the REST API server on %s", c.gs.Flags.Address) @@ -170,7 +172,13 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { // Trap Interrupts, SIGINTs and SIGTERMs. gracefulStop := func(sig os.Signal) { logger.WithField("sig", sig).Debug("Stopping k6 in response to signal...") - lingerCancel() // stop the test run, metric processing is cancelled below + // first abort the test run this way, to propagate the error + runSubAbort(errext.WithAbortReasonIfNone( + errext.WithExitCodeIfNone( + fmt.Errorf("test run was aborted because k6 received a '%s' signal", sig), exitcodes.ExternalAbort, + ), errext.AbortedByUser, + )) + lingerCancel() // cancel this context as well, since the user did Ctrl+C } onHardStop := func(sig os.Signal) { logger.WithField("sig", sig).Error("Aborting k6 in response to signal") @@ -181,7 +189,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { // Initialize the engine initBar.Modify(pb.WithConstProgress(0, "Init VUs...")) - engineRun, engineWait, err := engine.Init(globalCtx, runCtx) + engineRun, engineWait, err := engine.Init(globalCtx, runSubCtx) if err != nil { err = common.UnwrapGojaInterruptedError(err) // Add a generic engine exit code if we don't have a more specific one @@ -230,7 +238,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { engine.MetricsEngine.MetricsLock.Lock() // TODO: refactor so this is not needed summaryResult, hsErr := test.initRunner.HandleSummary(globalCtx, &lib.Summary{ Metrics: engine.MetricsEngine.ObservedMetrics, - RootGroup: execScheduler.GetRunner().GetDefaultGroup(), + RootGroup: testRunState.Runner.GetDefaultGroup(), TestRunDuration: executionState.GetCurrentTestRunDuration(), NoColor: c.gs.Flags.NoColor, UIState: lib.UIState{ @@ -273,8 +281,6 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) { if engine.IsTainted() { if err == nil { err = errors.New("some thresholds have failed") - } else { - logger.Error("some thresholds have failed") // log this, even if there was already a previous error } err = errext.WithAbortReasonIfNone( errext.WithExitCodeIfNone(err, exitcodes.ThresholdsHaveFailed), errext.AbortedByThresholdsAfterTestEnd, diff --git a/cmd/tests/cmd_run_test.go b/cmd/tests/cmd_run_test.go index 98c6cd58932..1f5f76ea2c0 100644 --- a/cmd/tests/cmd_run_test.go +++ b/cmd/tests/cmd_run_test.go @@ -610,7 +610,8 @@ func TestAbortedByThreshold(t *testing.T) { ) cmd.ExecuteWithGlobalState(ts.GlobalState) - assert.True(t, testutils.LogContains(ts.LoggerHook.Drain(), logrus.ErrorLevel, `test run aborted by failed thresholds`)) + expErr := "thresholds on metrics 'iterations' were breached; at least one has abortOnFail enabled, stopping test prematurely" + assert.True(t, testutils.LogContains(ts.LoggerHook.Drain(), logrus.ErrorLevel, expErr)) stdOut := ts.Stdout.String() t.Log(stdOut) assert.Contains(t, stdOut, `✗ iterations`) @@ -661,7 +662,7 @@ func TestAbortedByUserWithGoodThresholds(t *testing.T) { logs := ts.LoggerHook.Drain() assert.False(t, testutils.LogContains(logs, logrus.ErrorLevel, `some thresholds have failed`)) - assert.True(t, testutils.LogContains(logs, logrus.ErrorLevel, `test run aborted by signal`)) + assert.True(t, testutils.LogContains(logs, logrus.ErrorLevel, `test run was aborted because k6 received a 'interrupt' signal`)) stdout := ts.Stdout.String() t.Log(stdout) assert.Contains(t, stdout, `✓ iterations`) @@ -793,7 +794,7 @@ func TestAbortedByUserWithRestAPI(t *testing.T) { assert.Contains(t, stdout, `a simple iteration`) assert.Contains(t, stdout, `teardown() called`) assert.Contains(t, stdout, `PATCH /v1/status`) - assert.Contains(t, stdout, `run: stopped by user via REST API; exiting...`) + assert.Contains(t, stdout, `level=error msg="test run stopped from REST API`) assert.Contains(t, stdout, `level=debug msg="Metrics emission of VUs and VUsMax metrics stopped"`) assert.Contains(t, stdout, `level=debug msg="Metrics processing finished!"`) assert.Contains(t, stdout, `level=debug msg="Sending test finished" output=cloud ref=111 run_status=5 tainted=false`) diff --git a/core/engine.go b/core/engine.go index aa37d4bf4d1..7ba7ba78489 100644 --- a/core/engine.go +++ b/core/engine.go @@ -3,13 +3,12 @@ package core import ( "context" "errors" + "strings" "sync" "time" "github.com/sirupsen/logrus" - "go.k6.io/k6/errext" - "go.k6.io/k6/errext/exitcodes" "go.k6.io/k6/execution" "go.k6.io/k6/lib" "go.k6.io/k6/metrics" @@ -17,10 +16,7 @@ import ( "go.k6.io/k6/output" ) -const ( - collectRate = 50 * time.Millisecond - thresholdsRate = 2 * time.Second -) +const collectRate = 50 * time.Millisecond // The Engine is the beating heart of k6. type Engine struct { @@ -42,15 +38,10 @@ type Engine struct { ingester output.Output - logger *logrus.Entry - stopOnce sync.Once - stopChan chan struct{} + logger *logrus.Entry + AbortFn func(error) // temporary Samples chan metrics.SampleContainer - - // Are thresholds tainted? - thresholdsTaintedLock sync.Mutex - thresholdsTainted bool } // NewEngine instantiates a new Engine, without doing any heavy initialization. @@ -64,7 +55,6 @@ func NewEngine(testState *lib.TestRunState, ex *execution.Scheduler, outputs []o runtimeOptions: testState.RuntimeOptions, Samples: make(chan metrics.SampleContainer, testState.Options.MetricSamplesBufferSize.Int64), - stopChan: make(chan struct{}), logger: testState.Logger.WithField("component", "engine"), } @@ -75,7 +65,7 @@ func NewEngine(testState *lib.TestRunState, ex *execution.Scheduler, outputs []o e.MetricsEngine = me if !(testState.RuntimeOptions.NoSummary.Bool && testState.RuntimeOptions.NoThresholds.Bool) { - e.ingester = me.GetIngester() + e.ingester = me.CreateIngester() outputs = append(outputs, e.ingester) } @@ -83,7 +73,7 @@ func NewEngine(testState *lib.TestRunState, ex *execution.Scheduler, outputs []o if err != nil { testState.Logger.WithError(err).Error("Received error to stop from output") } - e.Stop() + e.AbortFn(err) }) return e, nil @@ -112,23 +102,19 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait } // TODO: move all of this in a separate struct? see main TODO above - runSubCtx, runSubCancel := context.WithCancel(runCtx) - - execRunResult := make(chan error) - engineRunResult := make(chan error) processMetricsAfterRun := make(chan struct{}) runFn := func() error { e.logger.Debug("Execution scheduler starting...") - err := e.ExecutionScheduler.Run(globalCtx, runSubCtx, e.Samples) - e.logger.WithError(err).Debug("Execution scheduler terminated") - - select { - case <-runSubCtx.Done(): - // do nothing, the test run was aborted somehow - default: - execRunResult <- err // we finished normally, so send the result + err := e.ExecutionScheduler.Run(globalCtx, runCtx, e.Samples) + if err == nil { + e.logger.Debug("Execution scheduler finished normally") + err = runCtx.Err() + } + if err != nil { + e.logger.WithError(err).Debug("Engine run returned an error") + } else { + e.logger.Debug("Execution scheduler and engine finished normally") } - result := <-engineRunResult // get the final result // Make the background jobs process the currently buffered metrics and // run the thresholds, then wait for that to be done. @@ -138,12 +124,10 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait case <-globalCtx.Done(): } - return result + return err } - waitFn := e.startBackgroundProcesses( - globalCtx, runCtx, execRunResult, engineRunResult, runSubCancel, processMetricsAfterRun, - ) + waitFn := e.startBackgroundProcesses(globalCtx, processMetricsAfterRun) return runFn, waitFn, nil } @@ -157,8 +141,7 @@ func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait // and that the remaining metrics samples in the pipeline should be processed as the background // process is about to exit. func (e *Engine) startBackgroundProcesses( - globalCtx, runCtx context.Context, execRunResult, engineRunResult chan error, - runSubCancel func(), processMetricsAfterRun chan struct{}, + globalCtx context.Context, processMetricsAfterRun chan struct{}, ) (wait func()) { processes := new(sync.WaitGroup) @@ -169,75 +152,6 @@ func (e *Engine) startBackgroundProcesses( e.processMetrics(globalCtx, processMetricsAfterRun) }() - // Update the test run status when the test finishes - processes.Add(1) - thresholdAbortChan := make(chan struct{}) - go func() { - defer processes.Done() - var err error - defer func() { - e.logger.WithError(err).Debug("Final Engine.Run() result") - engineRunResult <- err - }() - select { - case err = <-execRunResult: - if err != nil { - e.logger.WithError(err).Debug("run: execution scheduler returned an error") - } else { - e.logger.Debug("run: execution scheduler finished nominally") - } - // do nothing, return the same err value we got from the Run() - // ExecutionScheduler result, we just set the run_status based on it - case <-runCtx.Done(): - e.logger.Debug("run: context expired; exiting...") - err = errext.WithAbortReasonIfNone( - errext.WithExitCodeIfNone(errors.New("test run aborted by signal"), exitcodes.ExternalAbort), - errext.AbortedByUser, - ) - case <-e.stopChan: - runSubCancel() - e.logger.Debug("run: stopped by user via REST API; exiting...") - err = errext.WithAbortReasonIfNone( - errext.WithExitCodeIfNone(errors.New("test run stopped from the REST API"), exitcodes.ScriptStoppedFromRESTAPI), - errext.AbortedByUser, - ) - case <-thresholdAbortChan: - e.logger.Debug("run: stopped by thresholds; exiting...") - runSubCancel() - err = errext.WithAbortReasonIfNone( - errext.WithExitCodeIfNone(errors.New("test run aborted by failed thresholds"), exitcodes.ThresholdsHaveFailed), - errext.AbortedByThreshold, - ) - } - }() - - // Run thresholds, if not disabled. - if !e.runtimeOptions.NoThresholds.Bool { - processes.Add(1) - go func() { - defer processes.Done() - defer e.logger.Debug("Engine: Thresholds terminated") - ticker := time.NewTicker(thresholdsRate) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - thresholdsTainted, shouldAbort := e.MetricsEngine.EvaluateThresholds(true) - e.thresholdsTaintedLock.Lock() - e.thresholdsTainted = thresholdsTainted - e.thresholdsTaintedLock.Unlock() - if shouldAbort { - close(thresholdAbortChan) - return - } - case <-runCtx.Done(): - return - } - } - }() - } - return processes.Wait } @@ -251,26 +165,11 @@ func (e *Engine) startBackgroundProcesses( func (e *Engine) processMetrics(globalCtx context.Context, processMetricsAfterRun chan struct{}) { sampleContainers := []metrics.SampleContainer{} - defer func() { - // Process any remaining metrics in the pipeline, by this point Run() - // has already finished and nothing else should be producing metrics. - e.logger.Debug("Metrics processing winding down...") - - close(e.Samples) - for sc := range e.Samples { - sampleContainers = append(sampleContainers, sc) - } - e.OutputManager.AddMetricSamples(sampleContainers) - - if !e.runtimeOptions.NoThresholds.Bool { - // Process the thresholds one final time - thresholdsTainted, _ := e.MetricsEngine.EvaluateThresholds(false) - e.thresholdsTaintedLock.Lock() - e.thresholdsTainted = thresholdsTainted - e.thresholdsTaintedLock.Unlock() - } - e.logger.Debug("Metrics processing finished!") - }() + // Run thresholds, if not disabled. + var finalizeThresholds func() (breached []string) + if !e.runtimeOptions.NoThresholds.Bool { + finalizeThresholds = e.MetricsEngine.StartThresholdCalculations(e.AbortFn) + } ticker := time.NewTicker(collectRate) defer ticker.Stop() @@ -285,59 +184,45 @@ func (e *Engine) processMetrics(globalCtx context.Context, processMetricsAfterRu sampleContainers = make([]metrics.SampleContainer, 0, cap(sampleContainers)) } } + + finalize := func() { + // Process any remaining metrics in the pipeline, by this point Run() + // has already finished and nothing else should be producing metrics. + e.logger.Debug("Metrics processing winding down...") + + close(e.Samples) + for sc := range e.Samples { + sampleContainers = append(sampleContainers, sc) + } + processSamples() + + if finalizeThresholds != nil { + // Ensure the ingester flushes any buffered metrics + _ = e.ingester.Stop() + breached := finalizeThresholds() + e.logger.Debugf("Engine: thresholds done, breached: '%s'", strings.Join(breached, ", ")) + } + e.logger.Debug("Metrics processing finished!") + } + for { select { case <-ticker.C: processSamples() case <-processMetricsAfterRun: - getCachedMetrics: - for { - select { - case sc := <-e.Samples: - sampleContainers = append(sampleContainers, sc) - default: - break getCachedMetrics - } - } e.logger.Debug("Processing metrics and thresholds after the test run has ended...") - processSamples() - if !e.runtimeOptions.NoThresholds.Bool { - // Ensure the ingester flushes any buffered metrics - _ = e.ingester.Stop() - thresholdsTainted, _ := e.MetricsEngine.EvaluateThresholds(false) - e.thresholdsTaintedLock.Lock() - e.thresholdsTainted = thresholdsTainted - e.thresholdsTaintedLock.Unlock() - } + finalize() processMetricsAfterRun <- struct{}{} - + return case sc := <-e.Samples: sampleContainers = append(sampleContainers, sc) case <-globalCtx.Done(): + finalize() return } } } func (e *Engine) IsTainted() bool { - e.thresholdsTaintedLock.Lock() - defer e.thresholdsTaintedLock.Unlock() - return e.thresholdsTainted -} - -// Stop closes a signal channel, forcing a running Engine to return -func (e *Engine) Stop() { - e.stopOnce.Do(func() { - close(e.stopChan) - }) -} - -// IsStopped returns a bool indicating whether the Engine has been stopped -func (e *Engine) IsStopped() bool { - select { - case <-e.stopChan: - return true - default: - return false - } + return e.MetricsEngine.GetMetricsWithBreachedThresholdsCount() > 0 } diff --git a/core/engine_test.go b/core/engine_test.go index 30e3a8984e1..2152a8c1aec 100644 --- a/core/engine_test.go +++ b/core/engine_test.go @@ -34,17 +34,19 @@ const isWindows = runtime.GOOS == "windows" // TODO: completely rewrite all of these tests type testStruct struct { - engine *Engine - run func() error - runCancel func() - wait func() - piState *lib.TestPreInitState + engine *Engine + run func() error + runAbort func(error) + wait func() + piState *lib.TestPreInitState } func getTestPreInitState(tb testing.TB) *lib.TestPreInitState { reg := metrics.NewRegistry() + logger := testutils.NewLogger(tb) + logger.SetLevel(logrus.DebugLevel) return &lib.TestPreInitState{ - Logger: testutils.NewLogger(tb), + Logger: logger, RuntimeOptions: lib.RuntimeOptions{}, Registry: reg, BuiltinMetrics: metrics.RegisterBuiltinMetrics(reg), @@ -97,16 +99,19 @@ func newTestEngineWithTestPreInitState( //nolint:golint } else { runCtx, runCancel = context.WithCancel(globalCtx) } - run, waitFn, err := engine.Init(globalCtx, runCtx) + runSubCtx, runSubAbort := execution.NewTestRunContext(runCtx, piState.Logger) + engine.AbortFn = runSubAbort + + run, waitFn, err := engine.Init(globalCtx, runSubCtx) require.NoError(t, err) var test *testStruct test = &testStruct{ - engine: engine, - run: run, - runCancel: runCancel, + engine: engine, + run: run, + runAbort: runSubAbort, wait: func() { - test.runCancel() + runCancel() globalCancel() waitFn() engine.OutputManager.StopOutputs(nil) @@ -141,7 +146,7 @@ func TestEngineRun(t *testing.T) { defer test.wait() startTime := time.Now() - assert.ErrorContains(t, test.run(), "test run aborted by signal") + assert.ErrorContains(t, test.run(), "context deadline exceeded") assert.WithinDuration(t, startTime.Add(duration), time.Now(), 100*time.Millisecond) <-done }) @@ -191,8 +196,8 @@ func TestEngineRun(t *testing.T) { errC := make(chan error) go func() { errC <- test.run() }() <-signalChan - test.runCancel() - assert.ErrorContains(t, <-errC, "test run aborted by signal") + test.runAbort(fmt.Errorf("custom error")) + assert.ErrorContains(t, <-errC, "custom error") test.wait() found := 0 @@ -218,21 +223,6 @@ func TestEngineAtTime(t *testing.T) { assert.NoError(t, test.run()) } -func TestEngineStopped(t *testing.T) { - t.Parallel() - test := newTestEngine(t, nil, nil, nil, lib.Options{ - VUs: null.IntFrom(1), - Duration: types.NullDurationFrom(20 * time.Second), - }) - defer test.wait() - - assert.NoError(t, test.run()) - assert.Equal(t, false, test.engine.IsStopped(), "engine should be running") - test.engine.Stop() - assert.Equal(t, true, test.engine.IsStopped(), "engine should be stopped") - test.engine.Stop() // test that a second stop doesn't panic -} - func TestEngineOutput(t *testing.T) { t.Parallel() @@ -413,7 +403,7 @@ func TestEngineThresholdsWillAbort(t *testing.T) { assert.Fail(t, "Test should have completed within 10 seconds") } test.wait() - assert.True(t, test.engine.thresholdsTainted) + assert.True(t, test.engine.IsTainted()) } func TestEngineAbortedByThresholds(t *testing.T) { @@ -434,7 +424,8 @@ func TestEngineAbortedByThresholds(t *testing.T) { thresholds := map[string]metrics.Thresholds{metric.Name: ths} - done := make(chan struct{}) + doneIter := make(chan struct{}) + doneRun := make(chan struct{}) runner := &minirunner.MiniRunner{ Fn: func(ctx context.Context, _ *lib.State, out chan<- metrics.SampleContainer) error { out <- metrics.Sample{ @@ -446,7 +437,7 @@ func TestEngineAbortedByThresholds(t *testing.T) { Value: 1.25, } <-ctx.Done() - close(done) + close(doneIter) return nil }, } @@ -455,12 +446,18 @@ func TestEngineAbortedByThresholds(t *testing.T) { defer test.wait() go func() { - require.ErrorContains(t, test.run(), "aborted by failed thresholds") + defer close(doneRun) + t.Logf("test run done with err '%s'", err) + assert.ErrorContains(t, test.run(), "thresholds on metrics 'my_metric' were breached") }() select { - case <-done: - return + case <-doneIter: + case <-time.After(10 * time.Second): + assert.Fail(t, "Iteration should have completed within 10 seconds") + } + select { + case <-doneRun: case <-time.After(10 * time.Second): assert.Fail(t, "Test should have completed within 10 seconds") } @@ -1208,7 +1205,7 @@ func TestEngineRunsTeardownEvenAfterTestRunIsAborted(t *testing.T) { var test *testStruct runner := &minirunner.MiniRunner{ Fn: func(_ context.Context, _ *lib.State, _ chan<- metrics.SampleContainer) error { - test.runCancel() // we cancel the run immediately after the test starts + test.runAbort(fmt.Errorf("custom error")) // we cancel the run immediately after the test starts return nil }, TeardownFn: func(_ context.Context, out chan<- metrics.SampleContainer) error { @@ -1229,7 +1226,7 @@ func TestEngineRunsTeardownEvenAfterTestRunIsAborted(t *testing.T) { VUs: null.IntFrom(1), Iterations: null.IntFrom(1), }, piState) - assert.ErrorContains(t, test.run(), "test run aborted by signal") + assert.ErrorContains(t, test.run(), "custom error") test.wait() var count float64 diff --git a/execution/scheduler.go b/execution/scheduler.go index 0f5d42a729f..2ebb442bc50 100644 --- a/execution/scheduler.go +++ b/execution/scheduler.go @@ -90,11 +90,6 @@ func NewScheduler(trs *lib.TestRunState) (*Scheduler, error) { }, nil } -// GetRunner returns the wrapped lib.Runner instance. -func (e *Scheduler) GetRunner() lib.Runner { // TODO: remove - return e.state.Test.Runner -} - // GetState returns a pointer to the execution state struct for the execution // scheduler. It's guaranteed to be initialized and present, though see the // documentation in lib/execution.go for caveats about its usage. The most @@ -446,13 +441,8 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metr logger.Debug("Start all executors...") e.state.SetExecutionStatus(lib.ExecutionStatusRunning) - // We are using this context to allow lib.Executor implementations to cancel - // this context effectively stopping all executions. - // - // This is for addressing test.abort(). - execCtx, _ := NewTestRunContext(runSubCtx, logger) for _, exec := range e.executors { - go e.runExecutor(execCtx, runResults, engineOut, exec) + go e.runExecutor(runSubCtx, runResults, engineOut, exec) } // Wait for all executors to finish @@ -478,7 +468,7 @@ func (e *Scheduler) Run(globalCtx, runCtx context.Context, engineOut chan<- metr return err } } - if err := GetCancelReasonIfTestAborted(execCtx); err != nil && errext.IsInterruptError(err) { + if err := GetCancelReasonIfTestAborted(runSubCtx); err != nil { interrupted = true return err } diff --git a/lib/options_test.go b/lib/options_test.go index 10fb9017d3c..d8d1f354b53 100644 --- a/lib/options_test.go +++ b/lib/options_test.go @@ -114,6 +114,7 @@ func TestOptions(t *testing.T) { t.Run("TLSCipherSuites", func(t *testing.T) { t.Parallel() for suiteName, suiteID := range SupportedTLSCipherSuites { + suiteName, suiteID := suiteName, suiteID t.Run(suiteName, func(t *testing.T) { t.Parallel() opts := Options{}.Apply(Options{TLSCipherSuites: &TLSCipherSuites{suiteID}}) diff --git a/metrics/engine/engine.go b/metrics/engine/engine.go index 6648c41deba..3dc8836ebd4 100644 --- a/metrics/engine/engine.go +++ b/metrics/engine/engine.go @@ -6,14 +6,20 @@ import ( "fmt" "strings" "sync" + "sync/atomic" + "time" "github.com/sirupsen/logrus" + "go.k6.io/k6/errext" + "go.k6.io/k6/errext/exitcodes" "go.k6.io/k6/lib" "go.k6.io/k6/metrics" "go.k6.io/k6/output" "gopkg.in/guregu/null.v3" ) +const thresholdsRate = 2 * time.Second + // MetricsEngine is the internal metrics engine that k6 uses to keep track of // aggregated metric sample values. They are used to generate the end-of-test // summary and to evaluate the test thresholds. @@ -21,11 +27,15 @@ type MetricsEngine struct { es *lib.ExecutionState logger logrus.FieldLogger + outputIngester *outputIngester + // These can be both top-level metrics or sub-metrics metricsWithThresholds []*metrics.Metric + breachedThresholdsCount uint32 + // TODO: completely refactor: - // - make these private, + // - make these private, add a method to export the raw data // - do not use an unnecessary map for the observed metrics // - have one lock per metric instead of a a global one, when // the metrics are decoupled from their types @@ -52,13 +62,14 @@ func NewMetricsEngine(es *lib.ExecutionState) (*MetricsEngine, error) { return me, nil } -// GetIngester returns a pseudo-Output that uses the given metric samples to +// CreateIngester returns a pseudo-Output that uses the given metric samples to // update the engine's inner state. -func (me *MetricsEngine) GetIngester() output.Output { - return &outputIngester{ +func (me *MetricsEngine) CreateIngester() output.Output { + me.outputIngester = &outputIngester{ logger: me.logger.WithField("component", "metrics-engine-ingester"), metricsEngine: me, } + return me.outputIngester } func (me *MetricsEngine) getThresholdMetricOrSubmetric(name string) (*metrics.Metric, error) { @@ -152,15 +163,66 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error { return nil } -// EvaluateThresholds processes all of the thresholds. +// StartThresholdCalculations spins up a new goroutine to crunch thresholds and +// returns a callback that will stop the goroutine and finalizes calculations. +func (me *MetricsEngine) StartThresholdCalculations(abortRun func(error)) ( + finalize func() (breached []string), +) { + stop := make(chan struct{}) + done := make(chan struct{}) + + go func() { + defer close(done) + ticker := time.NewTicker(thresholdsRate) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + breached, shouldAbort := me.evaluateThresholds(true) + if shouldAbort { + err := fmt.Errorf( + "thresholds on metrics '%s' were breached; at least one has abortOnFail enabled, stopping test prematurely", + strings.Join(breached, ", "), + ) + me.logger.Debug(err.Error()) + err = errext.WithAbortReasonIfNone( + errext.WithExitCodeIfNone(err, exitcodes.ThresholdsHaveFailed), errext.AbortedByThreshold, + ) + abortRun(err) + } + case <-stop: + return + } + } + }() + + return func() []string { + if me.outputIngester != nil { + // Stop the ingester so we don't get any more metrics + err := me.outputIngester.Stop() + if err != nil { + me.logger.WithError(err).Warnf("There was a problem stopping the output ingester.") + } + } + close(stop) + <-done + + breached, _ := me.evaluateThresholds(false) + return breached + } +} + +// evaluateThresholds processes all of the thresholds. // -// TODO: refactor, make private, optimize -func (me *MetricsEngine) EvaluateThresholds(ignoreEmptySinks bool) (thresholdsTainted, shouldAbort bool) { +// TODO: refactor, optimize +func (me *MetricsEngine) evaluateThresholds(ignoreEmptySinks bool) (breachedThresholds []string, shouldAbort bool) { me.MetricsLock.Lock() defer me.MetricsLock.Unlock() t := me.es.GetCurrentTestRunDuration() + me.logger.Debugf("Running thresholds on %d metrics...", len(me.metricsWithThresholds)) for _, m := range me.metricsWithThresholds { // If either the metric has no thresholds defined, or its sinks // are empty, let's ignore its thresholds execution at this point. @@ -169,7 +231,6 @@ func (me *MetricsEngine) EvaluateThresholds(ignoreEmptySinks bool) (thresholdsTa } m.Tainted = null.BoolFrom(false) - me.logger.WithField("metric_name", m.Name).Debug("running thresholds") succ, err := m.Thresholds.Run(m.Sink, t) if err != nil { me.logger.WithField("metric_name", m.Name).WithError(err).Error("Threshold error") @@ -178,13 +239,21 @@ func (me *MetricsEngine) EvaluateThresholds(ignoreEmptySinks bool) (thresholdsTa if succ { continue // threshold passed } - me.logger.WithField("metric_name", m.Name).Debug("Thresholds failed") + breachedThresholds = append(breachedThresholds, m.Name) m.Tainted = null.BoolFrom(true) - thresholdsTainted = true if m.Thresholds.Abort { shouldAbort = true } } - return thresholdsTainted, shouldAbort + me.logger.Debugf("Thresholds on %d metrics breached: %v", len(breachedThresholds), breachedThresholds) + atomic.StoreUint32(&me.breachedThresholdsCount, uint32(len(breachedThresholds))) + return breachedThresholds, shouldAbort +} + +// GetMetricsWithBreachedThresholdsCount returns the number of metrics for which +// the thresholds were breached (failed) during the last processing phase. This +// API is safe to use concurrently. +func (me *MetricsEngine) GetMetricsWithBreachedThresholdsCount() uint32 { + return atomic.LoadUint32(&me.breachedThresholdsCount) } diff --git a/output/types.go b/output/types.go index 646bacc8589..5aa1b203eb4 100644 --- a/output/types.go +++ b/output/types.go @@ -76,7 +76,7 @@ type WithTestRunStop interface { } // WithStopWithTestError allows output to receive the error value that the test -// finished with. It could be nil, if the test finished nominally. +// finished with. It could be nil, if the test finished normally. // // If this interface is implemented by the output, StopWithError() will be // called instead of Stop().