From ecca7894b60f69b730d88e9f64c57b144171a3f1 Mon Sep 17 00:00:00 2001 From: Ivan <2103732+codebien@users.noreply.github.com> Date: Mon, 29 May 2023 12:05:57 +0200 Subject: [PATCH] output/cloudv2: Error handling for flush (#3082) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit It refactors the error handling on flush. It also sets a custom periodic flusher by not calling the callback on stop and handling this specific case directly on the StopWithTestError function. --------- Co-authored-by: Mihail Stoykov <312246+mstoykov@users.noreply.github.com> Co-authored-by: Ivan Mirić --- output/cloud/expv2/output.go | 144 +++++++++++++++--------- output/cloud/expv2/output_test.go | 177 ++++++++++++++++++++++++++++-- 2 files changed, 259 insertions(+), 62 deletions(-) diff --git a/output/cloud/expv2/output.go b/output/cloud/expv2/output.go index e9c01130ffb..94092bbcd1c 100644 --- a/output/cloud/expv2/output.go +++ b/output/cloud/expv2/output.go @@ -4,7 +4,9 @@ package expv2 import ( "context" + "errors" "net/http" + "sync" "time" "go.k6.io/k6/cloudapi" @@ -26,20 +28,26 @@ type Output struct { testStopFunc func(error) // TODO: replace with the real impl - metricsFlusher noopFlusher - periodicFlusher *output.PeriodicFlusher + metricsFlusher noopFlusher + collector *collector - collector *collector - periodicCollector *output.PeriodicFlusher - stopMetricsCollection chan struct{} + // wg tracks background goroutines + wg sync.WaitGroup + + // stop signal to graceful stop + stop chan struct{} + + // abort signal to interrupt immediately all background goroutines + abort chan struct{} } // New creates a new cloud output. func New(logger logrus.FieldLogger, conf cloudapi.Config) (*Output, error) { return &Output{ - config: conf, - logger: logger.WithFields(logrus.Fields{"output": "cloudv2"}), - stopMetricsCollection: make(chan struct{}), + config: conf, + logger: logger.WithFields(logrus.Fields{"output": "cloudv2"}), + abort: make(chan struct{}), + stop: make(chan struct{}), }, nil } @@ -72,19 +80,8 @@ func (o *Output) Start() error { bq: &o.collector.bq, } - pf, err := output.NewPeriodicFlusher( - o.config.MetricPushInterval.TimeDuration(), o.flushMetrics) - if err != nil { - return err - } - o.periodicFlusher = pf - - pfc, err := output.NewPeriodicFlusher( - o.config.AggregationPeriod.TimeDuration(), o.collectSamples) - if err != nil { - return err - } - o.periodicCollector = pfc + o.periodicInvoke(o.config.MetricPushInterval.TimeDuration(), o.flushMetrics) + o.periodicInvoke(o.config.AggregationPeriod.TimeDuration(), o.collectSamples) o.logger.Debug("Started!") return nil @@ -93,17 +90,24 @@ func (o *Output) Start() error { // StopWithTestError gracefully stops all metric emission from the output. func (o *Output) StopWithTestError(testErr error) error { o.logger.Debug("Stopping...") - close(o.stopMetricsCollection) + defer o.logger.Debug("Stopped!") + + close(o.stop) + o.wg.Wait() + + select { + case <-o.abort: + return nil + default: + } // Drain the SampleBuffer and force the aggregation for flushing // all the queued samples even if they haven't yet passed the // wait period. - o.periodicCollector.Stop() o.collector.DropExpiringDelay() o.collector.CollectSamples(nil) - o.periodicFlusher.Stop() + o.flushMetrics() - o.logger.Debug("Stopped!") return nil } @@ -113,7 +117,7 @@ func (o *Output) AddMetricSamples(s []metrics.SampleContainer) { // evaluate to do something smarter, maybe having a lock-free // queue. select { - case <-o.stopMetricsCollection: + case <-o.abort: return default: } @@ -124,13 +128,33 @@ func (o *Output) AddMetricSamples(s []metrics.SampleContainer) { // // If the bucketing process is efficient, the single // operation could be a bit longer than just enqueuing - // but it could be fast enough to justify to to direct + // but it could be fast enough to justify to direct // run it and save some memory across the e2e operation. // // It requires very specific benchmark. o.SampleBuffer.AddMetricSamples(s) } +func (o *Output) periodicInvoke(d time.Duration, callback func()) { + o.wg.Add(1) + go func() { + defer o.wg.Done() + + t := time.NewTicker(d) + defer t.Stop() + for { + select { + case <-t.C: + callback() + case <-o.stop: + return + case <-o.abort: + return + } + } + }() +} + func (o *Output) collectSamples() { samples := o.GetBufferedSamples() if len(samples) < 1 { @@ -151,42 +175,54 @@ func (o *Output) flushMetrics() { err := o.metricsFlusher.Flush(ctx) if err != nil { - o.logger.WithError(err).Error("Failed to push metrics to the cloud") - - if o.shouldStopSendingMetrics(err) { - o.logger.WithError(err).Warn("Interrupt sending metrics to cloud due to an error") - serr := errext.WithAbortReasonIfNone( - errext.WithExitCodeIfNone(err, exitcodes.ExternalAbort), - errext.AbortedByOutput, - ) - if o.config.StopOnError.Bool { - o.testStopFunc(serr) - } - close(o.stopMetricsCollection) - } + o.handleFlushError(err) return } o.logger.WithField("t", time.Since(start)).Debug("Successfully flushed buffered samples to the cloud") } -// shouldStopSendingMetrics returns true if the output should interrupt the metric flush. +// handleFlushError handles errors generated from the flushing operation. +// It may interrupt the metric collection or invoke aborting of the test. // -// note: The actual test execution should continues, -// since for local k6 run tests the end-of-test summary (or any other outputs) will still work, +// note: The actual test execution should continue, since for local k6 run tests +// the end-of-test summary (or any other outputs) will still work, // but the cloud output doesn't send any more metrics. -// Instead, if cloudapi.Config.StopOnError is enabled -// the cloud output should stop the whole test run too. -// This logic should be handled by the caller. -func (o *Output) shouldStopSendingMetrics(err error) bool { - if err == nil { - return false +// Instead, if cloudapi.Config.StopOnError is enabled the cloud output should +// stop the whole test run too. This logic should be handled by the caller. +func (o *Output) handleFlushError(err error) { + o.logger.WithError(err).Error("Failed to push metrics to the cloud") + + var errResp cloudapi.ErrorResponse + if !errors.As(err, &errResp) || errResp.Response == nil { + return + } + + // The Cloud service returns the error code 4 when it doesn't accept any more metrics. + // So, when k6 sees that, the cloud output just stops prematurely. + if errResp.Response.StatusCode != http.StatusForbidden || errResp.Code != 4 { + return } - if errResp, ok := err.(cloudapi.ErrorResponse); ok && errResp.Response != nil { //nolint:errorlint - // The Cloud service returns the error code 4 when it doesn't accept any more metrics. - // So, when k6 sees that, the cloud output just stops prematurely. - return errResp.Response.StatusCode == http.StatusForbidden && errResp.Code == 4 + + o.logger.WithError(err).Warn("Interrupt sending metrics to cloud due to an error") + + // Do not close multiple times (that would panic) in the case + // we hit this multiple times and/or concurrently + select { + case <-o.abort: + return + default: + close(o.abort) } - return false + if o.config.StopOnError.Bool { + serr := errext.WithAbortReasonIfNone( + errext.WithExitCodeIfNone(err, exitcodes.ExternalAbort), + errext.AbortedByOutput, + ) + + if o.testStopFunc != nil { + o.testStopFunc(serr) + } + } } diff --git a/output/cloud/expv2/output_test.go b/output/cloud/expv2/output_test.go index 55a8ff0a849..d23a44a6c9b 100644 --- a/output/cloud/expv2/output_test.go +++ b/output/cloud/expv2/output_test.go @@ -2,6 +2,9 @@ package expv2 import ( "errors" + "fmt" + "net/http" + "sync/atomic" "testing" "time" @@ -54,20 +57,17 @@ func TestOutputSetTestRunStopCallback(t *testing.T) { func TestOutputCollectSamples(t *testing.T) { t.Parallel() o, err := New(testutils.NewLogger(t), cloudapi.Config{ - AggregationPeriod: types.NewNullDuration(3*time.Second, true), AggregationWaitPeriod: types.NewNullDuration(5*time.Second, true), - MetricPushInterval: types.NewNullDuration(10*time.Second, true), + // Manually control and trigger the various steps + // instead to be time dependent + AggregationPeriod: types.NewNullDuration(1*time.Hour, true), + MetricPushInterval: types.NewNullDuration(1*time.Hour, true), }) require.NoError(t, err) require.NoError(t, o.Start()) - - // Manually control and trigger the various steps - // instead to be time dependent - o.periodicFlusher.Stop() - - o.periodicCollector.Stop() require.Empty(t, o.collector.bq.PopAll()) + o.collector.aggregationPeriod = 3 * time.Second o.collector.nowFunc = func() time.Time { // the cut off will be set to (22-1) return time.Date(2023, time.May, 1, 1, 1, 20, 0, time.UTC) @@ -106,6 +106,7 @@ func TestOutputCollectSamples(t *testing.T) { buckets := o.collector.bq.PopAll() require.Len(t, buckets, 1) require.Contains(t, buckets[0].Sinks, ts) + require.Len(t, buckets[0].Sinks, 1) counter, ok := buckets[0].Sinks[ts].(*metrics.CounterSink) require.True(t, ok) @@ -114,3 +115,163 @@ func TestOutputCollectSamples(t *testing.T) { expTime := time.Date(2023, time.May, 1, 1, 1, 15, 0, time.UTC) assert.Equal(t, expTime, buckets[0].Time) } + +func TestOutputHandleFlushError(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + err error + abort bool + expStopMetricCollection bool + expAborted bool + }{ + { + name: "no stop on generic errors", + err: errors.New("a fake unknown error"), + abort: true, + expStopMetricCollection: false, + expAborted: false, + }, + { + name: "error code equals 4 but no abort", + err: cloudapi.ErrorResponse{ + Response: &http.Response{StatusCode: http.StatusForbidden}, + Code: 4, + }, + abort: false, + expStopMetricCollection: true, + expAborted: false, + }, + { + name: "error code equals 4 and abort", + err: cloudapi.ErrorResponse{ + Response: &http.Response{StatusCode: http.StatusForbidden}, + Code: 4, + }, + abort: true, + expStopMetricCollection: true, + expAborted: true, + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + stopFuncCalled := false + stopMetricCollection := false + + o := Output{ + logger: testutils.NewLogger(t), + testStopFunc: func(error) { + stopFuncCalled = true + }, + abort: make(chan struct{}), + } + o.config.StopOnError = null.BoolFrom(tc.abort) + + done := make(chan struct{}) + stopped := make(chan struct{}) + + go func() { + defer close(stopped) + + <-done + select { + case <-o.abort: + stopMetricCollection = true + default: + } + }() + + o.handleFlushError(tc.err) + close(done) + <-stopped + + assert.Equal(t, tc.expStopMetricCollection, stopMetricCollection) + assert.Equal(t, tc.expAborted, stopFuncCalled) + }) + } +} + +// assert that the output does not stuck or panic +// when it is called doesn't stuck +func TestOutputHandleFlushErrorMultipleTimes(t *testing.T) { + t.Parallel() + + var stopFuncCalled int + o := Output{ + logger: testutils.NewLogger(t), + abort: make(chan struct{}), + testStopFunc: func(error) { + stopFuncCalled++ + }, + } + o.config.StopOnError = null.BoolFrom(true) + + er := cloudapi.ErrorResponse{ + Response: &http.Response{ + StatusCode: http.StatusForbidden, + }, + Code: 4, + } + o.handleFlushError(fmt.Errorf("first error: %w", er)) + o.handleFlushError(fmt.Errorf("second error: %w", er)) + assert.Equal(t, 1, stopFuncCalled) +} + +func TestOutputAddMetricSamples(t *testing.T) { + t.Parallel() + + stopSamples := make(chan struct{}) + o := Output{ + abort: stopSamples, + } + require.Empty(t, o.GetBufferedSamples()) + + s := metrics.Sample{} + o.AddMetricSamples([]metrics.SampleContainer{ + metrics.Samples([]metrics.Sample{s}), + }) + require.Len(t, o.GetBufferedSamples(), 1) + + // Not accept samples anymore when the chan is closed + close(stopSamples) + o.AddMetricSamples([]metrics.SampleContainer{ + metrics.Samples([]metrics.Sample{s}), + }) + require.Empty(t, o.GetBufferedSamples()) +} + +func TestOutputPeriodicInvoke(t *testing.T) { + t.Parallel() + + stop := make(chan struct{}) + var called uint64 + cb := func() { + updated := atomic.AddUint64(&called, 1) + if updated == 2 { + close(stop) + } + } + o := Output{stop: stop} + o.periodicInvoke(time.Duration(1), cb) // loop + <-stop + assert.Greater(t, atomic.LoadUint64(&called), uint64(1)) +} + +func TestOutputStopWithTestError(t *testing.T) { + t.Parallel() + + config := cloudapi.NewConfig() + config.Host = null.StringFrom("") // flush not expected + config.AggregationPeriod = types.NullDurationFrom(1 * time.Hour) + + o, err := New(testutils.NewLogger(t), config) + require.NoError(t, err) + + require.NoError(t, o.Start()) + require.NoError(t, o.StopWithTestError(errors.New("an error"))) +}