Skip to content

Commit

Permalink
Empty queued spans when ForceFlush called (#2335)
Browse files Browse the repository at this point in the history
* Empty queued spans when ForceFlush called

Update the implementation of ForceFlush() to first ensure that all spans
which are queued are added to the batch before calling export spans.
Create a small ReadOnlySpan implementation which can be used as a marker
that ForceFlush has been invoked and used to notify when all spans are
ready to be exported.

Fixes #2080.

* Add a changelog entry.

* Update CHANGELOG.md

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>

* Update sdk/trace/batch_span_processor.go

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>

* Improve test case to verify multiple flushes.

* Refactor code to use enqueue.

* Be more defensive on waiting for queue.

Update the handling of the force flush span so we only wait on the
channel if we were able to enqueue the span to the queue.

* Fix linter.

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>
  • Loading branch information
3 people authored Nov 5, 2021
1 parent 7ce58f3 commit 6d2aeb0
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The following interface types simply moved from `metric` to `metric/sdkapi`: `Descriptor`, `MeterImpl`, `InstrumentImpl`, `SyncImpl`, `BoundSyncImpl`, `AsyncImpl`, `AsyncRunner`, `AsyncSingleRunner`, and `AsyncBatchRunner`
- The following struct types moved and are replaced with type aliases, since they are exposed to the user: `Observation`, `Measurement`.
- The No-op implementations of sync and async instruments are no longer exported, new functions `sdkapi.NewNoopAsyncInstrument()` and `sdkapi.NewNoopSyncInstrument()` are provided instead. (#2271)
- Update the SDK `BatchSpanProcessor` to export all queued spans when `ForceFlush` is called. (#2080, #2335)

### Added

Expand Down
44 changes: 39 additions & 5 deletions sdk/trace/batch_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

// Defaults for BatchSpanProcessorOptions.
Expand Down Expand Up @@ -153,10 +154,29 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
return err
}

type forceFlushSpan struct {
ReadOnlySpan
flushed chan struct{}
}

func (f forceFlushSpan) SpanContext() trace.SpanContext {
return trace.NewSpanContext(trace.SpanContextConfig{TraceFlags: trace.FlagsSampled})
}

// ForceFlush exports all ended spans that have not yet been exported.
func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
var err error
if bsp.e != nil {
flushCh := make(chan struct{})
if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}, true) {
select {
case <-flushCh:
// Processed any items in queue prior to ForceFlush being called
case <-ctx.Done():
return ctx.Err()
}
}

wait := make(chan error)
go func() {
wait <- bsp.exportSpans(ctx)
Expand Down Expand Up @@ -248,6 +268,10 @@ func (bsp *batchSpanProcessor) processQueue() {
otel.Handle(err)
}
case sd := <-bsp.queue:
if ffs, ok := sd.(forceFlushSpan); ok {
close(ffs.flushed)
continue
}
bsp.batchMutex.Lock()
bsp.batch = append(bsp.batch, sd)
shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize
Expand Down Expand Up @@ -296,8 +320,12 @@ func (bsp *batchSpanProcessor) drainQueue() {
}

func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) {
bsp.enqueueBlockOnQueueFull(context.TODO(), sd, bsp.o.BlockOnQueueFull)
}

func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan, block bool) bool {
if !sd.SpanContext().IsSampled() {
return
return false
}

// This ensures the bsp.queue<- below does not panic as the
Expand All @@ -317,18 +345,24 @@ func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) {

select {
case <-bsp.stopCh:
return
return false
default:
}

if bsp.o.BlockOnQueueFull {
bsp.queue <- sd
return
if block {
select {
case bsp.queue <- sd:
return true
case <-ctx.Done():
return false
}
}

select {
case bsp.queue <- sd:
return true
default:
atomic.AddUint32(&bsp.dropped, 1)
}
return false
}
25 changes: 25 additions & 0 deletions sdk/trace/batch_span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ import (
"context"
"encoding/binary"
"errors"
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/sdk/trace/tracetest"

"go.opentelemetry.io/otel/trace"

sdktrace "go.opentelemetry.io/otel/sdk/trace"
Expand Down Expand Up @@ -457,3 +460,25 @@ func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) {
t.Errorf("expected %q error, got %v", want, got)
}
}

func TestBatchSpanProcessorForceFlushQueuedSpans(t *testing.T) {
ctx := context.Background()

exp := tracetest.NewInMemoryExporter()

tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
)

tracer := tp.Tracer("tracer")

for i := 0; i < 10; i++ {
_, span := tracer.Start(ctx, fmt.Sprintf("span%d", i))
span.End()

err := tp.ForceFlush(ctx)
assert.NoError(t, err)

assert.Len(t, exp.GetSpans(), i+1)
}
}

0 comments on commit 6d2aeb0

Please sign in to comment.