From 87f71b3b30ed97dfefd0f0f85d81a78b94fbc595 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Thu, 25 Feb 2021 17:37:16 -0800 Subject: [PATCH 1/5] Add ForceFlush method to TracerProvider The specification requires that a TracerProvider have a ForceFlush method that can be set with a timeout, return any error to the caller, and have all the registered span processors export their spans. This updates the SpanProcessor.ForceFlush method to accept a context and return an error and plumbs this method into a new ForceFlush method of the SDK TracerProvider. Additionally, this corrects the TracerProvider Shutdown method. This method as well needs to return to the caller any failure it encounters according to the specification. This returns an error if it cannot type assert the spanProcessorStates or if shutting down a span processor results in an error. Resolves #1606 --- sdk/trace/batch_span_processor.go | 29 ++++++++++----- sdk/trace/batch_span_processor_test.go | 9 +++-- sdk/trace/provider.go | 47 +++++++++++++++++++++--- sdk/trace/provider_test.go | 20 +++++----- sdk/trace/simple_span_processor.go | 3 +- sdk/trace/span_processor.go | 2 +- sdk/trace/span_processor_example_test.go | 8 ++-- sdk/trace/span_processor_test.go | 3 +- 8 files changed, 87 insertions(+), 34 deletions(-) diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index 302c4559dea..5943271b9e6 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -150,8 +150,8 @@ func (bsp *BatchSpanProcessor) Shutdown(ctx context.Context) error { } // ForceFlush exports all ended spans that have not yet been exported. -func (bsp *BatchSpanProcessor) ForceFlush() { - bsp.exportSpans() +func (bsp *BatchSpanProcessor) ForceFlush(ctx context.Context) error { + return bsp.exportSpans(ctx) } func WithMaxQueueSize(size int) BatchSpanProcessorOption { @@ -179,18 +179,19 @@ func WithBlocking() BatchSpanProcessorOption { } // exportSpans is a subroutine of processing and draining the queue. -func (bsp *BatchSpanProcessor) exportSpans() { +func (bsp *BatchSpanProcessor) exportSpans(ctx context.Context) error { bsp.timer.Reset(bsp.o.BatchTimeout) bsp.batchMutex.Lock() defer bsp.batchMutex.Unlock() if len(bsp.batch) > 0 { - if err := bsp.e.ExportSpans(context.Background(), bsp.batch); err != nil { - otel.Handle(err) + if err := bsp.e.ExportSpans(ctx, bsp.batch); err != nil { + return err } bsp.batch = bsp.batch[:0] } + return nil } // processQueue removes spans from the `queue` channel until processor @@ -199,12 +200,15 @@ func (bsp *BatchSpanProcessor) exportSpans() { func (bsp *BatchSpanProcessor) processQueue() { defer bsp.timer.Stop() + ctx := context.Background() for { select { case <-bsp.stopCh: return case <-bsp.timer.C: - bsp.exportSpans() + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } case sd := <-bsp.queue: bsp.batchMutex.Lock() bsp.batch = append(bsp.batch, sd) @@ -214,7 +218,9 @@ func (bsp *BatchSpanProcessor) processQueue() { if !bsp.timer.Stop() { <-bsp.timer.C } - bsp.exportSpans() + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } } } } @@ -223,11 +229,14 @@ func (bsp *BatchSpanProcessor) processQueue() { // drainQueue awaits the any caller that had added to bsp.stopWait // to finish the enqueue, then exports the final batch. func (bsp *BatchSpanProcessor) drainQueue() { + ctx := context.Background() for { select { case sd := <-bsp.queue: if sd == nil { - bsp.exportSpans() + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } return } @@ -237,7 +246,9 @@ func (bsp *BatchSpanProcessor) drainQueue() { bsp.batchMutex.Unlock() if shouldExport { - bsp.exportSpans() + if err := bsp.exportSpans(ctx); err != nil { + otel.Handle(err) + } } default: close(bsp.queue) diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 263d8dbad3f..0b9875dd8d8 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -78,10 +78,11 @@ func TestNewBatchSpanProcessorWithNilExporter(t *testing.T) { // These should not panic. bsp.OnStart(context.Background(), span.(sdktrace.ReadWriteSpan)) bsp.OnEnd(span.(sdktrace.ReadOnlySpan)) - bsp.ForceFlush() - err := bsp.Shutdown(context.Background()) - if err != nil { - t.Error("Error shutting the BatchSpanProcessor down\n") + if err := bsp.ForceFlush(context.Background()); err != nil { + t.Errorf("failed to ForceFlush the BatchSpanProcessor: %v", err) + } + if err := bsp.Shutdown(context.Background()); err != nil { + t.Errorf("failed to Shutdown the BatchSpanProcessor: %v", err) } } diff --git a/sdk/trace/provider.go b/sdk/trace/provider.go index 64012c86670..d049c6a0013 100644 --- a/sdk/trace/provider.go +++ b/sdk/trace/provider.go @@ -16,6 +16,7 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace" import ( "context" + "fmt" "sync" "sync/atomic" @@ -196,19 +197,55 @@ func (p *TracerProvider) ApplyConfig(cfg Config) { p.config.Store(&c) } -// Shutdown shuts down the span processors in the order they were registered +// ForceFlush immediately export all spans that have not yet been exported for +// all the registered span processors. +func (p *TracerProvider) ForceFlush(ctx context.Context) error { + spss, ok := p.spanProcessors.Load().(spanProcessorStates) + if !ok { + return fmt.Errorf("failed to load span processors") + } + if len(spss) == 0 { + return nil + } + + for _, sps := range spss { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if err := sps.sp.ForceFlush(ctx); err != nil { + return err + } + } + return nil +} + +// Shutdown shuts down the span processors in the order they were registered. func (p *TracerProvider) Shutdown(ctx context.Context) error { spss, ok := p.spanProcessors.Load().(spanProcessorStates) - if !ok || len(spss) == 0 { + if !ok { + return fmt.Errorf("failed to load span processors") + } + if len(spss) == 0 { return nil } for _, sps := range spss { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + var err error sps.state.Do(func() { - if err := sps.sp.Shutdown(ctx); err != nil { - otel.Handle(err) - } + err = sps.sp.Shutdown(ctx) }) + if err != nil { + return err + } } return nil } diff --git a/sdk/trace/provider_test.go b/sdk/trace/provider_test.go index 32bec2fb75c..6a55b50bb2d 100644 --- a/sdk/trace/provider_test.go +++ b/sdk/trace/provider_test.go @@ -32,9 +32,11 @@ func (t *basicSpanProcesor) Shutdown(context.Context) error { return t.injectShutdownError } -func (t *basicSpanProcesor) OnStart(parent context.Context, s ReadWriteSpan) {} -func (t *basicSpanProcesor) OnEnd(s ReadOnlySpan) {} -func (t *basicSpanProcesor) ForceFlush() {} +func (t *basicSpanProcesor) OnStart(context.Context, ReadWriteSpan) {} +func (t *basicSpanProcesor) OnEnd(ReadOnlySpan) {} +func (t *basicSpanProcesor) ForceFlush(context.Context) error { + return nil +} func TestShutdownTraceProvider(t *testing.T) { stp := NewTracerProvider() @@ -51,7 +53,6 @@ func TestShutdownTraceProvider(t *testing.T) { } func TestFailedProcessorShutdown(t *testing.T) { - handler.Reset() stp := NewTracerProvider() spErr := errors.New("basic span processor shutdown failure") sp := &basicSpanProcesor{ @@ -60,9 +61,9 @@ func TestFailedProcessorShutdown(t *testing.T) { } stp.RegisterSpanProcessor(sp) - _ = stp.Shutdown(context.Background()) - - assert.Contains(t, handler.errs, spErr) + err := stp.Shutdown(context.Background()) + assert.Error(t, err) + assert.Equal(t, err, spErr) } func TestFailedProcessorShutdownInUnregister(t *testing.T) { @@ -78,7 +79,6 @@ func TestFailedProcessorShutdownInUnregister(t *testing.T) { assert.Contains(t, handler.errs, spErr) - handler.errs = nil - _ = stp.Shutdown(context.Background()) - assert.Empty(t, handler.errs) + err := stp.Shutdown(context.Background()) + assert.NoError(t, err) } diff --git a/sdk/trace/simple_span_processor.go b/sdk/trace/simple_span_processor.go index 1ab95aa5e11..9fefb07f5ea 100644 --- a/sdk/trace/simple_span_processor.go +++ b/sdk/trace/simple_span_processor.go @@ -58,5 +58,6 @@ func (ssp *SimpleSpanProcessor) Shutdown(_ context.Context) error { } // ForceFlush does nothing as there is no data to flush. -func (ssp *SimpleSpanProcessor) ForceFlush() { +func (ssp *SimpleSpanProcessor) ForceFlush(context.Context) error { + return nil } diff --git a/sdk/trace/span_processor.go b/sdk/trace/span_processor.go index d32fed657f7..75eed068bd6 100644 --- a/sdk/trace/span_processor.go +++ b/sdk/trace/span_processor.go @@ -39,7 +39,7 @@ type SpanProcessor interface { // been exported. It should only be called when absolutely necessary, such as when // using a FaaS provider that may suspend the process after an invocation, but before // the Processor can export the completed spans. - ForceFlush() + ForceFlush(ctx context.Context) error } type spanProcessorState struct { diff --git a/sdk/trace/span_processor_example_test.go b/sdk/trace/span_processor_example_test.go index c30b0c37d56..a2d0be20115 100644 --- a/sdk/trace/span_processor_example_test.go +++ b/sdk/trace/span_processor_example_test.go @@ -36,8 +36,8 @@ type DurationFilter struct { func (f DurationFilter) OnStart(parent context.Context, s ReadWriteSpan) { f.Next.OnStart(parent, s) } -func (f DurationFilter) Shutdown(ctx context.Context) error { return f.Next.Shutdown(ctx) } -func (f DurationFilter) ForceFlush() { f.Next.ForceFlush() } +func (f DurationFilter) Shutdown(ctx context.Context) error { return f.Next.Shutdown(ctx) } +func (f DurationFilter) ForceFlush(ctx context.Context) error { return f.Next.ForceFlush(ctx) } func (f DurationFilter) OnEnd(s ReadOnlySpan) { if f.Min > 0 && s.EndTime().Sub(s.StartTime()) < f.Min { // Drop short lived spans. @@ -65,7 +65,9 @@ func (f InstrumentationBlacklist) OnStart(parent context.Context, s ReadWriteSpa f.Next.OnStart(parent, s) } func (f InstrumentationBlacklist) Shutdown(ctx context.Context) error { return f.Next.Shutdown(ctx) } -func (f InstrumentationBlacklist) ForceFlush() { f.Next.ForceFlush() } +func (f InstrumentationBlacklist) ForceFlush(ctx context.Context) error { + return f.Next.ForceFlush(ctx) +} func (f InstrumentationBlacklist) OnEnd(s ReadOnlySpan) { if f.Blacklist != nil && f.Blacklist[s.InstrumentationLibrary().Name] { // Drop spans from this instrumentation diff --git a/sdk/trace/span_processor_test.go b/sdk/trace/span_processor_test.go index 4dd918c3345..4e3061ef435 100644 --- a/sdk/trace/span_processor_test.go +++ b/sdk/trace/span_processor_test.go @@ -63,7 +63,8 @@ func (t *testSpanProcessor) Shutdown(_ context.Context) error { return nil } -func (t *testSpanProcessor) ForceFlush() { +func (t *testSpanProcessor) ForceFlush(context.Context) error { + return nil } func TestRegisterSpanProcessor(t *testing.T) { From 4b9f8017be5b153ac4f08a54bdcc0fdf9911ba6b Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Thu, 25 Feb 2021 17:43:23 -0800 Subject: [PATCH 2/5] Add changes to changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4d6ae4e7a2b..94655c60757 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - AttributePerEventCountLimit and AttributePerLinkCountLimit for SpanLimits. (#1535) - Added `Keys()` method to `propagation.TextMapCarrier` and `propagation.HeaderCarrier` to adapt `http.Header` to this interface. (#1544) - Added `code` attributes to `go.opentelemetry.io/otel/semconv` package. (#1558) +- A `ForceFlush` method to the `"go.opentelemetry.io/otel/sdk/trace".TracerProvider` to flush all registered `SpanProcessor`s. (#1607) ### Changed @@ -39,6 +40,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm "otel/sdk/trace".ReadOnlySpan "otel/sdk/trace".ReadWriteSpan ``` +- Update the `ForceFlush` method signature to the `"go.opentelemetry.io/otel/sdk/trace".SpanProcessor` to accept a context.Context and return an error. (#1607) +- Update the `Shutdown` method to the `"go.opentelemetry.io/otel/sdk/trace".TracerProvider` return an error on shutdown failure. (#1607) ### Removed From 1d4b9b6b42a8bfe9c496f9b5bdbdbc99f9d2e3c1 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 26 Feb 2021 07:31:23 -0800 Subject: [PATCH 3/5] Apply suggestions from code review Co-authored-by: Steven E. Harris --- CHANGELOG.md | 2 +- sdk/trace/provider.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 94655c60757..30f9842bc01 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,7 +40,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm "otel/sdk/trace".ReadOnlySpan "otel/sdk/trace".ReadWriteSpan ``` -- Update the `ForceFlush` method signature to the `"go.opentelemetry.io/otel/sdk/trace".SpanProcessor` to accept a context.Context and return an error. (#1607) +- Update the `ForceFlush` method signature to the `"go.opentelemetry.io/otel/sdk/trace".SpanProcessor` to accept a `context.Context` and return an error. (#1607) - Update the `Shutdown` method to the `"go.opentelemetry.io/otel/sdk/trace".TracerProvider` return an error on shutdown failure. (#1607) ### Removed diff --git a/sdk/trace/provider.go b/sdk/trace/provider.go index d049c6a0013..4c05597d273 100644 --- a/sdk/trace/provider.go +++ b/sdk/trace/provider.go @@ -197,7 +197,7 @@ func (p *TracerProvider) ApplyConfig(cfg Config) { p.config.Store(&c) } -// ForceFlush immediately export all spans that have not yet been exported for +// ForceFlush immediately exports all spans that have not yet been exported for // all the registered span processors. func (p *TracerProvider) ForceFlush(ctx context.Context) error { spss, ok := p.spanProcessors.Load().(spanProcessorStates) From 6d173d84b1f2d3bfbecc2bec664b357b0e322a6c Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 26 Feb 2021 07:35:24 -0800 Subject: [PATCH 4/5] Cancel export context when BSP stops --- sdk/trace/batch_span_processor.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index 5943271b9e6..e352207a264 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -200,10 +200,11 @@ func (bsp *BatchSpanProcessor) exportSpans(ctx context.Context) error { func (bsp *BatchSpanProcessor) processQueue() { defer bsp.timer.Stop() - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) for { select { case <-bsp.stopCh: + cancel() return case <-bsp.timer.C: if err := bsp.exportSpans(ctx); err != nil { @@ -229,7 +230,7 @@ func (bsp *BatchSpanProcessor) processQueue() { // drainQueue awaits the any caller that had added to bsp.stopWait // to finish the enqueue, then exports the final batch. func (bsp *BatchSpanProcessor) drainQueue() { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) for { select { case sd := <-bsp.queue: @@ -251,6 +252,7 @@ func (bsp *BatchSpanProcessor) drainQueue() { } } default: + cancel() close(bsp.queue) } } From 5db92ee4b89d8e69ffc7e0b635e86d1bd3b0304a Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 26 Feb 2021 07:47:37 -0800 Subject: [PATCH 5/5] Defer cancel call in BSP span processor funcs --- sdk/trace/batch_span_processor.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index e352207a264..2955b9570f5 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -201,10 +201,10 @@ func (bsp *BatchSpanProcessor) processQueue() { defer bsp.timer.Stop() ctx, cancel := context.WithCancel(context.Background()) + defer cancel() for { select { case <-bsp.stopCh: - cancel() return case <-bsp.timer.C: if err := bsp.exportSpans(ctx); err != nil { @@ -231,6 +231,7 @@ func (bsp *BatchSpanProcessor) processQueue() { // to finish the enqueue, then exports the final batch. func (bsp *BatchSpanProcessor) drainQueue() { ctx, cancel := context.WithCancel(context.Background()) + defer cancel() for { select { case sd := <-bsp.queue: @@ -252,7 +253,6 @@ func (bsp *BatchSpanProcessor) drainQueue() { } } default: - cancel() close(bsp.queue) } }