Skip to content

Commit

Permalink
component/otelcol/exporter: initial commit
Browse files Browse the repository at this point in the history
This commit introduces a new package, component/otelcol/exporter, which
exposes a generic Flow component implementation which can run
OpenTelemetry Collector exporters.

There is some stuff left to do for this implementation to be complete:

* A Zap logging adapter needs to be created to correctly process logs
  from OpenTelemetry Collector components.
* Component-specific metrics are currently ignored.
* Component-specific traces are currently ignored.

All of the above will be done in separate PRs.

As of this commit, there are no registered `otelcol.exporter.*`
components. Implementations for OpenTelemetry Collector Flow components
will be done in future PRs.

Related to grafana#2213.
  • Loading branch information
rfratto committed Sep 27, 2022
1 parent 5c6b4a8 commit b18d8a6
Show file tree
Hide file tree
Showing 6 changed files with 492 additions and 8 deletions.
19 changes: 19 additions & 0 deletions component/otelcol/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package otelcol

import (
otelconsumer "go.opentelemetry.io/collector/consumer"
)

// Consumer is a compbined OpenTelemetry Collector consumer which can consume
// any telemetry signal.
type Consumer interface {
otelconsumer.Traces
otelconsumer.Metrics
otelconsumer.Logs
}

// ConsumerExports is a common Exports type for Flow components which are
// otelcol processors or otelcol exporters.
type ConsumerExports struct {
Input Consumer `river:"input,attr"`
}
165 changes: 165 additions & 0 deletions component/otelcol/exporter/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Package exporter exposes utilities to create a Flow component from
// OpenTelemetry Collector exporters.
package exporter

import (
"context"
"errors"
"os"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/internal/lazyconsumer"
"github.com/grafana/agent/component/otelcol/internal/scheduler"
"github.com/grafana/agent/pkg/build"
otelcomponent "go.opentelemetry.io/collector/component"
otelconfig "go.opentelemetry.io/collector/config"
"go.opentelemetry.io/otel/metric/nonrecording"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

// Arguments is an extension of component.Arguments which contains necessary
// settings for OpenTelemetry Collector exporters.
type Arguments interface {
component.Arguments

// Convert converts the Arguments into an OpenTelemetry Collector exporter
// configuration.
Convert() otelconfig.Exporter

// Extensions returns the set of extensions that the configured component is
// allowed to use.
Extensions() map[otelconfig.ComponentID]otelcomponent.Extension

// Exporters returns the set of exporters that are exposed to the configured
// component.
Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter
}

// Exporter is a Flow component shim which manages an OpenTelemetry Collector
// exporter component.
type Exporter struct {
ctx context.Context
cancel context.CancelFunc

opts component.Options
factory otelcomponent.ExporterFactory
consumer *lazyconsumer.Consumer

sched *scheduler.Scheduler
}

var (
_ component.Component = (*Exporter)(nil)
_ component.HealthComponent = (*Exporter)(nil)
)

// New creates a new Flow component which encapsulates an OpenTelemetry
// Collector exporter. args must hold a value of the argument type registered
// with the Flow component.
//
// The registered component must be registered to export the
// otelcol.ConsumerExports type, otherwise New will panic.
func New(opts component.Options, f otelcomponent.ExporterFactory, args Arguments) (*Exporter, error) {
ctx, cancel := context.WithCancel(context.Background())

consumer := lazyconsumer.New(ctx)

// Immediately set our state with our consumer. The exports will never change
// throughout the lifetime of our component.
//
// This will panic if the wrapping component is not registered to export
// otelcol.ConsumerExports.
opts.OnStateChange(otelcol.ConsumerExports{Input: consumer})

e := &Exporter{
ctx: ctx,
cancel: cancel,

opts: opts,
factory: f,
consumer: consumer,

sched: scheduler.New(opts.Logger),
}
if err := e.Update(args); err != nil {
return nil, err
}
return e, nil
}

// Run starts the Exporter component.
func (e *Exporter) Run(ctx context.Context) error {
defer e.cancel()
return e.sched.Run(ctx)
}

// Update implements component.Component. It will convert the Arguments into
// configuration for OpenTelemetry Collector exporter configuration and manage
// the underlying OpenTelemetry Collector exporter.
func (e *Exporter) Update(args component.Arguments) error {
eargs := args.(Arguments)

host := scheduler.NewHost(e.opts.Logger)
host.SetExtensions(eargs.Extensions())
host.SetExporters(eargs.Exporters())

settings := otelcomponent.ExporterCreateSettings{
TelemetrySettings: otelcomponent.TelemetrySettings{
// TODO(rfratto): create an adapter from zap -> go-kit/log
Logger: zap.NewNop(),

// TODO(rfratto): expose tracing and logging statistics.
//
// We may want to put off tracing until we have native tracing
// instrumentation from Flow, but metrics should come sooner since we're
// already set up for supporting component-specific metrics.
TracerProvider: trace.NewNoopTracerProvider(),
MeterProvider: nonrecording.NewNoopMeterProvider(),
},

BuildInfo: otelcomponent.BuildInfo{
Command: os.Args[0],
Description: "Grafana Agent",
Version: build.Version,
},
}

var exporterConfig = eargs.Convert()

// Create instances of the exporter from our factory for each of our
// supported telemetry signals.
var components []otelcomponent.Component

tracesExporter, err := e.factory.CreateTracesExporter(e.ctx, settings, exporterConfig)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
} else if tracesExporter != nil {
components = append(components, tracesExporter)
}

metricsExporter, err := e.factory.CreateMetricsExporter(e.ctx, settings, exporterConfig)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
} else if metricsExporter != nil {
components = append(components, metricsExporter)
}

logsExporter, err := e.factory.CreateLogsExporter(e.ctx, settings, exporterConfig)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
} else if logsExporter != nil {
components = append(components, logsExporter)
}

// Schedule the components to run once our component is running.
e.sched.Schedule(host, components...)
e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter)
return nil
}

// CurrentHealth implements component.HealthComponent.
func (e *Exporter) CurrentHealth() component.Health {
return e.sched.CurrentHealth()
}
170 changes: 170 additions & 0 deletions component/otelcol/exporter/exporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package exporter_test

import (
"context"
"testing"
"time"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/exporter"
"github.com/grafana/agent/pkg/flow/componenttest"
"github.com/grafana/agent/pkg/util"
"github.com/stretchr/testify/require"
otelcomponent "go.opentelemetry.io/collector/component"
otelconfig "go.opentelemetry.io/collector/config"
otelconsumer "go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func TestExporter(t *testing.T) {
ctx := componenttest.TestContext(t)

// Channel where received traces will be written to.
tracesCh := make(chan ptrace.Traces, 1)

// Create an instance of a fake OpenTelemetry Collector exporter which our
// Flow component will wrap around.
innerExporter := &fakeExporter{
ConsumeTracesFunc: func(_ context.Context, td ptrace.Traces) error {
select {
case tracesCh <- td:
default:
}
return nil
},
}

// Create and start our Flow component. We then wait for it to export a
// consumer that we can send data to.
te := newTestEnvironment(t, innerExporter)
te.Start()

require.NoError(t, te.Controller.WaitExports(1*time.Second), "test component did not generate exports")
ce := te.Controller.Exports().(otelcol.ConsumerExports)

// Create a test set of traces and send it to our consumer in the background.
// We then wait for our channel to receive the traces, indicating that
// everything was wired up correctly.
testTraces := createTestTraces()
go func() {
err := ce.Input.ConsumeTraces(ctx, testTraces)
require.NoError(t, err)
}()

select {
case <-time.After(1 * time.Second):
require.FailNow(t, "testcomponent did not receive traces")
case td := <-tracesCh:
require.Equal(t, testTraces, td)
}
}

type testEnvironment struct {
t *testing.T

Controller *componenttest.Controller
}

func newTestEnvironment(t *testing.T, fe *fakeExporter) *testEnvironment {
t.Helper()

reg := component.Registration{
Name: "testcomponent",
Args: fakeExporterArgs{},
Exports: otelcol.ConsumerExports{},
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
// Create a factory which always returns our instance of fakeExporter
// defined above.
factory := otelcomponent.NewExporterFactory(
"testcomponent",
func() otelconfig.Exporter {
return fakeExporterArgs{}.Convert()
},
otelcomponent.WithTracesExporter(func(ctx context.Context, ecs otelcomponent.ExporterCreateSettings, e otelconfig.Exporter) (otelcomponent.TracesExporter, error) {
return fe, nil
}),
)

return exporter.New(opts, factory, args.(exporter.Arguments))
},
}

return &testEnvironment{
t: t,
Controller: componenttest.NewControllerFromReg(util.TestLogger(t), reg),
}
}

func (te *testEnvironment) Start() {
go func() {
ctx := componenttest.TestContext(te.t)
err := te.Controller.Run(ctx, fakeExporterArgs{})
require.NoError(te.t, err, "failed to run component")
}()
}

type fakeExporterArgs struct {
}

var _ exporter.Arguments = fakeExporterArgs{}

func (fa fakeExporterArgs) Convert() otelconfig.Exporter {
settings := otelconfig.NewExporterSettings(otelconfig.NewComponentID("testcomponent"))
return &settings
}

func (fa fakeExporterArgs) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension {
return nil
}

func (fa fakeExporterArgs) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter {
return nil
}

type fakeExporter struct {
StartFunc func(ctx context.Context, host otelcomponent.Host) error
ShutdownFunc func(ctx context.Context) error
CapabilitiesFunc func() otelconsumer.Capabilities
ConsumeTracesFunc func(ctx context.Context, td ptrace.Traces) error
}

var _ otelcomponent.TracesExporter = (*fakeExporter)(nil)

func (fe *fakeExporter) Start(ctx context.Context, host otelcomponent.Host) error {
if fe.StartFunc != nil {
return fe.StartFunc(ctx, host)
}
return nil
}

func (fe *fakeExporter) Shutdown(ctx context.Context) error {
if fe.ShutdownFunc != nil {
return fe.ShutdownFunc(ctx)
}
return nil
}

func (fe *fakeExporter) Capabilities() otelconsumer.Capabilities {
if fe.CapabilitiesFunc != nil {
return fe.CapabilitiesFunc()
}
return otelconsumer.Capabilities{}
}

func (fe *fakeExporter) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
if fe.ConsumeTracesFunc != nil {
return fe.ConsumeTracesFunc(ctx, td)
}
return nil
}

func createTestTraces() ptrace.Traces {
data := ptrace.NewTraces()
rss := data.ResourceSpans().AppendEmpty()
ss := rss.ScopeSpans().AppendEmpty()
s := ss.Spans().AppendEmpty()
s.SetName("TestSpan")

return data
}
Loading

0 comments on commit b18d8a6

Please sign in to comment.