From 65934b239841d2e758d87a08370a3ac1a2d59c21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Mon, 22 Jul 2024 12:20:14 +0200 Subject: [PATCH 01/13] sdk/log: Change Processor and Record.Clone to operate on pointers --- CHANGELOG.md | 5 +++++ sdk/log/batch.go | 6 +++--- sdk/log/example_test.go | 37 +++++++++++++++++++++++-------------- sdk/log/logger.go | 4 ++-- sdk/log/processor.go | 16 ++++++++++------ sdk/log/record.go | 4 ++-- sdk/log/simple.go | 6 +++--- 7 files changed, 48 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 64357fd3148..2447af548e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm This module is unstable and breaking changes may be introduced. See our [versioning policy](VERSIONING.md) for more information about these stability guarantees. (#5629) +### Changed + +- `Processor` in `go.opentelemetry.io/otel/sdk/log` now accepts a pointer to `Record` instead of a value so that the record modifications done in a processor are propagated to subsequent registered processors. (#TODO) +- `Record.Clone` in `go.opentelemetry.io/otel/sdk/log` now returns a pointer to `Record`. (#TODO) + ### Fixed - Correct comments for the priority of the `WithEndpoint` and `WithEndpointURL` options and their corresponding environment variables in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#5584) diff --git a/sdk/log/batch.go b/sdk/log/batch.go index 8e43b0e8f75..4c1c8a8d0a4 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -176,11 +176,11 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { } // OnEmit batches provided log record. -func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error { +func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error { if b.stopped.Load() || b.q == nil { return nil } - if n := b.q.Enqueue(r); n >= b.batchSize { + if n := b.q.Enqueue(*r); n >= b.batchSize { select { case b.pollTrigger <- struct{}{}: default: @@ -193,7 +193,7 @@ func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error { } // Enabled returns if b is enabled. -func (b *BatchProcessor) Enabled(context.Context, Record) bool { +func (b *BatchProcessor) Enabled(context.Context, *Record) bool { return !b.stopped.Load() && b.q != nil } diff --git a/sdk/log/example_test.go b/sdk/log/example_test.go index 71b99921528..01cf907dd33 100644 --- a/sdk/log/example_test.go +++ b/sdk/log/example_test.go @@ -83,14 +83,14 @@ type ContextFilterProcessor struct { log.Processor } -func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record log.Record) error { +func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record) error { if ignoreLogs(ctx) { return nil } return p.Processor.OnEmit(ctx, record) } -func (p *ContextFilterProcessor) Enabled(ctx context.Context, record log.Record) bool { +func (p *ContextFilterProcessor) Enabled(ctx context.Context, record *log.Record) bool { return !ignoreLogs(ctx) && p.Processor.Enabled(ctx, record) } @@ -104,35 +104,44 @@ func ExampleProcessor_redact() { // Existing processor that emits telemetry. var processor log.Processor = log.NewBatchProcessor(nil) - // Wrap the processor so that it redacts values from token attributes. - processor = &RedactTokensProcessor{processor} + // Add a processor so that it redacts values from token attributes. + redactProcessor := &RedactTokensProcessor{} // The created processor can then be registered with // the OpenTelemetry Logs SDK using the WithProcessor option. _ = log.NewLoggerProvider( + log.WithProcessor(redactProcessor), log.WithProcessor(processor), ) } // RedactTokensProcessor is a [log.Processor] decorator that redacts values // from attributes containing "token" in the key. -type RedactTokensProcessor struct { - log.Processor -} +type RedactTokensProcessor struct{} // OnEmit redacts values from attributes containing "token" in the key // by replacing them with a REDACTED value. -func (p *RedactTokensProcessor) OnEmit(ctx context.Context, record log.Record) error { - cloned := false +func (p *RedactTokensProcessor) OnEmit(ctx context.Context, record *log.Record) error { record.WalkAttributes(func(kv logapi.KeyValue) bool { if strings.Contains(strings.ToLower(kv.Key), "token") { - if !cloned { - record = record.Clone() - cloned = true - } record.AddAttributes(logapi.String(kv.Key, "REDACTED")) } return true }) - return p.Processor.OnEmit(ctx, record) + return nil +} + +// Enabled returns true. +func (p *RedactTokensProcessor) Enabled(context.Context, *log.Record) bool { + return true +} + +// Shutdown returns nil. +func (p *RedactTokensProcessor) Shutdown(ctx context.Context) error { + return nil +} + +// ForceFlush returns nil. +func (p *RedactTokensProcessor) ForceFlush(ctx context.Context) error { + return nil } diff --git a/sdk/log/logger.go b/sdk/log/logger.go index 245867f3fd6..169eb1762cf 100644 --- a/sdk/log/logger.go +++ b/sdk/log/logger.go @@ -52,10 +52,10 @@ func (l *logger) Enabled(ctx context.Context, r log.Record) bool { return false } -func (l *logger) newRecord(ctx context.Context, r log.Record) Record { +func (l *logger) newRecord(ctx context.Context, r log.Record) *Record { sc := trace.SpanContextFromContext(ctx) - newRecord := Record{ + newRecord := &Record{ timestamp: r.Timestamp(), observedTimestamp: r.ObservedTimestamp(), severity: r.Severity(), diff --git a/sdk/log/processor.go b/sdk/log/processor.go index f95ea949027..6945c6cc11e 100644 --- a/sdk/log/processor.go +++ b/sdk/log/processor.go @@ -26,9 +26,11 @@ type Processor interface { // considered unrecoverable and will be reported to a configured error // Handler. // - // Before modifying a Record, the implementation must use Record.Clone - // to create a copy that shares no state with the original. - OnEmit(ctx context.Context, record Record) error + // Implementations may synchronously modify the record so that the changes + // are visible in the next registered processor. + // Implementations should not modify the record asynchronously as [Record] + // is not concurrent safe. + OnEmit(ctx context.Context, record *Record) error // Enabled returns whether the Processor will process for the given context // and record. // @@ -44,9 +46,11 @@ type Processor interface { // indeterminate state, but may return false if valid reasons in particular // circumstances exist (e.g. performance, correctness). // - // Before modifying a Record, the implementation must use Record.Clone - // to create a copy that shares no state with the original. - Enabled(ctx context.Context, record Record) bool + // Implementations may synchronously modify the record so that the changes + // are visible in the next registered processor. + // Implementations should not modify the record asynchronously as [Record] + // is not concurrent safe. + Enabled(ctx context.Context, record *Record) bool // Shutdown is called when the SDK shuts down. Any cleanup or release of // resources held by the exporter should be done in this call. // diff --git a/sdk/log/record.go b/sdk/log/record.go index a6e50df7782..7d8941676cf 100644 --- a/sdk/log/record.go +++ b/sdk/log/record.go @@ -384,10 +384,10 @@ func (r *Record) InstrumentationScope() instrumentation.Scope { // Clone returns a copy of the record with no shared state. The original record // and the clone can both be modified without interfering with each other. -func (r *Record) Clone() Record { +func (r *Record) Clone() *Record { res := *r res.back = slices.Clone(r.back) - return res + return &res } func (r *Record) applyAttrLimits(attr log.KeyValue) log.KeyValue { diff --git a/sdk/log/simple.go b/sdk/log/simple.go index 4d2bb412045..21bcad9f1e0 100644 --- a/sdk/log/simple.go +++ b/sdk/log/simple.go @@ -40,9 +40,9 @@ var simpleProcRecordsPool = sync.Pool{ } // OnEmit batches provided log record. -func (s *SimpleProcessor) OnEmit(ctx context.Context, r Record) error { +func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error { records := simpleProcRecordsPool.Get().(*[]Record) - (*records)[0] = r + (*records)[0] = *r defer func() { simpleProcRecordsPool.Put(records) }() @@ -51,7 +51,7 @@ func (s *SimpleProcessor) OnEmit(ctx context.Context, r Record) error { } // Enabled returns true. -func (s *SimpleProcessor) Enabled(context.Context, Record) bool { +func (s *SimpleProcessor) Enabled(context.Context, *Record) bool { return true } From 7c705110109d17fa8c38540b299fd8b51f02ab8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Mon, 22 Jul 2024 12:41:06 +0200 Subject: [PATCH 02/13] Update BenchmarkProcessor --- sdk/log/bench_test.go | 112 ++++++++++++++++++++++++++++++------------ 1 file changed, 80 insertions(+), 32 deletions(-) diff --git a/sdk/log/bench_test.go b/sdk/log/bench_test.go index 8147418ef7e..c58dd9e55e7 100644 --- a/sdk/log/bench_test.go +++ b/sdk/log/bench_test.go @@ -16,59 +16,77 @@ import ( func BenchmarkProcessor(b *testing.B) { for _, tc := range []struct { name string - f func() Processor + f func() []LoggerProviderOption }{ { name: "Simple", - f: func() Processor { - return NewSimpleProcessor(noopExporter{}) + f: func() []LoggerProviderOption { + return []LoggerProviderOption{WithProcessor(NewSimpleProcessor(noopExporter{}))} }, }, { name: "Batch", - f: func() Processor { - return NewBatchProcessor(noopExporter{}) + f: func() []LoggerProviderOption { + return []LoggerProviderOption{WithProcessor(NewBatchProcessor(noopExporter{}))} }, }, { name: "SetTimestampSimple", - f: func() Processor { - return timestampDecorator{NewSimpleProcessor(noopExporter{})} + f: func() []LoggerProviderOption { + return []LoggerProviderOption{ + WithProcessor(timestampProcessor{}), + WithProcessor(NewSimpleProcessor(noopExporter{})), + } }, }, { name: "SetTimestampBatch", - f: func() Processor { - return timestampDecorator{NewBatchProcessor(noopExporter{})} + f: func() []LoggerProviderOption { + return []LoggerProviderOption{ + WithProcessor(timestampProcessor{}), + WithProcessor(NewBatchProcessor(noopExporter{})), + } }, }, { name: "AddAttributesSimple", - f: func() Processor { - return attrAddDecorator{NewSimpleProcessor(noopExporter{})} + f: func() []LoggerProviderOption { + return []LoggerProviderOption{ + WithProcessor(attrAddProcessor{}), + WithProcessor(NewSimpleProcessor(noopExporter{})), + } }, }, { name: "AddAttributesBatch", - f: func() Processor { - return attrAddDecorator{NewBatchProcessor(noopExporter{})} + f: func() []LoggerProviderOption { + return []LoggerProviderOption{ + WithProcessor(attrAddProcessor{}), + WithProcessor(NewBatchProcessor(noopExporter{})), + } }, }, { name: "SetAttributesSimple", - f: func() Processor { - return attrSetDecorator{NewSimpleProcessor(noopExporter{})} + f: func() []LoggerProviderOption { + return []LoggerProviderOption{ + WithProcessor(attrSetDecorator{}), + WithProcessor(NewSimpleProcessor(noopExporter{})), + } }, }, { name: "SetAttributesBatch", - f: func() Processor { - return attrSetDecorator{NewBatchProcessor(noopExporter{})} + f: func() []LoggerProviderOption { + return []LoggerProviderOption{ + WithProcessor(attrSetDecorator{}), + WithProcessor(NewBatchProcessor(noopExporter{})), + } }, }, } { b.Run(tc.name, func(b *testing.B) { - provider := NewLoggerProvider(WithProcessor(tc.f())) + provider := NewLoggerProvider(tc.f()...) b.Cleanup(func() { assert.NoError(b, provider.Shutdown(context.Background())) }) logger := provider.Logger(b.Name()) @@ -91,32 +109,62 @@ func BenchmarkProcessor(b *testing.B) { } } -type timestampDecorator struct { - Processor +type timestampProcessor struct { } -func (e timestampDecorator) OnEmit(ctx context.Context, r Record) error { - r = r.Clone() +func (p timestampProcessor) OnEmit(ctx context.Context, r *Record) error { r.SetObservedTimestamp(time.Date(1988, time.November, 17, 0, 0, 0, 0, time.UTC)) - return e.Processor.OnEmit(ctx, r) + return nil +} + +func (p timestampProcessor) Enabled(context.Context, *Record) bool { + return true +} + +func (p timestampProcessor) Shutdown(ctx context.Context) error { + return nil +} + +func (p timestampProcessor) ForceFlush(ctx context.Context) error { + return nil } -type attrAddDecorator struct { - Processor +type attrAddProcessor struct { } -func (e attrAddDecorator) OnEmit(ctx context.Context, r Record) error { - r = r.Clone() +func (p attrAddProcessor) OnEmit(ctx context.Context, r *Record) error { r.AddAttributes(log.String("add", "me")) - return e.Processor.OnEmit(ctx, r) + return nil +} + +func (p attrAddProcessor) Enabled(context.Context, *Record) bool { + return true +} + +func (p attrAddProcessor) Shutdown(ctx context.Context) error { + return nil +} + +func (p attrAddProcessor) ForceFlush(ctx context.Context) error { + return nil } type attrSetDecorator struct { - Processor } -func (e attrSetDecorator) OnEmit(ctx context.Context, r Record) error { - r = r.Clone() +func (p attrSetDecorator) OnEmit(ctx context.Context, r *Record) error { r.SetAttributes(log.String("replace", "me")) - return e.Processor.OnEmit(ctx, r) + return nil +} + +func (p attrSetDecorator) Enabled(context.Context, *Record) bool { + return true +} + +func (p attrSetDecorator) Shutdown(ctx context.Context) error { + return nil +} + +func (p attrSetDecorator) ForceFlush(ctx context.Context) error { + return nil } From eae4fa9a2f7c82eb14726ef3ec018a5f09c9cc7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Mon, 22 Jul 2024 12:47:51 +0200 Subject: [PATCH 03/13] Update unit tests --- sdk/log/batch_test.go | 38 ++++++++++++++--------------- sdk/log/logger_norace_test.go | 46 ----------------------------------- sdk/log/provider_test.go | 6 ++--- sdk/log/simple_test.go | 10 ++++---- 4 files changed, 27 insertions(+), 73 deletions(-) delete mode 100644 sdk/log/logger_norace_test.go diff --git a/sdk/log/batch_test.go b/sdk/log/batch_test.go index 70b12ab04fa..ec2023c825a 100644 --- a/sdk/log/batch_test.go +++ b/sdk/log/batch_test.go @@ -45,7 +45,7 @@ func TestEmptyBatchConfig(t *testing.T) { assert.NotPanics(t, func() { var bp BatchProcessor ctx := context.Background() - var record Record + record := new(Record) assert.NoError(t, bp.OnEmit(ctx, record), "OnEmit") assert.False(t, bp.Enabled(ctx, record), "Enabled") assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush") @@ -197,8 +197,8 @@ func TestBatchProcessor(t *testing.T) { WithExportInterval(time.Nanosecond), WithExportTimeout(time.Hour), ) - for _, r := range make([]Record, size) { - assert.NoError(t, b.OnEmit(ctx, r)) + for i := 0; i < size; i++ { + assert.NoError(t, b.OnEmit(ctx, new(Record))) } var got []Record assert.Eventually(t, func() bool { @@ -220,8 +220,8 @@ func TestBatchProcessor(t *testing.T) { WithExportInterval(time.Hour), WithExportTimeout(time.Hour), ) - for _, r := range make([]Record, 10*batch) { - assert.NoError(t, b.OnEmit(ctx, r)) + for i := 0; i < 10*batch; i++ { + assert.NoError(t, b.OnEmit(ctx, new(Record))) } assert.Eventually(t, func() bool { return e.ExportN() > 1 @@ -243,8 +243,8 @@ func TestBatchProcessor(t *testing.T) { WithExportInterval(time.Hour), WithExportTimeout(time.Hour), ) - for _, r := range make([]Record, 2*batch) { - assert.NoError(t, b.OnEmit(ctx, r)) + for i := 0; i < 2*batch; i++ { + assert.NoError(t, b.OnEmit(ctx, new(Record))) } var n int @@ -255,7 +255,7 @@ func TestBatchProcessor(t *testing.T) { var err error require.Eventually(t, func() bool { - err = b.OnEmit(ctx, Record{}) + err = b.OnEmit(ctx, new(Record)) return true }, time.Second, time.Microsecond, "OnEmit blocked") assert.NoError(t, err) @@ -272,10 +272,10 @@ func TestBatchProcessor(t *testing.T) { t.Run("Enabled", func(t *testing.T) { b := NewBatchProcessor(defaultNoopExporter) - assert.True(t, b.Enabled(ctx, Record{})) + assert.True(t, b.Enabled(ctx, new(Record))) _ = b.Shutdown(ctx) - assert.False(t, b.Enabled(ctx, Record{})) + assert.False(t, b.Enabled(ctx, new(Record))) }) t.Run("Shutdown", func(t *testing.T) { @@ -303,7 +303,7 @@ func TestBatchProcessor(t *testing.T) { assert.NoError(t, b.Shutdown(ctx)) want := e.ExportN() - assert.NoError(t, b.OnEmit(ctx, Record{})) + assert.NoError(t, b.OnEmit(ctx, new(Record))) assert.Equal(t, want, e.ExportN(), "Export called after shutdown") }) @@ -311,7 +311,7 @@ func TestBatchProcessor(t *testing.T) { e := newTestExporter(nil) b := NewBatchProcessor(e) - assert.NoError(t, b.OnEmit(ctx, Record{})) + assert.NoError(t, b.OnEmit(ctx, new(Record))) assert.NoError(t, b.Shutdown(ctx)) assert.NoError(t, b.ForceFlush(ctx)) @@ -344,7 +344,7 @@ func TestBatchProcessor(t *testing.T) { ) t.Cleanup(func() { _ = b.Shutdown(ctx) }) - var r Record + r := new(Record) r.SetBody(log.BoolValue(true)) require.NoError(t, b.OnEmit(ctx, r)) @@ -353,7 +353,7 @@ func TestBatchProcessor(t *testing.T) { if assert.Equal(t, 1, e.ExportN(), "exporter Export calls") { got := e.Records() if assert.Len(t, got[0], 1, "records received") { - assert.Equal(t, r, got[0][0]) + assert.Equal(t, *r, got[0][0]) } } }) @@ -381,7 +381,7 @@ func TestBatchProcessor(t *testing.T) { // Enqueue 10 x "batch size" amount of records. for i := 0; i < 10*batch; i++ { - require.NoError(t, b.OnEmit(ctx, Record{})) + require.NoError(t, b.OnEmit(ctx, new(Record))) } assert.Eventually(t, func() bool { return e.ExportN() > 0 && len(b.exporter.input) == cap(b.exporter.input) @@ -423,7 +423,7 @@ func TestBatchProcessor(t *testing.T) { b := NewBatchProcessor(e) t.Cleanup(func() { _ = b.Shutdown(ctx) }) - var r Record + r := new(Record) r.SetBody(log.BoolValue(true)) _ = b.OnEmit(ctx, r) t.Cleanup(func() { _ = b.Shutdown(ctx) }) @@ -453,7 +453,7 @@ func TestBatchProcessor(t *testing.T) { WithExportInterval(time.Hour), WithExportTimeout(time.Hour), ) - var r Record + r := new(Record) // First record will be blocked by testExporter.Export assert.NoError(t, b.OnEmit(ctx, r), "exported record") require.Eventually(t, func() bool { @@ -497,7 +497,7 @@ func TestBatchProcessor(t *testing.T) { case <-ctx.Done(): return default: - assert.NoError(t, b.OnEmit(ctx, Record{})) + assert.NoError(t, b.OnEmit(ctx, new(Record))) // Ignore partial flush errors. _ = b.ForceFlush(ctx) } @@ -642,7 +642,7 @@ func TestQueue(t *testing.T) { } func BenchmarkBatchProcessorOnEmit(b *testing.B) { - var r Record + r := new(Record) body := log.BoolValue(true) r.SetBody(body) diff --git a/sdk/log/logger_norace_test.go b/sdk/log/logger_norace_test.go deleted file mode 100644 index a7a9aaebdb9..00000000000 --- a/sdk/log/logger_norace_test.go +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -//go:build !race - -package log - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "go.opentelemetry.io/otel/log" - "go.opentelemetry.io/otel/sdk/instrumentation" -) - -func TestAllocationLimits(t *testing.T) { - // This test is not run with a race detector. The sync.Pool used by parts - // of the SDK has memory optimizations removed for the race detector. Do - // not test performance of the SDK in that state. - - const runs = 10 - - logger := newLogger(NewLoggerProvider(), instrumentation.Scope{}) - - r := log.Record{} - r.SetTimestamp(time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)) - r.SetObservedTimestamp(time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)) - r.SetBody(log.StringValue("testing body value")) - r.SetSeverity(log.SeverityInfo) - r.SetSeverityText("testing text") - - r.AddAttributes( - log.String("k1", "str"), - log.Float64("k2", 1.0), - log.Int("k3", 2), - log.Bool("k4", true), - log.Bytes("k5", []byte{1}), - ) - - assert.Equal(t, 0.0, testing.AllocsPerRun(runs, func() { - logger.newRecord(context.Background(), r) - }), "newRecord") -} diff --git a/sdk/log/provider_test.go b/sdk/log/provider_test.go index 55dc516a7b1..35994431b42 100644 --- a/sdk/log/provider_test.go +++ b/sdk/log/provider_test.go @@ -38,16 +38,16 @@ func newProcessor(name string) *processor { return &processor{Name: name, enabled: true} } -func (p *processor) OnEmit(ctx context.Context, r Record) error { +func (p *processor) OnEmit(ctx context.Context, r *Record) error { if p.Err != nil { return p.Err } - p.records = append(p.records, r) + p.records = append(p.records, *r) return nil } -func (p *processor) Enabled(context.Context, Record) bool { +func (p *processor) Enabled(context.Context, *Record) bool { return p.enabled } diff --git a/sdk/log/simple_test.go b/sdk/log/simple_test.go index 805130465b0..a47137669e0 100644 --- a/sdk/log/simple_test.go +++ b/sdk/log/simple_test.go @@ -42,17 +42,17 @@ func TestSimpleProcessorOnEmit(t *testing.T) { e := new(exporter) s := log.NewSimpleProcessor(e) - var r log.Record + r := new(log.Record) r.SetSeverityText("test") _ = s.OnEmit(context.Background(), r) require.True(t, e.exportCalled, "exporter Export not called") - assert.Equal(t, []log.Record{r}, e.records) + assert.Equal(t, []log.Record{*r}, e.records) } func TestSimpleProcessorEnabled(t *testing.T) { s := log.NewSimpleProcessor(nil) - assert.True(t, s.Enabled(context.Background(), log.Record{})) + assert.True(t, s.Enabled(context.Background(), new(log.Record))) } func TestSimpleProcessorShutdown(t *testing.T) { @@ -75,7 +75,7 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) { var wg sync.WaitGroup wg.Add(goRoutineN) - var r log.Record + r := new(log.Record) r.SetSeverityText("test") ctx := context.Background() s := log.NewSimpleProcessor(nil) @@ -94,7 +94,7 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) { } func BenchmarkSimpleProcessorOnEmit(b *testing.B) { - var r log.Record + r := new(log.Record) r.SetSeverityText("test") ctx := context.Background() s := log.NewSimpleProcessor(nil) From fd713c8ab163a477dc3a433e71b80bffec7106d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Mon, 22 Jul 2024 13:01:06 +0200 Subject: [PATCH 04/13] gofumpt --- sdk/log/bench_test.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/sdk/log/bench_test.go b/sdk/log/bench_test.go index c58dd9e55e7..43c237a3a5f 100644 --- a/sdk/log/bench_test.go +++ b/sdk/log/bench_test.go @@ -109,8 +109,7 @@ func BenchmarkProcessor(b *testing.B) { } } -type timestampProcessor struct { -} +type timestampProcessor struct{} func (p timestampProcessor) OnEmit(ctx context.Context, r *Record) error { r.SetObservedTimestamp(time.Date(1988, time.November, 17, 0, 0, 0, 0, time.UTC)) @@ -129,8 +128,7 @@ func (p timestampProcessor) ForceFlush(ctx context.Context) error { return nil } -type attrAddProcessor struct { -} +type attrAddProcessor struct{} func (p attrAddProcessor) OnEmit(ctx context.Context, r *Record) error { r.AddAttributes(log.String("add", "me")) @@ -149,8 +147,7 @@ func (p attrAddProcessor) ForceFlush(ctx context.Context) error { return nil } -type attrSetDecorator struct { -} +type attrSetDecorator struct{} func (p attrSetDecorator) OnEmit(ctx context.Context, r *Record) error { r.SetAttributes(log.String("replace", "me")) From 07d7af983c59859384a238e85a041667915403ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Mon, 22 Jul 2024 13:47:54 +0200 Subject: [PATCH 05/13] Update DESIGN.md --- sdk/log/DESIGN.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/sdk/log/DESIGN.md b/sdk/log/DESIGN.md index cda06511203..3bb0574d6b6 100644 --- a/sdk/log/DESIGN.md +++ b/sdk/log/DESIGN.md @@ -146,7 +146,25 @@ provided via API. Moreover it is safer to have these abstraction decoupled. E.g. there can be a need for some fields that can be set via API and cannot be modified by the processors. +### Processor to accept Record values + +There was a proposal to make the [Processor](#processor) accept a +[Record](#record) value instead of a pointer. + +There have been long discussions within the OpenTelemetry Specification SIG[^5] +about whether such a design would comply with the specification. The summary +was that the current processor design flaws are present in other languages as +well. Therefore, it would be favorable to introduce new processing concepts +(e.g. chaining processors) in the specification that would coexist with the +current "mutable" processor design. + +The performance disadvantages caused by using a pointer (which at the time of +writing causes an additional heap allocation) may be mitigated by future +versions of the Go compiler, thanks to improved escape analysis and +profile-guided optimization (PGO). + [^1]: [A Guide to the Go Garbage Collector](https://tip.golang.org/doc/gc-guide) [^2]: [OpenTelemetry Logging](https://opentelemetry.io/docs/specs/otel/logs) [^3]: [Conversation on representing LogRecordProcessor and LogRecordExporter via a single Expoter interface](https://github.com/open-telemetry/opentelemetry-go/pull/4954#discussion_r1515050480) [^4]: [Introduce Processor](https://github.com/pellared/opentelemetry-go/pull/9) +[^5]: [Log record mutations do not have to be visible in next registered processors](https://github.com/open-telemetry/opentelemetry-specification/pull/4067) From 94f3c6e06119afae8326bf7ce15fe894dc7a1a12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Mon, 22 Jul 2024 13:55:54 +0200 Subject: [PATCH 06/13] Make BatchProcessor safer --- sdk/log/batch.go | 2 +- sdk/log/processor.go | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sdk/log/batch.go b/sdk/log/batch.go index 4c1c8a8d0a4..5115d8763b1 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -180,7 +180,7 @@ func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error { if b.stopped.Load() || b.q == nil { return nil } - if n := b.q.Enqueue(*r); n >= b.batchSize { + if n := b.q.Enqueue(*r.Clone()); n >= b.batchSize { select { case b.pollTrigger <- struct{}{}: default: diff --git a/sdk/log/processor.go b/sdk/log/processor.go index 6945c6cc11e..6f3134c4e2f 100644 --- a/sdk/log/processor.go +++ b/sdk/log/processor.go @@ -28,8 +28,9 @@ type Processor interface { // // Implementations may synchronously modify the record so that the changes // are visible in the next registered processor. - // Implementations should not modify the record asynchronously as [Record] - // is not concurrent safe. + // Notice that [Record] is not concurrent safe. Therefore, asynchronous + // processing may cause race conditions. Use [Record.Clone] + // to create a copy that shares no state with the original. OnEmit(ctx context.Context, record *Record) error // Enabled returns whether the Processor will process for the given context // and record. @@ -48,8 +49,9 @@ type Processor interface { // // Implementations may synchronously modify the record so that the changes // are visible in the next registered processor. - // Implementations should not modify the record asynchronously as [Record] - // is not concurrent safe. + // Notice that [Record] is not concurrent safe. Therefore, asynchronous + // processing may cause race conditions. Use [Record.Clone] + // to create a copy that shares no state with the original. Enabled(ctx context.Context, record *Record) bool // Shutdown is called when the SDK shuts down. Any cleanup or release of // resources held by the exporter should be done in this call. From 3fe660bfc64e8d587d737194243cf8fcd2f0c11c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Mon, 22 Jul 2024 13:57:55 +0200 Subject: [PATCH 07/13] Update CHANGELOG.md --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2447af548e4..a473323269a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,8 +18,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Changed -- `Processor` in `go.opentelemetry.io/otel/sdk/log` now accepts a pointer to `Record` instead of a value so that the record modifications done in a processor are propagated to subsequent registered processors. (#TODO) -- `Record.Clone` in `go.opentelemetry.io/otel/sdk/log` now returns a pointer to `Record`. (#TODO) +- `Processor` in `go.opentelemetry.io/otel/sdk/log` now accepts a pointer to `Record` instead of a value so that the record modifications done in a processor are propagated to subsequent registered processors. (#5636) +- `Record.Clone` in `go.opentelemetry.io/otel/sdk/log` now returns a pointer to `Record`. (#5636) ### Fixed From 4f5cdb6b7ae9501341f4c8d718002310fb068e87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Mon, 22 Jul 2024 15:26:47 +0200 Subject: [PATCH 08/13] Refine commments --- sdk/log/processor.go | 4 ++++ sdk/log/provider.go | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/sdk/log/processor.go b/sdk/log/processor.go index 6f3134c4e2f..c755fd0ff18 100644 --- a/sdk/log/processor.go +++ b/sdk/log/processor.go @@ -26,6 +26,8 @@ type Processor interface { // considered unrecoverable and will be reported to a configured error // Handler. // + // The SDK invokes the processors sequentially in the same order as + // they were registered using [WithProcessor]. // Implementations may synchronously modify the record so that the changes // are visible in the next registered processor. // Notice that [Record] is not concurrent safe. Therefore, asynchronous @@ -47,6 +49,8 @@ type Processor interface { // indeterminate state, but may return false if valid reasons in particular // circumstances exist (e.g. performance, correctness). // + // The SDK invokes the processors sequentially in the same order as + // they were registered using [WithProcessor] until any processor returns true. // Implementations may synchronously modify the record so that the changes // are visible in the next registered processor. // Notice that [Record] is not concurrent safe. Therefore, asynchronous diff --git a/sdk/log/provider.go b/sdk/log/provider.go index 84bb14c5ec7..eb1d98acf37 100644 --- a/sdk/log/provider.go +++ b/sdk/log/provider.go @@ -189,8 +189,8 @@ func WithResource(res *resource.Resource) LoggerProviderOption { // By default, if this option is not used, the LoggerProvider will perform no // operations; no data will be exported without a processor. // -// Each WithProcessor creates a separate pipeline. Use custom decorators -// for advanced scenarios such as enriching with attributes. +// The SDK invokes the processors sequentially in the same order as they were +// registered. // // For production, use [NewBatchProcessor] to batch log records before they are exported. // For testing and debugging, use [NewSimpleProcessor] to synchronously export log records. From 8b64c68ed1b7b873bce3aabde24d74f6a4f3bb64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Wed, 24 Jul 2024 08:12:39 +0200 Subject: [PATCH 09/13] Apply suggestions from code review Co-authored-by: Sam Xie --- sdk/log/DESIGN.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/log/DESIGN.md b/sdk/log/DESIGN.md index 3bb0574d6b6..71498b8a97d 100644 --- a/sdk/log/DESIGN.md +++ b/sdk/log/DESIGN.md @@ -161,10 +161,11 @@ current "mutable" processor design. The performance disadvantages caused by using a pointer (which at the time of writing causes an additional heap allocation) may be mitigated by future versions of the Go compiler, thanks to improved escape analysis and -profile-guided optimization (PGO). +profile-guided optimization (PGO)[^6]. [^1]: [A Guide to the Go Garbage Collector](https://tip.golang.org/doc/gc-guide) [^2]: [OpenTelemetry Logging](https://opentelemetry.io/docs/specs/otel/logs) [^3]: [Conversation on representing LogRecordProcessor and LogRecordExporter via a single Expoter interface](https://github.com/open-telemetry/opentelemetry-go/pull/4954#discussion_r1515050480) [^4]: [Introduce Processor](https://github.com/pellared/opentelemetry-go/pull/9) [^5]: [Log record mutations do not have to be visible in next registered processors](https://github.com/open-telemetry/opentelemetry-specification/pull/4067) +[^6]: [Profile-guided optimization](https://go.dev/doc/pgo) From 790ab22ab422b8ff9bc907d50d364d049b77d8f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Wed, 24 Jul 2024 08:18:03 +0200 Subject: [PATCH 10/13] Revert Record.Clone changes --- CHANGELOG.md | 1 - sdk/log/batch.go | 2 +- sdk/log/record.go | 4 ++-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a473323269a..476582fe528 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,6 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Changed - `Processor` in `go.opentelemetry.io/otel/sdk/log` now accepts a pointer to `Record` instead of a value so that the record modifications done in a processor are propagated to subsequent registered processors. (#5636) -- `Record.Clone` in `go.opentelemetry.io/otel/sdk/log` now returns a pointer to `Record`. (#5636) ### Fixed diff --git a/sdk/log/batch.go b/sdk/log/batch.go index 5115d8763b1..0f5d8ebc05b 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -180,7 +180,7 @@ func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error { if b.stopped.Load() || b.q == nil { return nil } - if n := b.q.Enqueue(*r.Clone()); n >= b.batchSize { + if n := b.q.Enqueue(r.Clone()); n >= b.batchSize { select { case b.pollTrigger <- struct{}{}: default: diff --git a/sdk/log/record.go b/sdk/log/record.go index 7d8941676cf..a6e50df7782 100644 --- a/sdk/log/record.go +++ b/sdk/log/record.go @@ -384,10 +384,10 @@ func (r *Record) InstrumentationScope() instrumentation.Scope { // Clone returns a copy of the record with no shared state. The original record // and the clone can both be modified without interfering with each other. -func (r *Record) Clone() *Record { +func (r *Record) Clone() Record { res := *r res.back = slices.Clone(r.back) - return &res + return res } func (r *Record) applyAttrLimits(attr log.KeyValue) log.KeyValue { From 55dafb525a94410489202e8b0bd7eeb43b89136d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Wed, 24 Jul 2024 08:19:21 +0200 Subject: [PATCH 11/13] Add comment why we are cloning the record --- sdk/log/batch.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/log/batch.go b/sdk/log/batch.go index 0f5d8ebc05b..31677ec5723 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -180,6 +180,8 @@ func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error { if b.stopped.Load() || b.q == nil { return nil } + // The record is cloned so that changes done by subsequent processors + // are not going to lead to a data race. if n := b.q.Enqueue(r.Clone()); n >= b.batchSize { select { case b.pollTrigger <- struct{}{}: From b8b7fb818fa7f03a9088023733a58341b0d3308a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Tue, 30 Jul 2024 18:19:36 +0200 Subject: [PATCH 12/13] Update sdk/log/example_test.go Co-authored-by: Tyler Yahn --- sdk/log/example_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/log/example_test.go b/sdk/log/example_test.go index 01cf907dd33..c08b877f9a7 100644 --- a/sdk/log/example_test.go +++ b/sdk/log/example_test.go @@ -110,6 +110,7 @@ func ExampleProcessor_redact() { // The created processor can then be registered with // the OpenTelemetry Logs SDK using the WithProcessor option. _ = log.NewLoggerProvider( + // Order is important here. Redact before handing to the processor. log.WithProcessor(redactProcessor), log.WithProcessor(processor), ) From dfa72a60aabcdc6f8141f0bd901caa9f228fe1f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Wed, 31 Jul 2024 18:51:32 +0200 Subject: [PATCH 13/13] Processor.Enabled to accept Record value --- CHANGELOG.md | 2 +- sdk/log/DESIGN.md | 11 ++++++++--- sdk/log/batch.go | 2 +- sdk/log/batch_test.go | 6 +++--- sdk/log/bench_test.go | 6 +++--- sdk/log/example_test.go | 4 ++-- sdk/log/logger.go | 6 +++--- sdk/log/processor.go | 9 +++------ sdk/log/provider_test.go | 2 +- sdk/log/simple.go | 2 +- sdk/log/simple_test.go | 4 ++-- 11 files changed, 28 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 458f6388f1f..e0cc72e908f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Changed -- `Processor` in `go.opentelemetry.io/otel/sdk/log` now accepts a pointer to `Record` instead of a value so that the record modifications done in a processor are propagated to subsequent registered processors. (#5636) +- `Processor.OnEmit` in `go.opentelemetry.io/otel/sdk/log` now accepts a pointer to `Record` instead of a value so that the record modifications done in a processor are propagated to subsequent registered processors. (#5636) ### Fixed diff --git a/sdk/log/DESIGN.md b/sdk/log/DESIGN.md index 71498b8a97d..c94d50d8ffd 100644 --- a/sdk/log/DESIGN.md +++ b/sdk/log/DESIGN.md @@ -146,10 +146,11 @@ provided via API. Moreover it is safer to have these abstraction decoupled. E.g. there can be a need for some fields that can be set via API and cannot be modified by the processors. -### Processor to accept Record values +### Processor.OnEmit to accept Record values -There was a proposal to make the [Processor](#processor) accept a -[Record](#record) value instead of a pointer. +There was a proposal to make the [Processor](#processor)'s `OnEmit` +to accept a [Record](#record) value instead of a pointer to reduce allocations +as well as to have design similar to [`slog.Handler`](https://pkg.go.dev/log/slog#Handler). There have been long discussions within the OpenTelemetry Specification SIG[^5] about whether such a design would comply with the specification. The summary @@ -163,6 +164,10 @@ writing causes an additional heap allocation) may be mitigated by future versions of the Go compiler, thanks to improved escape analysis and profile-guided optimization (PGO)[^6]. +On the other hand, [Processor](#processor)'s `Enabled` is fine to accept +a [Record](#record) value as the processors should not mutate the passed +parameters. + [^1]: [A Guide to the Go Garbage Collector](https://tip.golang.org/doc/gc-guide) [^2]: [OpenTelemetry Logging](https://opentelemetry.io/docs/specs/otel/logs) [^3]: [Conversation on representing LogRecordProcessor and LogRecordExporter via a single Expoter interface](https://github.com/open-telemetry/opentelemetry-go/pull/4954#discussion_r1515050480) diff --git a/sdk/log/batch.go b/sdk/log/batch.go index 31677ec5723..fd913901c28 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -195,7 +195,7 @@ func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error { } // Enabled returns if b is enabled. -func (b *BatchProcessor) Enabled(context.Context, *Record) bool { +func (b *BatchProcessor) Enabled(context.Context, Record) bool { return !b.stopped.Load() && b.q != nil } diff --git a/sdk/log/batch_test.go b/sdk/log/batch_test.go index ec2023c825a..0b9ece02a3d 100644 --- a/sdk/log/batch_test.go +++ b/sdk/log/batch_test.go @@ -47,7 +47,7 @@ func TestEmptyBatchConfig(t *testing.T) { ctx := context.Background() record := new(Record) assert.NoError(t, bp.OnEmit(ctx, record), "OnEmit") - assert.False(t, bp.Enabled(ctx, record), "Enabled") + assert.False(t, bp.Enabled(ctx, *record), "Enabled") assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush") assert.NoError(t, bp.Shutdown(ctx), "Shutdown") }) @@ -272,10 +272,10 @@ func TestBatchProcessor(t *testing.T) { t.Run("Enabled", func(t *testing.T) { b := NewBatchProcessor(defaultNoopExporter) - assert.True(t, b.Enabled(ctx, new(Record))) + assert.True(t, b.Enabled(ctx, Record{})) _ = b.Shutdown(ctx) - assert.False(t, b.Enabled(ctx, new(Record))) + assert.False(t, b.Enabled(ctx, Record{})) }) t.Run("Shutdown", func(t *testing.T) { diff --git a/sdk/log/bench_test.go b/sdk/log/bench_test.go index 43c237a3a5f..835f68c7aba 100644 --- a/sdk/log/bench_test.go +++ b/sdk/log/bench_test.go @@ -116,7 +116,7 @@ func (p timestampProcessor) OnEmit(ctx context.Context, r *Record) error { return nil } -func (p timestampProcessor) Enabled(context.Context, *Record) bool { +func (p timestampProcessor) Enabled(context.Context, Record) bool { return true } @@ -135,7 +135,7 @@ func (p attrAddProcessor) OnEmit(ctx context.Context, r *Record) error { return nil } -func (p attrAddProcessor) Enabled(context.Context, *Record) bool { +func (p attrAddProcessor) Enabled(context.Context, Record) bool { return true } @@ -154,7 +154,7 @@ func (p attrSetDecorator) OnEmit(ctx context.Context, r *Record) error { return nil } -func (p attrSetDecorator) Enabled(context.Context, *Record) bool { +func (p attrSetDecorator) Enabled(context.Context, Record) bool { return true } diff --git a/sdk/log/example_test.go b/sdk/log/example_test.go index c08b877f9a7..7b697db3ea7 100644 --- a/sdk/log/example_test.go +++ b/sdk/log/example_test.go @@ -90,7 +90,7 @@ func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record) return p.Processor.OnEmit(ctx, record) } -func (p *ContextFilterProcessor) Enabled(ctx context.Context, record *log.Record) bool { +func (p *ContextFilterProcessor) Enabled(ctx context.Context, record log.Record) bool { return !ignoreLogs(ctx) && p.Processor.Enabled(ctx, record) } @@ -133,7 +133,7 @@ func (p *RedactTokensProcessor) OnEmit(ctx context.Context, record *log.Record) } // Enabled returns true. -func (p *RedactTokensProcessor) Enabled(context.Context, *log.Record) bool { +func (p *RedactTokensProcessor) Enabled(context.Context, log.Record) bool { return true } diff --git a/sdk/log/logger.go b/sdk/log/logger.go index 169eb1762cf..04c44ac5bb8 100644 --- a/sdk/log/logger.go +++ b/sdk/log/logger.go @@ -36,7 +36,7 @@ func newLogger(p *LoggerProvider, scope instrumentation.Scope) *logger { func (l *logger) Emit(ctx context.Context, r log.Record) { newRecord := l.newRecord(ctx, r) for _, p := range l.provider.processors { - if err := p.OnEmit(ctx, newRecord); err != nil { + if err := p.OnEmit(ctx, &newRecord); err != nil { otel.Handle(err) } } @@ -52,10 +52,10 @@ func (l *logger) Enabled(ctx context.Context, r log.Record) bool { return false } -func (l *logger) newRecord(ctx context.Context, r log.Record) *Record { +func (l *logger) newRecord(ctx context.Context, r log.Record) Record { sc := trace.SpanContextFromContext(ctx) - newRecord := &Record{ + newRecord := Record{ timestamp: r.Timestamp(), observedTimestamp: r.ObservedTimestamp(), severity: r.Severity(), diff --git a/sdk/log/processor.go b/sdk/log/processor.go index c755fd0ff18..2fcff98b62d 100644 --- a/sdk/log/processor.go +++ b/sdk/log/processor.go @@ -51,12 +51,9 @@ type Processor interface { // // The SDK invokes the processors sequentially in the same order as // they were registered using [WithProcessor] until any processor returns true. - // Implementations may synchronously modify the record so that the changes - // are visible in the next registered processor. - // Notice that [Record] is not concurrent safe. Therefore, asynchronous - // processing may cause race conditions. Use [Record.Clone] - // to create a copy that shares no state with the original. - Enabled(ctx context.Context, record *Record) bool + // + // Implementations should not modify the record. + Enabled(ctx context.Context, record Record) bool // Shutdown is called when the SDK shuts down. Any cleanup or release of // resources held by the exporter should be done in this call. // diff --git a/sdk/log/provider_test.go b/sdk/log/provider_test.go index fa38f3d37d7..39071b6a98f 100644 --- a/sdk/log/provider_test.go +++ b/sdk/log/provider_test.go @@ -47,7 +47,7 @@ func (p *processor) OnEmit(ctx context.Context, r *Record) error { return nil } -func (p *processor) Enabled(context.Context, *Record) bool { +func (p *processor) Enabled(context.Context, Record) bool { return p.enabled } diff --git a/sdk/log/simple.go b/sdk/log/simple.go index 21bcad9f1e0..8b6c34dc080 100644 --- a/sdk/log/simple.go +++ b/sdk/log/simple.go @@ -51,7 +51,7 @@ func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error { } // Enabled returns true. -func (s *SimpleProcessor) Enabled(context.Context, *Record) bool { +func (s *SimpleProcessor) Enabled(context.Context, Record) bool { return true } diff --git a/sdk/log/simple_test.go b/sdk/log/simple_test.go index a47137669e0..dbc91a90156 100644 --- a/sdk/log/simple_test.go +++ b/sdk/log/simple_test.go @@ -52,7 +52,7 @@ func TestSimpleProcessorOnEmit(t *testing.T) { func TestSimpleProcessorEnabled(t *testing.T) { s := log.NewSimpleProcessor(nil) - assert.True(t, s.Enabled(context.Background(), new(log.Record))) + assert.True(t, s.Enabled(context.Background(), log.Record{})) } func TestSimpleProcessorShutdown(t *testing.T) { @@ -84,7 +84,7 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) { defer wg.Done() _ = s.OnEmit(ctx, r) - _ = s.Enabled(ctx, r) + _ = s.Enabled(ctx, *r) _ = s.Shutdown(ctx) _ = s.ForceFlush(ctx) }()