diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a1605210d8..de9cfafe977 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,9 +11,12 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## Added - Added `Marshler` config option to `otlphttp` to enable otlp over json or protobufs. (#1586) +- A `ForceFlush` method to the `"go.opentelemetry.io/otel/sdk/trace".TracerProvider` to flush all registered `SpanProcessor`s. (#1608) ### Changed +- Update the `ForceFlush` method signature to the `"go.opentelemetry.io/otel/sdk/trace".SpanProcessor` to accept a `context.Context` and return an error. (#1608) +- Update the `Shutdown` method to the `"go.opentelemetry.io/otel/sdk/trace".TracerProvider` return an error on shutdown failure. (#1608) - The SimpleSpanProcessor will now shut down the enclosed `SpanExporter` and gracefully ignore subsequent calls to `OnEnd` after `Shutdown` is called. (#1612) - `"go.opentelemetry.io/sdk/metric/controller.basic".WithPusher` is replaced with `WithExporter` to provide consistent naming across project. (#1656) - Added non-empty string check for trace `Attribute` keys. (#1659) diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index 0c14b0d16e9..aa322ae6461 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -147,8 +147,8 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error { } // ForceFlush exports all ended spans that have not yet been exported. -func (bsp *batchSpanProcessor) ForceFlush() { - bsp.exportSpans() +func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error { + return bsp.exportSpans(ctx) } func WithMaxQueueSize(size int) BatchSpanProcessorOption { @@ -176,18 +176,19 @@ func WithBlocking() BatchSpanProcessorOption { } // exportSpans is a subroutine of processing and draining the queue. -func (bsp *batchSpanProcessor) exportSpans() { +func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error { bsp.timer.Reset(bsp.o.BatchTimeout) bsp.batchMutex.Lock() defer bsp.batchMutex.Unlock() if len(bsp.batch) > 0 { - if err := bsp.e.ExportSpans(context.Background(), bsp.batch); err != nil { - otel.Handle(err) + if err := bsp.e.ExportSpans(ctx, bsp.batch); err != nil { + return err } bsp.batch = bsp.batch[:0] } + return nil } // processQueue removes spans from the `queue` channel until processor @@ -196,12 +197,16 @@ func (bsp *batchSpanProcessor) exportSpans() { func (bsp *batchSpanProcessor) processQueue() { defer bsp.timer.Stop() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() for { select { case <-bsp.stopCh: return case <-bsp.timer.C: - bsp.exportSpans() + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } case sd := <-bsp.queue: bsp.batchMutex.Lock() bsp.batch = append(bsp.batch, sd) @@ -211,7 +216,9 @@ func (bsp *batchSpanProcessor) processQueue() { if !bsp.timer.Stop() { <-bsp.timer.C } - bsp.exportSpans() + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } } } } @@ -220,11 +227,15 @@ func (bsp *batchSpanProcessor) processQueue() { // drainQueue awaits the any caller that had added to bsp.stopWait // to finish the enqueue, then exports the final batch. func (bsp *batchSpanProcessor) drainQueue() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() for { select { case sd := <-bsp.queue: if sd == nil { - bsp.exportSpans() + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } return } @@ -234,7 +245,9 @@ func (bsp *batchSpanProcessor) drainQueue() { bsp.batchMutex.Unlock() if shouldExport { - bsp.exportSpans() + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } } default: close(bsp.queue) diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 7ed47afcce5..21349079580 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -78,10 +78,11 @@ func TestNewBatchSpanProcessorWithNilExporter(t *testing.T) { // These should not panic. bsp.OnStart(context.Background(), span.(sdktrace.ReadWriteSpan)) bsp.OnEnd(span.(sdktrace.ReadOnlySpan)) - bsp.ForceFlush() - err := bsp.Shutdown(context.Background()) - if err != nil { - t.Error("Error shutting the BatchSpanProcessor down\n") + if err := bsp.ForceFlush(context.Background()); err != nil { + t.Errorf("failed to ForceFlush the BatchSpanProcessor: %v", err) + } + if err := bsp.Shutdown(context.Background()); err != nil { + t.Errorf("failed to Shutdown the BatchSpanProcessor: %v", err) } } diff --git a/sdk/trace/provider.go b/sdk/trace/provider.go index 2810996fa21..e0c9814f049 100644 --- a/sdk/trace/provider.go +++ b/sdk/trace/provider.go @@ -16,6 +16,7 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace" import ( "context" + "fmt" "sync" "sync/atomic" @@ -201,19 +202,55 @@ func (p *TracerProvider) ApplyConfig(cfg Config) { p.config.Store(&c) } -// Shutdown shuts down the span processors in the order they were registered +// ForceFlush immediately exports all spans that have not yet been exported for +// all the registered span processors. +func (p *TracerProvider) ForceFlush(ctx context.Context) error { + spss, ok := p.spanProcessors.Load().(spanProcessorStates) + if !ok { + return fmt.Errorf("failed to load span processors") + } + if len(spss) == 0 { + return nil + } + + for _, sps := range spss { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if err := sps.sp.ForceFlush(ctx); err != nil { + return err + } + } + return nil +} + +// Shutdown shuts down the span processors in the order they were registered. func (p *TracerProvider) Shutdown(ctx context.Context) error { spss, ok := p.spanProcessors.Load().(spanProcessorStates) - if !ok || len(spss) == 0 { + if !ok { + return fmt.Errorf("failed to load span processors") + } + if len(spss) == 0 { return nil } for _, sps := range spss { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + var err error sps.state.Do(func() { - if err := sps.sp.Shutdown(ctx); err != nil { - otel.Handle(err) - } + err = sps.sp.Shutdown(ctx) }) + if err != nil { + return err + } } return nil } diff --git a/sdk/trace/provider_test.go b/sdk/trace/provider_test.go index 32bec2fb75c..6a55b50bb2d 100644 --- a/sdk/trace/provider_test.go +++ b/sdk/trace/provider_test.go @@ -32,9 +32,11 @@ func (t *basicSpanProcesor) Shutdown(context.Context) error { return t.injectShutdownError } -func (t *basicSpanProcesor) OnStart(parent context.Context, s ReadWriteSpan) {} -func (t *basicSpanProcesor) OnEnd(s ReadOnlySpan) {} -func (t *basicSpanProcesor) ForceFlush() {} +func (t *basicSpanProcesor) OnStart(context.Context, ReadWriteSpan) {} +func (t *basicSpanProcesor) OnEnd(ReadOnlySpan) {} +func (t *basicSpanProcesor) ForceFlush(context.Context) error { + return nil +} func TestShutdownTraceProvider(t *testing.T) { stp := NewTracerProvider() @@ -51,7 +53,6 @@ func TestShutdownTraceProvider(t *testing.T) { } func TestFailedProcessorShutdown(t *testing.T) { - handler.Reset() stp := NewTracerProvider() spErr := errors.New("basic span processor shutdown failure") sp := &basicSpanProcesor{ @@ -60,9 +61,9 @@ func TestFailedProcessorShutdown(t *testing.T) { } stp.RegisterSpanProcessor(sp) - _ = stp.Shutdown(context.Background()) - - assert.Contains(t, handler.errs, spErr) + err := stp.Shutdown(context.Background()) + assert.Error(t, err) + assert.Equal(t, err, spErr) } func TestFailedProcessorShutdownInUnregister(t *testing.T) { @@ -78,7 +79,6 @@ func TestFailedProcessorShutdownInUnregister(t *testing.T) { assert.Contains(t, handler.errs, spErr) - handler.errs = nil - _ = stp.Shutdown(context.Background()) - assert.Empty(t, handler.errs) + err := stp.Shutdown(context.Background()) + assert.NoError(t, err) } diff --git a/sdk/trace/simple_span_processor.go b/sdk/trace/simple_span_processor.go index 88e753f8b41..9efffb01895 100644 --- a/sdk/trace/simple_span_processor.go +++ b/sdk/trace/simple_span_processor.go @@ -76,5 +76,6 @@ func (ssp *simpleSpanProcessor) Shutdown(ctx context.Context) error { } // ForceFlush does nothing as there is no data to flush. -func (ssp *simpleSpanProcessor) ForceFlush() { +func (ssp *simpleSpanProcessor) ForceFlush(context.Context) error { + return nil } diff --git a/sdk/trace/span_processor.go b/sdk/trace/span_processor.go index b2e01c9484e..73f49815e8e 100644 --- a/sdk/trace/span_processor.go +++ b/sdk/trace/span_processor.go @@ -46,7 +46,7 @@ type SpanProcessor interface { // been exported. It should only be called when absolutely necessary, such as when // using a FaaS provider that may suspend the process after an invocation, but before // the Processor can export the completed spans. - ForceFlush() + ForceFlush(ctx context.Context) error } type spanProcessorState struct { diff --git a/sdk/trace/span_processor_example_test.go b/sdk/trace/span_processor_example_test.go index c30b0c37d56..a2d0be20115 100644 --- a/sdk/trace/span_processor_example_test.go +++ b/sdk/trace/span_processor_example_test.go @@ -36,8 +36,8 @@ type DurationFilter struct { func (f DurationFilter) OnStart(parent context.Context, s ReadWriteSpan) { f.Next.OnStart(parent, s) } -func (f DurationFilter) Shutdown(ctx context.Context) error { return f.Next.Shutdown(ctx) } -func (f DurationFilter) ForceFlush() { f.Next.ForceFlush() } +func (f DurationFilter) Shutdown(ctx context.Context) error { return f.Next.Shutdown(ctx) } +func (f DurationFilter) ForceFlush(ctx context.Context) error { return f.Next.ForceFlush(ctx) } func (f DurationFilter) OnEnd(s ReadOnlySpan) { if f.Min > 0 && s.EndTime().Sub(s.StartTime()) < f.Min { // Drop short lived spans. @@ -65,7 +65,9 @@ func (f InstrumentationBlacklist) OnStart(parent context.Context, s ReadWriteSpa f.Next.OnStart(parent, s) } func (f InstrumentationBlacklist) Shutdown(ctx context.Context) error { return f.Next.Shutdown(ctx) } -func (f InstrumentationBlacklist) ForceFlush() { f.Next.ForceFlush() } +func (f InstrumentationBlacklist) ForceFlush(ctx context.Context) error { + return f.Next.ForceFlush(ctx) +} func (f InstrumentationBlacklist) OnEnd(s ReadOnlySpan) { if f.Blacklist != nil && f.Blacklist[s.InstrumentationLibrary().Name] { // Drop spans from this instrumentation diff --git a/sdk/trace/span_processor_test.go b/sdk/trace/span_processor_test.go index 4dd918c3345..4e3061ef435 100644 --- a/sdk/trace/span_processor_test.go +++ b/sdk/trace/span_processor_test.go @@ -63,7 +63,8 @@ func (t *testSpanProcessor) Shutdown(_ context.Context) error { return nil } -func (t *testSpanProcessor) ForceFlush() { +func (t *testSpanProcessor) ForceFlush(context.Context) error { + return nil } func TestRegisterSpanProcessor(t *testing.T) {