From 51563860af5a558b6e7087feea2502b5d348375c Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Mon, 4 Jan 2021 10:34:46 -0800 Subject: [PATCH] Add queue-retry for opencensus exporter (#2307) Signed-off-by: Bogdan Drutu --- exporter/opencensusexporter/config.go | 4 ++- exporter/opencensusexporter/config_test.go | 13 +++++++ exporter/opencensusexporter/factory.go | 34 +++++++++++++++---- exporter/opencensusexporter/opencensus.go | 21 +++--------- .../opencensusexporter/testdata/config.yaml | 9 +++++ 5 files changed, 57 insertions(+), 24 deletions(-) diff --git a/exporter/opencensusexporter/config.go b/exporter/opencensusexporter/config.go index a2df6a415402..27a9e741bfe5 100644 --- a/exporter/opencensusexporter/config.go +++ b/exporter/opencensusexporter/config.go @@ -17,13 +17,15 @@ package opencensusexporter import ( "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/exporter/exporterhelper" ) // Config defines configuration for OpenCensus exporter. type Config struct { configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. - configgrpc.GRPCClientSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + exporterhelper.QueueSettings `mapstructure:"sending_queue"` + exporterhelper.RetrySettings `mapstructure:"retry_on_failure"` // The number of workers that send the gRPC requests. NumWorkers int `mapstructure:"num_workers"` diff --git a/exporter/opencensusexporter/config_test.go b/exporter/opencensusexporter/config_test.go index ad9f71701e7e..ecd2cec511a3 100644 --- a/exporter/opencensusexporter/config_test.go +++ b/exporter/opencensusexporter/config_test.go @@ -17,6 +17,7 @@ package opencensusexporter import ( "path" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -26,6 +27,7 @@ import ( "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/config/configtest" "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/exporter/exporterhelper" ) func TestLoadConfig(t *testing.T) { @@ -49,6 +51,17 @@ func TestLoadConfig(t *testing.T) { NameVal: "opencensus/2", TypeVal: "opencensus", }, + RetrySettings: exporterhelper.RetrySettings{ + Enabled: true, + InitialInterval: 10 * time.Second, + MaxInterval: 1 * time.Minute, + MaxElapsedTime: 10 * time.Minute, + }, + QueueSettings: exporterhelper.QueueSettings{ + Enabled: true, + NumConsumers: 2, + QueueSize: 10, + }, GRPCClientSettings: configgrpc.GRPCClientSettings{ Headers: map[string]string{ "can you have a . here?": "F0000000-0000-0000-0000-000000000000", diff --git a/exporter/opencensusexporter/factory.go b/exporter/opencensusexporter/factory.go index aabdd8e6e30c..d11f2e3d4ccf 100644 --- a/exporter/opencensusexporter/factory.go +++ b/exporter/opencensusexporter/factory.go @@ -52,12 +52,34 @@ func createDefaultConfig() configmodels.Exporter { } } -func createTraceExporter(ctx context.Context, params component.ExporterCreateParams, config configmodels.Exporter) (component.TracesExporter, error) { - oCfg := config.(*Config) - return newTraceExporter(ctx, oCfg, params.Logger) +func createTraceExporter(ctx context.Context, params component.ExporterCreateParams, cfg configmodels.Exporter) (component.TracesExporter, error) { + oCfg := cfg.(*Config) + oce, err := newTraceExporter(ctx, oCfg) + if err != nil { + return nil, err + } + + return exporterhelper.NewTraceExporter( + cfg, + params.Logger, + oce.pushTraceData, + exporterhelper.WithRetry(oCfg.RetrySettings), + exporterhelper.WithQueue(oCfg.QueueSettings), + exporterhelper.WithShutdown(oce.shutdown)) } -func createMetricsExporter(ctx context.Context, params component.ExporterCreateParams, config configmodels.Exporter) (component.MetricsExporter, error) { - oCfg := config.(*Config) - return newMetricsExporter(ctx, oCfg, params.Logger) +func createMetricsExporter(ctx context.Context, params component.ExporterCreateParams, cfg configmodels.Exporter) (component.MetricsExporter, error) { + oCfg := cfg.(*Config) + oce, err := newMetricsExporter(ctx, oCfg) + if err != nil { + return nil, err + } + + return exporterhelper.NewMetricsExporter( + cfg, + params.Logger, + oce.pushMetricsData, + exporterhelper.WithRetry(oCfg.RetrySettings), + exporterhelper.WithQueue(oCfg.QueueSettings), + exporterhelper.WithShutdown(oce.shutdown)) } diff --git a/exporter/opencensusexporter/opencensus.go b/exporter/opencensusexporter/opencensus.go index 42e98e908762..b0c397ac4336 100644 --- a/exporter/opencensusexporter/opencensus.go +++ b/exporter/opencensusexporter/opencensus.go @@ -23,13 +23,10 @@ import ( agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1" agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" - "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/metadata" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/translator/internaldata" ) @@ -107,7 +104,7 @@ func (oce *ocExporter) shutdown(context.Context) error { return oce.grpcClientConn.Close() } -func newTraceExporter(ctx context.Context, cfg *Config, logger *zap.Logger) (component.TracesExporter, error) { +func newTraceExporter(ctx context.Context, cfg *Config) (*ocExporter, error) { oce, err := newOcExporter(ctx, cfg) if err != nil { return nil, err @@ -120,15 +117,10 @@ func newTraceExporter(ctx context.Context, cfg *Config, logger *zap.Logger) (com // constant in the channel. oce.tracesClients <- nil } - - return exporterhelper.NewTraceExporter( - cfg, - logger, - oce.pushTraceData, - exporterhelper.WithShutdown(oce.shutdown)) + return oce, nil } -func newMetricsExporter(ctx context.Context, cfg *Config, logger *zap.Logger) (component.MetricsExporter, error) { +func newMetricsExporter(ctx context.Context, cfg *Config) (*ocExporter, error) { oce, err := newOcExporter(ctx, cfg) if err != nil { return nil, err @@ -141,12 +133,7 @@ func newMetricsExporter(ctx context.Context, cfg *Config, logger *zap.Logger) (c // constant in the channel. oce.metricsClients <- nil } - - return exporterhelper.NewMetricsExporter( - cfg, - logger, - oce.pushMetricsData, - exporterhelper.WithShutdown(oce.shutdown)) + return oce, nil } func (oce *ocExporter) pushTraceData(_ context.Context, td pdata.Traces) (int, error) { diff --git a/exporter/opencensusexporter/testdata/config.yaml b/exporter/opencensusexporter/testdata/config.yaml index 20b1284dd590..a75278dfeb8d 100644 --- a/exporter/opencensusexporter/testdata/config.yaml +++ b/exporter/opencensusexporter/testdata/config.yaml @@ -20,6 +20,15 @@ exporters: time: 20 timeout: 30 permit_without_stream: true + sending_queue: + enabled: true + num_consumers: 2 + queue_size: 10 + retry_on_failure: + enabled: true + initial_interval: 10s + max_interval: 60s + max_elapsed_time: 10m service: pipelines: