Skip to content

Commit

Permalink
Add queue-retry for opencensus exporter (#2307)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Jan 4, 2021
1 parent b229230 commit 5156386
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 24 deletions.
4 changes: 3 additions & 1 deletion exporter/opencensusexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ package opencensusexporter
import (
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

// Config defines configuration for OpenCensus exporter.
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.

configgrpc.GRPCClientSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`

// The number of workers that send the gRPC requests.
NumWorkers int `mapstructure:"num_workers"`
Expand Down
13 changes: 13 additions & 0 deletions exporter/opencensusexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package opencensusexporter
import (
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -26,6 +27,7 @@ import (
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtest"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

func TestLoadConfig(t *testing.T) {
Expand All @@ -49,6 +51,17 @@ func TestLoadConfig(t *testing.T) {
NameVal: "opencensus/2",
TypeVal: "opencensus",
},
RetrySettings: exporterhelper.RetrySettings{
Enabled: true,
InitialInterval: 10 * time.Second,
MaxInterval: 1 * time.Minute,
MaxElapsedTime: 10 * time.Minute,
},
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 2,
QueueSize: 10,
},
GRPCClientSettings: configgrpc.GRPCClientSettings{
Headers: map[string]string{
"can you have a . here?": "F0000000-0000-0000-0000-000000000000",
Expand Down
34 changes: 28 additions & 6 deletions exporter/opencensusexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,34 @@ func createDefaultConfig() configmodels.Exporter {
}
}

func createTraceExporter(ctx context.Context, params component.ExporterCreateParams, config configmodels.Exporter) (component.TracesExporter, error) {
oCfg := config.(*Config)
return newTraceExporter(ctx, oCfg, params.Logger)
func createTraceExporter(ctx context.Context, params component.ExporterCreateParams, cfg configmodels.Exporter) (component.TracesExporter, error) {
oCfg := cfg.(*Config)
oce, err := newTraceExporter(ctx, oCfg)
if err != nil {
return nil, err
}

return exporterhelper.NewTraceExporter(
cfg,
params.Logger,
oce.pushTraceData,
exporterhelper.WithRetry(oCfg.RetrySettings),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithShutdown(oce.shutdown))
}

func createMetricsExporter(ctx context.Context, params component.ExporterCreateParams, config configmodels.Exporter) (component.MetricsExporter, error) {
oCfg := config.(*Config)
return newMetricsExporter(ctx, oCfg, params.Logger)
func createMetricsExporter(ctx context.Context, params component.ExporterCreateParams, cfg configmodels.Exporter) (component.MetricsExporter, error) {
oCfg := cfg.(*Config)
oce, err := newMetricsExporter(ctx, oCfg)
if err != nil {
return nil, err
}

return exporterhelper.NewMetricsExporter(
cfg,
params.Logger,
oce.pushMetricsData,
exporterhelper.WithRetry(oCfg.RetrySettings),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithShutdown(oce.shutdown))
}
21 changes: 4 additions & 17 deletions exporter/opencensusexporter/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@ import (
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/translator/internaldata"
)

Expand Down Expand Up @@ -107,7 +104,7 @@ func (oce *ocExporter) shutdown(context.Context) error {
return oce.grpcClientConn.Close()
}

func newTraceExporter(ctx context.Context, cfg *Config, logger *zap.Logger) (component.TracesExporter, error) {
func newTraceExporter(ctx context.Context, cfg *Config) (*ocExporter, error) {
oce, err := newOcExporter(ctx, cfg)
if err != nil {
return nil, err
Expand All @@ -120,15 +117,10 @@ func newTraceExporter(ctx context.Context, cfg *Config, logger *zap.Logger) (com
// constant in the channel.
oce.tracesClients <- nil
}

return exporterhelper.NewTraceExporter(
cfg,
logger,
oce.pushTraceData,
exporterhelper.WithShutdown(oce.shutdown))
return oce, nil
}

func newMetricsExporter(ctx context.Context, cfg *Config, logger *zap.Logger) (component.MetricsExporter, error) {
func newMetricsExporter(ctx context.Context, cfg *Config) (*ocExporter, error) {
oce, err := newOcExporter(ctx, cfg)
if err != nil {
return nil, err
Expand All @@ -141,12 +133,7 @@ func newMetricsExporter(ctx context.Context, cfg *Config, logger *zap.Logger) (c
// constant in the channel.
oce.metricsClients <- nil
}

return exporterhelper.NewMetricsExporter(
cfg,
logger,
oce.pushMetricsData,
exporterhelper.WithShutdown(oce.shutdown))
return oce, nil
}

func (oce *ocExporter) pushTraceData(_ context.Context, td pdata.Traces) (int, error) {
Expand Down
9 changes: 9 additions & 0 deletions exporter/opencensusexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ exporters:
time: 20
timeout: 30
permit_without_stream: true
sending_queue:
enabled: true
num_consumers: 2
queue_size: 10
retry_on_failure:
enabled: true
initial_interval: 10s
max_interval: 60s
max_elapsed_time: 10m

service:
pipelines:
Expand Down

0 comments on commit 5156386

Please sign in to comment.