Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Empty queued spans when ForceFlush called #2335

Merged
merged 9 commits into from
Nov 5, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The following interface types simply moved from `metric` to `metric/sdkapi`: `Descriptor`, `MeterImpl`, `InstrumentImpl`, `SyncImpl`, `BoundSyncImpl`, `AsyncImpl`, `AsyncRunner`, `AsyncSingleRunner`, and `AsyncBatchRunner`
- The following struct types moved and are replaced with type aliases, since they are exposed to the user: `Observation`, `Measurement`.
- The No-op implementations of sync and async instruments are no longer exported, new functions `sdkapi.NewNoopAsyncInstrument()` and `sdkapi.NewNoopSyncInstrument()` are provided instead. (#2271)
- Update the SDK `BatchSpanProcessor` to export all queued spans when `ForceFlush` is called. (#2080, #2335)

### Added

Expand Down
23 changes: 23 additions & 0 deletions sdk/trace/batch_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,29 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
return err
}

type forceFlushSpan struct {
ReadOnlySpan
flushed chan struct{}
}

// ForceFlush exports all ended spans that have not yet been exported.
func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
var err error
if bsp.e != nil {
flushCh := make(chan struct{})
select {
case bsp.queue <- forceFlushSpan{flushed: flushCh}:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use the enque method here instead to handle the shutdown case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though this blocks on a full queue ... It might make sense to copy some of the logic from that method to here to handle the case where shutdown has already occurred or is concurrently happening.

case <-ctx.Done():
return ctx.Err()
}

select {
case <-flushCh:
// Processed any items in queue prior to ForceFlush being called
case <-ctx.Done():
return ctx.Err()
}

wait := make(chan error)
go func() {
wait <- bsp.exportSpans(ctx)
Expand Down Expand Up @@ -248,6 +267,10 @@ func (bsp *batchSpanProcessor) processQueue() {
otel.Handle(err)
}
case sd := <-bsp.queue:
if ffs, ok := sd.(forceFlushSpan); ok {
close(ffs.flushed)
continue
}
bsp.batchMutex.Lock()
bsp.batch = append(bsp.batch, sd)
shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize
Expand Down
24 changes: 24 additions & 0 deletions sdk/trace/batch_span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ import (
"context"
"encoding/binary"
"errors"
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/sdk/trace/tracetest"

"go.opentelemetry.io/otel/trace"

Expand Down Expand Up @@ -457,3 +459,25 @@ func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) {
t.Errorf("expected %q error, got %v", want, got)
}
}

func TestBatchSpanProcessorForceFlushQueuedSpans(t *testing.T) {
ctx := context.Background()

exp := tracetest.NewInMemoryExporter()

tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
)

tracer := tp.Tracer("tracer")

for i := 0; i < 10; i++ {
_, span := tracer.Start(ctx, fmt.Sprintf("span%d", i))
span.End()

err := tp.ForceFlush(ctx)
assert.NoError(t, err)

assert.Len(t, exp.GetSpans(), i+1)
}
}