Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable batching for traces #1352

Merged
merged 4 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 42 additions & 33 deletions pkg/export/otel/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import (
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/exporter/otlpexporter"
"go.opentelemetry.io/collector/exporter/otlphttpexporter"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -76,7 +79,7 @@ type TracesConfig struct {
BatchTimeout time.Duration `yaml:"batch_timeout" env:"BEYLA_OTLP_TRACES_BATCH_TIMEOUT"`
ExportTimeout time.Duration `yaml:"export_timeout" env:"BEYLA_OTLP_TRACES_EXPORT_TIMEOUT"`

// Configuration optiosn for BackOffConfig of the traces exporter.
// Configuration options for BackOffConfig of the traces exporter.
// See https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configretry/backoff.go
// BackOffInitialInterval the time to wait after the first failure before retrying.
BackOffInitialInterval time.Duration `yaml:"backoff_initial_interval" env:"BEYLA_BACKOFF_INITIAL_INTERVAL"`
Expand Down Expand Up @@ -240,6 +243,7 @@ func (tr *tracesOTELReceiver) provideLoop() (pipe.FinalFunc[[]request.Span], err
}, nil
}

// nolint:cyclop
func getTracesExporter(ctx context.Context, cfg TracesConfig, ctxInfo *global.ContextInfo) (exporter.Traces, error) {
switch proto := cfg.getProtocol(); proto {
case ProtocolHTTPJSON, ProtocolHTTPProtobuf, "": // zero value defaults to HTTP for backwards-compatibility
Expand All @@ -258,7 +262,15 @@ func getTracesExporter(ctx context.Context, cfg TracesConfig, ctxInfo *global.Co
}
factory := otlphttpexporter.NewFactory()
config := factory.CreateDefaultConfig().(*otlphttpexporter.Config)
config.QueueConfig.Enabled = false
// Experimental API for batching
// See: https://github.com/open-telemetry/opentelemetry-collector/issues/8122
batchCfg := exporterbatcher.NewDefaultConfig()
if cfg.MaxQueueSize > 0 {
batchCfg.MaxSizeConfig.MaxSizeItems = cfg.MaxExportBatchSize
if cfg.BatchTimeout > 0 {
batchCfg.FlushTimeout = cfg.BatchTimeout
}
}
config.RetryConfig = getRetrySettings(cfg)
config.ClientConfig = confighttp.ClientConfig{
Endpoint: opts.Scheme + "://" + opts.Endpoint + opts.BaseURLPath,
Expand All @@ -269,8 +281,21 @@ func getTracesExporter(ctx context.Context, cfg TracesConfig, ctxInfo *global.Co
Headers: convertHeaders(opts.HTTPHeaders),
}
slog.Debug("getTracesExporter: confighttp.ClientConfig created", "endpoint", config.ClientConfig.Endpoint)
set := getTraceSettings(ctxInfo, cfg, t)
return factory.CreateTraces(ctx, set, config)
set := getTraceSettings(ctxInfo, t)
exporter, err := factory.CreateTraces(ctx, set, config)
if err != nil {
slog.Error("can't create OTLP HTTP traces exporter", "error", err)
return nil, err
}
// TODO: remove this once the batcher helper is added to otlphttpexporter
return exporterhelper.NewTraces(ctx, set, cfg,
exporter.ConsumeTraces,
exporterhelper.WithStart(exporter.Start),
exporterhelper.WithShutdown(exporter.Shutdown),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithQueue(config.QueueConfig),
exporterhelper.WithBatcher(batchCfg),
exporterhelper.WithRetry(config.RetryConfig))
case ProtocolGRPC:
slog.Debug("instantiating GRPC TracesReporter", "protocol", proto)
var t trace.SpanExporter
Expand All @@ -291,7 +316,15 @@ func getTracesExporter(ctx context.Context, cfg TracesConfig, ctxInfo *global.Co
}
factory := otlpexporter.NewFactory()
config := factory.CreateDefaultConfig().(*otlpexporter.Config)
config.QueueConfig.Enabled = false
// Experimental API for batching
// See: https://github.com/open-telemetry/opentelemetry-collector/issues/8122
if cfg.MaxExportBatchSize > 0 {
config.BatcherConfig.Enabled = true
config.BatcherConfig.MaxSizeConfig.MaxSizeItems = cfg.MaxExportBatchSize
if cfg.BatchTimeout > 0 {
config.BatcherConfig.FlushTimeout = cfg.BatchTimeout
}
}
config.RetryConfig = getRetrySettings(cfg)
config.ClientConfig = configgrpc.ClientConfig{
Endpoint: endpoint.String(),
Expand All @@ -300,7 +333,7 @@ func getTracesExporter(ctx context.Context, cfg TracesConfig, ctxInfo *global.Co
InsecureSkipVerify: cfg.InsecureSkipVerify,
},
}
set := getTraceSettings(ctxInfo, cfg, t)
set := getTraceSettings(ctxInfo, t)
return factory.CreateTraces(ctx, set, config)
default:
slog.Error(fmt.Sprintf("invalid protocol value: %q. Accepted values are: %s, %s, %s",
Expand Down Expand Up @@ -332,39 +365,15 @@ func instrumentTraceExporter(in trace.SpanExporter, internalMetrics imetrics.Rep
}
}

func traceProviderWithInternalMetrics(ctxInfo *global.ContextInfo, cfg TracesConfig, in trace.SpanExporter) trace2.TracerProvider {
var opts []trace.BatchSpanProcessorOption
if cfg.MaxExportBatchSize > 0 {
opts = append(opts, trace.WithMaxExportBatchSize(cfg.MaxExportBatchSize))
}
if cfg.MaxQueueSize > 0 {
opts = append(opts, trace.WithMaxQueueSize(cfg.MaxQueueSize))
}
if cfg.BatchTimeout > 0 {
opts = append(opts, trace.WithBatchTimeout(cfg.BatchTimeout))
}
if cfg.ExportTimeout > 0 {
opts = append(opts, trace.WithExportTimeout(cfg.ExportTimeout))
}
tracer := instrumentTraceExporter(in, ctxInfo.Metrics)
bsp := trace.NewBatchSpanProcessor(tracer, opts...)
return trace.NewTracerProvider(
trace.WithSpanProcessor(bsp),
trace.WithSampler(cfg.Sampler.Implementation()),
)
}

func getTraceSettings(ctxInfo *global.ContextInfo, cfg TracesConfig, in trace.SpanExporter) exporter.Settings {
func getTraceSettings(ctxInfo *global.ContextInfo, in trace.SpanExporter) exporter.Settings {
var traceProvider trace2.TracerProvider

telemetryLevel := configtelemetry.LevelNone
traceProvider = tracenoop.NewTracerProvider()

if internalMetricsEnabled(ctxInfo) {
telemetryLevel = configtelemetry.LevelBasic
traceProvider = traceProviderWithInternalMetrics(ctxInfo, cfg, in)
spanExporter := instrumentTraceExporter(in, ctxInfo.Metrics)
traceProvider = trace.NewTracerProvider(trace.WithBatcher(spanExporter))
}

meterProvider := metric.NewMeterProvider()
telemetrySettings := component.TelemetrySettings{
Logger: zap.NewNop(),
Expand Down
Loading
Loading