Skip to content

Commit

Permalink
output/cloudv2: Error handling for flush (#3082)
Browse files Browse the repository at this point in the history
It refactors the error handling on flush.
It also sets a custom periodic flusher by not calling the callback on stop and handling this specific case directly on the StopWithTestError function.

---------

Co-authored-by: Mihail Stoykov <312246+mstoykov@users.noreply.github.com>
Co-authored-by: Ivan Mirić <ivan.miric@grafana.com>
  • Loading branch information
3 people authored May 29, 2023
1 parent 677bb29 commit ecca789
Show file tree
Hide file tree
Showing 2 changed files with 259 additions and 62 deletions.
144 changes: 90 additions & 54 deletions output/cloud/expv2/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ package expv2

import (
"context"
"errors"
"net/http"
"sync"
"time"

"go.k6.io/k6/cloudapi"
Expand All @@ -26,20 +28,26 @@ type Output struct {
testStopFunc func(error)

// TODO: replace with the real impl
metricsFlusher noopFlusher
periodicFlusher *output.PeriodicFlusher
metricsFlusher noopFlusher
collector *collector

collector *collector
periodicCollector *output.PeriodicFlusher
stopMetricsCollection chan struct{}
// wg tracks background goroutines
wg sync.WaitGroup

// stop signal to graceful stop
stop chan struct{}

// abort signal to interrupt immediately all background goroutines
abort chan struct{}
}

// New creates a new cloud output.
func New(logger logrus.FieldLogger, conf cloudapi.Config) (*Output, error) {
return &Output{
config: conf,
logger: logger.WithFields(logrus.Fields{"output": "cloudv2"}),
stopMetricsCollection: make(chan struct{}),
config: conf,
logger: logger.WithFields(logrus.Fields{"output": "cloudv2"}),
abort: make(chan struct{}),
stop: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -72,19 +80,8 @@ func (o *Output) Start() error {
bq: &o.collector.bq,
}

pf, err := output.NewPeriodicFlusher(
o.config.MetricPushInterval.TimeDuration(), o.flushMetrics)
if err != nil {
return err
}
o.periodicFlusher = pf

pfc, err := output.NewPeriodicFlusher(
o.config.AggregationPeriod.TimeDuration(), o.collectSamples)
if err != nil {
return err
}
o.periodicCollector = pfc
o.periodicInvoke(o.config.MetricPushInterval.TimeDuration(), o.flushMetrics)
o.periodicInvoke(o.config.AggregationPeriod.TimeDuration(), o.collectSamples)

o.logger.Debug("Started!")
return nil
Expand All @@ -93,17 +90,24 @@ func (o *Output) Start() error {
// StopWithTestError gracefully stops all metric emission from the output.
func (o *Output) StopWithTestError(testErr error) error {
o.logger.Debug("Stopping...")
close(o.stopMetricsCollection)
defer o.logger.Debug("Stopped!")

close(o.stop)
o.wg.Wait()

select {
case <-o.abort:
return nil
default:
}

// Drain the SampleBuffer and force the aggregation for flushing
// all the queued samples even if they haven't yet passed the
// wait period.
o.periodicCollector.Stop()
o.collector.DropExpiringDelay()
o.collector.CollectSamples(nil)
o.periodicFlusher.Stop()
o.flushMetrics()

o.logger.Debug("Stopped!")
return nil
}

Expand All @@ -113,7 +117,7 @@ func (o *Output) AddMetricSamples(s []metrics.SampleContainer) {
// evaluate to do something smarter, maybe having a lock-free
// queue.
select {
case <-o.stopMetricsCollection:
case <-o.abort:
return
default:
}
Expand All @@ -124,13 +128,33 @@ func (o *Output) AddMetricSamples(s []metrics.SampleContainer) {
//
// If the bucketing process is efficient, the single
// operation could be a bit longer than just enqueuing
// but it could be fast enough to justify to to direct
// but it could be fast enough to justify to direct
// run it and save some memory across the e2e operation.
//
// It requires very specific benchmark.
o.SampleBuffer.AddMetricSamples(s)
}

func (o *Output) periodicInvoke(d time.Duration, callback func()) {
o.wg.Add(1)
go func() {
defer o.wg.Done()

t := time.NewTicker(d)
defer t.Stop()
for {
select {
case <-t.C:
callback()
case <-o.stop:
return
case <-o.abort:
return
}
}
}()
}

func (o *Output) collectSamples() {
samples := o.GetBufferedSamples()
if len(samples) < 1 {
Expand All @@ -151,42 +175,54 @@ func (o *Output) flushMetrics() {

err := o.metricsFlusher.Flush(ctx)
if err != nil {
o.logger.WithError(err).Error("Failed to push metrics to the cloud")

if o.shouldStopSendingMetrics(err) {
o.logger.WithError(err).Warn("Interrupt sending metrics to cloud due to an error")
serr := errext.WithAbortReasonIfNone(
errext.WithExitCodeIfNone(err, exitcodes.ExternalAbort),
errext.AbortedByOutput,
)
if o.config.StopOnError.Bool {
o.testStopFunc(serr)
}
close(o.stopMetricsCollection)
}
o.handleFlushError(err)
return
}

o.logger.WithField("t", time.Since(start)).Debug("Successfully flushed buffered samples to the cloud")
}

// shouldStopSendingMetrics returns true if the output should interrupt the metric flush.
// handleFlushError handles errors generated from the flushing operation.
// It may interrupt the metric collection or invoke aborting of the test.
//
// note: The actual test execution should continues,
// since for local k6 run tests the end-of-test summary (or any other outputs) will still work,
// note: The actual test execution should continue, since for local k6 run tests
// the end-of-test summary (or any other outputs) will still work,
// but the cloud output doesn't send any more metrics.
// Instead, if cloudapi.Config.StopOnError is enabled
// the cloud output should stop the whole test run too.
// This logic should be handled by the caller.
func (o *Output) shouldStopSendingMetrics(err error) bool {
if err == nil {
return false
// Instead, if cloudapi.Config.StopOnError is enabled the cloud output should
// stop the whole test run too. This logic should be handled by the caller.
func (o *Output) handleFlushError(err error) {
o.logger.WithError(err).Error("Failed to push metrics to the cloud")

var errResp cloudapi.ErrorResponse
if !errors.As(err, &errResp) || errResp.Response == nil {
return
}

// The Cloud service returns the error code 4 when it doesn't accept any more metrics.
// So, when k6 sees that, the cloud output just stops prematurely.
if errResp.Response.StatusCode != http.StatusForbidden || errResp.Code != 4 {
return
}
if errResp, ok := err.(cloudapi.ErrorResponse); ok && errResp.Response != nil { //nolint:errorlint
// The Cloud service returns the error code 4 when it doesn't accept any more metrics.
// So, when k6 sees that, the cloud output just stops prematurely.
return errResp.Response.StatusCode == http.StatusForbidden && errResp.Code == 4

o.logger.WithError(err).Warn("Interrupt sending metrics to cloud due to an error")

// Do not close multiple times (that would panic) in the case
// we hit this multiple times and/or concurrently
select {
case <-o.abort:
return
default:
close(o.abort)
}

return false
if o.config.StopOnError.Bool {
serr := errext.WithAbortReasonIfNone(
errext.WithExitCodeIfNone(err, exitcodes.ExternalAbort),
errext.AbortedByOutput,
)

if o.testStopFunc != nil {
o.testStopFunc(serr)
}
}
}
Loading

0 comments on commit ecca789

Please sign in to comment.