Skip to content

Commit

Permalink
Default implementation for empty BatchProcessor (#5239)
Browse files Browse the repository at this point in the history
Ensure an empty BatchProcessor does not panic when any method is called.
Default an empty BatchProcessor as being shut down.
  • Loading branch information
MrAlias authored Apr 22, 2024
1 parent 9370c5a commit b34cfc4
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
12 changes: 7 additions & 5 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ const (
var _ Processor = (*BatchProcessor)(nil)

// BatchProcessor is a processor that exports batches of log records.
// A BatchProcessor must be created with [NewBatchProcessor].
//
// Use [NewBatchProcessor] to create a BatchProcessor. An empty BatchProcessor
// is shut down by default, no records will be batched or exported.
type BatchProcessor struct {
// The BatchProcessor is designed to provide the highest throughput of
// log records possible while being compatible with OpenTelemetry. The
Expand Down Expand Up @@ -170,7 +172,7 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) {

// OnEmit batches provided log record.
func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error {
if b.stopped.Load() {
if b.stopped.Load() || b.q == nil {
return nil
}
if n := b.q.Enqueue(r); n >= b.batchSize {
Expand All @@ -187,12 +189,12 @@ func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error {

// Enabled returns if b is enabled.
func (b *BatchProcessor) Enabled(context.Context, Record) bool {
return !b.stopped.Load()
return !b.stopped.Load() && b.q != nil
}

// Shutdown flushes queued log records and shuts down the decorated exporter.
func (b *BatchProcessor) Shutdown(ctx context.Context) error {
if b.stopped.Swap(true) {
if b.stopped.Swap(true) || b.q == nil {
return nil
}

Expand All @@ -219,7 +221,7 @@ var ctxErr = func(ctx context.Context) error {

// ForceFlush flushes queued log records and flushes the decorated exporter.
func (b *BatchProcessor) ForceFlush(ctx context.Context) error {
if b.stopped.Load() {
if b.stopped.Load() || b.q == nil {
return nil
}

Expand Down
12 changes: 12 additions & 0 deletions sdk/log/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@ import (
"go.opentelemetry.io/otel/log"
)

func TestEmptyBatchConfig(t *testing.T) {
assert.NotPanics(t, func() {
var bp BatchProcessor
ctx := context.Background()
var record Record
assert.NoError(t, bp.OnEmit(ctx, record), "OnEmit")
assert.False(t, bp.Enabled(ctx, record), "Enabled")
assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush")
assert.NoError(t, bp.Shutdown(ctx), "Shutdown")
})
}

func TestNewBatchConfig(t *testing.T) {
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
t.Log(err)
Expand Down

0 comments on commit b34cfc4

Please sign in to comment.