Skip to content

Commit

Permalink
component/otelcol/receiver: create receiver component abstraction
Browse files Browse the repository at this point in the history
This commit introduces a new package, component/otelcol/receiver, which
exposes a generic Flow component implementation which can run
OpenTelemetry Collector receiver.

Like grafana#2227, it leaves some work unfinished for future PRs:

* A Zap logging adapter needs to be created to correctly process logs
  from OpenTelemetry Collector components.
* Component-specific metrics are currently ignored.
* Component-specific traces are currently ignored.

As of this commit, there are no registered `otelcol.receiver.*`
components. Implementations for OpenTelemetry Collector Flow components
will be done in future PRs.

Related to grafana#2213.
  • Loading branch information
rfratto committed Sep 30, 2022
1 parent 0332eda commit ed7bd00
Show file tree
Hide file tree
Showing 8 changed files with 588 additions and 1 deletion.
11 changes: 11 additions & 0 deletions component/otelcol/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@ type Consumer interface {
otelconsumer.Logs
}

// ConsumerArguments is a common Arguments type for Flow components which can
// send data to otelcol consumers.
//
// It is expected to use ConsumerArguments as a block within the top-level
// arguments block for a component.
type ConsumerArguments struct {
Metrics []Consumer `river:"metrics,attr,optional"`
Logs []Consumer `river:"logs,attr,optional"`
Traces []Consumer `river:"traces,attr,optional"`
}

// ConsumerExports is a common Exports type for Flow components which are
// otelcol processors or otelcol exporters.
type ConsumerExports struct {
Expand Down
2 changes: 1 addition & 1 deletion component/otelcol/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (e *Exporter) Update(args component.Arguments) error {
},
}

var exporterConfig = eargs.Convert()
exporterConfig := eargs.Convert()

// Create instances of the exporter from our factory for each of our
// supported telemetry signals.
Expand Down
51 changes: 51 additions & 0 deletions component/otelcol/internal/fakeconsumer/fake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package fakeconsumer

import (
"context"

"github.com/grafana/agent/component/otelcol"
otelconsumer "go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

type Consumer struct {
CapabilitiesFunc func() otelconsumer.Capabilities
ConsumeTracesFunc func(context.Context, ptrace.Traces) error
ConsumeMetricsFunc func(context.Context, pmetric.Metrics) error
ConsumeLogsFunc func(context.Context, plog.Logs) error
}

var _ otelcol.Consumer = (*Consumer)(nil)

func (c *Consumer) Capabilities() otelconsumer.Capabilities {
if c.CapabilitiesFunc != nil {
return c.CapabilitiesFunc()
}

// We don't know what the fake implementation will do, so return true just
// in case it mutates data.
return otelconsumer.Capabilities{MutatesData: true}
}

func (c *Consumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
if c.ConsumeTracesFunc != nil {
return c.ConsumeTracesFunc(ctx, td)
}
return nil
}

func (c *Consumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
if c.ConsumeMetricsFunc != nil {
return c.ConsumeMetricsFunc(ctx, md)
}
return nil
}

func (c *Consumer) ConsumeLogs(ctx context.Context, md plog.Logs) error {
if c.ConsumeLogsFunc != nil {
return c.ConsumeLogsFunc(ctx, md)
}
return nil
}
77 changes: 77 additions & 0 deletions component/otelcol/internal/fanoutconsumer/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package fanoutconsumer

// This file is a near copy of
// https://github.com/open-telemetry/opentelemetry-collector/blob/v0.54.0/service/internal/fanoutconsumer/logs.go
//
// A copy was made because the upstream package is internal. If it is ever made
// public, our copy can be removed.

import (
"context"

"github.com/grafana/agent/component/otelcol"
otelconsumer "go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/multierr"
)

// Logs creates a new fanout consumer for logs.
func Logs(in []otelcol.Consumer) otelconsumer.Logs {
if len(in) == 1 {
return in[0]
}

var passthrough, clone []otelconsumer.Logs

// Iterate through all the consumers besides the last.
for i := 0; i < len(in)-1; i++ {
consumer := in[i]

if consumer.Capabilities().MutatesData {
clone = append(clone, consumer)
} else {
passthrough = append(passthrough, consumer)
}
}

last := in[len(in)-1]

// The final consumer can be given to the passthrough list regardless of
// whether it mutates as long as there's no other read-only consumers.
if len(passthrough) == 0 || !last.Capabilities().MutatesData {
passthrough = append(passthrough, last)
} else {
clone = append(clone, last)
}

return &logsFanout{
passthrough: passthrough,
clone: clone,
}
}

type logsFanout struct {
passthrough []otelconsumer.Logs // Consumers where data can be passed through directly
clone []otelconsumer.Logs // Consumes which require cloning data
}

func (f *logsFanout) Capabilities() otelconsumer.Capabilities {
return otelconsumer.Capabilities{MutatesData: false}
}

// ConsumeLogs exports the pmetric.Logs to all consumers wrapped by the current one.
func (f *logsFanout) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
var errs error

// Initially pass to clone exporter to avoid the case where the optimization
// of sending the incoming data to a mutating consumer is used that may
// change the incoming data before cloning.
for _, f := range f.clone {
errs = multierr.Append(errs, f.ConsumeLogs(ctx, ld.Clone()))
}
for _, f := range f.passthrough {
errs = multierr.Append(errs, f.ConsumeLogs(ctx, ld))
}

return errs
}
77 changes: 77 additions & 0 deletions component/otelcol/internal/fanoutconsumer/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package fanoutconsumer

// This file is a near copy of
// https://github.com/open-telemetry/opentelemetry-collector/blob/v0.54.0/service/internal/fanoutconsumer/metrics.go
//
// A copy was made because the upstream package is internal. If it is ever made
// public, our copy can be removed.

import (
"context"

"github.com/grafana/agent/component/otelcol"
otelconsumer "go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/multierr"
)

// Metrics creates a new fanout consumer for metrics.
func Metrics(in []otelcol.Consumer) otelconsumer.Metrics {
if len(in) == 1 {
return in[0]
}

var passthrough, clone []otelconsumer.Metrics

// Iterate through all the consumers besides the last.
for i := 0; i < len(in)-1; i++ {
consumer := in[i]

if consumer.Capabilities().MutatesData {
clone = append(clone, consumer)
} else {
passthrough = append(passthrough, consumer)
}
}

last := in[len(in)-1]

// The final consumer can be given to the passthrough list regardless of
// whether it mutates as long as there's no other read-only consumers.
if len(passthrough) == 0 || !last.Capabilities().MutatesData {
passthrough = append(passthrough, last)
} else {
clone = append(clone, last)
}

return &metricsFanout{
passthrough: passthrough,
clone: clone,
}
}

type metricsFanout struct {
passthrough []otelconsumer.Metrics // Consumers where data can be passed through directly
clone []otelconsumer.Metrics // Consumes which require cloning data
}

func (f *metricsFanout) Capabilities() otelconsumer.Capabilities {
return otelconsumer.Capabilities{MutatesData: false}
}

// ConsumeMetrics exports the pmetric.Metrics to all consumers wrapped by the current one.
func (f *metricsFanout) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
var errs error

// Initially pass to clone exporter to avoid the case where the optimization
// of sending the incoming data to a mutating consumer is used that may
// change the incoming data before cloning.
for _, f := range f.clone {
errs = multierr.Append(errs, f.ConsumeMetrics(ctx, md.Clone()))
}
for _, f := range f.passthrough {
errs = multierr.Append(errs, f.ConsumeMetrics(ctx, md))
}

return errs
}
77 changes: 77 additions & 0 deletions component/otelcol/internal/fanoutconsumer/traces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package fanoutconsumer

// This file is a near copy of
// https://github.com/open-telemetry/opentelemetry-collector/blob/v0.54.0/service/internal/fanoutconsumer/traces.go
//
// A copy was made because the upstream package is internal. If it is ever made
// public, our copy can be removed.

import (
"context"

"github.com/grafana/agent/component/otelcol"
otelconsumer "go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/multierr"
)

// Traces creates a new fanout consumer for traces.
func Traces(in []otelcol.Consumer) otelconsumer.Traces {
if len(in) == 1 {
return in[0]
}

var passthrough, clone []otelconsumer.Traces

// Iterate through all the consumers besides the last.
for i := 0; i < len(in)-1; i++ {
consumer := in[i]

if consumer.Capabilities().MutatesData {
clone = append(clone, consumer)
} else {
passthrough = append(passthrough, consumer)
}
}

last := in[len(in)-1]

// The final consumer can be given to the passthrough list regardless of
// whether it mutates as long as there's no other read-only consumers.
if len(passthrough) == 0 || !last.Capabilities().MutatesData {
passthrough = append(passthrough, last)
} else {
clone = append(clone, last)
}

return &tracesFanout{
passthrough: passthrough,
clone: clone,
}
}

type tracesFanout struct {
passthrough []otelconsumer.Traces // Consumers where data can be passed through directly
clone []otelconsumer.Traces // Consumes which require cloning data
}

func (f *tracesFanout) Capabilities() otelconsumer.Capabilities {
return otelconsumer.Capabilities{MutatesData: false}
}

// ConsumeTraces exports the pmetric.Traces to all consumers wrapped by the current one.
func (f *tracesFanout) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
var errs error

// Initially pass to clone exporter to avoid the case where the optimization
// of sending the incoming data to a mutating consumer is used that may
// change the incoming data before cloning.
for _, f := range f.clone {
errs = multierr.Append(errs, f.ConsumeTraces(ctx, td.Clone()))
}
for _, f := range f.passthrough {
errs = multierr.Append(errs, f.ConsumeTraces(ctx, td))
}

return errs
}
Loading

0 comments on commit ed7bd00

Please sign in to comment.