From b18d8a6ab9d643c71c8d1279e810b58fb409e82b Mon Sep 17 00:00:00 2001
From: Robert Fratto <robertfratto@gmail.com>
Date: Tue, 27 Sep 2022 16:01:15 -0400
Subject: [PATCH] component/otelcol/exporter: initial commit

This commit introduces a new package, component/otelcol/exporter, which
exposes a generic Flow component implementation which can run
OpenTelemetry Collector exporters.

There is some stuff left to do for this implementation to be complete:

* A Zap logging adapter needs to be created to correctly process logs
  from OpenTelemetry Collector components.
* Component-specific metrics are currently ignored.
* Component-specific traces are currently ignored.

All of the above will be done in separate PRs.

As of this commit, there are no registered `otelcol.exporter.*`
components. Implementations for OpenTelemetry Collector Flow components
will be done in future PRs.

Related to #2213.
---
 component/otelcol/consumer.go                 |  19 ++
 component/otelcol/exporter/exporter.go        | 165 +++++++++++++++++
 component/otelcol/exporter/exporter_test.go   | 170 ++++++++++++++++++
 .../internal/lazyconsumer/lazyconsumer.go     | 115 ++++++++++++
 component/otelcol/internal/scheduler/host.go  |  15 +-
 pkg/flow/componenttest/componenttest.go       |  16 +-
 6 files changed, 492 insertions(+), 8 deletions(-)
 create mode 100644 component/otelcol/consumer.go
 create mode 100644 component/otelcol/exporter/exporter.go
 create mode 100644 component/otelcol/exporter/exporter_test.go
 create mode 100644 component/otelcol/internal/lazyconsumer/lazyconsumer.go

diff --git a/component/otelcol/consumer.go b/component/otelcol/consumer.go
new file mode 100644
index 000000000000..6516ea541cb0
--- /dev/null
+++ b/component/otelcol/consumer.go
@@ -0,0 +1,19 @@
+package otelcol
+
+import (
+	otelconsumer "go.opentelemetry.io/collector/consumer"
+)
+
+// Consumer is a compbined OpenTelemetry Collector consumer which can consume
+// any telemetry signal.
+type Consumer interface {
+	otelconsumer.Traces
+	otelconsumer.Metrics
+	otelconsumer.Logs
+}
+
+// ConsumerExports is a common Exports type for Flow components which are
+// otelcol processors or otelcol exporters.
+type ConsumerExports struct {
+	Input Consumer `river:"input,attr"`
+}
diff --git a/component/otelcol/exporter/exporter.go b/component/otelcol/exporter/exporter.go
new file mode 100644
index 000000000000..eb1a55278d41
--- /dev/null
+++ b/component/otelcol/exporter/exporter.go
@@ -0,0 +1,165 @@
+// Package exporter exposes utilities to create a Flow component from
+// OpenTelemetry Collector exporters.
+package exporter
+
+import (
+	"context"
+	"errors"
+	"os"
+
+	"github.com/grafana/agent/component"
+	"github.com/grafana/agent/component/otelcol"
+	"github.com/grafana/agent/component/otelcol/internal/lazyconsumer"
+	"github.com/grafana/agent/component/otelcol/internal/scheduler"
+	"github.com/grafana/agent/pkg/build"
+	otelcomponent "go.opentelemetry.io/collector/component"
+	otelconfig "go.opentelemetry.io/collector/config"
+	"go.opentelemetry.io/otel/metric/nonrecording"
+	"go.opentelemetry.io/otel/trace"
+	"go.uber.org/zap"
+)
+
+// Arguments is an extension of component.Arguments which contains necessary
+// settings for OpenTelemetry Collector exporters.
+type Arguments interface {
+	component.Arguments
+
+	// Convert converts the Arguments into an OpenTelemetry Collector exporter
+	// configuration.
+	Convert() otelconfig.Exporter
+
+	// Extensions returns the set of extensions that the configured component is
+	// allowed to use.
+	Extensions() map[otelconfig.ComponentID]otelcomponent.Extension
+
+	// Exporters returns the set of exporters that are exposed to the configured
+	// component.
+	Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter
+}
+
+// Exporter is a Flow component shim which manages an OpenTelemetry Collector
+// exporter component.
+type Exporter struct {
+	ctx    context.Context
+	cancel context.CancelFunc
+
+	opts     component.Options
+	factory  otelcomponent.ExporterFactory
+	consumer *lazyconsumer.Consumer
+
+	sched *scheduler.Scheduler
+}
+
+var (
+	_ component.Component       = (*Exporter)(nil)
+	_ component.HealthComponent = (*Exporter)(nil)
+)
+
+// New creates a new Flow component which encapsulates an OpenTelemetry
+// Collector exporter. args must hold a value of the argument type registered
+// with the Flow component.
+//
+// The registered component must be registered to export the
+// otelcol.ConsumerExports type, otherwise New will panic.
+func New(opts component.Options, f otelcomponent.ExporterFactory, args Arguments) (*Exporter, error) {
+	ctx, cancel := context.WithCancel(context.Background())
+
+	consumer := lazyconsumer.New(ctx)
+
+	// Immediately set our state with our consumer. The exports will never change
+	// throughout the lifetime of our component.
+	//
+	// This will panic if the wrapping component is not registered to export
+	// otelcol.ConsumerExports.
+	opts.OnStateChange(otelcol.ConsumerExports{Input: consumer})
+
+	e := &Exporter{
+		ctx:    ctx,
+		cancel: cancel,
+
+		opts:     opts,
+		factory:  f,
+		consumer: consumer,
+
+		sched: scheduler.New(opts.Logger),
+	}
+	if err := e.Update(args); err != nil {
+		return nil, err
+	}
+	return e, nil
+}
+
+// Run starts the Exporter component.
+func (e *Exporter) Run(ctx context.Context) error {
+	defer e.cancel()
+	return e.sched.Run(ctx)
+}
+
+// Update implements component.Component. It will convert the Arguments into
+// configuration for OpenTelemetry Collector exporter configuration and manage
+// the underlying OpenTelemetry Collector exporter.
+func (e *Exporter) Update(args component.Arguments) error {
+	eargs := args.(Arguments)
+
+	host := scheduler.NewHost(e.opts.Logger)
+	host.SetExtensions(eargs.Extensions())
+	host.SetExporters(eargs.Exporters())
+
+	settings := otelcomponent.ExporterCreateSettings{
+		TelemetrySettings: otelcomponent.TelemetrySettings{
+			// TODO(rfratto): create an adapter from zap -> go-kit/log
+			Logger: zap.NewNop(),
+
+			// TODO(rfratto): expose tracing and logging statistics.
+			//
+			// We may want to put off tracing until we have native tracing
+			// instrumentation from Flow, but metrics should come sooner since we're
+			// already set up for supporting component-specific metrics.
+			TracerProvider: trace.NewNoopTracerProvider(),
+			MeterProvider:  nonrecording.NewNoopMeterProvider(),
+		},
+
+		BuildInfo: otelcomponent.BuildInfo{
+			Command:     os.Args[0],
+			Description: "Grafana Agent",
+			Version:     build.Version,
+		},
+	}
+
+	var exporterConfig = eargs.Convert()
+
+	// Create instances of the exporter from our factory for each of our
+	// supported telemetry signals.
+	var components []otelcomponent.Component
+
+	tracesExporter, err := e.factory.CreateTracesExporter(e.ctx, settings, exporterConfig)
+	if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
+		return err
+	} else if tracesExporter != nil {
+		components = append(components, tracesExporter)
+	}
+
+	metricsExporter, err := e.factory.CreateMetricsExporter(e.ctx, settings, exporterConfig)
+	if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
+		return err
+	} else if metricsExporter != nil {
+		components = append(components, metricsExporter)
+	}
+
+	logsExporter, err := e.factory.CreateLogsExporter(e.ctx, settings, exporterConfig)
+	if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
+		return err
+	} else if logsExporter != nil {
+		components = append(components, logsExporter)
+	}
+
+	// Schedule the components to run once our component is running.
+	e.sched.Schedule(host, components...)
+	e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter)
+	return nil
+}
+
+// CurrentHealth implements component.HealthComponent.
+func (e *Exporter) CurrentHealth() component.Health {
+	return e.sched.CurrentHealth()
+}
diff --git a/component/otelcol/exporter/exporter_test.go b/component/otelcol/exporter/exporter_test.go
new file mode 100644
index 000000000000..ed71acb29511
--- /dev/null
+++ b/component/otelcol/exporter/exporter_test.go
@@ -0,0 +1,170 @@
+package exporter_test
+
+import (
+	"context"
+	"testing"
+	"time"
+
+	"github.com/grafana/agent/component"
+	"github.com/grafana/agent/component/otelcol"
+	"github.com/grafana/agent/component/otelcol/exporter"
+	"github.com/grafana/agent/pkg/flow/componenttest"
+	"github.com/grafana/agent/pkg/util"
+	"github.com/stretchr/testify/require"
+	otelcomponent "go.opentelemetry.io/collector/component"
+	otelconfig "go.opentelemetry.io/collector/config"
+	otelconsumer "go.opentelemetry.io/collector/consumer"
+	"go.opentelemetry.io/collector/pdata/ptrace"
+)
+
+func TestExporter(t *testing.T) {
+	ctx := componenttest.TestContext(t)
+
+	// Channel where received traces will be written to.
+	tracesCh := make(chan ptrace.Traces, 1)
+
+	// Create an instance of a fake OpenTelemetry Collector exporter which our
+	// Flow component will wrap around.
+	innerExporter := &fakeExporter{
+		ConsumeTracesFunc: func(_ context.Context, td ptrace.Traces) error {
+			select {
+			case tracesCh <- td:
+			default:
+			}
+			return nil
+		},
+	}
+
+	// Create and start our Flow component. We then wait for it to export a
+	// consumer that we can send data to.
+	te := newTestEnvironment(t, innerExporter)
+	te.Start()
+
+	require.NoError(t, te.Controller.WaitExports(1*time.Second), "test component did not generate exports")
+	ce := te.Controller.Exports().(otelcol.ConsumerExports)
+
+	// Create a test set of traces and send it to our consumer in the background.
+	// We then wait for our channel to receive the traces, indicating that
+	// everything was wired up correctly.
+	testTraces := createTestTraces()
+	go func() {
+		err := ce.Input.ConsumeTraces(ctx, testTraces)
+		require.NoError(t, err)
+	}()
+
+	select {
+	case <-time.After(1 * time.Second):
+		require.FailNow(t, "testcomponent did not receive traces")
+	case td := <-tracesCh:
+		require.Equal(t, testTraces, td)
+	}
+}
+
+type testEnvironment struct {
+	t *testing.T
+
+	Controller *componenttest.Controller
+}
+
+func newTestEnvironment(t *testing.T, fe *fakeExporter) *testEnvironment {
+	t.Helper()
+
+	reg := component.Registration{
+		Name:    "testcomponent",
+		Args:    fakeExporterArgs{},
+		Exports: otelcol.ConsumerExports{},
+		Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
+			// Create a factory which always returns our instance of fakeExporter
+			// defined above.
+			factory := otelcomponent.NewExporterFactory(
+				"testcomponent",
+				func() otelconfig.Exporter {
+					return fakeExporterArgs{}.Convert()
+				},
+				otelcomponent.WithTracesExporter(func(ctx context.Context, ecs otelcomponent.ExporterCreateSettings, e otelconfig.Exporter) (otelcomponent.TracesExporter, error) {
+					return fe, nil
+				}),
+			)
+
+			return exporter.New(opts, factory, args.(exporter.Arguments))
+		},
+	}
+
+	return &testEnvironment{
+		t:          t,
+		Controller: componenttest.NewControllerFromReg(util.TestLogger(t), reg),
+	}
+}
+
+func (te *testEnvironment) Start() {
+	go func() {
+		ctx := componenttest.TestContext(te.t)
+		err := te.Controller.Run(ctx, fakeExporterArgs{})
+		require.NoError(te.t, err, "failed to run component")
+	}()
+}
+
+type fakeExporterArgs struct {
+}
+
+var _ exporter.Arguments = fakeExporterArgs{}
+
+func (fa fakeExporterArgs) Convert() otelconfig.Exporter {
+	settings := otelconfig.NewExporterSettings(otelconfig.NewComponentID("testcomponent"))
+	return &settings
+}
+
+func (fa fakeExporterArgs) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension {
+	return nil
+}
+
+func (fa fakeExporterArgs) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter {
+	return nil
+}
+
+type fakeExporter struct {
+	StartFunc         func(ctx context.Context, host otelcomponent.Host) error
+	ShutdownFunc      func(ctx context.Context) error
+	CapabilitiesFunc  func() otelconsumer.Capabilities
+	ConsumeTracesFunc func(ctx context.Context, td ptrace.Traces) error
+}
+
+var _ otelcomponent.TracesExporter = (*fakeExporter)(nil)
+
+func (fe *fakeExporter) Start(ctx context.Context, host otelcomponent.Host) error {
+	if fe.StartFunc != nil {
+		return fe.StartFunc(ctx, host)
+	}
+	return nil
+}
+
+func (fe *fakeExporter) Shutdown(ctx context.Context) error {
+	if fe.ShutdownFunc != nil {
+		return fe.ShutdownFunc(ctx)
+	}
+	return nil
+}
+
+func (fe *fakeExporter) Capabilities() otelconsumer.Capabilities {
+	if fe.CapabilitiesFunc != nil {
+		return fe.CapabilitiesFunc()
+	}
+	return otelconsumer.Capabilities{}
+}
+
+func (fe *fakeExporter) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
+	if fe.ConsumeTracesFunc != nil {
+		return fe.ConsumeTracesFunc(ctx, td)
+	}
+	return nil
+}
+
+func createTestTraces() ptrace.Traces {
+	data := ptrace.NewTraces()
+	rss := data.ResourceSpans().AppendEmpty()
+	ss := rss.ScopeSpans().AppendEmpty()
+	s := ss.Spans().AppendEmpty()
+	s.SetName("TestSpan")
+
+	return data
+}
diff --git a/component/otelcol/internal/lazyconsumer/lazyconsumer.go b/component/otelcol/internal/lazyconsumer/lazyconsumer.go
new file mode 100644
index 000000000000..32612387002d
--- /dev/null
+++ b/component/otelcol/internal/lazyconsumer/lazyconsumer.go
@@ -0,0 +1,115 @@
+// Package lazyconsumer implements a lazy OpenTelemetry Collector consumer
+// which can lazily forward request to another consumer implementation.
+package lazyconsumer
+
+import (
+	"context"
+	"sync"
+
+	otelcomponent "go.opentelemetry.io/collector/component"
+	otelconsumer "go.opentelemetry.io/collector/consumer"
+	"go.opentelemetry.io/collector/pdata/plog"
+	"go.opentelemetry.io/collector/pdata/pmetric"
+	"go.opentelemetry.io/collector/pdata/ptrace"
+)
+
+// Consumer is a lazily-loaded consumer.
+type Consumer struct {
+	ctx context.Context
+
+	mut             sync.RWMutex
+	metricsConsumer otelconsumer.Metrics
+	logsConsumer    otelconsumer.Logs
+	tracesConsumer  otelconsumer.Traces
+}
+
+var (
+	_ otelconsumer.Traces  = (*Consumer)(nil)
+	_ otelconsumer.Metrics = (*Consumer)(nil)
+	_ otelconsumer.Logs    = (*Consumer)(nil)
+)
+
+// New creates a new Consumer. The provided ctx is used to determine when the
+// Consumer should stop accepting data; if the ctx is closed, no further data
+// will be accepted.
+func New(ctx context.Context) *Consumer {
+	return &Consumer{ctx: ctx}
+}
+
+// Capabilities implements otelconsumer.baseConsumer.
+func (c *Consumer) Capabilities() otelconsumer.Capabilities {
+	return otelconsumer.Capabilities{
+		// MutatesData is always set to false; the lazy consumer will check the
+		// underlying consumer's capabilities prior to forwarding data and will
+		// pass a copy if the underlying consumer mutates data.
+		MutatesData: false,
+	}
+}
+
+// ConsumeTraces implements otelconsumer.Traces.
+func (c *Consumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
+	if c.ctx.Err() != nil {
+		return c.ctx.Err()
+	}
+
+	c.mut.RLock()
+	defer c.mut.RUnlock()
+
+	if c.tracesConsumer == nil {
+		return otelcomponent.ErrDataTypeIsNotSupported
+	}
+
+	if c.tracesConsumer.Capabilities().MutatesData {
+		td = td.Clone()
+	}
+	return c.tracesConsumer.ConsumeTraces(ctx, td)
+}
+
+// ConsumeMetrics implements otelconsumer.Metrics.
+func (c *Consumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
+	if c.ctx.Err() != nil {
+		return c.ctx.Err()
+	}
+
+	c.mut.RLock()
+	defer c.mut.RUnlock()
+
+	if c.metricsConsumer == nil {
+		return otelcomponent.ErrDataTypeIsNotSupported
+	}
+
+	if c.metricsConsumer.Capabilities().MutatesData {
+		md = md.Clone()
+	}
+	return c.metricsConsumer.ConsumeMetrics(ctx, md)
+}
+
+// ConsumeLogs implements otelconsumer.Logs.
+func (c *Consumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
+	if c.ctx.Err() != nil {
+		return c.ctx.Err()
+	}
+
+	c.mut.RLock()
+	defer c.mut.RUnlock()
+
+	if c.logsConsumer == nil {
+		return otelcomponent.ErrDataTypeIsNotSupported
+	}
+
+	if c.logsConsumer.Capabilities().MutatesData {
+		ld = ld.Clone()
+	}
+	return c.logsConsumer.ConsumeLogs(ctx, ld)
+}
+
+// SetConsumers updates the internal consumers that Consumer will forward data
+// to. It is valid for any combination of m, l, and t to be nil.
+func (c *Consumer) SetConsumers(t otelconsumer.Traces, m otelconsumer.Metrics, l otelconsumer.Logs) {
+	c.mut.Lock()
+	defer c.mut.Unlock()
+
+	c.metricsConsumer = m
+	c.logsConsumer = l
+	c.tracesConsumer = t
+}
diff --git a/component/otelcol/internal/scheduler/host.go b/component/otelcol/internal/scheduler/host.go
index 140a66502074..33e0d3e1b837 100644
--- a/component/otelcol/internal/scheduler/host.go
+++ b/component/otelcol/internal/scheduler/host.go
@@ -12,9 +12,6 @@ import (
 type Host struct {
 	log log.Logger
 
-	// TODO(rfratto): allow the below fields below to be used. For now they're
-	// always nil.
-
 	extensions map[otelconfig.ComponentID]otelcomponent.Extension
 	exporters  map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter
 }
@@ -26,6 +23,18 @@ func NewHost(l log.Logger) *Host {
 
 var _ otelcomponent.Host = (*Host)(nil)
 
+// SetExtensions sets the extensions available from the Host. It is not valid
+// to call this after the Host is in use.
+func (h *Host) SetExtensions(extensions map[otelconfig.ComponentID]otelcomponent.Extension) {
+	h.extensions = extensions
+}
+
+// SetExporters sets the exporters available from the Host. It is not valid
+// to call this after the Host is in use.
+func (h *Host) SetExporters(exporters map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter) {
+	h.exporters = exporters
+}
+
 // ReportFatalError implements otelcomponent.Host.
 func (h *Host) ReportFatalError(err error) {
 	level.Error(h.log).Log("msg", "fatal error running component", "err", err)
diff --git a/pkg/flow/componenttest/componenttest.go b/pkg/flow/componenttest/componenttest.go
index 75b8ebc829b5..7d9a7c33fe66 100644
--- a/pkg/flow/componenttest/componenttest.go
+++ b/pkg/flow/componenttest/componenttest.go
@@ -34,14 +34,20 @@ type Controller struct {
 // NewControllerFromID returns a new testing Controller for the component with
 // the provided name.
 func NewControllerFromID(l log.Logger, componentName string) (*Controller, error) {
-	if l == nil {
-		l = log.NewNopLogger()
-	}
-
 	reg, ok := component.Get(componentName)
 	if !ok {
 		return nil, fmt.Errorf("no such component %q", componentName)
 	}
+	return NewControllerFromReg(l, reg), nil
+}
+
+// NewControllerFromReg registers a new testing Controller for a component with
+// the given registration. This can be used for testing fake components which
+// aren't really registered.
+func NewControllerFromReg(l log.Logger, reg component.Registration) *Controller {
+	if l == nil {
+		l = log.NewNopLogger()
+	}
 
 	return &Controller{
 		reg: reg,
@@ -49,7 +55,7 @@ func NewControllerFromID(l log.Logger, componentName string) (*Controller, error
 
 		running:   make(chan struct{}, 1),
 		exportsCh: make(chan struct{}, 1),
-	}, nil
+	}
 }
 
 func (c *Controller) onStateChange(e component.Exports) {