diff --git a/component/otelcol/consumer.go b/component/otelcol/consumer.go index 3f6466ceb2e9..edb0fb4fbcf1 100644 --- a/component/otelcol/consumer.go +++ b/component/otelcol/consumer.go @@ -12,6 +12,17 @@ type Consumer interface { otelconsumer.Logs } +// ConsumerArguments is a common Arguments type for Flow components which can +// send data to otelcol consumers. +// +// It is expected to use ConsumerArguments as a block within the top-level +// arguments block for a component. +type ConsumerArguments struct { + Metrics []Consumer `river:"metrics,attr,optional"` + Logs []Consumer `river:"logs,attr,optional"` + Traces []Consumer `river:"traces,attr,optional"` +} + // ConsumerExports is a common Exports type for Flow components which are // otelcol processors or otelcol exporters. type ConsumerExports struct { diff --git a/component/otelcol/exporter/exporter.go b/component/otelcol/exporter/exporter.go index e2b0ac6f93eb..17deb3695770 100644 --- a/component/otelcol/exporter/exporter.go +++ b/component/otelcol/exporter/exporter.go @@ -128,7 +128,7 @@ func (e *Exporter) Update(args component.Arguments) error { }, } - var exporterConfig = eargs.Convert() + exporterConfig := eargs.Convert() // Create instances of the exporter from our factory for each of our // supported telemetry signals. diff --git a/component/otelcol/internal/fakeconsumer/fake.go b/component/otelcol/internal/fakeconsumer/fake.go new file mode 100644 index 000000000000..c962c3c2ff61 --- /dev/null +++ b/component/otelcol/internal/fakeconsumer/fake.go @@ -0,0 +1,60 @@ +package fakeconsumer + +import ( + "context" + + "github.com/grafana/agent/component/otelcol" + otelconsumer "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +// Consumer is a fake otelcol.Consumer implementation which invokes functions +// when methods are called. All struct member fields are optional. If a struct +// member field is not provided, implementations of methods will default to a +// no-op. +type Consumer struct { + CapabilitiesFunc func() otelconsumer.Capabilities + ConsumeTracesFunc func(context.Context, ptrace.Traces) error + ConsumeMetricsFunc func(context.Context, pmetric.Metrics) error + ConsumeLogsFunc func(context.Context, plog.Logs) error +} + +var _ otelcol.Consumer = (*Consumer)(nil) + +// Capabilities implements otelcol.Consumer. If the CapabilitiesFunc is not +// provided, MutatesData is reported as true. +func (c *Consumer) Capabilities() otelconsumer.Capabilities { + if c.CapabilitiesFunc != nil { + return c.CapabilitiesFunc() + } + + // We don't know what the fake implementation will do, so return true just + // in case it mutates data. + return otelconsumer.Capabilities{MutatesData: true} +} + +// ConsumeTraces implements otelcol.ConsumeTraces. +func (c *Consumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + if c.ConsumeTracesFunc != nil { + return c.ConsumeTracesFunc(ctx, td) + } + return nil +} + +// ConsumeMetrics implements otelcol.ConsumeMetrics. +func (c *Consumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + if c.ConsumeMetricsFunc != nil { + return c.ConsumeMetricsFunc(ctx, md) + } + return nil +} + +// ConsumeLogs implements otelcol.ConsumeLogs. +func (c *Consumer) ConsumeLogs(ctx context.Context, md plog.Logs) error { + if c.ConsumeLogsFunc != nil { + return c.ConsumeLogsFunc(ctx, md) + } + return nil +} diff --git a/component/otelcol/internal/fanoutconsumer/logs.go b/component/otelcol/internal/fanoutconsumer/logs.go new file mode 100644 index 000000000000..4e15061dc8a3 --- /dev/null +++ b/component/otelcol/internal/fanoutconsumer/logs.go @@ -0,0 +1,77 @@ +package fanoutconsumer + +// This file is a near copy of +// https://github.com/open-telemetry/opentelemetry-collector/blob/v0.54.0/service/internal/fanoutconsumer/logs.go +// +// A copy was made because the upstream package is internal. If it is ever made +// public, our copy can be removed. + +import ( + "context" + + "github.com/grafana/agent/component/otelcol" + otelconsumer "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" + "go.uber.org/multierr" +) + +// Logs creates a new fanout consumer for logs. +func Logs(in []otelcol.Consumer) otelconsumer.Logs { + if len(in) == 1 { + return in[0] + } + + var passthrough, clone []otelconsumer.Logs + + // Iterate through all the consumers besides the last. + for i := 0; i < len(in)-1; i++ { + consumer := in[i] + + if consumer.Capabilities().MutatesData { + clone = append(clone, consumer) + } else { + passthrough = append(passthrough, consumer) + } + } + + last := in[len(in)-1] + + // The final consumer can be given to the passthrough list regardless of + // whether it mutates as long as there's no other read-only consumers. + if len(passthrough) == 0 || !last.Capabilities().MutatesData { + passthrough = append(passthrough, last) + } else { + clone = append(clone, last) + } + + return &logsFanout{ + passthrough: passthrough, + clone: clone, + } +} + +type logsFanout struct { + passthrough []otelconsumer.Logs // Consumers where data can be passed through directly + clone []otelconsumer.Logs // Consumes which require cloning data +} + +func (f *logsFanout) Capabilities() otelconsumer.Capabilities { + return otelconsumer.Capabilities{MutatesData: false} +} + +// ConsumeLogs exports the pmetric.Logs to all consumers wrapped by the current one. +func (f *logsFanout) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + var errs error + + // Initially pass to clone exporter to avoid the case where the optimization + // of sending the incoming data to a mutating consumer is used that may + // change the incoming data before cloning. + for _, f := range f.clone { + errs = multierr.Append(errs, f.ConsumeLogs(ctx, ld.Clone())) + } + for _, f := range f.passthrough { + errs = multierr.Append(errs, f.ConsumeLogs(ctx, ld)) + } + + return errs +} diff --git a/component/otelcol/internal/fanoutconsumer/metrics.go b/component/otelcol/internal/fanoutconsumer/metrics.go new file mode 100644 index 000000000000..71c777428743 --- /dev/null +++ b/component/otelcol/internal/fanoutconsumer/metrics.go @@ -0,0 +1,77 @@ +package fanoutconsumer + +// This file is a near copy of +// https://github.com/open-telemetry/opentelemetry-collector/blob/v0.54.0/service/internal/fanoutconsumer/metrics.go +// +// A copy was made because the upstream package is internal. If it is ever made +// public, our copy can be removed. + +import ( + "context" + + "github.com/grafana/agent/component/otelcol" + otelconsumer "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/multierr" +) + +// Metrics creates a new fanout consumer for metrics. +func Metrics(in []otelcol.Consumer) otelconsumer.Metrics { + if len(in) == 1 { + return in[0] + } + + var passthrough, clone []otelconsumer.Metrics + + // Iterate through all the consumers besides the last. + for i := 0; i < len(in)-1; i++ { + consumer := in[i] + + if consumer.Capabilities().MutatesData { + clone = append(clone, consumer) + } else { + passthrough = append(passthrough, consumer) + } + } + + last := in[len(in)-1] + + // The final consumer can be given to the passthrough list regardless of + // whether it mutates as long as there's no other read-only consumers. + if len(passthrough) == 0 || !last.Capabilities().MutatesData { + passthrough = append(passthrough, last) + } else { + clone = append(clone, last) + } + + return &metricsFanout{ + passthrough: passthrough, + clone: clone, + } +} + +type metricsFanout struct { + passthrough []otelconsumer.Metrics // Consumers where data can be passed through directly + clone []otelconsumer.Metrics // Consumes which require cloning data +} + +func (f *metricsFanout) Capabilities() otelconsumer.Capabilities { + return otelconsumer.Capabilities{MutatesData: false} +} + +// ConsumeMetrics exports the pmetric.Metrics to all consumers wrapped by the current one. +func (f *metricsFanout) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + var errs error + + // Initially pass to clone exporter to avoid the case where the optimization + // of sending the incoming data to a mutating consumer is used that may + // change the incoming data before cloning. + for _, f := range f.clone { + errs = multierr.Append(errs, f.ConsumeMetrics(ctx, md.Clone())) + } + for _, f := range f.passthrough { + errs = multierr.Append(errs, f.ConsumeMetrics(ctx, md)) + } + + return errs +} diff --git a/component/otelcol/internal/fanoutconsumer/traces.go b/component/otelcol/internal/fanoutconsumer/traces.go new file mode 100644 index 000000000000..52957fc85bb7 --- /dev/null +++ b/component/otelcol/internal/fanoutconsumer/traces.go @@ -0,0 +1,77 @@ +package fanoutconsumer + +// This file is a near copy of +// https://github.com/open-telemetry/opentelemetry-collector/blob/v0.54.0/service/internal/fanoutconsumer/traces.go +// +// A copy was made because the upstream package is internal. If it is ever made +// public, our copy can be removed. + +import ( + "context" + + "github.com/grafana/agent/component/otelcol" + otelconsumer "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/multierr" +) + +// Traces creates a new fanout consumer for traces. +func Traces(in []otelcol.Consumer) otelconsumer.Traces { + if len(in) == 1 { + return in[0] + } + + var passthrough, clone []otelconsumer.Traces + + // Iterate through all the consumers besides the last. + for i := 0; i < len(in)-1; i++ { + consumer := in[i] + + if consumer.Capabilities().MutatesData { + clone = append(clone, consumer) + } else { + passthrough = append(passthrough, consumer) + } + } + + last := in[len(in)-1] + + // The final consumer can be given to the passthrough list regardless of + // whether it mutates as long as there's no other read-only consumers. + if len(passthrough) == 0 || !last.Capabilities().MutatesData { + passthrough = append(passthrough, last) + } else { + clone = append(clone, last) + } + + return &tracesFanout{ + passthrough: passthrough, + clone: clone, + } +} + +type tracesFanout struct { + passthrough []otelconsumer.Traces // Consumers where data can be passed through directly + clone []otelconsumer.Traces // Consumes which require cloning data +} + +func (f *tracesFanout) Capabilities() otelconsumer.Capabilities { + return otelconsumer.Capabilities{MutatesData: false} +} + +// ConsumeTraces exports the pmetric.Traces to all consumers wrapped by the current one. +func (f *tracesFanout) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + var errs error + + // Initially pass to clone exporter to avoid the case where the optimization + // of sending the incoming data to a mutating consumer is used that may + // change the incoming data before cloning. + for _, f := range f.clone { + errs = multierr.Append(errs, f.ConsumeTraces(ctx, td.Clone())) + } + for _, f := range f.passthrough { + errs = multierr.Append(errs, f.ConsumeTraces(ctx, td)) + } + + return errs +} diff --git a/component/otelcol/receiver/receiver.go b/component/otelcol/receiver/receiver.go new file mode 100644 index 000000000000..0f0a5fe7046e --- /dev/null +++ b/component/otelcol/receiver/receiver.go @@ -0,0 +1,166 @@ +// Package receiver utilities to create a Flow component from OpenTelemetry +// Collector receivers. +package receiver + +import ( + "context" + "errors" + "os" + + "github.com/grafana/agent/component" + "github.com/grafana/agent/component/otelcol" + "github.com/grafana/agent/component/otelcol/internal/fanoutconsumer" + "github.com/grafana/agent/component/otelcol/internal/scheduler" + "github.com/grafana/agent/pkg/build" + otelcomponent "go.opentelemetry.io/collector/component" + otelconfig "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" +) + +// Arguments is an extension of component.Arguments which contains necessary +// settings for OpenTelemetry Collector receivers. +type Arguments interface { + component.Arguments + + // Convert converts the Arguments into an OpenTelemetry Collector exporter + // configuration. + Convert() otelconfig.Receiver + + // Extensions returns the set of extensions that the configured component is + // allowed to use. + Extensions() map[otelconfig.ComponentID]otelcomponent.Extension + + // Exporters returns the set of exporters that are exposed to the configured + // component. + Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter + + // NextConsumers returns the set of consumers to send data to. + NextConsumers() *otelcol.ConsumerArguments +} + +// Receiver is a Flow component shim which manages an OpenTelemetry Collector +// receiver component. +type Receiver struct { + ctx context.Context + cancel context.CancelFunc + + opts component.Options + factory otelcomponent.ReceiverFactory + + sched *scheduler.Scheduler +} + +var ( + _ component.Component = (*Receiver)(nil) + _ component.HealthComponent = (*Receiver)(nil) +) + +// New creates a new Flow component which encapsulates an OpenTelemetry +// Collector receiver. args must hold a value of the argument type registered +// with the Flow component. +// +// If the registered Flow component registers exported fields, it is the +// responsibility of the caller to export values when needed; the Receiver +// component never exports any values. +func New(opts component.Options, f otelcomponent.ReceiverFactory, args Arguments) (*Receiver, error) { + ctx, cancel := context.WithCancel(context.Background()) + + r := &Receiver{ + ctx: ctx, + cancel: cancel, + + opts: opts, + factory: f, + + sched: scheduler.New(opts.Logger), + } + if err := r.Update(args); err != nil { + return nil, err + } + return r, nil +} + +// Run starts the Receiver component. +func (r *Receiver) Run(ctx context.Context) error { + defer r.cancel() + return r.sched.Run(ctx) +} + +// Update implements component.Component. It will convert the Arguments into +// configuration for OpenTelemetry Collector receiver configuration and manage +// the underlying OpenTelemetry Collector receiver. +func (r *Receiver) Update(args component.Arguments) error { + rargs := args.(Arguments) + + host := scheduler.NewHost( + r.opts.Logger, + scheduler.WithHostExtensions(rargs.Extensions()), + scheduler.WithHostExporters(rargs.Exporters()), + ) + + settings := otelcomponent.ReceiverCreateSettings{ + TelemetrySettings: otelcomponent.TelemetrySettings{ + // TODO(rfratto): create an adapter from zap -> go-kit/log + Logger: zap.NewNop(), + + // TODO(rfratto): expose tracing and logging statistics. + // + // We may want to put off tracing until we have native tracing + // instrumentation from Flow, but metrics should come sooner since we're + // already set up for supporting component-specific metrics. + TracerProvider: trace.NewNoopTracerProvider(), + MeterProvider: metric.NewNoopMeterProvider(), + }, + + BuildInfo: otelcomponent.BuildInfo{ + Command: os.Args[0], + Description: "Grafana Agent", + Version: build.Version, + }, + } + + receiverConfig := rargs.Convert() + + var ( + next = rargs.NextConsumers() + nextTraces = fanoutconsumer.Traces(next.Traces) + nextMetrics = fanoutconsumer.Metrics(next.Metrics) + nextLogs = fanoutconsumer.Logs(next.Logs) + ) + + // Create instances of the receiver from our factory for each of our + // supported telemetry signals. + var components []otelcomponent.Component + + tracesReceiver, err := r.factory.CreateTracesReceiver(r.ctx, settings, receiverConfig, nextTraces) + if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { + return err + } else if tracesReceiver != nil { + components = append(components, tracesReceiver) + } + + metricsReceiver, err := r.factory.CreateMetricsReceiver(r.ctx, settings, receiverConfig, nextMetrics) + if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { + return err + } else if metricsReceiver != nil { + components = append(components, metricsReceiver) + } + + logsReceiver, err := r.factory.CreateLogsReceiver(r.ctx, settings, receiverConfig, nextLogs) + if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { + return err + } else if logsReceiver != nil { + components = append(components, logsReceiver) + } + + // Schedule the components to run once our component is running. + r.sched.Schedule(host, components...) + return nil +} + +// CurrentHealth implements component.HealthComponent. +func (r *Receiver) CurrentHealth() component.Health { + return r.sched.CurrentHealth() +} diff --git a/component/otelcol/receiver/receiver_test.go b/component/otelcol/receiver/receiver_test.go new file mode 100644 index 000000000000..c515d263c111 --- /dev/null +++ b/component/otelcol/receiver/receiver_test.go @@ -0,0 +1,129 @@ +package receiver_test + +import ( + "context" + "testing" + "time" + + "github.com/grafana/agent/component" + "github.com/grafana/agent/component/otelcol" + "github.com/grafana/agent/component/otelcol/internal/fakeconsumer" + "github.com/grafana/agent/component/otelcol/receiver" + "github.com/grafana/agent/pkg/flow/componenttest" + "github.com/grafana/agent/pkg/util" + "github.com/stretchr/testify/require" + otelcomponent "go.opentelemetry.io/collector/component" + otelconfig "go.opentelemetry.io/collector/config" + otelconsumer "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func TestReceiver(t *testing.T) { + var ( + consumer otelconsumer.Traces + + waitConsumerTrigger = util.NewWaitTrigger() + onTracesConsumer = func(t otelconsumer.Traces) { + consumer = t + waitConsumerTrigger.Trigger() + } + + waitTracesTrigger = util.NewWaitTrigger() + nextConsumer = &fakeconsumer.Consumer{ + ConsumeTracesFunc: func(context.Context, ptrace.Traces) error { + waitTracesTrigger.Trigger() + return nil + }, + } + ) + + // Create and start our Flow component. We then wait for it to export a + // consumer that we can send data to. + te := newTestEnvironment(t, onTracesConsumer) + te.Start(fakeReceiverArgs{ + Output: &otelcol.ConsumerArguments{ + Metrics: []otelcol.Consumer{nextConsumer}, + Logs: []otelcol.Consumer{nextConsumer}, + Traces: []otelcol.Consumer{nextConsumer}, + }, + }) + + require.NoError(t, waitConsumerTrigger.Wait(time.Second), "no traces consumer sent") + + err := consumer.ConsumeTraces(context.Background(), ptrace.NewTraces()) + require.NoError(t, err) + + require.NoError(t, waitTracesTrigger.Wait(time.Second), "consumer did not get invoked") +} + +type testEnvironment struct { + t *testing.T + + Controller *componenttest.Controller +} + +func newTestEnvironment(t *testing.T, onTracesConsumer func(t otelconsumer.Traces)) *testEnvironment { + t.Helper() + + reg := component.Registration{ + Name: "testcomponent", + Args: fakeReceiverArgs{}, + Exports: otelcol.ConsumerExports{}, + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + // Create a factory which always returns our instance of fakeReceiver + // defined above. + factory := otelcomponent.NewReceiverFactory( + "testcomponent", + func() otelconfig.Receiver { return nil }, + otelcomponent.WithTracesReceiver(func( + ctx context.Context, + rcs otelcomponent.ReceiverCreateSettings, + r otelconfig.Receiver, + t otelconsumer.Traces, + ) (otelcomponent.TracesReceiver, error) { + + onTracesConsumer(t) + return nil, nil + }, otelcomponent.StabilityLevelUndefined), + ) + + return receiver.New(opts, factory, args.(receiver.Arguments)) + }, + } + + return &testEnvironment{ + t: t, + Controller: componenttest.NewControllerFromReg(util.TestLogger(t), reg), + } +} + +func (te *testEnvironment) Start(args component.Arguments) { + go func() { + ctx := componenttest.TestContext(te.t) + err := te.Controller.Run(ctx, args) + require.NoError(te.t, err, "failed to run component") + }() +} + +type fakeReceiverArgs struct { + Output *otelcol.ConsumerArguments +} + +var _ receiver.Arguments = fakeReceiverArgs{} + +func (fa fakeReceiverArgs) Convert() otelconfig.Receiver { + settings := otelconfig.NewReceiverSettings(otelconfig.NewComponentID("testcomponent")) + return &settings +} + +func (fa fakeReceiverArgs) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension { + return nil +} + +func (fa fakeReceiverArgs) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter { + return nil +} + +func (fa fakeReceiverArgs) NextConsumers() *otelcol.ConsumerArguments { + return fa.Output +}