diff --git a/component/otelcol/consumer.go b/component/otelcol/consumer.go new file mode 100644 index 000000000000..6516ea541cb0 --- /dev/null +++ b/component/otelcol/consumer.go @@ -0,0 +1,19 @@ +package otelcol + +import ( + otelconsumer "go.opentelemetry.io/collector/consumer" +) + +// Consumer is a compbined OpenTelemetry Collector consumer which can consume +// any telemetry signal. +type Consumer interface { + otelconsumer.Traces + otelconsumer.Metrics + otelconsumer.Logs +} + +// ConsumerExports is a common Exports type for Flow components which are +// otelcol processors or otelcol exporters. +type ConsumerExports struct { + Input Consumer `river:"input,attr"` +} diff --git a/component/otelcol/exporter/exporter.go b/component/otelcol/exporter/exporter.go new file mode 100644 index 000000000000..eb1a55278d41 --- /dev/null +++ b/component/otelcol/exporter/exporter.go @@ -0,0 +1,165 @@ +// Package exporter exposes utilities to create a Flow component from +// OpenTelemetry Collector exporters. +package exporter + +import ( + "context" + "errors" + "os" + + "github.com/grafana/agent/component" + "github.com/grafana/agent/component/otelcol" + "github.com/grafana/agent/component/otelcol/internal/lazyconsumer" + "github.com/grafana/agent/component/otelcol/internal/scheduler" + "github.com/grafana/agent/pkg/build" + otelcomponent "go.opentelemetry.io/collector/component" + otelconfig "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/otel/metric/nonrecording" + "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" +) + +// Arguments is an extension of component.Arguments which contains necessary +// settings for OpenTelemetry Collector exporters. +type Arguments interface { + component.Arguments + + // Convert converts the Arguments into an OpenTelemetry Collector exporter + // configuration. + Convert() otelconfig.Exporter + + // Extensions returns the set of extensions that the configured component is + // allowed to use. + Extensions() map[otelconfig.ComponentID]otelcomponent.Extension + + // Exporters returns the set of exporters that are exposed to the configured + // component. + Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter +} + +// Exporter is a Flow component shim which manages an OpenTelemetry Collector +// exporter component. +type Exporter struct { + ctx context.Context + cancel context.CancelFunc + + opts component.Options + factory otelcomponent.ExporterFactory + consumer *lazyconsumer.Consumer + + sched *scheduler.Scheduler +} + +var ( + _ component.Component = (*Exporter)(nil) + _ component.HealthComponent = (*Exporter)(nil) +) + +// New creates a new Flow component which encapsulates an OpenTelemetry +// Collector exporter. args must hold a value of the argument type registered +// with the Flow component. +// +// The registered component must be registered to export the +// otelcol.ConsumerExports type, otherwise New will panic. +func New(opts component.Options, f otelcomponent.ExporterFactory, args Arguments) (*Exporter, error) { + ctx, cancel := context.WithCancel(context.Background()) + + consumer := lazyconsumer.New(ctx) + + // Immediately set our state with our consumer. The exports will never change + // throughout the lifetime of our component. + // + // This will panic if the wrapping component is not registered to export + // otelcol.ConsumerExports. + opts.OnStateChange(otelcol.ConsumerExports{Input: consumer}) + + e := &Exporter{ + ctx: ctx, + cancel: cancel, + + opts: opts, + factory: f, + consumer: consumer, + + sched: scheduler.New(opts.Logger), + } + if err := e.Update(args); err != nil { + return nil, err + } + return e, nil +} + +// Run starts the Exporter component. +func (e *Exporter) Run(ctx context.Context) error { + defer e.cancel() + return e.sched.Run(ctx) +} + +// Update implements component.Component. It will convert the Arguments into +// configuration for OpenTelemetry Collector exporter configuration and manage +// the underlying OpenTelemetry Collector exporter. +func (e *Exporter) Update(args component.Arguments) error { + eargs := args.(Arguments) + + host := scheduler.NewHost(e.opts.Logger) + host.SetExtensions(eargs.Extensions()) + host.SetExporters(eargs.Exporters()) + + settings := otelcomponent.ExporterCreateSettings{ + TelemetrySettings: otelcomponent.TelemetrySettings{ + // TODO(rfratto): create an adapter from zap -> go-kit/log + Logger: zap.NewNop(), + + // TODO(rfratto): expose tracing and logging statistics. + // + // We may want to put off tracing until we have native tracing + // instrumentation from Flow, but metrics should come sooner since we're + // already set up for supporting component-specific metrics. + TracerProvider: trace.NewNoopTracerProvider(), + MeterProvider: nonrecording.NewNoopMeterProvider(), + }, + + BuildInfo: otelcomponent.BuildInfo{ + Command: os.Args[0], + Description: "Grafana Agent", + Version: build.Version, + }, + } + + var exporterConfig = eargs.Convert() + + // Create instances of the exporter from our factory for each of our + // supported telemetry signals. + var components []otelcomponent.Component + + tracesExporter, err := e.factory.CreateTracesExporter(e.ctx, settings, exporterConfig) + if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { + return err + } else if tracesExporter != nil { + components = append(components, tracesExporter) + } + + metricsExporter, err := e.factory.CreateMetricsExporter(e.ctx, settings, exporterConfig) + if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { + return err + } else if metricsExporter != nil { + components = append(components, metricsExporter) + } + + logsExporter, err := e.factory.CreateLogsExporter(e.ctx, settings, exporterConfig) + if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { + return err + } else if logsExporter != nil { + components = append(components, logsExporter) + } + + // Schedule the components to run once our component is running. + e.sched.Schedule(host, components...) + e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter) + return nil +} + +// CurrentHealth implements component.HealthComponent. +func (e *Exporter) CurrentHealth() component.Health { + return e.sched.CurrentHealth() +} diff --git a/component/otelcol/exporter/exporter_test.go b/component/otelcol/exporter/exporter_test.go new file mode 100644 index 000000000000..ed71acb29511 --- /dev/null +++ b/component/otelcol/exporter/exporter_test.go @@ -0,0 +1,170 @@ +package exporter_test + +import ( + "context" + "testing" + "time" + + "github.com/grafana/agent/component" + "github.com/grafana/agent/component/otelcol" + "github.com/grafana/agent/component/otelcol/exporter" + "github.com/grafana/agent/pkg/flow/componenttest" + "github.com/grafana/agent/pkg/util" + "github.com/stretchr/testify/require" + otelcomponent "go.opentelemetry.io/collector/component" + otelconfig "go.opentelemetry.io/collector/config" + otelconsumer "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func TestExporter(t *testing.T) { + ctx := componenttest.TestContext(t) + + // Channel where received traces will be written to. + tracesCh := make(chan ptrace.Traces, 1) + + // Create an instance of a fake OpenTelemetry Collector exporter which our + // Flow component will wrap around. + innerExporter := &fakeExporter{ + ConsumeTracesFunc: func(_ context.Context, td ptrace.Traces) error { + select { + case tracesCh <- td: + default: + } + return nil + }, + } + + // Create and start our Flow component. We then wait for it to export a + // consumer that we can send data to. + te := newTestEnvironment(t, innerExporter) + te.Start() + + require.NoError(t, te.Controller.WaitExports(1*time.Second), "test component did not generate exports") + ce := te.Controller.Exports().(otelcol.ConsumerExports) + + // Create a test set of traces and send it to our consumer in the background. + // We then wait for our channel to receive the traces, indicating that + // everything was wired up correctly. + testTraces := createTestTraces() + go func() { + err := ce.Input.ConsumeTraces(ctx, testTraces) + require.NoError(t, err) + }() + + select { + case <-time.After(1 * time.Second): + require.FailNow(t, "testcomponent did not receive traces") + case td := <-tracesCh: + require.Equal(t, testTraces, td) + } +} + +type testEnvironment struct { + t *testing.T + + Controller *componenttest.Controller +} + +func newTestEnvironment(t *testing.T, fe *fakeExporter) *testEnvironment { + t.Helper() + + reg := component.Registration{ + Name: "testcomponent", + Args: fakeExporterArgs{}, + Exports: otelcol.ConsumerExports{}, + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + // Create a factory which always returns our instance of fakeExporter + // defined above. + factory := otelcomponent.NewExporterFactory( + "testcomponent", + func() otelconfig.Exporter { + return fakeExporterArgs{}.Convert() + }, + otelcomponent.WithTracesExporter(func(ctx context.Context, ecs otelcomponent.ExporterCreateSettings, e otelconfig.Exporter) (otelcomponent.TracesExporter, error) { + return fe, nil + }), + ) + + return exporter.New(opts, factory, args.(exporter.Arguments)) + }, + } + + return &testEnvironment{ + t: t, + Controller: componenttest.NewControllerFromReg(util.TestLogger(t), reg), + } +} + +func (te *testEnvironment) Start() { + go func() { + ctx := componenttest.TestContext(te.t) + err := te.Controller.Run(ctx, fakeExporterArgs{}) + require.NoError(te.t, err, "failed to run component") + }() +} + +type fakeExporterArgs struct { +} + +var _ exporter.Arguments = fakeExporterArgs{} + +func (fa fakeExporterArgs) Convert() otelconfig.Exporter { + settings := otelconfig.NewExporterSettings(otelconfig.NewComponentID("testcomponent")) + return &settings +} + +func (fa fakeExporterArgs) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension { + return nil +} + +func (fa fakeExporterArgs) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter { + return nil +} + +type fakeExporter struct { + StartFunc func(ctx context.Context, host otelcomponent.Host) error + ShutdownFunc func(ctx context.Context) error + CapabilitiesFunc func() otelconsumer.Capabilities + ConsumeTracesFunc func(ctx context.Context, td ptrace.Traces) error +} + +var _ otelcomponent.TracesExporter = (*fakeExporter)(nil) + +func (fe *fakeExporter) Start(ctx context.Context, host otelcomponent.Host) error { + if fe.StartFunc != nil { + return fe.StartFunc(ctx, host) + } + return nil +} + +func (fe *fakeExporter) Shutdown(ctx context.Context) error { + if fe.ShutdownFunc != nil { + return fe.ShutdownFunc(ctx) + } + return nil +} + +func (fe *fakeExporter) Capabilities() otelconsumer.Capabilities { + if fe.CapabilitiesFunc != nil { + return fe.CapabilitiesFunc() + } + return otelconsumer.Capabilities{} +} + +func (fe *fakeExporter) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + if fe.ConsumeTracesFunc != nil { + return fe.ConsumeTracesFunc(ctx, td) + } + return nil +} + +func createTestTraces() ptrace.Traces { + data := ptrace.NewTraces() + rss := data.ResourceSpans().AppendEmpty() + ss := rss.ScopeSpans().AppendEmpty() + s := ss.Spans().AppendEmpty() + s.SetName("TestSpan") + + return data +} diff --git a/component/otelcol/internal/lazyconsumer/lazyconsumer.go b/component/otelcol/internal/lazyconsumer/lazyconsumer.go new file mode 100644 index 000000000000..32612387002d --- /dev/null +++ b/component/otelcol/internal/lazyconsumer/lazyconsumer.go @@ -0,0 +1,115 @@ +// Package lazyconsumer implements a lazy OpenTelemetry Collector consumer +// which can lazily forward request to another consumer implementation. +package lazyconsumer + +import ( + "context" + "sync" + + otelcomponent "go.opentelemetry.io/collector/component" + otelconsumer "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +// Consumer is a lazily-loaded consumer. +type Consumer struct { + ctx context.Context + + mut sync.RWMutex + metricsConsumer otelconsumer.Metrics + logsConsumer otelconsumer.Logs + tracesConsumer otelconsumer.Traces +} + +var ( + _ otelconsumer.Traces = (*Consumer)(nil) + _ otelconsumer.Metrics = (*Consumer)(nil) + _ otelconsumer.Logs = (*Consumer)(nil) +) + +// New creates a new Consumer. The provided ctx is used to determine when the +// Consumer should stop accepting data; if the ctx is closed, no further data +// will be accepted. +func New(ctx context.Context) *Consumer { + return &Consumer{ctx: ctx} +} + +// Capabilities implements otelconsumer.baseConsumer. +func (c *Consumer) Capabilities() otelconsumer.Capabilities { + return otelconsumer.Capabilities{ + // MutatesData is always set to false; the lazy consumer will check the + // underlying consumer's capabilities prior to forwarding data and will + // pass a copy if the underlying consumer mutates data. + MutatesData: false, + } +} + +// ConsumeTraces implements otelconsumer.Traces. +func (c *Consumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + if c.ctx.Err() != nil { + return c.ctx.Err() + } + + c.mut.RLock() + defer c.mut.RUnlock() + + if c.tracesConsumer == nil { + return otelcomponent.ErrDataTypeIsNotSupported + } + + if c.tracesConsumer.Capabilities().MutatesData { + td = td.Clone() + } + return c.tracesConsumer.ConsumeTraces(ctx, td) +} + +// ConsumeMetrics implements otelconsumer.Metrics. +func (c *Consumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + if c.ctx.Err() != nil { + return c.ctx.Err() + } + + c.mut.RLock() + defer c.mut.RUnlock() + + if c.metricsConsumer == nil { + return otelcomponent.ErrDataTypeIsNotSupported + } + + if c.metricsConsumer.Capabilities().MutatesData { + md = md.Clone() + } + return c.metricsConsumer.ConsumeMetrics(ctx, md) +} + +// ConsumeLogs implements otelconsumer.Logs. +func (c *Consumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + if c.ctx.Err() != nil { + return c.ctx.Err() + } + + c.mut.RLock() + defer c.mut.RUnlock() + + if c.logsConsumer == nil { + return otelcomponent.ErrDataTypeIsNotSupported + } + + if c.logsConsumer.Capabilities().MutatesData { + ld = ld.Clone() + } + return c.logsConsumer.ConsumeLogs(ctx, ld) +} + +// SetConsumers updates the internal consumers that Consumer will forward data +// to. It is valid for any combination of m, l, and t to be nil. +func (c *Consumer) SetConsumers(t otelconsumer.Traces, m otelconsumer.Metrics, l otelconsumer.Logs) { + c.mut.Lock() + defer c.mut.Unlock() + + c.metricsConsumer = m + c.logsConsumer = l + c.tracesConsumer = t +} diff --git a/component/otelcol/internal/scheduler/host.go b/component/otelcol/internal/scheduler/host.go index 140a66502074..33e0d3e1b837 100644 --- a/component/otelcol/internal/scheduler/host.go +++ b/component/otelcol/internal/scheduler/host.go @@ -12,9 +12,6 @@ import ( type Host struct { log log.Logger - // TODO(rfratto): allow the below fields below to be used. For now they're - // always nil. - extensions map[otelconfig.ComponentID]otelcomponent.Extension exporters map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter } @@ -26,6 +23,18 @@ func NewHost(l log.Logger) *Host { var _ otelcomponent.Host = (*Host)(nil) +// SetExtensions sets the extensions available from the Host. It is not valid +// to call this after the Host is in use. +func (h *Host) SetExtensions(extensions map[otelconfig.ComponentID]otelcomponent.Extension) { + h.extensions = extensions +} + +// SetExporters sets the exporters available from the Host. It is not valid +// to call this after the Host is in use. +func (h *Host) SetExporters(exporters map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter) { + h.exporters = exporters +} + // ReportFatalError implements otelcomponent.Host. func (h *Host) ReportFatalError(err error) { level.Error(h.log).Log("msg", "fatal error running component", "err", err) diff --git a/pkg/flow/componenttest/componenttest.go b/pkg/flow/componenttest/componenttest.go index 75b8ebc829b5..7d9a7c33fe66 100644 --- a/pkg/flow/componenttest/componenttest.go +++ b/pkg/flow/componenttest/componenttest.go @@ -34,14 +34,20 @@ type Controller struct { // NewControllerFromID returns a new testing Controller for the component with // the provided name. func NewControllerFromID(l log.Logger, componentName string) (*Controller, error) { - if l == nil { - l = log.NewNopLogger() - } - reg, ok := component.Get(componentName) if !ok { return nil, fmt.Errorf("no such component %q", componentName) } + return NewControllerFromReg(l, reg), nil +} + +// NewControllerFromReg registers a new testing Controller for a component with +// the given registration. This can be used for testing fake components which +// aren't really registered. +func NewControllerFromReg(l log.Logger, reg component.Registration) *Controller { + if l == nil { + l = log.NewNopLogger() + } return &Controller{ reg: reg, @@ -49,7 +55,7 @@ func NewControllerFromID(l log.Logger, componentName string) (*Controller, error running: make(chan struct{}, 1), exportsCh: make(chan struct{}, 1), - }, nil + } } func (c *Controller) onStateChange(e component.Exports) {