diff --git a/component/otelcol/processor/processor.go b/component/otelcol/processor/processor.go new file mode 100644 index 000000000000..a46aa25bfe26 --- /dev/null +++ b/component/otelcol/processor/processor.go @@ -0,0 +1,177 @@ +// Package processor exposes utilities to create a Flow component from +// OpenTelemetry Collector processors. +package processor + +import ( + "context" + "errors" + "os" + + "github.com/grafana/agent/component" + "github.com/grafana/agent/component/otelcol" + "github.com/grafana/agent/component/otelcol/internal/fanoutconsumer" + "github.com/grafana/agent/component/otelcol/internal/lazyconsumer" + "github.com/grafana/agent/component/otelcol/internal/scheduler" + "github.com/grafana/agent/component/otelcol/internal/zapadapter" + "github.com/grafana/agent/pkg/build" + otelcomponent "go.opentelemetry.io/collector/component" + otelconfig "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" +) + +// Arguments is an extension of component.Arguments which contains necessary +// settings for OpenTelemetry Collector processors. +type Arguments interface { + component.Arguments + + // Convert converts the Arguments into an OpenTelemetry Collector processor + // configuration. + Convert() otelconfig.Processor + + // Extensions returns the set of extensions that the configured component is + // allowed to use. + Extensions() map[otelconfig.ComponentID]otelcomponent.Extension + + // Exporters returns the set of exporters that are exposed to the configured + // component. + Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter + + // NextConsumers returns the set of consumers to send data to. + NextConsumers() *otelcol.ConsumerArguments +} + +// Processor is a Flow component shim which manages an OpenTelemetry Collector +// processor component. +type Processor struct { + ctx context.Context + cancel context.CancelFunc + + opts component.Options + factory otelcomponent.ProcessorFactory + consumer *lazyconsumer.Consumer + + sched *scheduler.Scheduler +} + +var ( + _ component.Component = (*Processor)(nil) + _ component.HealthComponent = (*Processor)(nil) +) + +// New creates a new Flow component which encapsulates an OpenTelemetry +// Collector processor. args must hold a value of the argument type registered +// with the Flow component. +// +// The registered component must be registered to export the +// otelcol.ConsumerExports type, otherwise New will panic. +func New(opts component.Options, f otelcomponent.ProcessorFactory, args Arguments) (*Processor, error) { + ctx, cancel := context.WithCancel(context.Background()) + + consumer := lazyconsumer.New(ctx) + + // Immediately set our state with our consumer. The exports will never change + // throughout the lifetime of our component. + // + // This will panic if the wrapping component is not registered to export + // otelcol.ConsumerExports. + opts.OnStateChange(otelcol.ConsumerExports{Input: consumer}) + + p := &Processor{ + ctx: ctx, + cancel: cancel, + + opts: opts, + factory: f, + consumer: consumer, + + sched: scheduler.New(opts.Logger), + } + if err := p.Update(args); err != nil { + return nil, err + } + return p, nil +} + +// Run starts the Processor component. +func (p *Processor) Run(ctx context.Context) error { + defer p.cancel() + return p.sched.Run(ctx) +} + +// Update implements component.Component. It will convert the Arguments into +// configuration for OpenTelemetry Collector processor configuration and manage +// the underlying OpenTelemetry Collector processor. +func (p *Processor) Update(args component.Arguments) error { + pargs := args.(Arguments) + + host := scheduler.NewHost( + p.opts.Logger, + scheduler.WithHostExtensions(pargs.Extensions()), + scheduler.WithHostExporters(pargs.Exporters()), + ) + + settings := otelcomponent.ProcessorCreateSettings{ + TelemetrySettings: otelcomponent.TelemetrySettings{ + Logger: zapadapter.New(p.opts.Logger), + + // TODO(rfratto): expose tracing and logging statistics. + // + // We may want to put off tracing until we have native tracing + // instrumentation from Flow, but metrics should come sooner since we're + // already set up for supporting component-specific metrics. + TracerProvider: trace.NewNoopTracerProvider(), + MeterProvider: metric.NewNoopMeterProvider(), + }, + + BuildInfo: otelcomponent.BuildInfo{ + Command: os.Args[0], + Description: "Grafana Agent", + Version: build.Version, + }, + } + + processorConfig := pargs.Convert() + + var ( + next = pargs.NextConsumers() + nextTraces = fanoutconsumer.Traces(next.Traces) + nextMetrics = fanoutconsumer.Metrics(next.Metrics) + nextLogs = fanoutconsumer.Logs(next.Logs) + ) + + // Create instances of the processor from our factory for each of our + // supported telemetry signals. + var components []otelcomponent.Component + + tracesProcessor, err := p.factory.CreateTracesProcessor(p.ctx, settings, processorConfig, nextTraces) + if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { + return err + } else if tracesProcessor != nil { + components = append(components, tracesProcessor) + } + + metricsProcessor, err := p.factory.CreateMetricsProcessor(p.ctx, settings, processorConfig, nextMetrics) + if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { + return err + } else if metricsProcessor != nil { + components = append(components, metricsProcessor) + } + + logsProcessor, err := p.factory.CreateLogsProcessor(p.ctx, settings, processorConfig, nextLogs) + if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { + return err + } else if logsProcessor != nil { + components = append(components, logsProcessor) + } + + // Schedule the components to run once our component is running. + p.sched.Schedule(host, components...) + p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor) + return nil +} + +// CurrentHealth implements component.HealthComponent. +func (p *Processor) CurrentHealth() component.Health { + return p.sched.CurrentHealth() +} diff --git a/component/otelcol/processor/processor_test.go b/component/otelcol/processor/processor_test.go new file mode 100644 index 000000000000..d448ff515ff5 --- /dev/null +++ b/component/otelcol/processor/processor_test.go @@ -0,0 +1,207 @@ +package processor_test + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/grafana/agent/component" + "github.com/grafana/agent/component/otelcol" + "github.com/grafana/agent/component/otelcol/internal/fakeconsumer" + "github.com/grafana/agent/component/otelcol/processor" + "github.com/grafana/agent/pkg/flow/componenttest" + "github.com/grafana/agent/pkg/util" + "github.com/stretchr/testify/require" + otelcomponent "go.opentelemetry.io/collector/component" + otelconfig "go.opentelemetry.io/collector/config" + otelconsumer "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func TestProcessor(t *testing.T) { + ctx := componenttest.TestContext(t) + + // Create an instance of a fake OpenTelemetry Collector processor which our + // Flow component will wrap around. Our fake processor will immediately + // forward data to the connected consumer once one is made available to it. + var ( + consumer otelconsumer.Traces + + waitConsumerTrigger = util.NewWaitTrigger() + onTracesConsumer = func(t otelconsumer.Traces) { + consumer = t + waitConsumerTrigger.Trigger() + } + + waitTracesTrigger = util.NewWaitTrigger() + nextConsumer = &fakeconsumer.Consumer{ + ConsumeTracesFunc: func(context.Context, ptrace.Traces) error { + waitTracesTrigger.Trigger() + return nil + }, + } + + // Our fake processor will wait for a consumer to be registered and then + // pass along data directly to it. + innerProcessor = &fakeProcessor{ + ConsumeTracesFunc: func(ctx context.Context, td ptrace.Traces) error { + require.NoError(t, waitConsumerTrigger.Wait(time.Second), "no next consumer registered") + return consumer.ConsumeTraces(ctx, td) + }, + } + ) + + // Create and start our Flow component. We then wait for it to export a + // consumer that we can send data to. + te := newTestEnvironment(t, innerProcessor, onTracesConsumer) + te.Start(fakeProcessorArgs{ + Output: &otelcol.ConsumerArguments{ + Metrics: []otelcol.Consumer{nextConsumer}, + Logs: []otelcol.Consumer{nextConsumer}, + Traces: []otelcol.Consumer{nextConsumer}, + }, + }) + + require.NoError(t, te.Controller.WaitExports(1*time.Second), "test component did not generate exports") + ce := te.Controller.Exports().(otelcol.ConsumerExports) + + // Create a test set of traces and send it to our consumer in the background. + // We then wait for our channel to receive the traces, indicating that + // everything was wired up correctly. + go func() { + var err error + + for { + err = ce.Input.ConsumeTraces(ctx, ptrace.NewTraces()) + + if errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { + // Our component may not have been fully initialized yet. Wait a little + // bit before trying again. + time.Sleep(100 * time.Millisecond) + continue + } + + require.NoError(t, err) + break + } + }() + + require.NoError(t, waitTracesTrigger.Wait(time.Second), "consumer did not get invoked") +} + +type testEnvironment struct { + t *testing.T + + Controller *componenttest.Controller +} + +func newTestEnvironment( + t *testing.T, + fp otelcomponent.TracesProcessor, + onTracesConsumer func(t otelconsumer.Traces), +) *testEnvironment { + + t.Helper() + + reg := component.Registration{ + Name: "testcomponent", + Args: fakeProcessorArgs{}, + Exports: otelcol.ConsumerExports{}, + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + // Create a factory which always returns our instance of fakeProcessor + // defined above. + factory := otelcomponent.NewProcessorFactory( + "testcomponent", + func() otelconfig.Processor { + return fakeProcessorArgs{}.Convert() + }, + otelcomponent.WithTracesProcessor(func( + _ context.Context, + _ otelcomponent.ProcessorCreateSettings, + _ otelconfig.Processor, + t otelconsumer.Traces, + ) (otelcomponent.TracesProcessor, error) { + + onTracesConsumer(t) + return fp, nil + }, otelcomponent.StabilityLevelUndefined), + ) + + return processor.New(opts, factory, args.(processor.Arguments)) + }, + } + + return &testEnvironment{ + t: t, + Controller: componenttest.NewControllerFromReg(util.TestLogger(t), reg), + } +} + +func (te *testEnvironment) Start(args component.Arguments) { + go func() { + ctx := componenttest.TestContext(te.t) + err := te.Controller.Run(ctx, args) + require.NoError(te.t, err, "failed to run component") + }() +} + +type fakeProcessorArgs struct { + Output *otelcol.ConsumerArguments +} + +var _ processor.Arguments = fakeProcessorArgs{} + +func (fa fakeProcessorArgs) Convert() otelconfig.Processor { + settings := otelconfig.NewProcessorSettings(otelconfig.NewComponentID("testcomponent")) + return &settings +} + +func (fa fakeProcessorArgs) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension { + return nil +} + +func (fa fakeProcessorArgs) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter { + return nil +} + +func (fa fakeProcessorArgs) NextConsumers() *otelcol.ConsumerArguments { + return fa.Output +} + +type fakeProcessor struct { + StartFunc func(ctx context.Context, host otelcomponent.Host) error + ShutdownFunc func(ctx context.Context) error + CapabilitiesFunc func() otelconsumer.Capabilities + ConsumeTracesFunc func(ctx context.Context, td ptrace.Traces) error +} + +var _ otelcomponent.TracesProcessor = (*fakeProcessor)(nil) + +func (fe *fakeProcessor) Start(ctx context.Context, host otelcomponent.Host) error { + if fe.StartFunc != nil { + return fe.StartFunc(ctx, host) + } + return nil +} + +func (fe *fakeProcessor) Shutdown(ctx context.Context) error { + if fe.ShutdownFunc != nil { + return fe.ShutdownFunc(ctx) + } + return nil +} + +func (fe *fakeProcessor) Capabilities() otelconsumer.Capabilities { + if fe.CapabilitiesFunc != nil { + return fe.CapabilitiesFunc() + } + return otelconsumer.Capabilities{} +} + +func (fe *fakeProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + if fe.ConsumeTracesFunc != nil { + return fe.ConsumeTracesFunc(ctx, td) + } + return nil +} diff --git a/component/otelcol/receiver/receiver.go b/component/otelcol/receiver/receiver.go index 8e3d7df8ae6c..aea56448e2ac 100644 --- a/component/otelcol/receiver/receiver.go +++ b/component/otelcol/receiver/receiver.go @@ -24,7 +24,7 @@ import ( type Arguments interface { component.Arguments - // Convert converts the Arguments into an OpenTelemetry Collector exporter + // Convert converts the Arguments into an OpenTelemetry Collector receiver // configuration. Convert() otelconfig.Receiver