-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Empty queued spans when ForceFlush called #2335
Changes from 6 commits
b2dd3c9
0e621b8
c7e4c51
4eb4f2b
0180f37
cb23081
bf24f4c
c03a61a
486120a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ import ( | |
"time" | ||
|
||
"go.opentelemetry.io/otel" | ||
"go.opentelemetry.io/otel/trace" | ||
) | ||
|
||
// Defaults for BatchSpanProcessorOptions. | ||
|
@@ -153,10 +154,29 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error { | |
return err | ||
} | ||
|
||
type forceFlushSpan struct { | ||
ReadOnlySpan | ||
flushed chan struct{} | ||
} | ||
|
||
func (f forceFlushSpan) SpanContext() trace.SpanContext { | ||
return trace.NewSpanContext(trace.SpanContextConfig{TraceFlags: trace.FlagsSampled}) | ||
} | ||
|
||
// ForceFlush exports all ended spans that have not yet been exported. | ||
func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error { | ||
var err error | ||
if bsp.e != nil { | ||
flushCh := make(chan struct{}) | ||
bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}, true) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pass true as the last argument here since we want to block for this span to be queued. |
||
|
||
select { | ||
case <-flushCh: | ||
// Processed any items in queue prior to ForceFlush being called | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
} | ||
|
||
wait := make(chan error) | ||
go func() { | ||
wait <- bsp.exportSpans(ctx) | ||
|
@@ -248,6 +268,10 @@ func (bsp *batchSpanProcessor) processQueue() { | |
otel.Handle(err) | ||
} | ||
case sd := <-bsp.queue: | ||
if ffs, ok := sd.(forceFlushSpan); ok { | ||
close(ffs.flushed) | ||
continue | ||
} | ||
bsp.batchMutex.Lock() | ||
bsp.batch = append(bsp.batch, sd) | ||
shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize | ||
|
@@ -296,6 +320,10 @@ func (bsp *batchSpanProcessor) drainQueue() { | |
} | ||
|
||
func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) { | ||
bsp.enqueueBlockOnQueueFull(context.TODO(), sd, bsp.o.BlockOnQueueFull) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In order to work in the ForceFlush method, it was needed to pass a context (to allow cancellation). |
||
} | ||
|
||
func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan, block bool) { | ||
if !sd.SpanContext().IsSampled() { | ||
return | ||
} | ||
|
@@ -321,9 +349,13 @@ func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) { | |
default: | ||
} | ||
|
||
if bsp.o.BlockOnQueueFull { | ||
bsp.queue <- sd | ||
return | ||
if block { | ||
select { | ||
case bsp.queue <- sd: | ||
return | ||
case <-ctx.Done(): | ||
return | ||
} | ||
} | ||
|
||
select { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using enqueue requires that the span being queued is marked as sampled.