diff --git a/cloudapi/insights/client.go b/cloudapi/insights/client.go index c28328dc7d8..c9ed123c5fc 100644 --- a/cloudapi/insights/client.go +++ b/cloudapi/insights/client.go @@ -11,7 +11,6 @@ import ( "time" grpcRetry "github.com/grpc-ecosystem/go-grpc-middleware/retry" - "go.k6.io/k6/lib/types" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" @@ -37,7 +36,7 @@ var ( // ClientConfig is the configuration for the client. type ClientConfig struct { IngesterHost string - Timeout types.NullDuration + Timeout time.Duration ConnectConfig ClientConnectConfig AuthConfig ClientAuthConfig TLSConfig ClientTLSConfig @@ -48,6 +47,7 @@ type ClientConfig struct { type ClientConnectConfig struct { Block bool FailOnNonTempDialError bool + Timeout time.Duration Dialer func(context.Context, string) (net.Conn, error) } @@ -88,6 +88,39 @@ type Client struct { connMu *sync.RWMutex } +// NewDefaultClientConfigForTestRun creates a new default client config for a test run. +func NewDefaultClientConfigForTestRun(ingesterHost, authToken string, testRunID int64) ClientConfig { + return ClientConfig{ + IngesterHost: ingesterHost, + Timeout: 90 * time.Second, + ConnectConfig: ClientConnectConfig{ + Block: false, + FailOnNonTempDialError: false, + Timeout: 10 * time.Second, + Dialer: nil, + }, + AuthConfig: ClientAuthConfig{ + Enabled: true, + TestRunID: testRunID, + Token: authToken, + RequireTransportSecurity: true, + }, + TLSConfig: ClientTLSConfig{ + Insecure: false, + }, + RetryConfig: ClientRetryConfig{ + RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`, + MaxAttempts: 3, + PerRetryTimeout: 30 * time.Second, + BackoffConfig: ClientBackoffConfig{ + Enabled: true, + JitterFraction: 0.1, + WaitBetween: 1 * time.Second, + }, + }, + } +} + // NewClient creates a new client. func NewClient(cfg ClientConfig) *Client { return &Client{ @@ -112,6 +145,8 @@ func (c *Client) Dial(ctx context.Context) error { return fmt.Errorf("failed to create dial options: %w", err) } + ctx, cancel := context.WithTimeout(ctx, c.cfg.ConnectConfig.Timeout) + defer cancel() conn, err := grpc.DialContext(ctx, c.cfg.IngesterHost, opts...) if err != nil { return fmt.Errorf("failed to dial: %w", err) @@ -132,9 +167,6 @@ func (c *Client) IngestRequestMetadatasBatch(ctx context.Context, requestMetadat return ErrClientClosed } - ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout.TimeDuration()) - defer cancel() - if len(requestMetadatas) < 1 { return nil } @@ -144,6 +176,8 @@ func (c *Client) IngestRequestMetadatasBatch(ctx context.Context, requestMetadat return fmt.Errorf("failed to create request from request metadatas: %w", err) } + ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout) + defer cancel() _, err = c.client.BatchCreateRequestMetadatas(ctx, req) if err != nil { st := status.Convert(err) diff --git a/cloudapi/insights/client_test.go b/cloudapi/insights/client_test.go index fb45d12cb55..cb0e88ca984 100644 --- a/cloudapi/insights/client_test.go +++ b/cloudapi/insights/client_test.go @@ -8,7 +8,6 @@ import ( "time" "github.com/stretchr/testify/require" - "go.k6.io/k6/lib/types" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -92,8 +91,8 @@ func TestClient_Dial_ReturnsNoErrorWithWorkingDialer(t *testing.T) { lis := newMockListener(t, ser) cfg := ClientConfig{ - Timeout: types.NullDurationFrom(1 * time.Second), - ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)}, + Timeout: 1 * time.Second, + ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)}, TLSConfig: ClientTLSConfig{Insecure: true}, RetryConfig: ClientRetryConfig{RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`}, } @@ -114,8 +113,8 @@ func TestClient_Dial_ReturnsErrorWhenCalledTwice(t *testing.T) { lis := newMockListener(t, ser) cfg := ClientConfig{ - Timeout: types.NullDurationFrom(1 * time.Second), - ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)}, + Timeout: 1 * time.Second, + ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)}, TLSConfig: ClientTLSConfig{Insecure: true}, RetryConfig: ClientRetryConfig{RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`}, } @@ -138,6 +137,7 @@ func TestClient_Dial_ReturnsNoErrorWithFailingDialer(t *testing.T) { ConnectConfig: ClientConnectConfig{ Block: true, FailOnNonTempDialError: true, + Timeout: 1 * time.Second, Dialer: func(ctx context.Context, s string) (net.Conn, error) { return nil, &fatalError{} }, @@ -163,7 +163,7 @@ func TestClient_Dial_ReturnsErrorWithoutRetryableStatusCodes(t *testing.T) { lis := newMockListener(t, ser) cfg := ClientConfig{ - Timeout: types.NullDurationFrom(1 * time.Second), + Timeout: 1 * time.Second, ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)}, TLSConfig: ClientTLSConfig{Insecure: true}, } @@ -184,7 +184,7 @@ func TestClient_Dial_ReturnsErrorWithInvalidRetryableStatusCodes(t *testing.T) { lis := newMockListener(t, ser) cfg := ClientConfig{ - Timeout: types.NullDurationFrom(1 * time.Second), + Timeout: 1 * time.Second, ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)}, TLSConfig: ClientTLSConfig{Insecure: true}, RetryConfig: ClientRetryConfig{RetryableStatusCodes: "RANDOM,INTERNAL"}, @@ -206,8 +206,8 @@ func TestClient_IngestRequestMetadatasBatch_ReturnsNoErrorWithWorkingServerAndNo lis := newMockListener(t, ser) cfg := ClientConfig{ - Timeout: types.NullDurationFrom(1 * time.Second), - ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)}, + Timeout: 1 * time.Second, + ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)}, TLSConfig: ClientTLSConfig{Insecure: true}, RetryConfig: ClientRetryConfig{RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`}, } @@ -231,8 +231,8 @@ func TestClient_IngestRequestMetadatasBatch_ReturnsNoErrorWithWorkingServerAndNo lis := newMockListener(t, ser) cfg := ClientConfig{ - Timeout: types.NullDurationFrom(1 * time.Second), - ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)}, + Timeout: 1 * time.Second, + ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)}, TLSConfig: ClientTLSConfig{Insecure: true}, RetryConfig: ClientRetryConfig{RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`}, } @@ -272,8 +272,8 @@ func TestClient_IngestRequestMetadatasBatch_ReturnsErrorWithWorkingServerAndCanc lis := newMockListener(t, ser) cfg := ClientConfig{ - Timeout: types.NullDurationFrom(1 * time.Second), - ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)}, + Timeout: 1 * time.Second, + ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)}, TLSConfig: ClientTLSConfig{Insecure: true}, RetryConfig: ClientRetryConfig{RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`}, } @@ -308,7 +308,7 @@ func TestClient_IngestRequestMetadatasBatch_ReturnsErrorWithUninitializedClient( lis := newMockListener(t, ser) cfg := ClientConfig{ - Timeout: types.NullDurationFrom(1 * time.Second), + Timeout: 1 * time.Second, ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)}, TLSConfig: ClientTLSConfig{Insecure: true}, } @@ -341,8 +341,8 @@ func TestClient_IngestRequestMetadatasBatch_ReturnsErrorWithFailingServerAndNonC lis := newMockListener(t, ser) cfg := ClientConfig{ - Timeout: types.NullDurationFrom(1 * time.Second), - ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)}, + Timeout: 1 * time.Second, + ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)}, TLSConfig: ClientTLSConfig{Insecure: true}, RetryConfig: ClientRetryConfig{RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`}, } @@ -373,8 +373,8 @@ func TestClient_IngestRequestMetadatasBatch_ReturnsNoErrorAfterRetrySeveralTimes lis := newMockListener(t, ser) cfg := ClientConfig{ - Timeout: types.NullDurationFrom(1 * time.Second), - ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)}, + Timeout: 1 * time.Second, + ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)}, TLSConfig: ClientTLSConfig{Insecure: true}, RetryConfig: ClientRetryConfig{ MaxAttempts: 20, @@ -422,8 +422,8 @@ func TestClient_IngestRequestMetadatasBatch_ReturnsErrorAfterExhaustingMaxRetryA lis := newMockListener(t, ser) cfg := ClientConfig{ - Timeout: types.NullDurationFrom(1 * time.Second), - ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)}, + Timeout: 1 * time.Second, + ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)}, TLSConfig: ClientTLSConfig{Insecure: true}, RetryConfig: ClientRetryConfig{ BackoffConfig: ClientBackoffConfig{ @@ -469,7 +469,7 @@ func TestClient_Close_ReturnsNoErrorWhenClosedOnce(t *testing.T) { lis := newMockListener(t, ser) cfg := ClientConfig{ - ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)}, + ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)}, TLSConfig: ClientTLSConfig{Insecure: true}, RetryConfig: ClientRetryConfig{RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`}, } @@ -491,7 +491,7 @@ func TestClient_Close_ReturnsNoErrorWhenClosedTwice(t *testing.T) { lis := newMockListener(t, ser) cfg := ClientConfig{ - ConnectConfig: ClientConnectConfig{Dialer: newMockContextDialer(t, lis)}, + ConnectConfig: ClientConnectConfig{Timeout: 1 * time.Second, Dialer: newMockContextDialer(t, lis)}, TLSConfig: ClientTLSConfig{Insecure: true}, RetryConfig: ClientRetryConfig{RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`}, } diff --git a/output/cloud/expv2/output.go b/output/cloud/expv2/output.go index 4f991efc84a..a03a8f0bc0c 100644 --- a/output/cloud/expv2/output.go +++ b/output/cloud/expv2/output.go @@ -16,7 +16,6 @@ import ( "go.k6.io/k6/errext" "go.k6.io/k6/errext/exitcodes" "go.k6.io/k6/lib/consts" - "go.k6.io/k6/lib/types" "go.k6.io/k6/metrics" "go.k6.io/k6/output" insightsOutput "go.k6.io/k6/output/cloud/insights" @@ -125,34 +124,14 @@ func (o *Output) Start() error { } o.requestMetadatasCollector = insightsOutput.NewCollector(testRunID) - insightsClientConfig := insights.ClientConfig{ - IngesterHost: o.config.TracesHost.String, - Timeout: types.NewNullDuration(90*time.Second, false), - AuthConfig: insights.ClientAuthConfig{ - Enabled: true, - TestRunID: testRunID, - Token: o.config.Token.String, - RequireTransportSecurity: true, - }, - TLSConfig: insights.ClientTLSConfig{ - Insecure: false, - }, - RetryConfig: insights.ClientRetryConfig{ - RetryableStatusCodes: `"UNKNOWN","INTERNAL","UNAVAILABLE","DEADLINE_EXCEEDED"`, - MaxAttempts: 3, - PerRetryTimeout: 30 * time.Second, - BackoffConfig: insights.ClientBackoffConfig{ - Enabled: true, - JitterFraction: 0.1, - WaitBetween: 1 * time.Second, - }, - }, - } + insightsClientConfig := insights.NewDefaultClientConfigForTestRun( + o.config.TracesHost.String, + o.config.Token.String, + testRunID, + ) insightsClient := insights.NewClient(insightsClientConfig) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - if err := insightsClient.Dial(ctx); err != nil { + if err := insightsClient.Dial(context.Background()); err != nil { return err } @@ -319,6 +298,8 @@ func (o *Output) flushRequestMetadatas() { err := o.requestMetadatasFlusher.Flush() if err != nil { o.logger.WithError(err).WithField("t", time.Since(start)).Error("Failed to push trace samples to the cloud") + + return } o.logger.WithField("t", time.Since(start)).Debug("Successfully flushed buffered trace samples to the cloud") diff --git a/output/cloud/v1/output.go b/output/cloud/v1/output.go index a8554c0f3eb..35c98b4aa4c 100644 --- a/output/cloud/v1/output.go +++ b/output/cloud/v1/output.go @@ -3,16 +3,20 @@ package cloud import ( + "context" "net/http" + "strconv" "sync" "time" - easyjson "github.com/mailru/easyjson" + "github.com/mailru/easyjson" "github.com/sirupsen/logrus" "go.k6.io/k6/cloudapi" + "go.k6.io/k6/cloudapi/insights" "go.k6.io/k6/errext" "go.k6.io/k6/errext/exitcodes" + insightsOutput "go.k6.io/k6/output/cloud/insights" "go.k6.io/k6/lib/netext" "go.k6.io/k6/lib/netext/httpext" @@ -34,6 +38,10 @@ type Output struct { bufferHTTPTrails []*httpext.Trail bufferSamples []*Sample + insightsClient insightsOutput.Client + requestMetadatasCollector insightsOutput.RequestMetadatasCollector + requestMetadatasFlusher insightsOutput.RequestMetadatasFlusher + // TODO: optimize this // // Since the real-time metrics refactoring (https://github.com/k6io/k6/pull/678), @@ -101,6 +109,29 @@ func (out *Output) Start() error { }() } + if insightsOutput.Enabled(out.config) { + testRunID, err := strconv.ParseInt(out.referenceID, 10, 64) + if err != nil { + return err + } + out.requestMetadatasCollector = insightsOutput.NewCollector(testRunID) + + insightsClientConfig := insights.NewDefaultClientConfigForTestRun( + out.config.TracesHost.String, + out.config.Token.String, + testRunID, + ) + insightsClient := insights.NewClient(insightsClientConfig) + + if err := insightsClient.Dial(context.Background()); err != nil { + return err + } + + out.insightsClient = insightsClient + out.requestMetadatasFlusher = insightsOutput.NewFlusher(insightsClient, out.requestMetadatasCollector) + out.runFlushRequestMetadatas() + } + out.outputDone.Add(1) go func() { defer out.outputDone.Done() @@ -136,6 +167,11 @@ func (out *Output) StopWithTestError(testErr error) error { close(out.stopOutput) out.outputDone.Wait() out.logger.Debug("Metric emission stopped, calling cloud API...") + if insightsOutput.Enabled(out.config) { + if err := out.insightsClient.Close(); err != nil { + out.logger.WithError(err).Error("Failed to close the insights client") + } + } return nil } @@ -222,6 +258,10 @@ func (out *Output) AddMetricSamples(sampleContainers []metrics.SampleContainer) out.bufferHTTPTrails = append(out.bufferHTTPTrails, newHTTPTrails...) out.bufferMutex.Unlock() } + + if insightsOutput.Enabled(out.config) { + out.requestMetadatasCollector.CollectRequestMetadatas(sampleContainers) + } } //nolint:funlen,nestif,gocognit @@ -472,4 +512,44 @@ func (out *Output) pushMetrics() { }).Debug("Pushing metrics to cloud finished") } +func (out *Output) runFlushRequestMetadatas() { + t := time.NewTicker(out.config.TracesPushInterval.TimeDuration()) + + for i := int64(0); i < out.config.TracesPushConcurrency.Int64; i++ { + out.outputDone.Add(1) + go func() { + defer out.outputDone.Done() + defer t.Stop() + + for { + select { + case <-out.stopSendingMetrics: + return + default: + } + select { + case <-out.stopOutput: + out.flushRequestMetadatas() + return + case <-t.C: + out.flushRequestMetadatas() + } + } + }() + } +} + +func (out *Output) flushRequestMetadatas() { + start := time.Now() + + err := out.requestMetadatasFlusher.Flush() + if err != nil { + out.logger.WithError(err).WithField("t", time.Since(start)).Error("Failed to push trace samples to the cloud") + + return + } + + out.logger.WithField("t", time.Since(start)).Debug("Successfully flushed buffered trace samples to the cloud") +} + const expectedGzipRatio = 6 // based on test it is around 6.8, but we don't need to be that accurate