Skip to content

Commit

Permalink
Remove the ExecutionState field for metrics engine
Browse files Browse the repository at this point in the history
  • Loading branch information
codebien committed Feb 1, 2023
1 parent 943b1e7 commit 9333cff
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 25 deletions.
2 changes: 1 addition & 1 deletion api/v1/group_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func getControlSurface(tb testing.TB, testState *lib.TestRunState) *ControlSurfa
execScheduler, err := execution.NewScheduler(testState)
require.NoError(tb, err)

me, err := engine.NewMetricsEngine(execScheduler.GetState())
me, err := engine.NewMetricsEngine(testState)
require.NoError(tb, err)

ctx, cancel := context.WithCancel(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion api/v1/setup_teardown_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestSetupData(t *testing.T) {

execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)
metricsEngine, err := engine.NewMetricsEngine(execScheduler.GetState())
metricsEngine, err := engine.NewMetricsEngine(testState)
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion api/v1/status_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestPatchStatus(t *testing.T) {
execScheduler, err := execution.NewScheduler(testState)
require.NoError(t, err)

metricsEngine, err := engine.NewMetricsEngine(execScheduler.GetState())
metricsEngine, err := engine.NewMetricsEngine(testState)
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
Expand Down
4 changes: 2 additions & 2 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
}

executionState := execScheduler.GetState()
metricsEngine, err := engine.NewMetricsEngine(executionState)
metricsEngine, err := engine.NewMetricsEngine(executionState.Test)
if err != nil {
return err
}
Expand Down Expand Up @@ -178,7 +178,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
}()

if !testRunState.RuntimeOptions.NoThresholds.Bool { //nolint:nestif
finalizeThresholds := metricsEngine.StartThresholdCalculations(runAbort)
finalizeThresholds := metricsEngine.StartThresholdCalculations(runAbort, executionState.GetCurrentTestRunDuration)
defer func() {
if finalizeThresholds == nil {
return
Expand Down
4 changes: 2 additions & 2 deletions js/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func TestDataIsolation(t *testing.T) {
execScheduler, err := execution.NewScheduler(testRunState)
require.NoError(t, err)

metricsEngine, err := engine.NewMetricsEngine(execScheduler.GetState())
metricsEngine, err := engine.NewMetricsEngine(testRunState)
require.NoError(t, err)

globalCtx, globalCancel := context.WithCancel(context.Background())
Expand All @@ -401,7 +401,7 @@ func TestDataIsolation(t *testing.T) {
require.NoError(t, err)
defer stopOutputs(nil)

finalizeThresholds := metricsEngine.StartThresholdCalculations(runAbort)
finalizeThresholds := metricsEngine.StartThresholdCalculations(runAbort, execScheduler.GetState().GetCurrentTestRunDuration)
require.Nil(t, finalizeThresholds)

require.Empty(t, runner.defaultGroup.Groups)
Expand Down
40 changes: 22 additions & 18 deletions metrics/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ const thresholdsRate = 2 * time.Second
// aggregated metric sample values. They are used to generate the end-of-test
// summary and to evaluate the test thresholds.
type MetricsEngine struct {
es *lib.ExecutionState
logger logrus.FieldLogger

logger logrus.FieldLogger
test *lib.TestRunState
outputIngester *outputIngester

// These can be both top-level metrics or sub-metrics
Expand All @@ -45,15 +44,14 @@ type MetricsEngine struct {
}

// NewMetricsEngine creates a new metrics Engine with the given parameters.
func NewMetricsEngine(es *lib.ExecutionState) (*MetricsEngine, error) {
func NewMetricsEngine(runState *lib.TestRunState) (*MetricsEngine, error) {
me := &MetricsEngine{
es: es,
logger: es.Test.Logger.WithField("component", "metrics-engine"),

test: runState,
logger: runState.Logger.WithField("component", "metrics-engine"),
ObservedMetrics: make(map[string]*metrics.Metric),
}

if !(me.es.Test.RuntimeOptions.NoSummary.Bool && me.es.Test.RuntimeOptions.NoThresholds.Bool) {
if !(me.test.RuntimeOptions.NoSummary.Bool && me.test.RuntimeOptions.NoThresholds.Bool) {
err := me.initSubMetricsAndThresholds()
if err != nil {
return nil, err
Expand All @@ -77,10 +75,11 @@ func (me *MetricsEngine) getThresholdMetricOrSubmetric(name string) (*metrics.Me
// TODO: replace with strings.Cut after Go 1.18
nameParts := strings.SplitN(name, "{", 2)

metric := me.es.Test.Registry.Get(nameParts[0])
metric := me.test.Registry.Get(nameParts[0])
if metric == nil {
return nil, fmt.Errorf("metric '%s' does not exist in the script", nameParts[0])
}

if len(nameParts) == 1 { // no sub-metric
return metric, nil
}
Expand Down Expand Up @@ -126,10 +125,10 @@ func (me *MetricsEngine) markObserved(metric *metrics.Metric) {
}

func (me *MetricsEngine) initSubMetricsAndThresholds() error {
for metricName, thresholds := range me.es.Test.Options.Thresholds {
for metricName, thresholds := range me.test.Options.Thresholds {
metric, err := me.getThresholdMetricOrSubmetric(metricName)

if me.es.Test.RuntimeOptions.NoThresholds.Bool {
if me.test.RuntimeOptions.NoThresholds.Bool {
if err != nil {
me.logger.WithError(err).Warnf("Invalid metric '%s' in threshold definitions", metricName)
}
Expand All @@ -154,7 +153,7 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error {

// TODO: refactor out of here when https://github.com/grafana/k6/issues/1321
// lands and there is a better way to enable a metric with tag
if me.es.Test.Options.SystemTags.Has(metrics.TagExpectedResponse) {
if me.test.Options.SystemTags.Has(metrics.TagExpectedResponse) {
_, err := me.getThresholdMetricOrSubmetric("http_req_duration{expected_response:true}")
if err != nil {
return err // shouldn't happen, but ¯\_(ツ)_/¯
Expand All @@ -166,8 +165,10 @@ func (me *MetricsEngine) initSubMetricsAndThresholds() error {

// StartThresholdCalculations spins up a new goroutine to crunch thresholds and
// returns a callback that will stop the goroutine and finalizes calculations.
func (me *MetricsEngine) StartThresholdCalculations(abortRun func(error)) (
finalize func() (breached []string),
func (me *MetricsEngine) StartThresholdCalculations(
abortRun func(error),
getCurrentTestRunDuration func() time.Duration,
) (finalize func() (breached []string),
) {
if len(me.metricsWithThresholds) == 0 {
return nil // no thresholds were defined
Expand All @@ -184,7 +185,7 @@ func (me *MetricsEngine) StartThresholdCalculations(abortRun func(error)) (
for {
select {
case <-ticker.C:
breached, shouldAbort := me.evaluateThresholds(true)
breached, shouldAbort := me.evaluateThresholds(true, getCurrentTestRunDuration)
if shouldAbort {
err := fmt.Errorf(
"thresholds on metrics '%s' were breached; at least one has abortOnFail enabled, stopping test prematurely",
Expand Down Expand Up @@ -213,19 +214,22 @@ func (me *MetricsEngine) StartThresholdCalculations(abortRun func(error)) (
close(stop)
<-done

breached, _ := me.evaluateThresholds(false)
breached, _ := me.evaluateThresholds(false, getCurrentTestRunDuration)
return breached
}
}

// evaluateThresholds processes all of the thresholds.
//
// TODO: refactor, optimize
func (me *MetricsEngine) evaluateThresholds(ignoreEmptySinks bool) (breachedThresholds []string, shouldAbort bool) {
func (me *MetricsEngine) evaluateThresholds(
ignoreEmptySinks bool,
getCurrentTestRunDuration func() time.Duration,
) (breachedThresholds []string, shouldAbort bool) {
me.MetricsLock.Lock()
defer me.MetricsLock.Unlock()

t := me.es.GetCurrentTestRunDuration()
t := getCurrentTestRunDuration()

me.logger.Debugf("Running thresholds on %d metrics...", len(me.metricsWithThresholds))
for _, m := range me.metricsWithThresholds {
Expand Down

0 comments on commit 9333cff

Please sign in to comment.