Skip to content

Commit

Permalink
Merge pull request #2813 from grafana/prune-the-engine
Browse files Browse the repository at this point in the history
Move most capabilities out of the `core.Engine` in preparation for removal
  • Loading branch information
na-- authored Jan 27, 2023
2 parents 359610c + d0eb9d3 commit 1268fad
Show file tree
Hide file tree
Showing 25 changed files with 372 additions and 456 deletions.
25 changes: 0 additions & 25 deletions api/common/context.go

This file was deleted.

43 changes: 26 additions & 17 deletions api/server.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,44 @@
package api

import (
"context"
"fmt"
"net/http"
"time"

"github.com/sirupsen/logrus"

"go.k6.io/k6/api/common"
v1 "go.k6.io/k6/api/v1"
"go.k6.io/k6/core"
"go.k6.io/k6/execution"
"go.k6.io/k6/lib"
"go.k6.io/k6/metrics"
"go.k6.io/k6/metrics/engine"
)

func newHandler(logger logrus.FieldLogger) http.Handler {
func newHandler(cs *v1.ControlSurface) http.Handler {
mux := http.NewServeMux()
mux.Handle("/v1/", v1.NewHandler())
mux.Handle("/ping", handlePing(logger))
mux.Handle("/", handlePing(logger))
mux.Handle("/v1/", v1.NewHandler(cs))
mux.Handle("/ping", handlePing(cs.RunState.Logger))
mux.Handle("/", handlePing(cs.RunState.Logger))
return mux
}

// GetServer returns a http.Server instance that can serve k6's REST API.
func GetServer(addr string, engine *core.Engine, logger logrus.FieldLogger) *http.Server {
mux := withEngine(engine, newLogger(logger, newHandler(logger)))
func GetServer(
runCtx context.Context, addr string, runState *lib.TestRunState,
samples chan metrics.SampleContainer, me *engine.MetricsEngine, es *execution.Scheduler,
) *http.Server {
// TODO: reduce the control surface as much as possible? For example, if
// we refactor the Runner API, we won't need to send the Samples channel.
cs := &v1.ControlSurface{
RunCtx: runCtx,
Samples: samples,
MetricsEngine: me,
Scheduler: es,
RunState: runState,
}

mux := withLoggingHandler(runState.Logger, newHandler(cs))
return &http.Server{Addr: addr, Handler: mux, ReadHeaderTimeout: 10 * time.Second}
}

Expand All @@ -36,8 +52,8 @@ func (w wrappedResponseWriter) WriteHeader(status int) {
w.ResponseWriter.WriteHeader(status)
}

// newLogger returns the middleware which logs response status for request.
func newLogger(l logrus.FieldLogger, next http.Handler) http.HandlerFunc {
// withLoggingHandler returns the middleware which logs response status for request.
func withLoggingHandler(l logrus.FieldLogger, next http.Handler) http.HandlerFunc {
return func(rw http.ResponseWriter, r *http.Request) {
wrapped := wrappedResponseWriter{ResponseWriter: rw, status: 200} // The default status code is 200 if it's not set
next.ServeHTTP(wrapped, r)
Expand All @@ -46,13 +62,6 @@ func newLogger(l logrus.FieldLogger, next http.Handler) http.HandlerFunc {
}
}

func withEngine(engine *core.Engine, next http.Handler) http.HandlerFunc {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
r = r.WithContext(common.WithEngine(r.Context(), engine))
next.ServeHTTP(rw, r)
})
}

func handlePing(logger logrus.FieldLogger) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
rw.Header().Add("Content-Type", "text/plain; charset=utf-8")
Expand Down
37 changes: 2 additions & 35 deletions api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,8 @@ import (
"github.com/sirupsen/logrus"
logtest "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.k6.io/k6/api/common"
"go.k6.io/k6/core"
"go.k6.io/k6/execution"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/testutils/minirunner"
"go.k6.io/k6/metrics"
)

func testHTTPHandler(rw http.ResponseWriter, r *http.Request) {
Expand All @@ -37,7 +30,7 @@ func TestLogger(t *testing.T) {

l, hook := logtest.NewNullLogger()
l.Level = logrus.DebugLevel
newLogger(l, http.HandlerFunc(testHTTPHandler))(rw, r)
withLoggingHandler(l, http.HandlerFunc(testHTTPHandler))(rw, r)

res := rw.Result()
assert.Equal(t, http.StatusOK, res.StatusCode)
Expand All @@ -57,36 +50,10 @@ func TestLogger(t *testing.T) {
}
}

func TestWithEngine(t *testing.T) {
logger := logrus.New()
logger.SetOutput(testutils.NewTestOutput(t))
registry := metrics.NewRegistry()
testState := &lib.TestRunState{
TestPreInitState: &lib.TestPreInitState{
Logger: logger,
Registry: registry,
BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry),
},
Options: lib.Options{},
Runner: &minirunner.MiniRunner{},
}

execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)
engine, err := core.NewEngine(testState, execScheduler, nil)
require.NoError(t, err)

rw := httptest.NewRecorder()
r := httptest.NewRequest("GET", "http://example.com/", nil)
withEngine(engine, http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
assert.Equal(t, engine, common.GetEngine(r.Context()))
}))(rw, r)
}

func TestPing(t *testing.T) {
logger := logrus.New()
logger.SetOutput(testutils.NewTestOutput(t))
mux := newHandler(logger)
mux := handlePing(logger)

rw := httptest.NewRecorder()
r := httptest.NewRequest("GET", "/ping", nil)
Expand Down
20 changes: 20 additions & 0 deletions api/v1/control_surface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package v1

import (
"context"

"go.k6.io/k6/execution"
"go.k6.io/k6/lib"
"go.k6.io/k6/metrics"
"go.k6.io/k6/metrics/engine"
)

// ControlSurface includes the methods the REST API can use to control and
// communicate with the rest of k6.
type ControlSurface struct {
RunCtx context.Context
Samples chan metrics.SampleContainer
MetricsEngine *engine.MetricsEngine
Scheduler *execution.Scheduler
RunState *lib.TestRunState
}
14 changes: 4 additions & 10 deletions api/v1/group_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,10 @@ package v1
import (
"encoding/json"
"net/http"

"go.k6.io/k6/api/common"
)

func handleGetGroups(rw http.ResponseWriter, r *http.Request) {
engine := common.GetEngine(r.Context())

root := NewGroup(engine.ExecutionScheduler.GetRunner().GetDefaultGroup(), nil)
func handleGetGroups(cs *ControlSurface, rw http.ResponseWriter, _ *http.Request) {
root := NewGroup(cs.RunState.Runner.GetDefaultGroup(), nil)
groups := FlattenGroup(root)

data, err := json.Marshal(newGroupsJSONAPI(groups))
Expand All @@ -21,10 +17,8 @@ func handleGetGroups(rw http.ResponseWriter, r *http.Request) {
_, _ = rw.Write(data)
}

func handleGetGroup(rw http.ResponseWriter, r *http.Request, id string) {
engine := common.GetEngine(r.Context())

root := NewGroup(engine.ExecutionScheduler.GetRunner().GetDefaultGroup(), nil)
func handleGetGroup(cs *ControlSurface, rw http.ResponseWriter, _ *http.Request, id string) {
root := NewGroup(cs.RunState.Runner.GetDefaultGroup(), nil)
groups := FlattenGroup(root)

var group *Group
Expand Down
33 changes: 25 additions & 8 deletions api/v1/group_routes_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package v1

import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
Expand All @@ -10,12 +11,12 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.k6.io/k6/core"
"go.k6.io/k6/execution"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/testutils/minirunner"
"go.k6.io/k6/metrics"
"go.k6.io/k6/metrics/engine"
)

func getTestPreInitState(tb testing.TB) *lib.TestPreInitState {
Expand All @@ -41,6 +42,26 @@ func getTestRunState(tb testing.TB, options lib.Options, runner lib.Runner) *lib
}
}

func getControlSurface(tb testing.TB, testState *lib.TestRunState) *ControlSurface {
execScheduler, err := execution.NewScheduler(testState)
require.NoError(tb, err)

me, err := engine.NewMetricsEngine(execScheduler.GetState())
require.NoError(tb, err)

ctx, cancel := context.WithCancel(context.Background())
tb.Cleanup(cancel)
ctx, _ = execution.NewTestRunContext(ctx, testState.Logger)

return &ControlSurface{
RunCtx: ctx,
Samples: make(chan metrics.SampleContainer, 1000),
MetricsEngine: me,
Scheduler: execScheduler,
RunState: testState,
}
}

func TestGetGroups(t *testing.T) {
g0, err := lib.NewGroup("", nil)
assert.NoError(t, err)
Expand All @@ -49,15 +70,11 @@ func TestGetGroups(t *testing.T) {
g2, err := g1.Group("group 2")
assert.NoError(t, err)

testState := getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{Group: g0})
execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)
engine, err := core.NewEngine(testState, execScheduler, nil)
require.NoError(t, err)
cs := getControlSurface(t, getTestRunState(t, lib.Options{}, &minirunner.MiniRunner{Group: g0}))

t.Run("list", func(t *testing.T) {
rw := httptest.NewRecorder()
NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/groups", nil))
NewHandler(cs).ServeHTTP(rw, httptest.NewRequest(http.MethodGet, "/v1/groups", nil))
res := rw.Result()
body := rw.Body.Bytes()
assert.Equal(t, http.StatusOK, res.StatusCode)
Expand Down Expand Up @@ -105,7 +122,7 @@ func TestGetGroups(t *testing.T) {
for _, gp := range []*lib.Group{g0, g1, g2} {
t.Run(gp.Name, func(t *testing.T) {
rw := httptest.NewRecorder()
NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/groups/"+gp.ID, nil))
NewHandler(cs).ServeHTTP(rw, httptest.NewRequest(http.MethodGet, "/v1/groups/"+gp.ID, nil))
res := rw.Result()
assert.Equal(t, http.StatusOK, res.StatusCode)
})
Expand Down
32 changes: 13 additions & 19 deletions api/v1/metric_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,17 @@ import (
"encoding/json"
"net/http"
"time"

"go.k6.io/k6/api/common"
)

func handleGetMetrics(rw http.ResponseWriter, r *http.Request) {
engine := common.GetEngine(r.Context())

func handleGetMetrics(cs *ControlSurface, rw http.ResponseWriter, _ *http.Request) {
var t time.Duration
if engine.ExecutionScheduler != nil {
t = engine.ExecutionScheduler.GetState().GetCurrentTestRunDuration()
if cs.Scheduler != nil {
t = cs.Scheduler.GetState().GetCurrentTestRunDuration()
}

engine.MetricsEngine.MetricsLock.Lock()
metrics := newMetricsJSONAPI(engine.MetricsEngine.ObservedMetrics, t)
engine.MetricsEngine.MetricsLock.Unlock()
cs.MetricsEngine.MetricsLock.Lock()
metrics := newMetricsJSONAPI(cs.MetricsEngine.ObservedMetrics, t)
cs.MetricsEngine.MetricsLock.Unlock()

data, err := json.Marshal(metrics)
if err != nil {
Expand All @@ -28,23 +24,21 @@ func handleGetMetrics(rw http.ResponseWriter, r *http.Request) {
_, _ = rw.Write(data)
}

func handleGetMetric(rw http.ResponseWriter, r *http.Request, id string) {
engine := common.GetEngine(r.Context())

func handleGetMetric(cs *ControlSurface, rw http.ResponseWriter, _ *http.Request, id string) {
var t time.Duration
if engine.ExecutionScheduler != nil {
t = engine.ExecutionScheduler.GetState().GetCurrentTestRunDuration()
if cs.Scheduler != nil {
t = cs.Scheduler.GetState().GetCurrentTestRunDuration()
}

engine.MetricsEngine.MetricsLock.Lock()
metric, ok := engine.MetricsEngine.ObservedMetrics[id]
cs.MetricsEngine.MetricsLock.Lock()
metric, ok := cs.MetricsEngine.ObservedMetrics[id]
if !ok {
engine.MetricsEngine.MetricsLock.Unlock()
cs.MetricsEngine.MetricsLock.Unlock()
apiError(rw, "Not Found", "No metric with that ID was found", http.StatusNotFound)
return
}
wrappedMetric := newMetricEnvelope(metric, t)
engine.MetricsEngine.MetricsLock.Unlock()
cs.MetricsEngine.MetricsLock.Unlock()

data, err := json.Marshal(wrappedMetric)
if err != nil {
Expand Down
Loading

0 comments on commit 1268fad

Please sign in to comment.