From 11eff6872f66988ad06de6144fcb6dafb4c9f13c Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Mon, 3 Oct 2022 13:32:33 -0400 Subject: [PATCH 1/5] component/otelcol/processor: create processor component abstraction This commit introduces a new package, component/otelcol/processor, which exposes a generic Flow component implementation which can run OpenTelemetry Collector processor. Like #2227 and #2254, it leaves some work unfinished for future PRs: * A Zap logging adapter needs to be created to correctly process logs from OpenTelemetry Collector components. * Component-specific metrics are currently ignored. * Component-specific traces are currently ignored. As of this commit, there are no registered `otelcol.processor.*` components. Implementations for OpenTelemetry Collector Flow components will be done in future PRs. Related to #2213. --- component/otelcol/processor/processor.go | 178 +++++++++++++++ component/otelcol/processor/processor_test.go | 207 ++++++++++++++++++ component/otelcol/receiver/receiver.go | 2 +- 3 files changed, 386 insertions(+), 1 deletion(-) create mode 100644 component/otelcol/processor/processor.go create mode 100644 component/otelcol/processor/processor_test.go diff --git a/component/otelcol/processor/processor.go b/component/otelcol/processor/processor.go new file mode 100644 index 000000000000..c9c6f65224da --- /dev/null +++ b/component/otelcol/processor/processor.go @@ -0,0 +1,178 @@ +// Package processor exposes utilities to create a Flow component from +// OpenTelemetry Collector processors. +package processor + +import ( + "context" + "errors" + "os" + + "github.com/grafana/agent/component" + "github.com/grafana/agent/component/otelcol" + "github.com/grafana/agent/component/otelcol/internal/fanoutconsumer" + "github.com/grafana/agent/component/otelcol/internal/lazyconsumer" + "github.com/grafana/agent/component/otelcol/internal/scheduler" + "github.com/grafana/agent/pkg/build" + otelcomponent "go.opentelemetry.io/collector/component" + otelconfig "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" +) + +// Arguments is an extension of component.Arguments which contains necessary +// settings for OpenTelemetry Collector processors. +type Arguments interface { + component.Arguments + + // Convert converts the Arguments into an OpenTelemetry Collector processor + // configuration. + Convert() otelconfig.Processor + + // Extensions returns the set of extensions that the configured component is + // allowed to use. + Extensions() map[otelconfig.ComponentID]otelcomponent.Extension + + // Exporters returns the set of exporters that are exposed to the configured + // component. + Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter + + // NextConsumers returns the set of consumers to send data to. + NextConsumers() *otelcol.ConsumerArguments +} + +// Processor is a Flow component shim which manages an OpenTelemetry Collector +// processor component. +type Processor struct { + ctx context.Context + cancel context.CancelFunc + + opts component.Options + factory otelcomponent.ProcessorFactory + consumer *lazyconsumer.Consumer + + sched *scheduler.Scheduler +} + +var ( + _ component.Component = (*Processor)(nil) + _ component.HealthComponent = (*Processor)(nil) +) + +// New creates a new Flow component which encapsulates an OpenTelemetry +// Collector processor. args must hold a value of the argument type registered +// with the Flow component. +// +// The registered component must be registered to export the +// otelcol.ConsumerExports type, otherwise New will panic. +func New(opts component.Options, f otelcomponent.ProcessorFactory, args Arguments) (*Processor, error) { + ctx, cancel := context.WithCancel(context.Background()) + + consumer := lazyconsumer.New(ctx) + + // Immediately set our state with our consumer. The exports will never change + // throughout the lifetime of our component. + // + // This will panic if the wrapping component is not registered to export + // otelcol.ConsumerExports. + opts.OnStateChange(otelcol.ConsumerExports{Input: consumer}) + + e := &Processor{ + ctx: ctx, + cancel: cancel, + + opts: opts, + factory: f, + consumer: consumer, + + sched: scheduler.New(opts.Logger), + } + if err := e.Update(args); err != nil { + return nil, err + } + return e, nil +} + +// Run starts the Processor component. +func (e *Processor) Run(ctx context.Context) error { + defer e.cancel() + return e.sched.Run(ctx) +} + +// Update implements component.Component. It will convert the Arguments into +// configuration for OpenTelemetry Collector processor configuration and manage +// the underlying OpenTelemetry Collector processor. +func (e *Processor) Update(args component.Arguments) error { + pargs := args.(Arguments) + + host := scheduler.NewHost( + e.opts.Logger, + scheduler.WithHostExtensions(pargs.Extensions()), + scheduler.WithHostExporters(pargs.Exporters()), + ) + + settings := otelcomponent.ProcessorCreateSettings{ + TelemetrySettings: otelcomponent.TelemetrySettings{ + // TODO(rfratto): create an adapter from zap -> go-kit/log + Logger: zap.NewNop(), + + // TODO(rfratto): expose tracing and logging statistics. + // + // We may want to put off tracing until we have native tracing + // instrumentation from Flow, but metrics should come sooner since we're + // already set up for supporting component-specific metrics. + TracerProvider: trace.NewNoopTracerProvider(), + MeterProvider: metric.NewNoopMeterProvider(), + }, + + BuildInfo: otelcomponent.BuildInfo{ + Command: os.Args[0], + Description: "Grafana Agent", + Version: build.Version, + }, + } + + exporterConfig := pargs.Convert() + + var ( + next = pargs.NextConsumers() + nextTraces = fanoutconsumer.Traces(next.Traces) + nextMetrics = fanoutconsumer.Metrics(next.Metrics) + nextLogs = fanoutconsumer.Logs(next.Logs) + ) + + // Create instances of the exporter from our factory for each of our + // supported telemetry signals. + var components []otelcomponent.Component + + tracesProcessor, err := e.factory.CreateTracesProcessor(e.ctx, settings, exporterConfig, nextTraces) + if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { + return err + } else if tracesProcessor != nil { + components = append(components, tracesProcessor) + } + + metricsProcessor, err := e.factory.CreateMetricsProcessor(e.ctx, settings, exporterConfig, nextMetrics) + if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { + return err + } else if metricsProcessor != nil { + components = append(components, metricsProcessor) + } + + logsProcessor, err := e.factory.CreateLogsProcessor(e.ctx, settings, exporterConfig, nextLogs) + if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { + return err + } else if logsProcessor != nil { + components = append(components, logsProcessor) + } + + // Schedule the components to run once our component is running. + e.sched.Schedule(host, components...) + e.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor) + return nil +} + +// CurrentHealth implements component.HealthComponent. +func (e *Processor) CurrentHealth() component.Health { + return e.sched.CurrentHealth() +} diff --git a/component/otelcol/processor/processor_test.go b/component/otelcol/processor/processor_test.go new file mode 100644 index 000000000000..fcaf6d6075e8 --- /dev/null +++ b/component/otelcol/processor/processor_test.go @@ -0,0 +1,207 @@ +package processor_test + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/grafana/agent/component" + "github.com/grafana/agent/component/otelcol" + "github.com/grafana/agent/component/otelcol/internal/fakeconsumer" + "github.com/grafana/agent/component/otelcol/processor" + "github.com/grafana/agent/pkg/flow/componenttest" + "github.com/grafana/agent/pkg/util" + "github.com/stretchr/testify/require" + otelcomponent "go.opentelemetry.io/collector/component" + otelconfig "go.opentelemetry.io/collector/config" + otelconsumer "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func TestProcessor(t *testing.T) { + ctx := componenttest.TestContext(t) + + // Create an instance of a fake OpenTelemetry Collector processor which our + // Flow component will wrap around. It is expected + + var ( + consumer otelconsumer.Traces + + waitConsumerTrigger = util.NewWaitTrigger() + onTracesConsumer = func(t otelconsumer.Traces) { + consumer = t + waitConsumerTrigger.Trigger() + } + + waitTracesTrigger = util.NewWaitTrigger() + nextConsumer = &fakeconsumer.Consumer{ + ConsumeTracesFunc: func(context.Context, ptrace.Traces) error { + waitTracesTrigger.Trigger() + return nil + }, + } + + // Our fake processor will wait for a consumer to be registered and then + // pass along data directly to it. + innerProcessor = &fakeProcessor{ + ConsumeTracesFunc: func(ctx context.Context, td ptrace.Traces) error { + require.NoError(t, waitConsumerTrigger.Wait(time.Second), "no next consumer registered") + return consumer.ConsumeTraces(ctx, td) + }, + } + ) + + // Create and start our Flow component. We then wait for it to export a + // consumer that we can send data to. + te := newTestEnvironment(t, innerProcessor, onTracesConsumer) + te.Start(fakeProcessorArgs{ + Output: &otelcol.ConsumerArguments{ + Metrics: []otelcol.Consumer{nextConsumer}, + Logs: []otelcol.Consumer{nextConsumer}, + Traces: []otelcol.Consumer{nextConsumer}, + }, + }) + + require.NoError(t, te.Controller.WaitExports(1*time.Second), "test component did not generate exports") + ce := te.Controller.Exports().(otelcol.ConsumerExports) + + // Create a test set of traces and send it to our consumer in the background. + // We then wait for our channel to receive the traces, indicating that + // everything was wired up correctly. + go func() { + var err error + + for { + err = ce.Input.ConsumeTraces(ctx, ptrace.NewTraces()) + + if errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { + // Our component may not have been fully initialized yet. Wait a little + // bit before trying again. + time.Sleep(100 * time.Millisecond) + continue + } + + require.NoError(t, err) + break + } + }() + + require.NoError(t, waitTracesTrigger.Wait(time.Second), "consumer did not get invoked") +} + +type testEnvironment struct { + t *testing.T + + Controller *componenttest.Controller +} + +func newTestEnvironment( + t *testing.T, + fp otelcomponent.TracesProcessor, + onTracesConsumer func(t otelconsumer.Traces), +) *testEnvironment { + + t.Helper() + + reg := component.Registration{ + Name: "testcomponent", + Args: fakeProcessorArgs{}, + Exports: otelcol.ConsumerExports{}, + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + // Create a factory which always returns our instance of fakeExporter + // defined above. + factory := otelcomponent.NewProcessorFactory( + "testcomponent", + func() otelconfig.Processor { + return fakeProcessorArgs{}.Convert() + }, + otelcomponent.WithTracesProcessor(func( + _ context.Context, + _ otelcomponent.ProcessorCreateSettings, + _ otelconfig.Processor, + t otelconsumer.Traces, + ) (otelcomponent.TracesProcessor, error) { + + onTracesConsumer(t) + return fp, nil + }, otelcomponent.StabilityLevelUndefined), + ) + + return processor.New(opts, factory, args.(processor.Arguments)) + }, + } + + return &testEnvironment{ + t: t, + Controller: componenttest.NewControllerFromReg(util.TestLogger(t), reg), + } +} + +func (te *testEnvironment) Start(args component.Arguments) { + go func() { + ctx := componenttest.TestContext(te.t) + err := te.Controller.Run(ctx, args) + require.NoError(te.t, err, "failed to run component") + }() +} + +type fakeProcessorArgs struct { + Output *otelcol.ConsumerArguments +} + +var _ processor.Arguments = fakeProcessorArgs{} + +func (fa fakeProcessorArgs) Convert() otelconfig.Processor { + settings := otelconfig.NewProcessorSettings(otelconfig.NewComponentID("testcomponent")) + return &settings +} + +func (fa fakeProcessorArgs) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension { + return nil +} + +func (fa fakeProcessorArgs) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter { + return nil +} + +func (fa fakeProcessorArgs) NextConsumers() *otelcol.ConsumerArguments { + return fa.Output +} + +type fakeProcessor struct { + StartFunc func(ctx context.Context, host otelcomponent.Host) error + ShutdownFunc func(ctx context.Context) error + CapabilitiesFunc func() otelconsumer.Capabilities + ConsumeTracesFunc func(ctx context.Context, td ptrace.Traces) error +} + +var _ otelcomponent.TracesExporter = (*fakeProcessor)(nil) + +func (fe *fakeProcessor) Start(ctx context.Context, host otelcomponent.Host) error { + if fe.StartFunc != nil { + return fe.StartFunc(ctx, host) + } + return nil +} + +func (fe *fakeProcessor) Shutdown(ctx context.Context) error { + if fe.ShutdownFunc != nil { + return fe.ShutdownFunc(ctx) + } + return nil +} + +func (fe *fakeProcessor) Capabilities() otelconsumer.Capabilities { + if fe.CapabilitiesFunc != nil { + return fe.CapabilitiesFunc() + } + return otelconsumer.Capabilities{} +} + +func (fe *fakeProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + if fe.ConsumeTracesFunc != nil { + return fe.ConsumeTracesFunc(ctx, td) + } + return nil +} diff --git a/component/otelcol/receiver/receiver.go b/component/otelcol/receiver/receiver.go index 8e3d7df8ae6c..aea56448e2ac 100644 --- a/component/otelcol/receiver/receiver.go +++ b/component/otelcol/receiver/receiver.go @@ -24,7 +24,7 @@ import ( type Arguments interface { component.Arguments - // Convert converts the Arguments into an OpenTelemetry Collector exporter + // Convert converts the Arguments into an OpenTelemetry Collector receiver // configuration. Convert() otelconfig.Receiver From 281b441e09e972a3b2aeaaa1577e40f122531c12 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Tue, 4 Oct 2022 08:31:04 -0400 Subject: [PATCH 2/5] otelcol/processor: use zapadapter --- component/otelcol/processor/processor.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/component/otelcol/processor/processor.go b/component/otelcol/processor/processor.go index c9c6f65224da..47310bf855d6 100644 --- a/component/otelcol/processor/processor.go +++ b/component/otelcol/processor/processor.go @@ -12,12 +12,12 @@ import ( "github.com/grafana/agent/component/otelcol/internal/fanoutconsumer" "github.com/grafana/agent/component/otelcol/internal/lazyconsumer" "github.com/grafana/agent/component/otelcol/internal/scheduler" + "github.com/grafana/agent/component/otelcol/internal/zapadapter" "github.com/grafana/agent/pkg/build" otelcomponent "go.opentelemetry.io/collector/component" otelconfig "go.opentelemetry.io/collector/config" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" - "go.uber.org/zap" ) // Arguments is an extension of component.Arguments which contains necessary @@ -113,8 +113,7 @@ func (e *Processor) Update(args component.Arguments) error { settings := otelcomponent.ProcessorCreateSettings{ TelemetrySettings: otelcomponent.TelemetrySettings{ - // TODO(rfratto): create an adapter from zap -> go-kit/log - Logger: zap.NewNop(), + Logger: zapadapter.New(e.opts.Logger), // TODO(rfratto): expose tracing and logging statistics. // From b51eeace54663fb005531e6828fe6b3b581d7c93 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Tue, 4 Oct 2022 08:40:47 -0400 Subject: [PATCH 3/5] fix bad copy job --- component/otelcol/processor/processor.go | 36 ++++++++++++------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/component/otelcol/processor/processor.go b/component/otelcol/processor/processor.go index 47310bf855d6..a46aa25bfe26 100644 --- a/component/otelcol/processor/processor.go +++ b/component/otelcol/processor/processor.go @@ -77,7 +77,7 @@ func New(opts component.Options, f otelcomponent.ProcessorFactory, args Argument // otelcol.ConsumerExports. opts.OnStateChange(otelcol.ConsumerExports{Input: consumer}) - e := &Processor{ + p := &Processor{ ctx: ctx, cancel: cancel, @@ -87,33 +87,33 @@ func New(opts component.Options, f otelcomponent.ProcessorFactory, args Argument sched: scheduler.New(opts.Logger), } - if err := e.Update(args); err != nil { + if err := p.Update(args); err != nil { return nil, err } - return e, nil + return p, nil } // Run starts the Processor component. -func (e *Processor) Run(ctx context.Context) error { - defer e.cancel() - return e.sched.Run(ctx) +func (p *Processor) Run(ctx context.Context) error { + defer p.cancel() + return p.sched.Run(ctx) } // Update implements component.Component. It will convert the Arguments into // configuration for OpenTelemetry Collector processor configuration and manage // the underlying OpenTelemetry Collector processor. -func (e *Processor) Update(args component.Arguments) error { +func (p *Processor) Update(args component.Arguments) error { pargs := args.(Arguments) host := scheduler.NewHost( - e.opts.Logger, + p.opts.Logger, scheduler.WithHostExtensions(pargs.Extensions()), scheduler.WithHostExporters(pargs.Exporters()), ) settings := otelcomponent.ProcessorCreateSettings{ TelemetrySettings: otelcomponent.TelemetrySettings{ - Logger: zapadapter.New(e.opts.Logger), + Logger: zapadapter.New(p.opts.Logger), // TODO(rfratto): expose tracing and logging statistics. // @@ -131,7 +131,7 @@ func (e *Processor) Update(args component.Arguments) error { }, } - exporterConfig := pargs.Convert() + processorConfig := pargs.Convert() var ( next = pargs.NextConsumers() @@ -140,25 +140,25 @@ func (e *Processor) Update(args component.Arguments) error { nextLogs = fanoutconsumer.Logs(next.Logs) ) - // Create instances of the exporter from our factory for each of our + // Create instances of the processor from our factory for each of our // supported telemetry signals. var components []otelcomponent.Component - tracesProcessor, err := e.factory.CreateTracesProcessor(e.ctx, settings, exporterConfig, nextTraces) + tracesProcessor, err := p.factory.CreateTracesProcessor(p.ctx, settings, processorConfig, nextTraces) if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { return err } else if tracesProcessor != nil { components = append(components, tracesProcessor) } - metricsProcessor, err := e.factory.CreateMetricsProcessor(e.ctx, settings, exporterConfig, nextMetrics) + metricsProcessor, err := p.factory.CreateMetricsProcessor(p.ctx, settings, processorConfig, nextMetrics) if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { return err } else if metricsProcessor != nil { components = append(components, metricsProcessor) } - logsProcessor, err := e.factory.CreateLogsProcessor(e.ctx, settings, exporterConfig, nextLogs) + logsProcessor, err := p.factory.CreateLogsProcessor(p.ctx, settings, processorConfig, nextLogs) if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { return err } else if logsProcessor != nil { @@ -166,12 +166,12 @@ func (e *Processor) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - e.sched.Schedule(host, components...) - e.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor) + p.sched.Schedule(host, components...) + p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor) return nil } // CurrentHealth implements component.HealthComponent. -func (e *Processor) CurrentHealth() component.Health { - return e.sched.CurrentHealth() +func (p *Processor) CurrentHealth() component.Health { + return p.sched.CurrentHealth() } From cacbd06620680d7af5e40161497517f224e3f21f Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Tue, 4 Oct 2022 08:42:00 -0400 Subject: [PATCH 4/5] fix incomplete comment in test --- component/otelcol/processor/processor_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/component/otelcol/processor/processor_test.go b/component/otelcol/processor/processor_test.go index fcaf6d6075e8..8aa4718d96fb 100644 --- a/component/otelcol/processor/processor_test.go +++ b/component/otelcol/processor/processor_test.go @@ -23,8 +23,8 @@ func TestProcessor(t *testing.T) { ctx := componenttest.TestContext(t) // Create an instance of a fake OpenTelemetry Collector processor which our - // Flow component will wrap around. It is expected - + // Flow component will wrap around. Our fake processor will immediately + // forward data to the connected consumer once one is made available to it. var ( consumer otelconsumer.Traces From 75191543a42d9252f155bdcb0807e4d87080da52 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Tue, 4 Oct 2022 08:45:08 -0400 Subject: [PATCH 5/5] s/exporter/processor --- component/otelcol/processor/processor_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/component/otelcol/processor/processor_test.go b/component/otelcol/processor/processor_test.go index 8aa4718d96fb..d448ff515ff5 100644 --- a/component/otelcol/processor/processor_test.go +++ b/component/otelcol/processor/processor_test.go @@ -109,7 +109,7 @@ func newTestEnvironment( Args: fakeProcessorArgs{}, Exports: otelcol.ConsumerExports{}, Build: func(opts component.Options, args component.Arguments) (component.Component, error) { - // Create a factory which always returns our instance of fakeExporter + // Create a factory which always returns our instance of fakeProcessor // defined above. factory := otelcomponent.NewProcessorFactory( "testcomponent", @@ -176,7 +176,7 @@ type fakeProcessor struct { ConsumeTracesFunc func(ctx context.Context, td ptrace.Traces) error } -var _ otelcomponent.TracesExporter = (*fakeProcessor)(nil) +var _ otelcomponent.TracesProcessor = (*fakeProcessor)(nil) func (fe *fakeProcessor) Start(ctx context.Context, host otelcomponent.Host) error { if fe.StartFunc != nil {