Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

component/otelcol/receiver: create receiver component abstraction #2254

Merged
merged 3 commits into from
Oct 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions component/otelcol/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@ type Consumer interface {
otelconsumer.Logs
}

// ConsumerArguments is a common Arguments type for Flow components which can
// send data to otelcol consumers.
//
// It is expected to use ConsumerArguments as a block within the top-level
// arguments block for a component.
type ConsumerArguments struct {
Metrics []Consumer `river:"metrics,attr,optional"`
Logs []Consumer `river:"logs,attr,optional"`
Traces []Consumer `river:"traces,attr,optional"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll always use the fanoutconsumer package implementations for these internally, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, fanoutconsumer is only used when passing the list of consumers to the OpenTelemetry code. These will instead almost always be conusmers from the lazyconsumer package (unless one day there's different components which are otelcol-compatible that have their own approach for exposing consumers)

}

// ConsumerExports is a common Exports type for Flow components which are
// otelcol processors or otelcol exporters.
type ConsumerExports struct {
Expand Down
2 changes: 1 addition & 1 deletion component/otelcol/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (e *Exporter) Update(args component.Arguments) error {
},
}

var exporterConfig = eargs.Convert()
exporterConfig := eargs.Convert()

// Create instances of the exporter from our factory for each of our
// supported telemetry signals.
Expand Down
60 changes: 60 additions & 0 deletions component/otelcol/internal/fakeconsumer/fake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package fakeconsumer

import (
"context"

"github.com/grafana/agent/component/otelcol"
otelconsumer "go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

// Consumer is a fake otelcol.Consumer implementation which invokes functions
// when methods are called. All struct member fields are optional. If a struct
// member field is not provided, implementations of methods will default to a
// no-op.
type Consumer struct {
CapabilitiesFunc func() otelconsumer.Capabilities
ConsumeTracesFunc func(context.Context, ptrace.Traces) error
ConsumeMetricsFunc func(context.Context, pmetric.Metrics) error
ConsumeLogsFunc func(context.Context, plog.Logs) error
}

var _ otelcol.Consumer = (*Consumer)(nil)

// Capabilities implements otelcol.Consumer. If the CapabilitiesFunc is not
// provided, MutatesData is reported as true.
func (c *Consumer) Capabilities() otelconsumer.Capabilities {
if c.CapabilitiesFunc != nil {
return c.CapabilitiesFunc()
}

// We don't know what the fake implementation will do, so return true just
// in case it mutates data.
return otelconsumer.Capabilities{MutatesData: true}
}

// ConsumeTraces implements otelcol.ConsumeTraces.
func (c *Consumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
if c.ConsumeTracesFunc != nil {
return c.ConsumeTracesFunc(ctx, td)
}
return nil
}

// ConsumeMetrics implements otelcol.ConsumeMetrics.
func (c *Consumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
if c.ConsumeMetricsFunc != nil {
return c.ConsumeMetricsFunc(ctx, md)
}
return nil
}

// ConsumeLogs implements otelcol.ConsumeLogs.
func (c *Consumer) ConsumeLogs(ctx context.Context, md plog.Logs) error {
if c.ConsumeLogsFunc != nil {
return c.ConsumeLogsFunc(ctx, md)
}
return nil
}
77 changes: 77 additions & 0 deletions component/otelcol/internal/fanoutconsumer/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package fanoutconsumer

// This file is a near copy of
// https://github.com/open-telemetry/opentelemetry-collector/blob/v0.54.0/service/internal/fanoutconsumer/logs.go
//
// A copy was made because the upstream package is internal. If it is ever made
// public, our copy can be removed.

import (
"context"

"github.com/grafana/agent/component/otelcol"
otelconsumer "go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/multierr"
)

// Logs creates a new fanout consumer for logs.
func Logs(in []otelcol.Consumer) otelconsumer.Logs {
if len(in) == 1 {
return in[0]
}

var passthrough, clone []otelconsumer.Logs

// Iterate through all the consumers besides the last.
for i := 0; i < len(in)-1; i++ {
consumer := in[i]

if consumer.Capabilities().MutatesData {
clone = append(clone, consumer)
} else {
passthrough = append(passthrough, consumer)
}
}

last := in[len(in)-1]

// The final consumer can be given to the passthrough list regardless of
// whether it mutates as long as there's no other read-only consumers.
if len(passthrough) == 0 || !last.Capabilities().MutatesData {
passthrough = append(passthrough, last)
} else {
clone = append(clone, last)
}

return &logsFanout{
passthrough: passthrough,
clone: clone,
}
}

type logsFanout struct {
passthrough []otelconsumer.Logs // Consumers where data can be passed through directly
clone []otelconsumer.Logs // Consumes which require cloning data
}

func (f *logsFanout) Capabilities() otelconsumer.Capabilities {
return otelconsumer.Capabilities{MutatesData: false}
}

// ConsumeLogs exports the pmetric.Logs to all consumers wrapped by the current one.
func (f *logsFanout) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
var errs error

// Initially pass to clone exporter to avoid the case where the optimization
// of sending the incoming data to a mutating consumer is used that may
// change the incoming data before cloning.
for _, f := range f.clone {
errs = multierr.Append(errs, f.ConsumeLogs(ctx, ld.Clone()))
}
for _, f := range f.passthrough {
errs = multierr.Append(errs, f.ConsumeLogs(ctx, ld))
}

return errs
}
77 changes: 77 additions & 0 deletions component/otelcol/internal/fanoutconsumer/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package fanoutconsumer

// This file is a near copy of
// https://github.com/open-telemetry/opentelemetry-collector/blob/v0.54.0/service/internal/fanoutconsumer/metrics.go
//
// A copy was made because the upstream package is internal. If it is ever made
// public, our copy can be removed.

import (
"context"

"github.com/grafana/agent/component/otelcol"
otelconsumer "go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/multierr"
)

// Metrics creates a new fanout consumer for metrics.
func Metrics(in []otelcol.Consumer) otelconsumer.Metrics {
if len(in) == 1 {
return in[0]
}

var passthrough, clone []otelconsumer.Metrics

// Iterate through all the consumers besides the last.
for i := 0; i < len(in)-1; i++ {
consumer := in[i]

if consumer.Capabilities().MutatesData {
clone = append(clone, consumer)
} else {
passthrough = append(passthrough, consumer)
}
}

last := in[len(in)-1]

// The final consumer can be given to the passthrough list regardless of
// whether it mutates as long as there's no other read-only consumers.
if len(passthrough) == 0 || !last.Capabilities().MutatesData {
passthrough = append(passthrough, last)
} else {
clone = append(clone, last)
}

return &metricsFanout{
passthrough: passthrough,
clone: clone,
}
}

type metricsFanout struct {
passthrough []otelconsumer.Metrics // Consumers where data can be passed through directly
clone []otelconsumer.Metrics // Consumes which require cloning data
}

func (f *metricsFanout) Capabilities() otelconsumer.Capabilities {
return otelconsumer.Capabilities{MutatesData: false}
}

// ConsumeMetrics exports the pmetric.Metrics to all consumers wrapped by the current one.
func (f *metricsFanout) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
var errs error

// Initially pass to clone exporter to avoid the case where the optimization
// of sending the incoming data to a mutating consumer is used that may
// change the incoming data before cloning.
for _, f := range f.clone {
errs = multierr.Append(errs, f.ConsumeMetrics(ctx, md.Clone()))
}
for _, f := range f.passthrough {
errs = multierr.Append(errs, f.ConsumeMetrics(ctx, md))
}

return errs
}
77 changes: 77 additions & 0 deletions component/otelcol/internal/fanoutconsumer/traces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package fanoutconsumer

// This file is a near copy of
// https://github.com/open-telemetry/opentelemetry-collector/blob/v0.54.0/service/internal/fanoutconsumer/traces.go
//
// A copy was made because the upstream package is internal. If it is ever made
// public, our copy can be removed.

import (
"context"

"github.com/grafana/agent/component/otelcol"
otelconsumer "go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/multierr"
)

// Traces creates a new fanout consumer for traces.
func Traces(in []otelcol.Consumer) otelconsumer.Traces {
if len(in) == 1 {
return in[0]
}

var passthrough, clone []otelconsumer.Traces

// Iterate through all the consumers besides the last.
for i := 0; i < len(in)-1; i++ {
consumer := in[i]

if consumer.Capabilities().MutatesData {
clone = append(clone, consumer)
} else {
passthrough = append(passthrough, consumer)
}
}

last := in[len(in)-1]

// The final consumer can be given to the passthrough list regardless of
// whether it mutates as long as there's no other read-only consumers.
if len(passthrough) == 0 || !last.Capabilities().MutatesData {
passthrough = append(passthrough, last)
} else {
clone = append(clone, last)
}

return &tracesFanout{
passthrough: passthrough,
clone: clone,
}
}

type tracesFanout struct {
passthrough []otelconsumer.Traces // Consumers where data can be passed through directly
clone []otelconsumer.Traces // Consumes which require cloning data
}

func (f *tracesFanout) Capabilities() otelconsumer.Capabilities {
return otelconsumer.Capabilities{MutatesData: false}
}

// ConsumeTraces exports the pmetric.Traces to all consumers wrapped by the current one.
func (f *tracesFanout) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
var errs error

// Initially pass to clone exporter to avoid the case where the optimization
// of sending the incoming data to a mutating consumer is used that may
// change the incoming data before cloning.
for _, f := range f.clone {
errs = multierr.Append(errs, f.ConsumeTraces(ctx, td.Clone()))
}
for _, f := range f.passthrough {
errs = multierr.Append(errs, f.ConsumeTraces(ctx, td))
}

return errs
}
Loading