Skip to content

Commit

Permalink
WIP metrics registry
Browse files Browse the repository at this point in the history
This commit specifically doesn't fix most tests as this takes a lot of
time and also doesn't do a lot of other stuff:
1. breaking metrics in other ways as that might be useful to be
   kept for a little bit at least
2. also doesn't try to do anything that isn't absolutely necessary to
   make the registry work
3. doesnt move the metrics stuff around, although I would expect the
   Registry and the Metric type will end up in a `metrics` package later
   on ...
  • Loading branch information
mstoykov committed Jun 23, 2021
1 parent 2039c56 commit 608677a
Show file tree
Hide file tree
Showing 21 changed files with 442 additions and 192 deletions.
71 changes: 65 additions & 6 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ import (
"go.k6.io/k6/js"
"go.k6.io/k6/lib"
"go.k6.io/k6/lib/consts"
"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/loader"
"go.k6.io/k6/output"
"go.k6.io/k6/stats"
"go.k6.io/k6/ui/pb"
)

Expand Down Expand Up @@ -153,10 +156,6 @@ a commandline interface for interacting with it.`,
// can start winding down its metrics processing.
globalCtx, globalCancel := context.WithCancel(ctx)
defer globalCancel()
lingerCtx, lingerCancel := context.WithCancel(globalCtx)
defer lingerCancel()
runCtx, runCancel := context.WithCancel(lingerCtx)
defer runCancel()

// Create a local execution scheduler wrapping the runner.
logger.Debug("Initializing the execution scheduler...")
Expand Down Expand Up @@ -197,6 +196,16 @@ a commandline interface for interacting with it.`,
return err
}

registry := stats.NewRegistry(engine.Samples)
builtInMetrics := metrics.RegisterBuiltinMetrics(registry)
globalCtx = stats.WithRegistry(globalCtx, registry)
globalCtx = metrics.WithBuiltinMetrics(globalCtx, builtInMetrics)

lingerCtx, lingerCancel := context.WithCancel(globalCtx)
defer lingerCancel()
runCtx, runCancel := context.WithCancel(lingerCtx)
defer runCancel()

// Spin up the REST API server, if not disabled.
if address != "" {
initBar.Modify(pb.WithConstProgress(0, "Init API server"))
Expand All @@ -216,11 +225,11 @@ a commandline interface for interacting with it.`,

// We do this here so we can get any output URLs below.
initBar.Modify(pb.WithConstProgress(0, "Starting outputs"))
err = engine.StartOutputs()
err = StartOutputs(logger, outputs, engine, builtInMetrics)
if err != nil {
return err
}
defer engine.StopOutputs()
defer StopOutputs(logger, outputs)

printExecutionDescription(
"local", filename, "", conf, execScheduler.GetState().ExecutionTuple,
Expand Down Expand Up @@ -442,3 +451,53 @@ func handleSummaryResult(fs afero.Fs, stdOut, stdErr io.Writer, result map[strin

return consolidateErrorMessage(errs, "Could not save some summary information:")
}

// StartOutputs spins up all configured outputs, giving the thresholds to any
// that can accept them. And if some output fails, stop the already started
// ones. This may take some time, since some outputs make initial network
// requests to set up whatever remote services are going to listen to them.
func StartOutputs(
logger logrus.FieldLogger,
outputs []output.Output,
engine *core.Engine,
builtinMetrics *metrics.BuiltInMetrics,
) error {
logger.Debugf("Starting %d outputs...", len(outputs))
for i, out := range outputs {
if thresholdOut, ok := out.(output.WithThresholds); ok {
thresholdOut.SetThresholds(engine.Options.Thresholds)
}

if stopOut, ok := out.(output.WithTestRunStop); ok {
stopOut.SetTestRunStopCallback(
func(err error) {
logger.WithError(err).Error("Received error to stop from output")
engine.Stop()
})
}

if builtinMetricOut, ok := out.(output.WithBuiltinMetrics); ok {
builtinMetricOut.SetBuiltinMetrics(builtinMetrics)
}

if err := out.Start(); err != nil {
stopOutputs(logger, outputs, i)
return err
}
}
return nil
}

// StopOutputs stops all configured outputs.
func StopOutputs(logger logrus.FieldLogger, outputs []output.Output) {
stopOutputs(logger, outputs, len(outputs))
}

func stopOutputs(logger logrus.FieldLogger, outputs []output.Output, upToID int) {
logger.Debugf("Stopping %d outputs...", upToID)
for i := 0; i < upToID; i++ {
if err := outputs[i].Stop(); err != nil {
logger.WithError(err).Errorf("Stopping output %d failed", i)
}
}
}
12 changes: 9 additions & 3 deletions core/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ type Engine struct {
Metrics map[string]*stats.Metric
MetricsLock sync.Mutex

Samples chan stats.SampleContainer
builtInMetrics *metrics.BuiltInMetrics
Samples chan stats.SampleContainer

// Assigned to metrics upon first received sample.
thresholds map[string]stats.Thresholds
Expand Down Expand Up @@ -147,6 +148,10 @@ func (e *Engine) StartOutputs() error {
})
}

if builtinMetricOut, ok := out.(output.WithBuiltinMetrics); ok {
builtinMetricOut.SetBuiltinMetrics(e.builtInMetrics)
}

if err := out.Start(); err != nil {
e.stopOutputs(i)
return err
Expand Down Expand Up @@ -184,6 +189,7 @@ func (e *Engine) stopOutputs(upToID int) {
// returned by cancelling the globalCtx
// - The second returned lambda can be used to wait for that process to finish.
func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait func(), err error) {
e.builtInMetrics = metrics.GetBuiltInMetrics(globalCtx)
e.logger.Debug("Initialization starting...")
// TODO: if we ever need metrics processing in the init context, we can move
// this below the other components... or even start them concurrently?
Expand Down Expand Up @@ -416,12 +422,12 @@ func (e *Engine) emitMetrics() {
Samples: []stats.Sample{
{
Time: t,
Metric: metrics.VUs,
Metric: e.builtInMetrics.VUs,
Value: float64(executionState.GetCurrentlyActiveVUsCount()),
Tags: e.Options.RunTags,
}, {
Time: t,
Metric: metrics.VUsMax,
Metric: e.builtInMetrics.VUsMax,
Value: float64(executionState.GetInitializedVUsCount()),
Tags: e.Options.RunTags,
},
Expand Down
2 changes: 1 addition & 1 deletion js/modules/k6/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func (c *Client) HandleRPC(ctx context.Context, stat grpcstats.RPCStats) {
stats.PushIfNotDone(ctx, state.Samples, stats.ConnectedSamples{
Samples: []stats.Sample{
{
Metric: metrics.GRPCReqDuration,
Metric: metrics.GetBuiltInMetrics(ctx).GRPCReqDuration,
Tags: sampleTags,
Value: stats.D(s.EndTime.Sub(s.BeginTime)),
Time: s.EndTime,
Expand Down
8 changes: 5 additions & 3 deletions js/modules/k6/k6.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,10 @@ func (*K6) Group(ctx context.Context, name string, fn goja.Callable) (goja.Value
t := time.Now()

tags := state.CloneTags()

stats.PushIfNotDone(ctx, state.Samples, stats.Sample{
Time: t,
Metric: metrics.GroupDuration,
Metric: metrics.GetBuiltInMetrics(ctx).GroupDuration,
Tags: stats.IntoSampleTags(&tags),
Value: stats.D(t.Sub(startTime)),
})
Expand Down Expand Up @@ -165,12 +166,13 @@ func (*K6) Check(ctx context.Context, arg0, checks goja.Value, extras ...goja.Va
select {
case <-ctx.Done():
default:
builtin := metrics.GetBuiltInMetrics(ctx)
if val.ToBoolean() {
atomic.AddInt64(&check.Passes, 1)
stats.PushIfNotDone(ctx, state.Samples, stats.Sample{Time: t, Metric: metrics.Checks, Tags: sampleTags, Value: 1})
stats.PushIfNotDone(ctx, state.Samples, stats.Sample{Time: t, Metric: builtin.Checks, Tags: sampleTags, Value: 1})
} else {
atomic.AddInt64(&check.Fails, 1)
stats.PushIfNotDone(ctx, state.Samples, stats.Sample{Time: t, Metric: metrics.Checks, Tags: sampleTags, Value: 0})
stats.PushIfNotDone(ctx, state.Samples, stats.Sample{Time: t, Metric: builtin.Checks, Tags: sampleTags, Value: 0})
// A single failure makes the return value false.
succ = false
}
Expand Down
Loading

0 comments on commit 608677a

Please sign in to comment.