Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[consumer] Add tracing to all the components #10847

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
fde3646
Merge branch 'main' of github.com:open-telemetry/opentelemetry-collector
iblancasa Aug 9, 2024
ee7fb87
Add tracing information to all the components #8804
iblancasa Aug 9, 2024
04b75b1
Merge branch 'main' into feature/8804
iblancasa Aug 9, 2024
5e5a9cc
Fix tests
iblancasa Aug 9, 2024
2a9039f
Merge branch 'feature/8804' of github.com:iblancasa/opentelemetry-col…
iblancasa Aug 9, 2024
17020ce
Merge branch 'main' into feature/8804
iblancasa Aug 13, 2024
fe46a6e
Merge branch 'main' into feature/8804
iblancasa Aug 13, 2024
436e26a
Merge branch 'main' into feature/8804
iblancasa Aug 13, 2024
8e6f7ec
Merge branch 'main' into feature/8804
iblancasa Aug 14, 2024
422f7a0
Merge branch 'main' into feature/8804
iblancasa Aug 19, 2024
ff1d3ed
Merge branch 'main' into feature/8804
iblancasa Aug 20, 2024
d8990d0
Merge branch 'main' into feature/8804
iblancasa Aug 20, 2024
dc52520
Merge branch 'main' into feature/8804
iblancasa Aug 21, 2024
fe9a81f
Merge branch 'main' into feature/8804
iblancasa Aug 22, 2024
cab9aa0
Merge branch 'main' into feature/8804
iblancasa Aug 22, 2024
83be7f6
Merge branch 'main' into feature/8804
iblancasa Aug 22, 2024
147c37d
Merge branch 'main' into feature/8804
iblancasa Aug 26, 2024
404d723
Merge branch 'main' into feature/8804
iblancasa Aug 27, 2024
7208e48
Merge branch 'main' into feature/8804
iblancasa Aug 27, 2024
adb1e43
Merge branch 'main' into feature/8804
iblancasa Aug 28, 2024
451524b
Merge branch 'main' into feature/8804
iblancasa Aug 28, 2024
f41b801
Merge branch 'main' into feature/8804
iblancasa Aug 30, 2024
bf2f6ac
Merge branch 'main' into feature/8804
iblancasa Aug 30, 2024
959ef39
Merge branch 'main' into feature/8804
iblancasa Sep 9, 2024
c932261
Merge branch 'main' into feature/8804
iblancasa Sep 11, 2024
958692b
Merge branch 'main' into feature/8804
iblancasa Sep 12, 2024
08384a9
Merge branch 'main' into feature/8804
iblancasa Sep 12, 2024
4a9985e
Merge branch 'main' into feature/8804
iblancasa Sep 13, 2024
eefffd6
Merge branch 'main' into feature/8804
iblancasa Sep 16, 2024
4e7ad6d
Merge branch 'main' into feature/8804
iblancasa Sep 16, 2024
0c5c512
Merge branch 'main' into feature/8804
iblancasa Sep 17, 2024
1af6c2a
Merge branch 'main' into feature/8804
iblancasa Sep 18, 2024
da8e4fc
Merge branch 'main' into feature/8804
iblancasa Sep 18, 2024
f8a0630
Merge branch 'main' into feature/8804
iblancasa Sep 19, 2024
efbe92d
Merge branch 'main' into feature/8804
iblancasa Sep 19, 2024
a302c34
Merge branch 'main' into feature/8804
iblancasa Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/add-tracing-information-to-all-components.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: consumer

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Add tracing information to all the components"

# One or more tracking issues or pull requests related to the change
issues: [8804]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
9 changes: 9 additions & 0 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// Capabilities describes the capabilities of a Processor.
type Capabilities = internal.Capabilities

// ObsReport describes the observability report of a consumer.
type ObsReport = internal.ObsReport

var errNilFunc = errors.New("nil consumer func")

// Option to construct new consumers.
Expand All @@ -24,3 +27,9 @@
o.Cap = capabilities
})
}

func WithObsReport(report ObsReport) Option {
return func(o *internal.BaseImpl) {

Check failure on line 32 in consumer/consumer.go

View workflow job for this annotation

GitHub Actions / Integration test

cannot use func(o *internal.BaseImpl) {…} (value of type func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl)) as "go.opentelemetry.io/collector/consumer/internal".Option value in return statement: func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl) does not implement "go.opentelemetry.io/collector/consumer/internal".Option (missing method apply)

Check failure on line 32 in consumer/consumer.go

View workflow job for this annotation

GitHub Actions / govulncheck

cannot use (func(o *internal.BaseImpl) literal) (value of type func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl)) as "go.opentelemetry.io/collector/consumer/internal".Option value in return statement: func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl) does not implement "go.opentelemetry.io/collector/consumer/internal".Option (missing method apply)

Check failure on line 32 in consumer/consumer.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (pkg)

cannot use func(o *internal.BaseImpl) {…} (value of type func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl)) as "go.opentelemetry.io/collector/consumer/internal".Option value in return statement: func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl) does not implement "go.opentelemetry.io/collector/consumer/internal".Option (missing method apply)

Check failure on line 32 in consumer/consumer.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (pkg)

cannot use func(o *internal.BaseImpl) {…} (value of type func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl)) as "go.opentelemetry.io/collector/consumer/internal".Option value in return statement: func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl) does not implement "go.opentelemetry.io/collector/consumer/internal".Option (missing method apply)

Check failure on line 32 in consumer/consumer.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

cannot use func(o *internal.BaseImpl) {…} (value of type func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl)) as "go.opentelemetry.io/collector/consumer/internal".Option value in return statement: func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl) does not implement "go.opentelemetry.io/collector/consumer/internal".Option (missing method apply)

Check failure on line 32 in consumer/consumer.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

cannot use func(o *internal.BaseImpl) {…} (value of type func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl)) as "go.opentelemetry.io/collector/consumer/internal".Option value in return statement: func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl) does not implement "go.opentelemetry.io/collector/consumer/internal".Option (missing method apply)

Check failure on line 32 in consumer/consumer.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

cannot use func(o *internal.BaseImpl) {…} (value of type func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl)) as "go.opentelemetry.io/collector/consumer/internal".Option value in return statement: func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl) does not implement "go.opentelemetry.io/collector/consumer/internal".Option (missing method apply)

Check failure on line 32 in consumer/consumer.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

cannot use func(o *internal.BaseImpl) {…} (value of type func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl)) as "go.opentelemetry.io/collector/consumer/internal".Option value in return statement: func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl) does not implement "go.opentelemetry.io/collector/consumer/internal".Option (missing method apply)

Check failure on line 32 in consumer/consumer.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

cannot use func(o *internal.BaseImpl) {…} (value of type func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl)) as "go.opentelemetry.io/collector/consumer/internal".Option value in return statement: func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl) does not implement "go.opentelemetry.io/collector/consumer/internal".Option (missing method apply)

Check failure on line 32 in consumer/consumer.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

cannot use func(o *internal.BaseImpl) {…} (value of type func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl)) as "go.opentelemetry.io/collector/consumer/internal".Option value in return statement: func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl) does not implement "go.opentelemetry.io/collector/consumer/internal".Option (missing method apply)

Check failure on line 32 in consumer/consumer.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

cannot use func(o *internal.BaseImpl) {…} (value of type func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl)) as "go.opentelemetry.io/collector/consumer/internal".Option value in return statement: func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl) does not implement "go.opentelemetry.io/collector/consumer/internal".Option (missing method apply)

Check failure on line 32 in consumer/consumer.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

cannot use func(o *internal.BaseImpl) {…} (value of type func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl)) as "go.opentelemetry.io/collector/consumer/internal".Option value in return statement: func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl) does not implement "go.opentelemetry.io/collector/consumer/internal".Option (missing method apply)

Check failure on line 32 in consumer/consumer.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

cannot use func(o *internal.BaseImpl) {…} (value of type func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl)) as "go.opentelemetry.io/collector/consumer/internal".Option value in return statement: func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl) does not implement "go.opentelemetry.io/collector/consumer/internal".Option (missing method apply)

Check failure on line 32 in consumer/consumer.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (processor)

cannot use func(o *internal.BaseImpl) {…} (value of type func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl)) as "go.opentelemetry.io/collector/consumer/internal".Option value in return statement: func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl) does not implement "go.opentelemetry.io/collector/consumer/internal".Option (missing method apply)

Check failure on line 32 in consumer/consumer.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

cannot use func(o *internal.BaseImpl) {…} (value of type func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl)) as "go.opentelemetry.io/collector/consumer/internal".Option value in return statement: func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl) does not implement "go.opentelemetry.io/collector/consumer/internal".Option (missing method apply)

Check failure on line 32 in consumer/consumer.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

cannot use func(o *internal.BaseImpl) {…} (value of type func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl)) as "go.opentelemetry.io/collector/consumer/internal".Option value in return statement: func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl) does not implement "go.opentelemetry.io/collector/consumer/internal".Option (missing method apply)

Check failure on line 32 in consumer/consumer.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

cannot use func(o *internal.BaseImpl) {…} (value of type func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl)) as "go.opentelemetry.io/collector/consumer/internal".Option value in return statement: func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl) does not implement "go.opentelemetry.io/collector/consumer/internal".Option (missing method apply)

Check failure on line 32 in consumer/consumer.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

cannot use func(o *internal.BaseImpl) {…} (value of type func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl)) as "go.opentelemetry.io/collector/consumer/internal".Option value in return statement: func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl) does not implement "go.opentelemetry.io/collector/consumer/internal".Option (missing method apply)

Check failure on line 32 in consumer/consumer.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

cannot use func(o *internal.BaseImpl) {…} (value of type func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl)) as "go.opentelemetry.io/collector/consumer/internal".Option value in return statement: func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl) does not implement "go.opentelemetry.io/collector/consumer/internal".Option (missing method apply)

Check failure on line 32 in consumer/consumer.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

cannot use func(o *internal.BaseImpl) {…} (value of type func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl)) as "go.opentelemetry.io/collector/consumer/internal".Option value in return statement: func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl) does not implement "go.opentelemetry.io/collector/consumer/internal".Option (missing method apply)

Check failure on line 32 in consumer/consumer.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

cannot use func(o *internal.BaseImpl) {…} (value of type func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl)) as "go.opentelemetry.io/collector/consumer/internal".Option value in return statement: func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl) does not implement "go.opentelemetry.io/collector/consumer/internal".Option (missing method apply)

Check failure on line 32 in consumer/consumer.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

cannot use func(o *internal.BaseImpl) {…} (value of type func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl)) as "go.opentelemetry.io/collector/consumer/internal".Option value in return statement: func(o *"go.opentelemetry.io/collector/consumer/internal".BaseImpl) does not implement "go.opentelemetry.io/collector/consumer/internal".Option (missing method apply)
o.ObsReport = report
}
}
6 changes: 4 additions & 2 deletions consumer/internal/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ type BaseConsumer interface {
}

type BaseImpl struct {
Cap Capabilities
Cap Capabilities
ObsReport ObsReport
}

// Option to construct new consumers.
Expand All @@ -39,7 +40,8 @@ func (bs BaseImpl) Capabilities() Capabilities {

func NewBaseImpl(options ...Option) *BaseImpl {
bs := &BaseImpl{
Cap: Capabilities{MutatesData: false},
Cap: Capabilities{MutatesData: false},
ObsReport: noopObsReport,
}

for _, op := range options {
Expand Down
23 changes: 23 additions & 0 deletions consumer/internal/obsreport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "go.opentelemetry.io/collector/consumer/internal"

import "context"

// ObsReport contains information required to make an implementor
// of Consumer observable.
type ObsReport interface {
StartTracesOp(context.Context) context.Context
EndTracesOp(context.Context, int, error)
}

type baseObsReport struct{}

func (bor baseObsReport) StartTracesOp(ctx context.Context) context.Context {
return ctx
}

func (bor baseObsReport) EndTracesOp(_ context.Context, _ int, _ error) {}

var noopObsReport = baseObsReport{}
13 changes: 11 additions & 2 deletions consumer/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,17 @@ func NewLogs(consume ConsumeLogsFunc, options ...Option) (Logs, error) {
if consume == nil {
return nil, errNilFunc
}
baseImpl := internal.NewBaseImpl(options...)
fn := func(ctx context.Context, ld plog.Logs) error {
baseImpl.ObsReport.StartTracesOp(ctx)
logRecordCount := ld.LogRecordCount()
err := consume(ctx, ld)
baseImpl.ObsReport.EndTracesOp(ctx, logRecordCount, err)
return err
}

return &baseLogs{
BaseImpl: internal.NewBaseImpl(options...),
ConsumeLogsFunc: consume,
BaseImpl: baseImpl,
ConsumeLogsFunc: fn,
}, nil
}
25 changes: 25 additions & 0 deletions consumer/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
)

type mockObsReport struct {
StartTracesOpCalled int
EndTracesOpCalled int
}

func (m *mockObsReport) StartTracesOp(ctx context.Context) context.Context {
m.StartTracesOpCalled++
return ctx
}

func (m *mockObsReport) EndTracesOp(_ context.Context, _ int, _ error) {
m.EndTracesOpCalled++
}

func TestDefaultLogs(t *testing.T) {
cp, err := NewLogs(func(context.Context, plog.Logs) error { return nil })
assert.NoError(t, err)
Expand All @@ -35,6 +49,17 @@ func TestWithCapabilitiesLogs(t *testing.T) {
assert.Equal(t, Capabilities{MutatesData: true}, cp.Capabilities())
}

func TestWithObsReportLogs(t *testing.T) {
obsr := &mockObsReport{}
cp, err := NewLogs(
func(context.Context, plog.Logs) error { return nil },
WithObsReport(obsr))
assert.NoError(t, err)
assert.NoError(t, cp.ConsumeLogs(context.Background(), plog.NewLogs()))
assert.Equal(t, 1, obsr.StartTracesOpCalled)
assert.Equal(t, 1, obsr.EndTracesOpCalled)
}

func TestConsumeLogs(t *testing.T) {
consumeCalled := false
cp, err := NewLogs(func(context.Context, plog.Logs) error { consumeCalled = true; return nil })
Expand Down
13 changes: 11 additions & 2 deletions consumer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,17 @@ func NewMetrics(consume ConsumeMetricsFunc, options ...Option) (Metrics, error)
if consume == nil {
return nil, errNilFunc
}
baseImpl := internal.NewBaseImpl(options...)
fn := func(ctx context.Context, ld pmetric.Metrics) error {
ctx = baseImpl.ObsReport.StartTracesOp(ctx)
dataPointCount := ld.DataPointCount()
err := consume(ctx, ld)
baseImpl.ObsReport.EndTracesOp(ctx, dataPointCount, err)
return err
}

return &baseMetrics{
BaseImpl: internal.NewBaseImpl(options...),
ConsumeMetricsFunc: consume,
BaseImpl: baseImpl,
ConsumeMetricsFunc: fn,
}, nil
}
11 changes: 11 additions & 0 deletions consumer/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ func TestWithCapabilitiesMetrics(t *testing.T) {
assert.Equal(t, Capabilities{MutatesData: true}, cp.Capabilities())
}

func TestWithObsReportMetrics(t *testing.T) {
obsr := &mockObsReport{}
cp, err := NewMetrics(
func(context.Context, pmetric.Metrics) error { return nil },
WithObsReport(obsr))
assert.NoError(t, err)
assert.NoError(t, cp.ConsumeMetrics(context.Background(), pmetric.NewMetrics()))
assert.Equal(t, 1, obsr.StartTracesOpCalled)
assert.Equal(t, 1, obsr.EndTracesOpCalled)
}

func TestConsumeMetrics(t *testing.T) {
consumeCalled := false
cp, err := NewMetrics(func(context.Context, pmetric.Metrics) error { consumeCalled = true; return nil })
Expand Down
14 changes: 12 additions & 2 deletions consumer/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,18 @@ func NewTraces(consume ConsumeTracesFunc, options ...Option) (Traces, error) {
if consume == nil {
return nil, errNilFunc
}

baseImpl := internal.NewBaseImpl(options...)
fn := func(ctx context.Context, td ptrace.Traces) error {
ctx = baseImpl.ObsReport.StartTracesOp(ctx)
spanCount := td.SpanCount()
err := consume(ctx, td)
baseImpl.ObsReport.EndTracesOp(ctx, spanCount, err)
return err
}

return &baseTraces{
BaseImpl: internal.NewBaseImpl(options...),
ConsumeTracesFunc: consume,
BaseImpl: baseImpl,
ConsumeTracesFunc: fn,
}, nil
}
11 changes: 11 additions & 0 deletions consumer/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ func TestWithCapabilitiesTraces(t *testing.T) {
assert.Equal(t, Capabilities{MutatesData: true}, cp.Capabilities())
}

func TestWithWithObsReportTraces(t *testing.T) {
obsr := &mockObsReport{}
cp, err := NewTraces(
func(context.Context, ptrace.Traces) error { return nil },
WithObsReport(obsr))
assert.NoError(t, err)
assert.NoError(t, cp.ConsumeTraces(context.Background(), ptrace.NewTraces()))
assert.Equal(t, 1, obsr.StartTracesOpCalled)
assert.Equal(t, 1, obsr.EndTracesOpCalled)
}

func TestConsumeTraces(t *testing.T) {
consumeCalled := false
cp, err := NewTraces(func(context.Context, ptrace.Traces) error { consumeCalled = true; return nil })
Expand Down
Loading