Skip to content

Commit

Permalink
Remove bundler from Jaeger exporter (open-telemetry#1830)
Browse files Browse the repository at this point in the history
* Remove bundler from Jaeger exporter

* Update dependencies

* Add changes to changelog

* Add PR number to changelog
  • Loading branch information
MrAlias committed Apr 22, 2021
1 parent 738ef11 commit 0fdc3d7
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 1,096 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The convenience functions for the stdout exporter have been updated to return the `TracerProvider` implementation and enable the shutdown of the exporter. (#1800)
- Replace the flush function returned from the Jaeger exporter's convenience creation functions (`InstallNewPipeline` and `NewExportPipeline`) with the `TracerProvider` implementation they create.
This enables the caller to shutdown and flush using the related `TracerProvider` methods. (#1822)
- The Jaeger exporter no longer batches exported spans itself, instead it relies on the SDK's `BatchSpanProcessor` for this functionality. (#1830)
- The Jaeger exporter creation functions (`NewRawExporter`, `NewExportPipeline`, and `InstallNewPipeline`) no longer accept the removed `Option` type as a variadic argument. (#1830)

### Removed

Expand Down Expand Up @@ -109,6 +111,12 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
This option was used to set SDK options for the exporter creation convenience functions.
These functions are provided as a way to easily setup or install the exporter with what are deemed reasonable SDK settings for common use cases.
If the SDK needs to be configured differently, the `NewRawExporter` function and direct setup of the SDK with the desired settings should be used. (#1825)
- The `WithBufferMaxCount` and `WithBatchMaxCount` `Option`s from the Jaeger exporter are removed.
The exporter no longer batches exports, instead relying on the SDK's `BatchSpanProcessor` for this functionality. (#1830)
- The Jaeger exporter `Option` type is removed.
The type is no longer used by the exporter to configure anything.
All of the previous configuration these options provided were duplicates of SDK configuration.
They have all been removed in favor of using the SDK configuration and focuses the exporter configuration to be only about the endpoints it will send telemetry to. (#1830)

## [0.19.0] - 2021-03-18

Expand Down
450 changes: 2 additions & 448 deletions example/jaeger/go.sum

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion exporters/trace/jaeger/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ require (
go.opentelemetry.io/otel v0.19.0
go.opentelemetry.io/otel/sdk v0.19.0
go.opentelemetry.io/otel/trace v0.19.0
google.golang.org/api v0.44.0
)

replace go.opentelemetry.io/otel/bridge/opencensus => ../../../bridge/opencensus
Expand Down
450 changes: 2 additions & 448 deletions exporters/trace/jaeger/go.sum

Large diffs are not rendered by default.

151 changes: 21 additions & 130 deletions exporters/trace/jaeger/jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import (
"encoding/binary"
"encoding/json"
"fmt"

"google.golang.org/api/support/bundler"
"sync"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -43,45 +42,14 @@ const (
keyEventName = "event"
)

type Option func(*options)

// options are the options to be used when initializing a Jaeger export.
type options struct {

// BufferMaxCount defines the total number of traces that can be buffered in memory
BufferMaxCount int

// BatchMaxCount defines the maximum number of spans sent in one batch
BatchMaxCount int
}

// WithBufferMaxCount defines the total number of traces that can be buffered in memory
func WithBufferMaxCount(bufferMaxCount int) Option {
return func(o *options) {
o.BufferMaxCount = bufferMaxCount
}
}

// WithBatchMaxCount defines the maximum number of spans in one batch
func WithBatchMaxCount(batchMaxCount int) Option {
return func(o *options) {
o.BatchMaxCount = batchMaxCount
}
}

// NewRawExporter returns an OTel Exporter implementation that exports the
// collected spans to Jaeger.
func NewRawExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, error) {
func NewRawExporter(endpointOption EndpointOption) (*Exporter, error) {
uploader, err := endpointOption()
if err != nil {
return nil, err
}

o := options{}
for _, opt := range opts {
opt(&o)
}

// Fetch default service.name from default resource for backup
var defaultServiceName string
defaultResource := resource.Default()
Expand All @@ -98,49 +66,25 @@ func NewRawExporter(endpointOption EndpointOption, opts ...Option) (*Exporter, e
stopCh: stopCh,
defaultServiceName: defaultServiceName,
}
bundler := bundler.NewBundler((*sdktrace.SpanSnapshot)(nil), func(bundle interface{}) {
if err := e.upload(bundle.([]*sdktrace.SpanSnapshot)); err != nil {
otel.Handle(err)
}
})

// Set BufferedByteLimit with the total number of spans that are permissible to be held in memory.
// This needs to be done since the size of messages is always set to 1. Failing to set this would allow
// 1G messages to be held in memory since that is the default value of BufferedByteLimit.
if o.BufferMaxCount != 0 {
bundler.BufferedByteLimit = o.BufferMaxCount
}

// The default value bundler uses is 10, increase to send larger batches
if o.BatchMaxCount != 0 {
bundler.BundleCountThreshold = o.BatchMaxCount
}

e.bundler = bundler
return e, nil
}

// NewExportPipeline sets up a complete export pipeline
// with the recommended setup for trace provider
func NewExportPipeline(endpointOption EndpointOption, opts ...Option) (*sdktrace.TracerProvider, error) {
exporter, err := NewRawExporter(endpointOption, opts...)
func NewExportPipeline(endpointOption EndpointOption) (*sdktrace.TracerProvider, error) {
exporter, err := NewRawExporter(endpointOption)
if err != nil {
return nil, err
}

// TODO (MrAlias): The recommended default setup needs to register the
// exporter as a batcher, not a syncer. This will conflict with batching
// of the bundler currently, but when the bundler is removed it needs to
// be updated.
// https://github.com/open-telemetry/opentelemetry-go/issues/1799
tp := sdktrace.NewTracerProvider(sdktrace.WithSyncer(exporter))
tp := sdktrace.NewTracerProvider(sdktrace.WithBatcher(exporter))
return tp, nil
}

// InstallNewPipeline instantiates a NewExportPipeline with the
// recommended configuration and registers it globally.
func InstallNewPipeline(endpointOption EndpointOption, opts ...Option) (*sdktrace.TracerProvider, error) {
tp, err := NewExportPipeline(endpointOption, opts...)
func InstallNewPipeline(endpointOption EndpointOption) (*sdktrace.TracerProvider, error) {
tp, err := NewExportPipeline(endpointOption)
if err != nil {
return tp, err
}
Expand All @@ -149,21 +93,18 @@ func InstallNewPipeline(endpointOption EndpointOption, opts ...Option) (*sdktrac
return tp, nil
}

// Exporter is an implementation of an OTel SpanSyncer that uploads spans to
// Jaeger.
// Exporter exports OpenTelemetry spans to a Jaeger agent or collector.
type Exporter struct {
bundler *bundler.Bundler
uploader batchUploader

stopCh chan struct{}

uploader batchUploader
stopOnce sync.Once
stopCh chan struct{}
defaultServiceName string
}

var _ sdktrace.SpanExporter = (*Exporter)(nil)

// ExportSpans exports SpanSnapshots to Jaeger.
func (e *Exporter) ExportSpans(ctx context.Context, ss []*sdktrace.SpanSnapshot) error {
// ExportSpans transforms and exports OpenTelemetry spans to Jaeger.
func (e *Exporter) ExportSpans(ctx context.Context, spans []*sdktrace.SpanSnapshot) error {
// Return fast if context is already canceled or Exporter shutdown.
select {
case <-ctx.Done():
Expand All @@ -185,50 +126,26 @@ func (e *Exporter) ExportSpans(ctx context.Context, ss []*sdktrace.SpanSnapshot)
}
}(ctx, cancel)

for _, span := range ss {
// TODO(jbd): Handle oversized bundlers.
err := e.bundler.AddWait(ctx, span, 1)
if err != nil {
return fmt.Errorf("failed to bundle %q: %w", span.Name, err)
}
select {
case <-ctx.Done():
return ctx.Err()
default:
for _, batch := range jaegerBatchList(spans, e.defaultServiceName) {
if err := e.uploader.upload(ctx, batch); err != nil {
return err
}
}

return nil
}

// flush is used to wrap the bundler's Flush method for testing.
var flush = func(e *Exporter) {
e.bundler.Flush()
}

// Shutdown stops the exporter flushing any pending exports.
// Shutdown stops the Exporter. This will close all connections and release
// all resources held by the Exporter.
func (e *Exporter) Shutdown(ctx context.Context) error {
// Stop any active and subsequent exports.
close(e.stopCh)

done := make(chan struct{}, 1)
// Shadow so if the goroutine is leaked in testing it doesn't cause a race
// condition when the file level var is reset.
go func(FlushFunc func(*Exporter)) {
// The OpenTelemetry specification is explicit in not having this
// method block so the preference here is to orphan this goroutine if
// the context is canceled or times out while this flushing is
// occurring. This is a consequence of the bundler Flush method not
// supporting a context.
FlushFunc(e)
done <- struct{}{}
}(flush)
e.stopOnce.Do(func() { close(e.stopCh) })
select {
case <-ctx.Done():
return ctx.Err()
case <-done:
default:
}
return nil
return e.uploader.shutdown(ctx)
}

func spanSnapshotToThrift(ss *sdktrace.SpanSnapshot) *gen.Span {
Expand Down Expand Up @@ -391,32 +308,6 @@ func getBoolTag(k string, b bool) *gen.Tag {
}
}

// Flush waits for exported trace spans to be uploaded.
//
// This is useful if your program is ending and you do not want to lose recent spans.
func (e *Exporter) Flush() {
// Return fast if Exporter shutdown.
select {
case <-e.stopCh:
return
default:
}
flush(e)
}

func (e *Exporter) upload(spans []*sdktrace.SpanSnapshot) error {
batchList := jaegerBatchList(spans, e.defaultServiceName)
for _, batch := range batchList {
// TODO (MrAlias): pass an appropriate context (#1799, #1803).
err := e.uploader.upload(context.TODO(), batch)
if err != nil {
return err
}
}

return nil
}

// jaegerBatchList transforms a slice of SpanSnapshot into a slice of jaeger
// Batch.
func jaegerBatchList(ssl []*sdktrace.SpanSnapshot, defaultServiceName string) []*gen.Batch {
Expand Down
3 changes: 1 addition & 2 deletions exporters/trace/jaeger/jaeger_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func spans(n int) []*tracesdk.SpanSnapshot {
func benchmarkExportSpans(b *testing.B, o EndpointOption, i int) {
ctx := context.Background()
s := spans(i)
exp, err := NewRawExporter(o, WithBatchMaxCount(i+1), WithBufferMaxCount(i+1))
exp, err := NewRawExporter(o)
if err != nil {
b.Fatal(err)
}
Expand All @@ -83,7 +83,6 @@ func benchmarkExportSpans(b *testing.B, o EndpointOption, i int) {
if err := exp.ExportSpans(ctx, s); err != nil {
b.Error(err)
}
exp.bundler.Flush()
}
}

Expand Down
Loading

0 comments on commit 0fdc3d7

Please sign in to comment.