Skip to content

Commit

Permalink
output/cloud tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mstoykov committed Jun 23, 2021
1 parent ff054ad commit 7da10ac
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 75 deletions.
96 changes: 65 additions & 31 deletions lib/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,40 @@ import (
"go.k6.io/k6/stats"
)

const (
VUsName = "vus"
VUsMaxName = "vus_max"
IterationsName = "iterations"
IterationDurationName = "iteration_duration"
DroppedIterationsName = "dropped_iterations"
ErrorsName = "errors"

ChecksName = "checks"
GroupDurationName = "group_duration"

HTTPReqsName = "http_reqs"
HTTPReqFailedName = "http_req_failed"
HTTPReqDurationName = "http_req_duration"
HTTPReqBlockedName = "http_req_blocked"
HTTPReqConnectingName = "http_req_connecting"
HTTPReqTLSHandshakingName = "http_req_tls_handshaking"
HTTPReqSendingName = "http_req_sending"
HTTPReqWaitingName = "http_req_waiting"
HTTPReqReceivingName = "http_req_receiving"

WSSessionsName = "ws_sessions"
WSMessagesSentName = "ws_msgs_sent"
WSMessagesReceivedName = "ws_msgs_received"
WSPingName = "ws_ping"
WSSessionDurationName = "ws_session_duration"
WSConnectingName = "ws_connecting"

GRPCReqDurationName = "grpc_req_duration"

DataSentName = "data_sent"
DataReceivedName = "data_received"
)

type BuiltInMetrics struct {
VUs *stats.Metric
VUsMax *stats.Metric
Expand Down Expand Up @@ -67,37 +101,37 @@ type BuiltInMetrics struct {

func RegisterBuiltinMetrics(registry *stats.Registry) *BuiltInMetrics {
return &BuiltInMetrics{
VUs: registry.MustNewMetric("vus", stats.Gauge),
VUsMax: registry.MustNewMetric("vus_max", stats.Gauge),
Iterations: registry.MustNewMetric("iterations", stats.Counter),
IterationDuration: registry.MustNewMetric("iteration_duration", stats.Trend, stats.Time),
DroppedIterations: registry.MustNewMetric("dropped_iterations", stats.Counter),
Errors: registry.MustNewMetric("errors", stats.Counter),

Checks: registry.MustNewMetric("checks", stats.Rate),
GroupDuration: registry.MustNewMetric("group_duration", stats.Trend, stats.Time),

HTTPReqs: registry.MustNewMetric("http_reqs", stats.Counter),
HTTPReqFailed: registry.MustNewMetric("http_req_failed", stats.Rate),
HTTPReqDuration: registry.MustNewMetric("http_req_duration", stats.Trend, stats.Time),
HTTPReqBlocked: registry.MustNewMetric("http_req_blocked", stats.Trend, stats.Time),
HTTPReqConnecting: registry.MustNewMetric("http_req_connecting", stats.Trend, stats.Time),
HTTPReqTLSHandshaking: registry.MustNewMetric("http_req_tls_handshaking", stats.Trend, stats.Time),
HTTPReqSending: registry.MustNewMetric("http_req_sending", stats.Trend, stats.Time),
HTTPReqWaiting: registry.MustNewMetric("http_req_waiting", stats.Trend, stats.Time),
HTTPReqReceiving: registry.MustNewMetric("http_req_receiving", stats.Trend, stats.Time),

WSSessions: registry.MustNewMetric("ws_sessions", stats.Counter),
WSMessagesSent: registry.MustNewMetric("ws_msgs_sent", stats.Counter),
WSMessagesReceived: registry.MustNewMetric("ws_msgs_received", stats.Counter),
WSPing: registry.MustNewMetric("ws_ping", stats.Trend, stats.Time),
WSSessionDuration: registry.MustNewMetric("ws_session_duration", stats.Trend, stats.Time),
WSConnecting: registry.MustNewMetric("ws_connecting", stats.Trend, stats.Time),

GRPCReqDuration: registry.MustNewMetric("grpc_req_duration", stats.Trend, stats.Time),

DataSent: registry.MustNewMetric("data_sent", stats.Counter, stats.Data),
DataReceived: registry.MustNewMetric("data_received", stats.Counter, stats.Data),
VUs: registry.MustNewMetric(VUsName, stats.Gauge),
VUsMax: registry.MustNewMetric(VUsMaxName, stats.Gauge),
Iterations: registry.MustNewMetric(IterationsName, stats.Counter),
IterationDuration: registry.MustNewMetric(IterationDurationName, stats.Trend, stats.Time),
DroppedIterations: registry.MustNewMetric(DroppedIterationsName, stats.Counter),
Errors: registry.MustNewMetric(ErrorsName, stats.Counter),

Checks: registry.MustNewMetric(ChecksName, stats.Rate),
GroupDuration: registry.MustNewMetric(GroupDurationName, stats.Trend, stats.Time),

HTTPReqs: registry.MustNewMetric(HTTPReqsName, stats.Counter),
HTTPReqFailed: registry.MustNewMetric(HTTPReqFailedName, stats.Rate),
HTTPReqDuration: registry.MustNewMetric(HTTPReqDurationName, stats.Trend, stats.Time),
HTTPReqBlocked: registry.MustNewMetric(HTTPReqBlockedName, stats.Trend, stats.Time),
HTTPReqConnecting: registry.MustNewMetric(HTTPReqConnectingName, stats.Trend, stats.Time),
HTTPReqTLSHandshaking: registry.MustNewMetric(HTTPReqTLSHandshakingName, stats.Trend, stats.Time),
HTTPReqSending: registry.MustNewMetric(HTTPReqSendingName, stats.Trend, stats.Time),
HTTPReqWaiting: registry.MustNewMetric(HTTPReqWaitingName, stats.Trend, stats.Time),
HTTPReqReceiving: registry.MustNewMetric(HTTPReqReceivingName, stats.Trend, stats.Time),

WSSessions: registry.MustNewMetric(WSSessionsName, stats.Counter),
WSMessagesSent: registry.MustNewMetric(WSMessagesSentName, stats.Counter),
WSMessagesReceived: registry.MustNewMetric(WSMessagesReceivedName, stats.Counter),
WSPing: registry.MustNewMetric(WSPingName, stats.Trend, stats.Time),
WSSessionDuration: registry.MustNewMetric(WSSessionDurationName, stats.Trend, stats.Time),
WSConnecting: registry.MustNewMetric(WSConnectingName, stats.Trend, stats.Time),

GRPCReqDuration: registry.MustNewMetric(GRPCReqDurationName, stats.Trend, stats.Time),

DataSent: registry.MustNewMetric(DataSentName, stats.Counter, stats.Data),
DataReceived: registry.MustNewMetric(DataReceivedName, stats.Counter, stats.Data),
}
}

Expand Down
22 changes: 12 additions & 10 deletions output/cloud/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sort"
"time"

"go.k6.io/k6/lib/metrics"
"go.k6.io/k6/lib/netext/httpext"
"go.k6.io/k6/stats"
)
Expand Down Expand Up @@ -108,22 +109,23 @@ type SampleDataMap struct {

// NewSampleFromTrail just creates a ready-to-send Sample instance
// directly from a httpext.Trail.
func (out *Output) NewSampleFromTrail(trail *httpext.Trail) *Sample {
func NewSampleFromTrail(trail *httpext.Trail) *Sample {
length := 8
if trail.Failed.Valid {
length++
}

values := make(map[string]float64, length)
values[out.metrics.HTTPReqs.Name] = 1
values[out.metrics.HTTPReqDuration.Name] = stats.D(trail.Duration)
values[out.metrics.HTTPReqBlocked.Name] = stats.D(trail.Blocked)
values[out.metrics.HTTPReqConnecting.Name] = stats.D(trail.Connecting)
values[out.metrics.HTTPReqTLSHandshaking.Name] = stats.D(trail.TLSHandshaking)
values[out.metrics.HTTPReqSending.Name] = stats.D(trail.Sending)
values[out.metrics.HTTPReqWaiting.Name] = stats.D(trail.Waiting)
values[out.metrics.HTTPReqReceiving.Name] = stats.D(trail.Receiving)
values[metrics.HTTPReqsName] = 1
values[metrics.HTTPReqDurationName] = stats.D(trail.Duration)
values[metrics.HTTPReqBlockedName] = stats.D(trail.Blocked)
values[metrics.HTTPReqConnectingName] = stats.D(trail.Connecting)
values[metrics.HTTPReqTLSHandshakingName] = stats.D(trail.TLSHandshaking)
values[metrics.HTTPReqSendingName] = stats.D(trail.Sending)
values[metrics.HTTPReqWaitingName] = stats.D(trail.Waiting)
values[metrics.HTTPReqReceivingName] = stats.D(trail.Receiving)
if trail.Failed.Valid { // this is done so the adding of 1 map element doesn't reexpand the map as this is a hotpath
values[out.metrics.HTTPReqFailed.Name] = stats.B(trail.Failed.Bool)
values[metrics.HTTPReqFailedName] = stats.B(trail.Failed.Bool)
}
return &Sample{
Type: DataTypeMap,
Expand Down
11 changes: 6 additions & 5 deletions output/cloud/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
func TestSampleMarshaling(t *testing.T) {
t.Parallel()

builtinMetrics := metrics.RegisterBuiltinMetrics(stats.NewRegistry(nil))
now := time.Now()
exptoMicroSecond := now.UnixNano() / 1000

Expand All @@ -49,9 +50,9 @@ func TestSampleMarshaling(t *testing.T) {
{
&Sample{
Type: DataTypeSingle,
Metric: metrics.VUs.Name,
Metric: metrics.VUsName,
Data: &SampleDataSingle{
Type: metrics.VUs.Type,
Type: builtinMetrics.VUs.Type,
Time: toMicroSecond(now),
Tags: stats.IntoSampleTags(&map[string]string{"aaa": "bbb", "ccc": "123"}),
Value: 999,
Expand All @@ -67,9 +68,9 @@ func TestSampleMarshaling(t *testing.T) {
Time: toMicroSecond(now),
Tags: stats.IntoSampleTags(&map[string]string{"test": "mest"}),
Values: map[string]float64{
metrics.DataSent.Name: 1234.5,
metrics.DataReceived.Name: 6789.1,
metrics.IterationDuration.Name: stats.D(10 * time.Second),
metrics.DataSentName: 1234.5,
metrics.DataReceivedName: 6789.1,
metrics.IterationDurationName: stats.D(10 * time.Second),
},
},
},
Expand Down
26 changes: 9 additions & 17 deletions output/cloud/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ type Output struct {
bufferHTTPTrails []*httpext.Trail
bufferSamples []*Sample

metrics *metrics.BuiltInMetrics

logger logrus.FieldLogger
opts lib.Options

Expand All @@ -90,7 +88,6 @@ var _ interface {
output.WithRunStatusUpdates
output.WithThresholds
output.WithTestRunStop
output.WithBuiltinMetrics
} = &Output{}

// New creates a new cloud output.
Expand Down Expand Up @@ -332,11 +329,6 @@ func (out *Output) SetTestRunStopCallback(stopFunc func(error)) {
out.engineStopFunc = stopFunc
}

// SetBuiltinMetrics receives the function that stops the engine on error
func (out *Output) SetBuiltinMetrics(builtInMetrics *metrics.BuiltInMetrics) {
out.metrics = builtInMetrics
}

func useCloudTags(source *httpext.Trail) *httpext.Trail {
name, nameExist := source.Tags.Get("name")
url, urlExist := source.Tags.Get("url")
Expand Down Expand Up @@ -380,18 +372,18 @@ func (out *Output) AddMetricSamples(sampleContainers []stats.SampleContainer) {
if out.config.AggregationPeriod.Duration > 0 {
newHTTPTrails = append(newHTTPTrails, sc)
} else {
newSamples = append(newSamples, out.NewSampleFromTrail(sc))
newSamples = append(newSamples, NewSampleFromTrail(sc))
}
case *netext.NetTrail:
// TODO: aggregate?
values := map[string]float64{
out.metrics.DataSent.Name: float64(sc.BytesWritten),
out.metrics.DataReceived.Name: float64(sc.BytesRead),
metrics.DataSentName: float64(sc.BytesWritten),
metrics.DataReceivedName: float64(sc.BytesRead),
}

if sc.FullIteration {
values[out.metrics.IterationDuration.Name] = stats.D(sc.EndTime.Sub(sc.StartTime))
values[out.metrics.Iterations.Name] = 1
values[metrics.IterationDurationName] = stats.D(sc.EndTime.Sub(sc.StartTime))
values[metrics.IterationsName] = 1
}

newSamples = append(newSamples, &Sample{
Expand Down Expand Up @@ -494,7 +486,7 @@ func (out *Output) aggregateHTTPTrails(waitPeriod time.Duration) {
trailCount := int64(len(httpTrails))
if trailCount < out.config.AggregationMinSamples.Int64 {
for _, trail := range httpTrails {
newSamples = append(newSamples, out.NewSampleFromTrail(trail))
newSamples = append(newSamples, NewSampleFromTrail(trail))
}
continue
}
Expand Down Expand Up @@ -535,7 +527,7 @@ func (out *Output) aggregateHTTPTrails(waitPeriod time.Duration) {
trail.Duration < minReqDur ||
trail.Duration > maxReqDur {
// Seems like an outlier, add it as a standalone metric
newSamples = append(newSamples, out.NewSampleFromTrail(trail))
newSamples = append(newSamples, NewSampleFromTrail(trail))
} else {
// Aggregate the trail
aggrData.Add(trail)
Expand Down Expand Up @@ -577,13 +569,13 @@ func (out *Output) flushHTTPTrails() {

newSamples := []*Sample{}
for _, trail := range out.bufferHTTPTrails {
newSamples = append(newSamples, out.NewSampleFromTrail(trail))
newSamples = append(newSamples, NewSampleFromTrail(trail))
}
for _, bucket := range out.aggrBuckets {
for _, subBucket := range bucket {
for _, trails := range subBucket {
for _, trail := range trails {
newSamples = append(newSamples, out.NewSampleFromTrail(trail))
newSamples = append(newSamples, NewSampleFromTrail(trail))
}
}
}
Expand Down
29 changes: 17 additions & 12 deletions output/cloud/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func runCloudOutputTestCase(t *testing.T, minSamples int) {
require.NoError(t, err)
}))

builtinMetrics := metrics.RegisterBuiltinMetrics(stats.NewRegistry(nil))
out, err := newOutput(output.Params{
Logger: testutils.NewLogger(t),
JSONConfig: json.RawMessage(fmt.Sprintf(`{"host": "%s", "noCompress": true}`, tb.ServerHTTP.URL)),
Expand Down Expand Up @@ -222,15 +223,15 @@ func runCloudOutputTestCase(t *testing.T, minSamples int) {

out.AddMetricSamples([]stats.SampleContainer{stats.Sample{
Time: now,
Metric: metrics.VUs,
Metric: builtinMetrics.VUs,
Tags: tags,
Value: 1.0,
}})
expSamples <- []Sample{{
Type: DataTypeSingle,
Metric: metrics.VUs.Name,
Metric: metrics.VUsName,
Data: &SampleDataSingle{
Type: metrics.VUs.Type,
Type: builtinMetrics.VUs.Type,
Time: toMicroSecond(now),
Tags: tags,
Value: 1.0,
Expand Down Expand Up @@ -305,6 +306,7 @@ func runCloudOutputTestCase(t *testing.T, minSamples int) {

func TestCloudOutputMaxPerPacket(t *testing.T) {
t.Parallel()
builtinMetrics := metrics.RegisterBuiltinMetrics(stats.NewRegistry(nil))
tb := httpmultibin.NewHTTPMultiBin(t)
maxMetricSamplesPerPackage := 20
tb.Mux.HandleFunc("/v1/tests", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -356,7 +358,7 @@ func TestCloudOutputMaxPerPacket(t *testing.T) {

out.AddMetricSamples([]stats.SampleContainer{stats.Sample{
Time: now,
Metric: metrics.VUs,
Metric: builtinMetrics.VUs,
Tags: stats.NewSampleTags(tags.CloneTags()),
Value: 1.0,
}})
Expand Down Expand Up @@ -399,6 +401,7 @@ func TestCloudOutputStopSendingMetric(t *testing.T) {

func testCloudOutputStopSendingMetric(t *testing.T, stopOnError bool) {
tb := httpmultibin.NewHTTPMultiBin(t)
builtinMetrics := metrics.RegisterBuiltinMetrics(stats.NewRegistry(nil))
tb.Mux.HandleFunc("/v1/tests", http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
body, err := ioutil.ReadAll(req.Body)
require.NoError(t, err)
Expand Down Expand Up @@ -480,7 +483,7 @@ func testCloudOutputStopSendingMetric(t *testing.T, stopOnError bool) {

out.AddMetricSamples([]stats.SampleContainer{stats.Sample{
Time: now,
Metric: metrics.VUs,
Metric: builtinMetrics.VUs,
Tags: stats.NewSampleTags(tags.CloneTags()),
Value: 1.0,
}})
Expand Down Expand Up @@ -520,7 +523,7 @@ func testCloudOutputStopSendingMetric(t *testing.T, stopOnError bool) {
nBufferHTTPTrails := len(out.bufferHTTPTrails)
out.AddMetricSamples([]stats.SampleContainer{stats.Sample{
Time: now,
Metric: metrics.VUs,
Metric: builtinMetrics.VUs,
Tags: stats.NewSampleTags(tags.CloneTags()),
Value: 1.0,
}})
Expand Down Expand Up @@ -601,6 +604,7 @@ func TestCloudOutputAggregationPeriodZeroNoBlock(t *testing.T) {

func TestCloudOutputPushRefID(t *testing.T) {
t.Parallel()
builtinMetrics := metrics.RegisterBuiltinMetrics(stats.NewRegistry(nil))
expSamples := make(chan []Sample)
defer close(expSamples)

Expand Down Expand Up @@ -637,15 +641,15 @@ func TestCloudOutputPushRefID(t *testing.T) {

out.AddMetricSamples([]stats.SampleContainer{stats.Sample{
Time: now,
Metric: metrics.HTTPReqDuration,
Metric: builtinMetrics.HTTPReqDuration,
Tags: tags,
Value: 123.45,
}})
exp := []Sample{{
Type: DataTypeSingle,
Metric: metrics.HTTPReqDuration.Name,
Metric: metrics.HTTPReqDurationName,
Data: &SampleDataSingle{
Type: metrics.HTTPReqDuration.Type,
Type: builtinMetrics.HTTPReqDuration.Type,
Time: toMicroSecond(now),
Tags: tags,
Value: 123.45,
Expand All @@ -663,6 +667,7 @@ func TestCloudOutputPushRefID(t *testing.T) {

func TestCloudOutputRecvIterLIAllIterations(t *testing.T) {
t.Parallel()
builtinMetrics := metrics.RegisterBuiltinMetrics(stats.NewRegistry(nil))
tb := httpmultibin.NewHTTPMultiBin(t)
tb.Mux.HandleFunc("/v1/tests", http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
body, err := ioutil.ReadAll(req.Body)
Expand Down Expand Up @@ -732,17 +737,17 @@ func TestCloudOutputRecvIterLIAllIterations(t *testing.T) {
Samples: []stats.Sample{
{
Time: now,
Metric: metrics.DataSent,
Metric: builtinMetrics.DataSent,
Value: float64(200),
},
{
Time: now,
Metric: metrics.DataReceived,
Metric: builtinMetrics.DataReceived,
Value: float64(100),
},
{
Time: now,
Metric: metrics.Iterations,
Metric: builtinMetrics.Iterations,
Value: 1,
},
},
Expand Down

0 comments on commit 7da10ac

Please sign in to comment.