diff --git a/CHANGELOG.md b/CHANGELOG.md index b7791db7575..e0cc72e908f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm See our [versioning policy](VERSIONING.md) for more information about these stability guarantees. (#5629) - Add `InstrumentationScope` field to `SpanStub` in `go.opentelemetry.io/otel/sdk/trace/tracetest`, as a replacement for the deprecated `InstrumentationLibrary`. (#5627) +### Changed + +- `Processor.OnEmit` in `go.opentelemetry.io/otel/sdk/log` now accepts a pointer to `Record` instead of a value so that the record modifications done in a processor are propagated to subsequent registered processors. (#5636) + ### Fixed - Correct comments for the priority of the `WithEndpoint` and `WithEndpointURL` options and their corresponding environment variables in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#5584) diff --git a/sdk/log/DESIGN.md b/sdk/log/DESIGN.md index cda06511203..c94d50d8ffd 100644 --- a/sdk/log/DESIGN.md +++ b/sdk/log/DESIGN.md @@ -146,7 +146,31 @@ provided via API. Moreover it is safer to have these abstraction decoupled. E.g. there can be a need for some fields that can be set via API and cannot be modified by the processors. +### Processor.OnEmit to accept Record values + +There was a proposal to make the [Processor](#processor)'s `OnEmit` +to accept a [Record](#record) value instead of a pointer to reduce allocations +as well as to have design similar to [`slog.Handler`](https://pkg.go.dev/log/slog#Handler). + +There have been long discussions within the OpenTelemetry Specification SIG[^5] +about whether such a design would comply with the specification. The summary +was that the current processor design flaws are present in other languages as +well. Therefore, it would be favorable to introduce new processing concepts +(e.g. chaining processors) in the specification that would coexist with the +current "mutable" processor design. + +The performance disadvantages caused by using a pointer (which at the time of +writing causes an additional heap allocation) may be mitigated by future +versions of the Go compiler, thanks to improved escape analysis and +profile-guided optimization (PGO)[^6]. + +On the other hand, [Processor](#processor)'s `Enabled` is fine to accept +a [Record](#record) value as the processors should not mutate the passed +parameters. + [^1]: [A Guide to the Go Garbage Collector](https://tip.golang.org/doc/gc-guide) [^2]: [OpenTelemetry Logging](https://opentelemetry.io/docs/specs/otel/logs) [^3]: [Conversation on representing LogRecordProcessor and LogRecordExporter via a single Expoter interface](https://github.com/open-telemetry/opentelemetry-go/pull/4954#discussion_r1515050480) [^4]: [Introduce Processor](https://github.com/pellared/opentelemetry-go/pull/9) +[^5]: [Log record mutations do not have to be visible in next registered processors](https://github.com/open-telemetry/opentelemetry-specification/pull/4067) +[^6]: [Profile-guided optimization](https://go.dev/doc/pgo) diff --git a/sdk/log/batch.go b/sdk/log/batch.go index 8e43b0e8f75..fd913901c28 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -176,11 +176,13 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { } // OnEmit batches provided log record. -func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error { +func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error { if b.stopped.Load() || b.q == nil { return nil } - if n := b.q.Enqueue(r); n >= b.batchSize { + // The record is cloned so that changes done by subsequent processors + // are not going to lead to a data race. + if n := b.q.Enqueue(r.Clone()); n >= b.batchSize { select { case b.pollTrigger <- struct{}{}: default: diff --git a/sdk/log/batch_test.go b/sdk/log/batch_test.go index 70b12ab04fa..0b9ece02a3d 100644 --- a/sdk/log/batch_test.go +++ b/sdk/log/batch_test.go @@ -45,9 +45,9 @@ func TestEmptyBatchConfig(t *testing.T) { assert.NotPanics(t, func() { var bp BatchProcessor ctx := context.Background() - var record Record + record := new(Record) assert.NoError(t, bp.OnEmit(ctx, record), "OnEmit") - assert.False(t, bp.Enabled(ctx, record), "Enabled") + assert.False(t, bp.Enabled(ctx, *record), "Enabled") assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush") assert.NoError(t, bp.Shutdown(ctx), "Shutdown") }) @@ -197,8 +197,8 @@ func TestBatchProcessor(t *testing.T) { WithExportInterval(time.Nanosecond), WithExportTimeout(time.Hour), ) - for _, r := range make([]Record, size) { - assert.NoError(t, b.OnEmit(ctx, r)) + for i := 0; i < size; i++ { + assert.NoError(t, b.OnEmit(ctx, new(Record))) } var got []Record assert.Eventually(t, func() bool { @@ -220,8 +220,8 @@ func TestBatchProcessor(t *testing.T) { WithExportInterval(time.Hour), WithExportTimeout(time.Hour), ) - for _, r := range make([]Record, 10*batch) { - assert.NoError(t, b.OnEmit(ctx, r)) + for i := 0; i < 10*batch; i++ { + assert.NoError(t, b.OnEmit(ctx, new(Record))) } assert.Eventually(t, func() bool { return e.ExportN() > 1 @@ -243,8 +243,8 @@ func TestBatchProcessor(t *testing.T) { WithExportInterval(time.Hour), WithExportTimeout(time.Hour), ) - for _, r := range make([]Record, 2*batch) { - assert.NoError(t, b.OnEmit(ctx, r)) + for i := 0; i < 2*batch; i++ { + assert.NoError(t, b.OnEmit(ctx, new(Record))) } var n int @@ -255,7 +255,7 @@ func TestBatchProcessor(t *testing.T) { var err error require.Eventually(t, func() bool { - err = b.OnEmit(ctx, Record{}) + err = b.OnEmit(ctx, new(Record)) return true }, time.Second, time.Microsecond, "OnEmit blocked") assert.NoError(t, err) @@ -303,7 +303,7 @@ func TestBatchProcessor(t *testing.T) { assert.NoError(t, b.Shutdown(ctx)) want := e.ExportN() - assert.NoError(t, b.OnEmit(ctx, Record{})) + assert.NoError(t, b.OnEmit(ctx, new(Record))) assert.Equal(t, want, e.ExportN(), "Export called after shutdown") }) @@ -311,7 +311,7 @@ func TestBatchProcessor(t *testing.T) { e := newTestExporter(nil) b := NewBatchProcessor(e) - assert.NoError(t, b.OnEmit(ctx, Record{})) + assert.NoError(t, b.OnEmit(ctx, new(Record))) assert.NoError(t, b.Shutdown(ctx)) assert.NoError(t, b.ForceFlush(ctx)) @@ -344,7 +344,7 @@ func TestBatchProcessor(t *testing.T) { ) t.Cleanup(func() { _ = b.Shutdown(ctx) }) - var r Record + r := new(Record) r.SetBody(log.BoolValue(true)) require.NoError(t, b.OnEmit(ctx, r)) @@ -353,7 +353,7 @@ func TestBatchProcessor(t *testing.T) { if assert.Equal(t, 1, e.ExportN(), "exporter Export calls") { got := e.Records() if assert.Len(t, got[0], 1, "records received") { - assert.Equal(t, r, got[0][0]) + assert.Equal(t, *r, got[0][0]) } } }) @@ -381,7 +381,7 @@ func TestBatchProcessor(t *testing.T) { // Enqueue 10 x "batch size" amount of records. for i := 0; i < 10*batch; i++ { - require.NoError(t, b.OnEmit(ctx, Record{})) + require.NoError(t, b.OnEmit(ctx, new(Record))) } assert.Eventually(t, func() bool { return e.ExportN() > 0 && len(b.exporter.input) == cap(b.exporter.input) @@ -423,7 +423,7 @@ func TestBatchProcessor(t *testing.T) { b := NewBatchProcessor(e) t.Cleanup(func() { _ = b.Shutdown(ctx) }) - var r Record + r := new(Record) r.SetBody(log.BoolValue(true)) _ = b.OnEmit(ctx, r) t.Cleanup(func() { _ = b.Shutdown(ctx) }) @@ -453,7 +453,7 @@ func TestBatchProcessor(t *testing.T) { WithExportInterval(time.Hour), WithExportTimeout(time.Hour), ) - var r Record + r := new(Record) // First record will be blocked by testExporter.Export assert.NoError(t, b.OnEmit(ctx, r), "exported record") require.Eventually(t, func() bool { @@ -497,7 +497,7 @@ func TestBatchProcessor(t *testing.T) { case <-ctx.Done(): return default: - assert.NoError(t, b.OnEmit(ctx, Record{})) + assert.NoError(t, b.OnEmit(ctx, new(Record))) // Ignore partial flush errors. _ = b.ForceFlush(ctx) } @@ -642,7 +642,7 @@ func TestQueue(t *testing.T) { } func BenchmarkBatchProcessorOnEmit(b *testing.B) { - var r Record + r := new(Record) body := log.BoolValue(true) r.SetBody(body) diff --git a/sdk/log/bench_test.go b/sdk/log/bench_test.go index 8147418ef7e..835f68c7aba 100644 --- a/sdk/log/bench_test.go +++ b/sdk/log/bench_test.go @@ -16,59 +16,77 @@ import ( func BenchmarkProcessor(b *testing.B) { for _, tc := range []struct { name string - f func() Processor + f func() []LoggerProviderOption }{ { name: "Simple", - f: func() Processor { - return NewSimpleProcessor(noopExporter{}) + f: func() []LoggerProviderOption { + return []LoggerProviderOption{WithProcessor(NewSimpleProcessor(noopExporter{}))} }, }, { name: "Batch", - f: func() Processor { - return NewBatchProcessor(noopExporter{}) + f: func() []LoggerProviderOption { + return []LoggerProviderOption{WithProcessor(NewBatchProcessor(noopExporter{}))} }, }, { name: "SetTimestampSimple", - f: func() Processor { - return timestampDecorator{NewSimpleProcessor(noopExporter{})} + f: func() []LoggerProviderOption { + return []LoggerProviderOption{ + WithProcessor(timestampProcessor{}), + WithProcessor(NewSimpleProcessor(noopExporter{})), + } }, }, { name: "SetTimestampBatch", - f: func() Processor { - return timestampDecorator{NewBatchProcessor(noopExporter{})} + f: func() []LoggerProviderOption { + return []LoggerProviderOption{ + WithProcessor(timestampProcessor{}), + WithProcessor(NewBatchProcessor(noopExporter{})), + } }, }, { name: "AddAttributesSimple", - f: func() Processor { - return attrAddDecorator{NewSimpleProcessor(noopExporter{})} + f: func() []LoggerProviderOption { + return []LoggerProviderOption{ + WithProcessor(attrAddProcessor{}), + WithProcessor(NewSimpleProcessor(noopExporter{})), + } }, }, { name: "AddAttributesBatch", - f: func() Processor { - return attrAddDecorator{NewBatchProcessor(noopExporter{})} + f: func() []LoggerProviderOption { + return []LoggerProviderOption{ + WithProcessor(attrAddProcessor{}), + WithProcessor(NewBatchProcessor(noopExporter{})), + } }, }, { name: "SetAttributesSimple", - f: func() Processor { - return attrSetDecorator{NewSimpleProcessor(noopExporter{})} + f: func() []LoggerProviderOption { + return []LoggerProviderOption{ + WithProcessor(attrSetDecorator{}), + WithProcessor(NewSimpleProcessor(noopExporter{})), + } }, }, { name: "SetAttributesBatch", - f: func() Processor { - return attrSetDecorator{NewBatchProcessor(noopExporter{})} + f: func() []LoggerProviderOption { + return []LoggerProviderOption{ + WithProcessor(attrSetDecorator{}), + WithProcessor(NewBatchProcessor(noopExporter{})), + } }, }, } { b.Run(tc.name, func(b *testing.B) { - provider := NewLoggerProvider(WithProcessor(tc.f())) + provider := NewLoggerProvider(tc.f()...) b.Cleanup(func() { assert.NoError(b, provider.Shutdown(context.Background())) }) logger := provider.Logger(b.Name()) @@ -91,32 +109,59 @@ func BenchmarkProcessor(b *testing.B) { } } -type timestampDecorator struct { - Processor -} +type timestampProcessor struct{} -func (e timestampDecorator) OnEmit(ctx context.Context, r Record) error { - r = r.Clone() +func (p timestampProcessor) OnEmit(ctx context.Context, r *Record) error { r.SetObservedTimestamp(time.Date(1988, time.November, 17, 0, 0, 0, 0, time.UTC)) - return e.Processor.OnEmit(ctx, r) + return nil +} + +func (p timestampProcessor) Enabled(context.Context, Record) bool { + return true } -type attrAddDecorator struct { - Processor +func (p timestampProcessor) Shutdown(ctx context.Context) error { + return nil } -func (e attrAddDecorator) OnEmit(ctx context.Context, r Record) error { - r = r.Clone() +func (p timestampProcessor) ForceFlush(ctx context.Context) error { + return nil +} + +type attrAddProcessor struct{} + +func (p attrAddProcessor) OnEmit(ctx context.Context, r *Record) error { r.AddAttributes(log.String("add", "me")) - return e.Processor.OnEmit(ctx, r) + return nil } -type attrSetDecorator struct { - Processor +func (p attrAddProcessor) Enabled(context.Context, Record) bool { + return true } -func (e attrSetDecorator) OnEmit(ctx context.Context, r Record) error { - r = r.Clone() +func (p attrAddProcessor) Shutdown(ctx context.Context) error { + return nil +} + +func (p attrAddProcessor) ForceFlush(ctx context.Context) error { + return nil +} + +type attrSetDecorator struct{} + +func (p attrSetDecorator) OnEmit(ctx context.Context, r *Record) error { r.SetAttributes(log.String("replace", "me")) - return e.Processor.OnEmit(ctx, r) + return nil +} + +func (p attrSetDecorator) Enabled(context.Context, Record) bool { + return true +} + +func (p attrSetDecorator) Shutdown(ctx context.Context) error { + return nil +} + +func (p attrSetDecorator) ForceFlush(ctx context.Context) error { + return nil } diff --git a/sdk/log/example_test.go b/sdk/log/example_test.go index 71b99921528..7b697db3ea7 100644 --- a/sdk/log/example_test.go +++ b/sdk/log/example_test.go @@ -83,7 +83,7 @@ type ContextFilterProcessor struct { log.Processor } -func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record log.Record) error { +func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record) error { if ignoreLogs(ctx) { return nil } @@ -104,35 +104,45 @@ func ExampleProcessor_redact() { // Existing processor that emits telemetry. var processor log.Processor = log.NewBatchProcessor(nil) - // Wrap the processor so that it redacts values from token attributes. - processor = &RedactTokensProcessor{processor} + // Add a processor so that it redacts values from token attributes. + redactProcessor := &RedactTokensProcessor{} // The created processor can then be registered with // the OpenTelemetry Logs SDK using the WithProcessor option. _ = log.NewLoggerProvider( + // Order is important here. Redact before handing to the processor. + log.WithProcessor(redactProcessor), log.WithProcessor(processor), ) } // RedactTokensProcessor is a [log.Processor] decorator that redacts values // from attributes containing "token" in the key. -type RedactTokensProcessor struct { - log.Processor -} +type RedactTokensProcessor struct{} // OnEmit redacts values from attributes containing "token" in the key // by replacing them with a REDACTED value. -func (p *RedactTokensProcessor) OnEmit(ctx context.Context, record log.Record) error { - cloned := false +func (p *RedactTokensProcessor) OnEmit(ctx context.Context, record *log.Record) error { record.WalkAttributes(func(kv logapi.KeyValue) bool { if strings.Contains(strings.ToLower(kv.Key), "token") { - if !cloned { - record = record.Clone() - cloned = true - } record.AddAttributes(logapi.String(kv.Key, "REDACTED")) } return true }) - return p.Processor.OnEmit(ctx, record) + return nil +} + +// Enabled returns true. +func (p *RedactTokensProcessor) Enabled(context.Context, log.Record) bool { + return true +} + +// Shutdown returns nil. +func (p *RedactTokensProcessor) Shutdown(ctx context.Context) error { + return nil +} + +// ForceFlush returns nil. +func (p *RedactTokensProcessor) ForceFlush(ctx context.Context) error { + return nil } diff --git a/sdk/log/logger.go b/sdk/log/logger.go index 245867f3fd6..04c44ac5bb8 100644 --- a/sdk/log/logger.go +++ b/sdk/log/logger.go @@ -36,7 +36,7 @@ func newLogger(p *LoggerProvider, scope instrumentation.Scope) *logger { func (l *logger) Emit(ctx context.Context, r log.Record) { newRecord := l.newRecord(ctx, r) for _, p := range l.provider.processors { - if err := p.OnEmit(ctx, newRecord); err != nil { + if err := p.OnEmit(ctx, &newRecord); err != nil { otel.Handle(err) } } diff --git a/sdk/log/logger_norace_test.go b/sdk/log/logger_norace_test.go deleted file mode 100644 index a7a9aaebdb9..00000000000 --- a/sdk/log/logger_norace_test.go +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -//go:build !race - -package log - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "go.opentelemetry.io/otel/log" - "go.opentelemetry.io/otel/sdk/instrumentation" -) - -func TestAllocationLimits(t *testing.T) { - // This test is not run with a race detector. The sync.Pool used by parts - // of the SDK has memory optimizations removed for the race detector. Do - // not test performance of the SDK in that state. - - const runs = 10 - - logger := newLogger(NewLoggerProvider(), instrumentation.Scope{}) - - r := log.Record{} - r.SetTimestamp(time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)) - r.SetObservedTimestamp(time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)) - r.SetBody(log.StringValue("testing body value")) - r.SetSeverity(log.SeverityInfo) - r.SetSeverityText("testing text") - - r.AddAttributes( - log.String("k1", "str"), - log.Float64("k2", 1.0), - log.Int("k3", 2), - log.Bool("k4", true), - log.Bytes("k5", []byte{1}), - ) - - assert.Equal(t, 0.0, testing.AllocsPerRun(runs, func() { - logger.newRecord(context.Background(), r) - }), "newRecord") -} diff --git a/sdk/log/processor.go b/sdk/log/processor.go index f95ea949027..2fcff98b62d 100644 --- a/sdk/log/processor.go +++ b/sdk/log/processor.go @@ -26,9 +26,14 @@ type Processor interface { // considered unrecoverable and will be reported to a configured error // Handler. // - // Before modifying a Record, the implementation must use Record.Clone + // The SDK invokes the processors sequentially in the same order as + // they were registered using [WithProcessor]. + // Implementations may synchronously modify the record so that the changes + // are visible in the next registered processor. + // Notice that [Record] is not concurrent safe. Therefore, asynchronous + // processing may cause race conditions. Use [Record.Clone] // to create a copy that shares no state with the original. - OnEmit(ctx context.Context, record Record) error + OnEmit(ctx context.Context, record *Record) error // Enabled returns whether the Processor will process for the given context // and record. // @@ -44,8 +49,10 @@ type Processor interface { // indeterminate state, but may return false if valid reasons in particular // circumstances exist (e.g. performance, correctness). // - // Before modifying a Record, the implementation must use Record.Clone - // to create a copy that shares no state with the original. + // The SDK invokes the processors sequentially in the same order as + // they were registered using [WithProcessor] until any processor returns true. + // + // Implementations should not modify the record. Enabled(ctx context.Context, record Record) bool // Shutdown is called when the SDK shuts down. Any cleanup or release of // resources held by the exporter should be done in this call. diff --git a/sdk/log/provider.go b/sdk/log/provider.go index 84bb14c5ec7..eb1d98acf37 100644 --- a/sdk/log/provider.go +++ b/sdk/log/provider.go @@ -189,8 +189,8 @@ func WithResource(res *resource.Resource) LoggerProviderOption { // By default, if this option is not used, the LoggerProvider will perform no // operations; no data will be exported without a processor. // -// Each WithProcessor creates a separate pipeline. Use custom decorators -// for advanced scenarios such as enriching with attributes. +// The SDK invokes the processors sequentially in the same order as they were +// registered. // // For production, use [NewBatchProcessor] to batch log records before they are exported. // For testing and debugging, use [NewSimpleProcessor] to synchronously export log records. diff --git a/sdk/log/provider_test.go b/sdk/log/provider_test.go index 4a251338fb0..39071b6a98f 100644 --- a/sdk/log/provider_test.go +++ b/sdk/log/provider_test.go @@ -38,12 +38,12 @@ func newProcessor(name string) *processor { return &processor{Name: name, enabled: true} } -func (p *processor) OnEmit(ctx context.Context, r Record) error { +func (p *processor) OnEmit(ctx context.Context, r *Record) error { if p.Err != nil { return p.Err } - p.records = append(p.records, r) + p.records = append(p.records, *r) return nil } diff --git a/sdk/log/simple.go b/sdk/log/simple.go index 4d2bb412045..8b6c34dc080 100644 --- a/sdk/log/simple.go +++ b/sdk/log/simple.go @@ -40,9 +40,9 @@ var simpleProcRecordsPool = sync.Pool{ } // OnEmit batches provided log record. -func (s *SimpleProcessor) OnEmit(ctx context.Context, r Record) error { +func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error { records := simpleProcRecordsPool.Get().(*[]Record) - (*records)[0] = r + (*records)[0] = *r defer func() { simpleProcRecordsPool.Put(records) }() diff --git a/sdk/log/simple_test.go b/sdk/log/simple_test.go index 805130465b0..dbc91a90156 100644 --- a/sdk/log/simple_test.go +++ b/sdk/log/simple_test.go @@ -42,12 +42,12 @@ func TestSimpleProcessorOnEmit(t *testing.T) { e := new(exporter) s := log.NewSimpleProcessor(e) - var r log.Record + r := new(log.Record) r.SetSeverityText("test") _ = s.OnEmit(context.Background(), r) require.True(t, e.exportCalled, "exporter Export not called") - assert.Equal(t, []log.Record{r}, e.records) + assert.Equal(t, []log.Record{*r}, e.records) } func TestSimpleProcessorEnabled(t *testing.T) { @@ -75,7 +75,7 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) { var wg sync.WaitGroup wg.Add(goRoutineN) - var r log.Record + r := new(log.Record) r.SetSeverityText("test") ctx := context.Background() s := log.NewSimpleProcessor(nil) @@ -84,7 +84,7 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) { defer wg.Done() _ = s.OnEmit(ctx, r) - _ = s.Enabled(ctx, r) + _ = s.Enabled(ctx, *r) _ = s.Shutdown(ctx) _ = s.ForceFlush(ctx) }() @@ -94,7 +94,7 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) { } func BenchmarkSimpleProcessorOnEmit(b *testing.B) { - var r log.Record + r := new(log.Record) r.SetSeverityText("test") ctx := context.Background() s := log.NewSimpleProcessor(nil)