Skip to content

Commit

Permalink
Merge pull request #678 from loadimpact/real-time-metrics
Browse files Browse the repository at this point in the history
Real time metrics
  • Loading branch information
na-- authored Jul 2, 2018
2 parents cd557cf + a9a134a commit 38bed9a
Show file tree
Hide file tree
Showing 26 changed files with 737 additions and 452 deletions.
8 changes: 5 additions & 3 deletions api/v1/setup_teardown_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,10 @@ func HandleSetSetupData(rw http.ResponseWriter, r *http.Request, p httprouter.Pa

// HandleRunSetup executes the runner's Setup() method and returns the result
func HandleRunSetup(rw http.ResponseWriter, r *http.Request, p httprouter.Params) {
runner := common.GetEngine(r.Context()).Executor.GetRunner()
engine := common.GetEngine(r.Context())
runner := engine.Executor.GetRunner()

if err := runner.Setup(r.Context()); err != nil {
if err := runner.Setup(r.Context(), engine.Samples); err != nil {
apiError(rw, "Error executing setup", err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -96,9 +97,10 @@ func HandleRunSetup(rw http.ResponseWriter, r *http.Request, p httprouter.Params

// HandleRunTeardown executes the runner's Teardown() method
func HandleRunTeardown(rw http.ResponseWriter, r *http.Request, p httprouter.Params) {
engine := common.GetEngine(r.Context())
runner := common.GetEngine(r.Context()).Executor.GetRunner()

if err := runner.Teardown(r.Context()); err != nil {
if err := runner.Teardown(r.Context(), engine.Samples); err != nil {
apiError(rw, "Error executing teardown", err.Error(), http.StatusInternalServerError)
}
}
8 changes: 4 additions & 4 deletions api/v1/setup_teardown_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func TestSetupData(t *testing.T) {
engine, err := core.NewEngine(executor, runner.GetOptions())
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
errC := make(chan error)
go func() { errC <- engine.Run(ctx) }()

handler := NewHandler()

checkSetup := func(method, body, expResult string) {
Expand All @@ -102,10 +106,6 @@ func TestSetupData(t *testing.T) {
checkSetup("PUT", `{"v":2, "test":"mest"}`, `{"data": {"v":2, "test":"mest"}}`)
checkSetup("GET", "", `{"data": {"v":2, "test":"mest"}}`)

ctx, cancel := context.WithCancel(context.Background())
errC := make(chan error)
go func() { errC <- engine.Run(ctx) }()

engine.Executor.SetPaused(false)

select {
Expand Down
2 changes: 2 additions & 0 deletions cmd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func getOptions(flags *pflag.FlagSet) (lib.Options, error) {
// TODO: find a saner and more dev-friendly and error-proof way to handle options
SetupTimeout: types.NullDuration{Duration: types.Duration(10 * time.Second), Valid: false},
TeardownTimeout: types.NullDuration{Duration: types.Duration(10 * time.Second), Valid: false},

MetricSamplesBufferSize: null.NewInt(1000, false),
}

stageStrings, err := flags.GetStringSlice("stage")
Expand Down
35 changes: 24 additions & 11 deletions core/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
const (
TickRate = 1 * time.Millisecond
MetricsRate = 1 * time.Second
CollectRate = 10 * time.Millisecond
CollectRate = 50 * time.Millisecond
ThresholdsRate = 2 * time.Second
ShutdownTimeout = 10 * time.Second

Expand All @@ -59,6 +59,8 @@ type Engine struct {
Metrics map[string]*stats.Metric
MetricsLock sync.Mutex

Samples chan stats.SampleContainer

// Assigned to metrics upon first received sample.
thresholds map[string]stats.Thresholds
submetrics map[string][]*stats.Submetric
Expand All @@ -76,6 +78,7 @@ func NewEngine(ex lib.Executor, o lib.Options) (*Engine, error) {
Executor: ex,
Options: o,
Metrics: make(map[string]*stats.Metric),
Samples: make(chan stats.SampleContainer, o.MetricSamplesBufferSize.Int64),
}
e.SetLogger(log.StandardLogger())

Expand Down Expand Up @@ -173,15 +176,15 @@ func (e *Engine) Run(ctx context.Context) error {
}

// Run the executor.
out := make(chan []stats.SampleContainer)
errC := make(chan error)
subwg.Add(1)
go func() {
errC <- e.Executor.Run(subctx, out)
errC <- e.Executor.Run(subctx, e.Samples)
e.logger.Debug("Engine: Executor terminated")
subwg.Done()
}()

sampleContainers := []stats.SampleContainer{}
defer func() {
// Shut down subsystems.
subcancel()
Expand All @@ -194,10 +197,14 @@ func (e *Engine) Run(ctx context.Context) error {
errC = nil
}
subwg.Wait()
close(out)
close(e.Samples)
}()
for sampleContainers := range out {
e.processSamples(sampleContainers...)

for sc := range e.Samples {
sampleContainers = append(sampleContainers, sc)
}
if len(sampleContainers) > 0 {
e.processSamples(sampleContainers)
}

// Emit final metrics.
Expand All @@ -213,10 +220,16 @@ func (e *Engine) Run(ctx context.Context) error {
collectorwg.Wait()
}()

ticker := time.NewTicker(CollectRate)
for {
select {
case sampleContainers := <-out:
e.processSamples(sampleContainers...)
case <-ticker.C:
if len(sampleContainers) > 0 {
e.processSamples(sampleContainers)
sampleContainers = []stats.SampleContainer{}
}
case sc := <-e.Samples:
sampleContainers = append(sampleContainers, sc)
case err := <-errC:
errC = nil
if err != nil {
Expand Down Expand Up @@ -262,7 +275,7 @@ func (e *Engine) runMetricsEmission(ctx context.Context) {
func (e *Engine) emitMetrics() {
t := time.Now()

e.processSamples(stats.ConnectedSamples{
e.processSamples([]stats.SampleContainer{stats.ConnectedSamples{
Samples: []stats.Sample{
{
Time: t,
Expand All @@ -278,7 +291,7 @@ func (e *Engine) emitMetrics() {
},
Tags: e.Options.RunTags,
Time: t,
})
}})
}

func (e *Engine) runThresholds(ctx context.Context, abort func()) {
Expand Down Expand Up @@ -330,7 +343,7 @@ func (e *Engine) processThresholds(abort func()) {
}
}

func (e *Engine) processSamples(sampleCointainers ...stats.SampleContainer) {
func (e *Engine) processSamples(sampleCointainers []stats.SampleContainer) {
if len(sampleCointainers) == 0 {
return
}
Expand Down
101 changes: 84 additions & 17 deletions core/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ func applyNullLogger(e *Engine) *logtest.Hook {

// Wrapper around newEngine that applies a null logger.
func newTestEngine(ex lib.Executor, opts lib.Options) (*Engine, error, *logtest.Hook) {
if !opts.MetricSamplesBufferSize.Valid {
opts.MetricSamplesBufferSize = null.IntFrom(200)
}
e, err := NewEngine(ex, opts)
if err != nil {
return e, err, nil
Expand All @@ -64,12 +67,8 @@ func newTestEngine(ex lib.Executor, opts lib.Options) (*Engine, error, *logtest.
return e, nil, hook
}

func L(r lib.Runner) lib.Executor {
return local.New(r)
}

func LF(fn func(ctx context.Context) ([]stats.SampleContainer, error)) lib.Executor {
return L(&lib.MiniRunner{Fn: fn})
func LF(fn func(ctx context.Context, out chan<- stats.SampleContainer) error) lib.Executor {
return local.New(&lib.MiniRunner{Fn: fn})
}

func TestNewEngine(t *testing.T) {
Expand Down Expand Up @@ -250,8 +249,8 @@ func TestEngineRun(t *testing.T) {

signalChan := make(chan interface{})
var e *Engine
e, err, _ := newTestEngine(LF(func(ctx context.Context) (samples []stats.SampleContainer, err error) {
samples = append(samples, stats.Sample{Metric: testMetric, Time: time.Now(), Value: 1})
e, err, _ := newTestEngine(LF(func(ctx context.Context, samples chan<- stats.SampleContainer) error {
samples <- stats.Sample{Metric: testMetric, Time: time.Now(), Value: 1}
close(signalChan)
<-ctx.Done()

Expand All @@ -263,8 +262,8 @@ func TestEngineRun(t *testing.T) {
// 2. Sometimes the `case samples := <-vuOut` gets selected before the `<-ctx.Done()` in
// core/local/local.go:Run() causing all samples from this mocked "RunOnce()" function to be accepted.
time.Sleep(time.Millisecond * 10)
samples = append(samples, stats.Sample{Metric: testMetric, Time: time.Now(), Value: 2})
return samples, err
samples <- stats.Sample{Metric: testMetric, Time: time.Now(), Value: 2}
return nil
}), lib.Options{
VUs: null.IntFrom(1),
VUsMax: null.IntFrom(1),
Expand Down Expand Up @@ -308,8 +307,9 @@ func TestEngineAtTime(t *testing.T) {
func TestEngineCollector(t *testing.T) {
testMetric := stats.New("test_metric", stats.Trend)

e, err, _ := newTestEngine(LF(func(ctx context.Context) ([]stats.SampleContainer, error) {
return []stats.SampleContainer{stats.Sample{Metric: testMetric}}, nil
e, err, _ := newTestEngine(LF(func(ctx context.Context, out chan<- stats.SampleContainer) error {
out <- stats.Sample{Metric: testMetric}
return nil
}), lib.Options{VUs: null.IntFrom(1), VUsMax: null.IntFrom(1), Iterations: null.IntFrom(1)})
assert.NoError(t, err)

Expand Down Expand Up @@ -343,7 +343,7 @@ func TestEngine_processSamples(t *testing.T) {
assert.NoError(t, err)

e.processSamples(
stats.Sample{Metric: metric, Value: 1.25, Tags: stats.IntoSampleTags(&map[string]string{"a": "1"})},
[]stats.SampleContainer{stats.Sample{Metric: metric, Value: 1.25, Tags: stats.IntoSampleTags(&map[string]string{"a": "1"})}},
)

assert.IsType(t, &stats.GaugeSink{}, e.Metrics["my_metric"].Sink)
Expand All @@ -365,7 +365,7 @@ func TestEngine_processSamples(t *testing.T) {
assert.EqualValues(t, map[string]string{"a": "1"}, sms[0].Tags.CloneTags())

e.processSamples(
stats.Sample{Metric: metric, Value: 1.25, Tags: stats.IntoSampleTags(&map[string]string{"a": "1", "b": "2"})},
[]stats.SampleContainer{stats.Sample{Metric: metric, Value: 1.25, Tags: stats.IntoSampleTags(&map[string]string{"a": "1", "b": "2"})}},
)

assert.IsType(t, &stats.GaugeSink{}, e.Metrics["my_metric"].Sink)
Expand All @@ -387,7 +387,7 @@ func TestEngine_runThresholds(t *testing.T) {
assert.NoError(t, err)

e.processSamples(
stats.Sample{Metric: metric, Value: 1.25, Tags: stats.IntoSampleTags(&map[string]string{"a": "1"})},
[]stats.SampleContainer{stats.Sample{Metric: metric, Value: 1.25, Tags: stats.IntoSampleTags(&map[string]string{"a": "1"})}},
)

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -410,7 +410,7 @@ func TestEngine_runThresholds(t *testing.T) {
assert.NoError(t, err)

e.processSamples(
stats.Sample{Metric: metric, Value: 1.25, Tags: stats.IntoSampleTags(&map[string]string{"a": "1"})},
[]stats.SampleContainer{stats.Sample{Metric: metric, Value: 1.25, Tags: stats.IntoSampleTags(&map[string]string{"a": "1"})}},
)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -463,7 +463,7 @@ func TestEngine_processThresholds(t *testing.T) {
assert.NoError(t, err)

e.processSamples(
stats.Sample{Metric: metric, Value: 1.25, Tags: stats.IntoSampleTags(&map[string]string{"a": "1"})},
[]stats.SampleContainer{stats.Sample{Metric: metric, Value: 1.25, Tags: stats.IntoSampleTags(&map[string]string{"a": "1"})}},
)

abortCalled := false
Expand Down Expand Up @@ -749,3 +749,70 @@ func TestRunTags(t *testing.T) {
}
}
}

func TestSetupTeardownThresholds(t *testing.T) {
t.Parallel()
tb := testutils.NewHTTPMultiBin(t)
defer tb.Cleanup()

script := []byte(tb.Replacer.Replace(`
import http from "k6/http";
import { check } from "k6";
import { Counter } from "k6/metrics";
let statusCheck = { "status is 200": (r) => r.status === 200 }
let myCounter = new Counter("setup_teardown");
export let options = {
iterations: 5,
thresholds: {
"setup_teardown": ["count == 2"],
"iterations": ["count == 5"],
"http_reqs": ["count == 7"],
},
};
export function setup() {
check(http.get("HTTPBIN_IP_URL"), statusCheck) && myCounter.add(1);
};
export default function () {
check(http.get("HTTPBIN_IP_URL"), statusCheck);
};
export function teardown() {
check(http.get("HTTPBIN_IP_URL"), statusCheck) && myCounter.add(1);
};
`))

runner, err := js.New(
&lib.SourceData{Filename: "/script.js", Data: script},
afero.NewMemMapFs(),
lib.RuntimeOptions{},
)
require.NoError(t, err)
runner.SetOptions(runner.GetOptions().Apply(lib.Options{
SystemTags: lib.GetTagSet(lib.DefaultSystemTagList...),
SetupTimeout: types.NullDurationFrom(3 * time.Second),
TeardownTimeout: types.NullDurationFrom(3 * time.Second),
VUs: null.IntFrom(3),
VUsMax: null.IntFrom(3),
}))

engine, err := NewEngine(local.New(runner), runner.GetOptions())
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
errC := make(chan error)
go func() { errC <- engine.Run(ctx) }()

select {
case <-time.After(10 * time.Second):
cancel()
t.Fatal("Test timed out")
case err := <-errC:
cancel()
require.NoError(t, err)
require.False(t, engine.IsTainted())
}
}
Loading

0 comments on commit 38bed9a

Please sign in to comment.