Skip to content

Commit

Permalink
Refactor the Engine to actually use the metrics registry
Browse files Browse the repository at this point in the history
This is a prerequisite for solving other issues like always evaluating thresholds correctly, and as a side-benefit, it also allows us to validate them in the init context, before the test has started.
  • Loading branch information
na-- committed Mar 8, 2022
1 parent 98911d0 commit adc4d9f
Show file tree
Hide file tree
Showing 13 changed files with 238 additions and 120 deletions.
2 changes: 1 addition & 1 deletion api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestWithEngine(t *testing.T) {
require.NoError(t, err)
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, builtinMetrics)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, registry, builtinMetrics)
require.NoError(t, err)

rw := httptest.NewRecorder()
Expand Down
2 changes: 1 addition & 1 deletion api/v1/group_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestGetGroups(t *testing.T) {
require.NoError(t, err)
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, builtinMetrics)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, registry, builtinMetrics)
require.NoError(t, err)

t.Run("list", func(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions api/v1/metric_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestGetMetrics(t *testing.T) {
require.NoError(t, err)
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, builtinMetrics)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, registry, builtinMetrics)
require.NoError(t, err)

engine.Metrics = map[string]*stats.Metric{
Expand Down Expand Up @@ -105,7 +105,7 @@ func TestGetMetric(t *testing.T) {
require.NoError(t, err)
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, builtinMetrics)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, registry, builtinMetrics)
require.NoError(t, err)

engine.Metrics = map[string]*stats.Metric{
Expand Down
2 changes: 1 addition & 1 deletion api/v1/setup_teardown_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestSetupData(t *testing.T) {
})
execScheduler, err := local.NewExecutionScheduler(runner, logger)
require.NoError(t, err)
engine, err := core.NewEngine(execScheduler, runner.GetOptions(), lib.RuntimeOptions{}, nil, logger, builtinMetrics)
engine, err := core.NewEngine(execScheduler, runner.GetOptions(), lib.RuntimeOptions{}, nil, logger, registry, builtinMetrics)
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
Expand Down
4 changes: 2 additions & 2 deletions api/v1/status_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestGetStatus(t *testing.T) {
require.NoError(t, err)
registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, builtinMetrics)
engine, err := core.NewEngine(execScheduler, lib.Options{}, lib.RuntimeOptions{}, nil, logger, registry, builtinMetrics)
require.NoError(t, err)

rw := httptest.NewRecorder()
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestPatchStatus(t *testing.T) {

execScheduler, err := local.NewExecutionScheduler(&minirunner.MiniRunner{Options: options}, logger)
require.NoError(t, err)
engine, err := core.NewEngine(execScheduler, options, lib.RuntimeOptions{}, nil, logger, builtinMetrics)
engine, err := core.NewEngine(execScheduler, options, lib.RuntimeOptions{}, nil, logger, registry, builtinMetrics)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
Expand Down
3 changes: 2 additions & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) error {
initBar.Modify(pb.WithConstProgress(0, "Init engine"))
engine, err := core.NewEngine(
execScheduler, conf.Options, test.runtimeOptions,
outputs, logger, test.builtInMetrics,
outputs, logger, test.metricsRegistry, test.builtInMetrics,
)
if err != nil {
return err
Expand All @@ -136,6 +136,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) error {
initBar.Modify(pb.WithConstProgress(0, "Init API server"))
go func() {
logger.Debugf("Starting the REST API server on %s", c.gs.flags.address)
// TODO: send the ExecutionState and MetricsEngine instead of the Engine
if aerr := api.ListenAndServe(c.gs.flags.address, engine, logger); aerr != nil {
// Only exit k6 if the user has explicitly set the REST API address
if cmd.Flags().Lookup("address").Changed {
Expand Down
121 changes: 79 additions & 42 deletions core/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package core
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -57,23 +58,23 @@ type Engine struct {
ExecutionScheduler lib.ExecutionScheduler
executionState *lib.ExecutionState

Options lib.Options
options lib.Options
runtimeOptions lib.RuntimeOptions
outputs []output.Output

logger *logrus.Entry
stopOnce sync.Once
stopChan chan struct{}

Metrics map[string]*stats.Metric
Metrics map[string]*stats.Metric // TODO: refactor, this doesn't need to be a map
MetricsLock sync.Mutex

registry *metrics.Registry
builtinMetrics *metrics.BuiltinMetrics
Samples chan stats.SampleContainer

// Assigned to metrics upon first received sample.
thresholds map[string]stats.Thresholds
submetrics map[string][]*stats.Submetric
// These can be both top-level metrics or sub-metrics
metricsWithThresholds []*stats.Metric

// Are thresholds tainted?
thresholdsTainted bool
Expand All @@ -82,7 +83,7 @@ type Engine struct {
// NewEngine instantiates a new Engine, without doing any heavy initialization.
func NewEngine(
ex lib.ExecutionScheduler, opts lib.Options, rtOpts lib.RuntimeOptions, outputs []output.Output, logger *logrus.Logger,
builtinMetrics *metrics.BuiltinMetrics,
registry *metrics.Registry, builtinMetrics *metrics.BuiltinMetrics,
) (*Engine, error) {
if ex == nil {
return nil, errors.New("missing ExecutionScheduler instance")
Expand All @@ -92,42 +93,78 @@ func NewEngine(
ExecutionScheduler: ex,
executionState: ex.GetState(),

Options: opts,
options: opts,
runtimeOptions: rtOpts,
outputs: outputs,
Metrics: make(map[string]*stats.Metric),
Samples: make(chan stats.SampleContainer, opts.MetricSamplesBufferSize.Int64),
stopChan: make(chan struct{}),
logger: logger.WithField("component", "engine"),
registry: registry,
builtinMetrics: builtinMetrics,
}

e.thresholds = opts.Thresholds
e.submetrics = make(map[string][]*stats.Submetric)
for name := range e.thresholds {
if !strings.Contains(name, "{") {
continue
if !(e.runtimeOptions.NoSummary.Bool && e.runtimeOptions.NoThresholds.Bool) {
err := e.initSubMetricsAndThresholds()
if err != nil {
return nil, err
}
}

return e, nil
}

parent, sm := stats.NewSubmetric(name)
e.submetrics[parent] = append(e.submetrics[parent], sm)
func (e *Engine) getOrInitPotentialSubmetric(name string) (*stats.Metric, error) {
// TODO: replace with strings.Cut after Go 1.18
nameParts := strings.SplitN(name, "{", 2)

metric := e.registry.Get(nameParts[0])
if metric == nil {
return nil, fmt.Errorf("metric '%s' does not exist in the script", nameParts[0])
}
if len(nameParts) == 1 { // no sub-metric
return metric, nil
}

if nameParts[1][len(nameParts[1])-1] != '}' {
return nil, fmt.Errorf("missing ending bracket, sub-metric format needs to be 'metric{key:value}'")
}
sm, err := metric.AddSubmetric(nameParts[1][:len(nameParts[1])-1])
if err != nil {
return nil, err
}
return sm.Metric, nil
}

// TODO: refactor this out of here when https://github.com/k6io/k6/issues/1832 lands and
// there is a better way to enable a metric with tag
if opts.SystemTags.Has(stats.TagExpectedResponse) {
for _, name := range []string{
"http_req_duration{expected_response:true}",
} {
if _, ok := e.thresholds[name]; ok {
continue
func (e *Engine) initSubMetricsAndThresholds() error {
for metricName, thresholds := range e.options.Thresholds {
metric, err := e.getOrInitPotentialSubmetric(metricName)

if e.runtimeOptions.NoThresholds.Bool {
if err != nil {
e.logger.WithError(err).Warnf("Invalid metric '%s' in threshold definitions", metricName)
}
parent, sm := stats.NewSubmetric(name)
e.submetrics[parent] = append(e.submetrics[parent], sm)
continue
}

if err != nil {
return fmt.Errorf("invalid metric '%s' in threshold definitions: %w", metricName, err)
}

metric.Thresholds = thresholds
e.metricsWithThresholds = append(e.metricsWithThresholds, metric)
}

return e, nil
// TODO: refactor out of here when https://github.com/grafana/k6/issues/1321
// lands and there is a better way to enable a metric with tag
if e.options.SystemTags.Has(stats.TagExpectedResponse) {
_, err := e.getOrInitPotentialSubmetric("http_req_duration{expected_response:true}")
if err != nil {
return err // shouldn't happen, but ¯\_(ツ)_/¯
}
}

return nil
}

// Init is used to initialize the execution scheduler and all metrics processing
Expand Down Expand Up @@ -382,15 +419,15 @@ func (e *Engine) emitMetrics() {
Time: t,
Metric: e.builtinMetrics.VUs,
Value: float64(executionState.GetCurrentlyActiveVUsCount()),
Tags: e.Options.RunTags,
Tags: e.options.RunTags,
}, {
Time: t,
Metric: e.builtinMetrics.VUsMax,
Value: float64(executionState.GetInitializedVUsCount()),
Tags: e.Options.RunTags,
Tags: e.options.RunTags,
},
},
Tags: e.Options.RunTags,
Tags: e.options.RunTags,
Time: t,
}})
}
Expand Down Expand Up @@ -427,7 +464,7 @@ func (e *Engine) processThresholds() (shouldAbort bool) {
return shouldAbort
}

func (e *Engine) processSamplesForMetrics(sampleContainers []stats.SampleContainer) {
func (e *Engine) processMetricsInSamples(sampleContainers []stats.SampleContainer) {
for _, sampleContainer := range sampleContainers {
samples := sampleContainer.GetSamples()

Expand All @@ -436,25 +473,25 @@ func (e *Engine) processSamplesForMetrics(sampleContainers []stats.SampleContain
}

for _, sample := range samples {
m, ok := e.Metrics[sample.Metric.Name]
if !ok {
m = stats.New(sample.Metric.Name, sample.Metric.Type, sample.Metric.Contains)
m.Thresholds = e.thresholds[m.Name]
m.Submetrics = e.submetrics[m.Name]
m := sample.Metric // this should have come from the Registry, no need to look it up
if !m.Observed {
// But we need to add it here, so we can show data in the
// end-of-test summary for this metric
e.Metrics[m.Name] = m
m.Observed = true
}
m.Sink.Add(sample)
m.Sink.Add(sample) // add its value to its own sink

// and also add it to any submetrics that match
for _, sm := range m.Submetrics {
if !sample.Tags.Contains(sm.Tags) {
continue
}

if sm.Metric == nil {
sm.Metric = stats.New(sm.Name, sample.Metric.Type, sample.Metric.Contains)
sm.Metric.Sub = *sm
sm.Metric.Thresholds = e.thresholds[sm.Name]
e.Metrics[sm.Name] = sm.Metric
if !sm.Metric.Observed {
// But we need to add it here, so we can show data in the
// end-of-test summary for this metric
e.Metrics[sm.Metric.Name] = sm.Metric
sm.Metric.Observed = true
}
sm.Metric.Sink.Add(sample)
}
Expand All @@ -473,7 +510,7 @@ func (e *Engine) processSamples(sampleContainers []stats.SampleContainer) {

// TODO: run this and the below code in goroutines?
if !(e.runtimeOptions.NoSummary.Bool && e.runtimeOptions.NoThresholds.Bool) {
e.processSamplesForMetrics(sampleContainers)
e.processMetricsInSamples(sampleContainers)
}

for _, out := range e.outputs {
Expand Down
Loading

0 comments on commit adc4d9f

Please sign in to comment.