Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] PoC for native distributed execution #2438

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0a21f8f
Simplify requirements calculation in k6 inspect
na-- Mar 5, 2022
a30ce02
Refactor cmd/ to get rid of most direct os package access
na-- Mar 5, 2022
2fef1d7
Test that --logformat is deprecated but still works
na-- Mar 6, 2022
3f035f1
Start with a colorful log if stderr is TTY, but also respect NO_COLOR
na-- Mar 6, 2022
b73890e
Do not output k6 banner and test description when --quiet is enabled
na-- Mar 6, 2022
63c678d
Do not output cloud login token to stdout when --quiet is enabled
na-- Mar 6, 2022
a45511e
Refactor test loading by moving it to a new helper function
na-- Mar 6, 2022
de3ae30
Refactor abort signal handling into a separate helper
na-- Mar 6, 2022
516fc9b
Move K6_TYPE / --type / -t out of the globalFlags to RuntimeOptions
na-- Mar 6, 2022
07cc65a
Simplify createOutputs()
na-- Mar 6, 2022
20f689c
Refactor cmd.rootCommand to support easy integration tests
na-- Mar 7, 2022
dd5736a
Fix --log-output=file support for relative paths and add a test
na-- Mar 7, 2022
9923347
Rename and document better the cmd/inspect.go internals
na-- Mar 7, 2022
e4241ca
Remove js.Bundle.IsExecutable() because it's not needed anymore
na-- Mar 7, 2022
207ef83
Move some output init calls away from the Engine
na-- Mar 8, 2022
f82b364
Flatten some long functions in cmd/ and enable splitting them apart
na-- Mar 8, 2022
f83e784
Move Output management out of the Engine
na-- Mar 8, 2022
98911d0
Move the metrics package out of lib/
na-- Mar 8, 2022
adc4d9f
Refactor the Engine to actually use the metrics registry
na-- Mar 8, 2022
e5d8c32
Fix the bug of thresholds not working for unused metrics
na-- Mar 8, 2022
a3138b1
Fix submetric matching bug when nonexistent keys are specified
na-- Mar 8, 2022
559f4c5
Pass BuiltinMetrics via ExecState, emit vus and vus_max by ExecScheduler
na-- Mar 9, 2022
954e7b5
Add an integration test for custom metrics and thresholds
na-- Mar 9, 2022
2476cfc
Move the Engine data crunching logic in a new component under metrics/
na-- Mar 9, 2022
8850e35
Completely remove the Engine :tada:
na-- Mar 10, 2022
a17f256
Move core/local/ExecutionScheduler to execution/Scheduler
na-- Mar 10, 2022
38f9b99
Use openhistogram/circonusllhist for TrendSinks
na-- Mar 11, 2022
60237cc
Add support for native distributed execution and metrics
na-- Mar 10, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 12 additions & 17 deletions api/common/context.go → api/common/control_surface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,18 @@ package common
import (
"context"

"go.k6.io/k6/core"
"github.com/sirupsen/logrus"
"go.k6.io/k6/execution"
"go.k6.io/k6/metrics/engine"
"go.k6.io/k6/stats"
)

type ContextKey int

const ctxKeyEngine = ContextKey(1)

// WithEngine sets the k6 running Engine in the under the hood context.
//
// Deprecated: Use directly the Engine as dependency.
func WithEngine(ctx context.Context, engine *core.Engine) context.Context {
return context.WithValue(ctx, ctxKeyEngine, engine)
}

// GetEngine returns the k6 running Engine fetching it from the context.
//
// Deprecated: Use directly the Engine as dependency.
func GetEngine(ctx context.Context) *core.Engine {
return ctx.Value(ctxKeyEngine).(*core.Engine)
// ControlSurface includes the methods the REST API can use to control and
// communicate with the rest of k6.
type ControlSurface struct {
RunCtx context.Context
Samples chan stats.SampleContainer
MetricsEngine *engine.MetricsEngine
ExecutionScheduler *execution.Scheduler
Logger logrus.FieldLogger
}
39 changes: 23 additions & 16 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,43 @@
package api

import (
"context"
"fmt"
"net/http"

"github.com/sirupsen/logrus"

"go.k6.io/k6/api/common"
v1 "go.k6.io/k6/api/v1"
"go.k6.io/k6/core"
"go.k6.io/k6/execution"
"go.k6.io/k6/metrics/engine"
"go.k6.io/k6/stats"
)

func newHandler(logger logrus.FieldLogger) http.Handler {
func newHandler(cs *common.ControlSurface) http.Handler {
mux := http.NewServeMux()
mux.Handle("/v1/", v1.NewHandler())
mux.Handle("/ping", handlePing(logger))
mux.Handle("/", handlePing(logger))
mux.Handle("/v1/", v1.NewHandler(cs))
mux.Handle("/ping", handlePing(cs.Logger))
mux.Handle("/", handlePing(cs.Logger))
return mux
}

// ListenAndServe is analogous to the stdlib one but also takes a core.Engine and logrus.FieldLogger
func ListenAndServe(addr string, engine *core.Engine, logger logrus.FieldLogger) error {
mux := newHandler(logger)
// NewAPIServer returns a new *unstarted* HTTP REST API server.
func NewAPIServer(
runCtx context.Context, addr string, samples chan stats.SampleContainer,
me *engine.MetricsEngine, es *execution.Scheduler, logger logrus.FieldLogger,
) *http.Server {
// TODO: reduce the control surface as much as possible... For example, if
// we refactor the Runner API, we won't need to send the Samples channel.
cs := &common.ControlSurface{
RunCtx: runCtx,
Samples: samples,
MetricsEngine: me,
ExecutionScheduler: es,
Logger: logger,
}

return http.ListenAndServe(addr, withEngine(engine, newLogger(logger, mux)))
return &http.Server{Addr: addr, Handler: newHandler(cs)}
}

type wrappedResponseWriter struct {
Expand All @@ -66,13 +80,6 @@ func newLogger(l logrus.FieldLogger, next http.Handler) http.HandlerFunc {
}
}

func withEngine(engine *core.Engine, next http.Handler) http.HandlerFunc {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
r = r.WithContext(common.WithEngine(r.Context(), engine))
next.ServeHTTP(rw, r)
})
}

func handlePing(logger logrus.FieldLogger) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
rw.Header().Add("Content-Type", "text/plain; charset=utf-8")
Expand Down
12 changes: 8 additions & 4 deletions api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

package api

/*
TODO: fix tests

import (
"fmt"
"net/http"
Expand All @@ -35,9 +38,9 @@ import (
"go.k6.io/k6/core"
"go.k6.io/k6/core/local"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/testutils/minirunner"
"go.k6.io/k6/metrics"
)

func testHTTPHandler(rw http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -80,11 +83,11 @@ func TestLogger(t *testing.T) {
func TestWithEngine(t *testing.T) {
logger := logrus.New()
logger.SetOutput(testutils.NewTestOutput(t))
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, logger)
require.NoError(t, err)
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, builtinMetrics)
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, builtinMetrics, logger)
require.NoError(t, err)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, registry)
require.NoError(t, err)

rw := httptest.NewRecorder()
Expand All @@ -107,3 +110,4 @@ func TestPing(t *testing.T) {
assert.Equal(t, http.StatusOK, res.StatusCode)
assert.Equal(t, []byte{'o', 'k'}, rw.Body.Bytes())
}
*/
12 changes: 4 additions & 8 deletions api/v1/group_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ import (
"go.k6.io/k6/api/common"
)

func handleGetGroups(rw http.ResponseWriter, r *http.Request) {
engine := common.GetEngine(r.Context())

root := NewGroup(engine.ExecutionScheduler.GetRunner().GetDefaultGroup(), nil)
func handleGetGroups(cs *common.ControlSurface, rw http.ResponseWriter, r *http.Request) {
root := NewGroup(cs.ExecutionScheduler.GetRunner().GetDefaultGroup(), nil)
groups := FlattenGroup(root)

data, err := json.Marshal(newGroupsJSONAPI(groups))
Expand All @@ -41,10 +39,8 @@ func handleGetGroups(rw http.ResponseWriter, r *http.Request) {
_, _ = rw.Write(data)
}

func handleGetGroup(rw http.ResponseWriter, r *http.Request, id string) {
engine := common.GetEngine(r.Context())

root := NewGroup(engine.ExecutionScheduler.GetRunner().GetDefaultGroup(), nil)
func handleGetGroup(cs *common.ControlSurface, rw http.ResponseWriter, r *http.Request, id string) {
root := NewGroup(cs.ExecutionScheduler.GetRunner().GetDefaultGroup(), nil)
groups := FlattenGroup(root)

var group *Group
Expand Down
12 changes: 8 additions & 4 deletions api/v1/group_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

package v1

/*
TODO: fix tests

import (
"encoding/json"
"net/http"
Expand All @@ -33,9 +36,9 @@ import (
"go.k6.io/k6/core"
"go.k6.io/k6/core/local"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/testutils/minirunner"
"go.k6.io/k6/metrics"
)

func TestGetGroups(t *testing.T) {
Expand All @@ -49,11 +52,11 @@ func TestGetGroups(t *testing.T) {
logger := logrus.New()
logger.SetOutput(testutils.NewTestOutput(t))

execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{Group: g0}, logger)
require.NoError(t, err)
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, builtinMetrics)
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{Group: g0}, builtinMetrics, logger)
require.NoError(t, err)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, registry)
require.NoError(t, err)

t.Run("list", func(t *testing.T) {
Expand Down Expand Up @@ -112,3 +115,4 @@ func TestGetGroups(t *testing.T) {
})
}
}
*/
30 changes: 15 additions & 15 deletions api/v1/metric_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,15 @@ import (
"go.k6.io/k6/api/common"
)

func handleGetMetrics(rw http.ResponseWriter, r *http.Request) {
engine := common.GetEngine(r.Context())

func handleGetMetrics(cs *common.ControlSurface, rw http.ResponseWriter, r *http.Request) {
var t time.Duration
if engine.ExecutionScheduler != nil {
t = engine.ExecutionScheduler.GetState().GetCurrentTestRunDuration()
if cs.ExecutionScheduler != nil {
t = cs.ExecutionScheduler.GetState().GetCurrentTestRunDuration()
}

engine.MetricsLock.Lock()
metrics := newMetricsJSONAPI(engine.Metrics, t)
engine.MetricsLock.Unlock()
cs.MetricsEngine.MetricsLock.Lock()
metrics := newMetricsJSONAPI(cs.MetricsEngine.ObservedMetrics, t)
cs.MetricsEngine.MetricsLock.Unlock()

data, err := json.Marshal(metrics)
if err != nil {
Expand All @@ -48,21 +46,23 @@ func handleGetMetrics(rw http.ResponseWriter, r *http.Request) {
_, _ = rw.Write(data)
}

func handleGetMetric(rw http.ResponseWriter, r *http.Request, id string) {
engine := common.GetEngine(r.Context())

func handleGetMetric(cs *common.ControlSurface, rw http.ResponseWriter, r *http.Request, id string) {
var t time.Duration
if engine.ExecutionScheduler != nil {
t = engine.ExecutionScheduler.GetState().GetCurrentTestRunDuration()
if cs.ExecutionScheduler != nil {
t = cs.ExecutionScheduler.GetState().GetCurrentTestRunDuration()
}

metric, ok := engine.Metrics[id]
cs.MetricsEngine.MetricsLock.Lock()
metric, ok := cs.MetricsEngine.ObservedMetrics[id]
if !ok {
cs.MetricsEngine.MetricsLock.Unlock()
apiError(rw, "Not Found", "No metric with that ID was found", http.StatusNotFound)
return
}
wrappedMetric := newMetricEnvelope(metric, t)
cs.MetricsEngine.MetricsLock.Unlock()

data, err := json.Marshal(newMetricEnvelope(metric, t))
data, err := json.Marshal(wrappedMetric)
if err != nil {
apiError(rw, "Encoding error", err.Error(), http.StatusInternalServerError)
return
Expand Down
26 changes: 15 additions & 11 deletions api/v1/metric_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

package v1

/*
TODO: fix tests

import (
"encoding/json"
"net/http"
Expand All @@ -34,9 +37,9 @@ import (
"go.k6.io/k6/core"
"go.k6.io/k6/core/local"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/lib/testutils"
"go.k6.io/k6/lib/testutils/minirunner"
"go.k6.io/k6/metrics"
"go.k6.io/k6/stats"
)

Expand All @@ -45,17 +48,17 @@ func TestGetMetrics(t *testing.T) {

logger := logrus.New()
logger.SetOutput(testutils.NewTestOutput(t))
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, logger)
require.NoError(t, err)
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, builtinMetrics)
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, builtinMetrics, logger)
require.NoError(t, err)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, registry)
require.NoError(t, err)

engine.Metrics = map[string]*stats.Metric{
engine.MetricsEngine.ObservedMetrics = map[string]*stats.Metric{
"my_metric": stats.New("my_metric", stats.Trend, stats.Time),
}
engine.Metrics["my_metric"].Tainted = null.BoolFrom(true)
engine.MetricsEngine.ObservedMetrics["my_metric"].Tainted = null.BoolFrom(true)

rw := httptest.NewRecorder()
NewHandler().ServeHTTP(rw, newRequestWithEngine(engine, "GET", "/v1/metrics", nil))
Expand Down Expand Up @@ -101,17 +104,17 @@ func TestGetMetric(t *testing.T) {

logger := logrus.New()
logger.SetOutput(testutils.NewTestOutput(t))
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, logger)
require.NoError(t, err)
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, builtinMetrics)
execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{}, builtinMetrics, logger)
require.NoError(t, err)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, registry)
require.NoError(t, err)

engine.Metrics = map[string]*stats.Metric{
engine.MetricsEngine.ObservedMetrics = map[string]*stats.Metric{
"my_metric": stats.New("my_metric", stats.Trend, stats.Time),
}
engine.Metrics["my_metric"].Tainted = null.BoolFrom(true)
engine.MetricsEngine.ObservedMetrics["my_metric"].Tainted = null.BoolFrom(true)

t.Run("nonexistent", func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -156,3 +159,4 @@ func TestGetMetric(t *testing.T) {
})
})
}
*/
Loading