Skip to content

Commit

Permalink
component/otelcol/processor: create processor component abstraction (#…
Browse files Browse the repository at this point in the history
…2284)

This commit introduces a new package, component/otelcol/processor, which
exposes a generic Flow component implementation which can run
OpenTelemetry Collector processor.

Like #2227 and #2254, it leaves some work unfinished for future PRs:

* Component-specific metrics are currently ignored.
* Component-specific traces are currently ignored.

As of this commit, there are no registered `otelcol.processor.*`
components. Implementations for OpenTelemetry Collector Flow components
will be done in future PRs.

Related to #2213.
  • Loading branch information
rfratto authored Oct 4, 2022
1 parent f5c1d0d commit 4bc7c07
Show file tree
Hide file tree
Showing 3 changed files with 385 additions and 1 deletion.
177 changes: 177 additions & 0 deletions component/otelcol/processor/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Package processor exposes utilities to create a Flow component from
// OpenTelemetry Collector processors.
package processor

import (
"context"
"errors"
"os"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/internal/fanoutconsumer"
"github.com/grafana/agent/component/otelcol/internal/lazyconsumer"
"github.com/grafana/agent/component/otelcol/internal/scheduler"
"github.com/grafana/agent/component/otelcol/internal/zapadapter"
"github.com/grafana/agent/pkg/build"
otelcomponent "go.opentelemetry.io/collector/component"
otelconfig "go.opentelemetry.io/collector/config"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)

// Arguments is an extension of component.Arguments which contains necessary
// settings for OpenTelemetry Collector processors.
type Arguments interface {
component.Arguments

// Convert converts the Arguments into an OpenTelemetry Collector processor
// configuration.
Convert() otelconfig.Processor

// Extensions returns the set of extensions that the configured component is
// allowed to use.
Extensions() map[otelconfig.ComponentID]otelcomponent.Extension

// Exporters returns the set of exporters that are exposed to the configured
// component.
Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter

// NextConsumers returns the set of consumers to send data to.
NextConsumers() *otelcol.ConsumerArguments
}

// Processor is a Flow component shim which manages an OpenTelemetry Collector
// processor component.
type Processor struct {
ctx context.Context
cancel context.CancelFunc

opts component.Options
factory otelcomponent.ProcessorFactory
consumer *lazyconsumer.Consumer

sched *scheduler.Scheduler
}

var (
_ component.Component = (*Processor)(nil)
_ component.HealthComponent = (*Processor)(nil)
)

// New creates a new Flow component which encapsulates an OpenTelemetry
// Collector processor. args must hold a value of the argument type registered
// with the Flow component.
//
// The registered component must be registered to export the
// otelcol.ConsumerExports type, otherwise New will panic.
func New(opts component.Options, f otelcomponent.ProcessorFactory, args Arguments) (*Processor, error) {
ctx, cancel := context.WithCancel(context.Background())

consumer := lazyconsumer.New(ctx)

// Immediately set our state with our consumer. The exports will never change
// throughout the lifetime of our component.
//
// This will panic if the wrapping component is not registered to export
// otelcol.ConsumerExports.
opts.OnStateChange(otelcol.ConsumerExports{Input: consumer})

p := &Processor{
ctx: ctx,
cancel: cancel,

opts: opts,
factory: f,
consumer: consumer,

sched: scheduler.New(opts.Logger),
}
if err := p.Update(args); err != nil {
return nil, err
}
return p, nil
}

// Run starts the Processor component.
func (p *Processor) Run(ctx context.Context) error {
defer p.cancel()
return p.sched.Run(ctx)
}

// Update implements component.Component. It will convert the Arguments into
// configuration for OpenTelemetry Collector processor configuration and manage
// the underlying OpenTelemetry Collector processor.
func (p *Processor) Update(args component.Arguments) error {
pargs := args.(Arguments)

host := scheduler.NewHost(
p.opts.Logger,
scheduler.WithHostExtensions(pargs.Extensions()),
scheduler.WithHostExporters(pargs.Exporters()),
)

settings := otelcomponent.ProcessorCreateSettings{
TelemetrySettings: otelcomponent.TelemetrySettings{
Logger: zapadapter.New(p.opts.Logger),

// TODO(rfratto): expose tracing and logging statistics.
//
// We may want to put off tracing until we have native tracing
// instrumentation from Flow, but metrics should come sooner since we're
// already set up for supporting component-specific metrics.
TracerProvider: trace.NewNoopTracerProvider(),
MeterProvider: metric.NewNoopMeterProvider(),
},

BuildInfo: otelcomponent.BuildInfo{
Command: os.Args[0],
Description: "Grafana Agent",
Version: build.Version,
},
}

processorConfig := pargs.Convert()

var (
next = pargs.NextConsumers()
nextTraces = fanoutconsumer.Traces(next.Traces)
nextMetrics = fanoutconsumer.Metrics(next.Metrics)
nextLogs = fanoutconsumer.Logs(next.Logs)
)

// Create instances of the processor from our factory for each of our
// supported telemetry signals.
var components []otelcomponent.Component

tracesProcessor, err := p.factory.CreateTracesProcessor(p.ctx, settings, processorConfig, nextTraces)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
} else if tracesProcessor != nil {
components = append(components, tracesProcessor)
}

metricsProcessor, err := p.factory.CreateMetricsProcessor(p.ctx, settings, processorConfig, nextMetrics)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
} else if metricsProcessor != nil {
components = append(components, metricsProcessor)
}

logsProcessor, err := p.factory.CreateLogsProcessor(p.ctx, settings, processorConfig, nextLogs)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
} else if logsProcessor != nil {
components = append(components, logsProcessor)
}

// Schedule the components to run once our component is running.
p.sched.Schedule(host, components...)
p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor)
return nil
}

// CurrentHealth implements component.HealthComponent.
func (p *Processor) CurrentHealth() component.Health {
return p.sched.CurrentHealth()
}
207 changes: 207 additions & 0 deletions component/otelcol/processor/processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package processor_test

import (
"context"
"errors"
"testing"
"time"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/internal/fakeconsumer"
"github.com/grafana/agent/component/otelcol/processor"
"github.com/grafana/agent/pkg/flow/componenttest"
"github.com/grafana/agent/pkg/util"
"github.com/stretchr/testify/require"
otelcomponent "go.opentelemetry.io/collector/component"
otelconfig "go.opentelemetry.io/collector/config"
otelconsumer "go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func TestProcessor(t *testing.T) {
ctx := componenttest.TestContext(t)

// Create an instance of a fake OpenTelemetry Collector processor which our
// Flow component will wrap around. Our fake processor will immediately
// forward data to the connected consumer once one is made available to it.
var (
consumer otelconsumer.Traces

waitConsumerTrigger = util.NewWaitTrigger()
onTracesConsumer = func(t otelconsumer.Traces) {
consumer = t
waitConsumerTrigger.Trigger()
}

waitTracesTrigger = util.NewWaitTrigger()
nextConsumer = &fakeconsumer.Consumer{
ConsumeTracesFunc: func(context.Context, ptrace.Traces) error {
waitTracesTrigger.Trigger()
return nil
},
}

// Our fake processor will wait for a consumer to be registered and then
// pass along data directly to it.
innerProcessor = &fakeProcessor{
ConsumeTracesFunc: func(ctx context.Context, td ptrace.Traces) error {
require.NoError(t, waitConsumerTrigger.Wait(time.Second), "no next consumer registered")
return consumer.ConsumeTraces(ctx, td)
},
}
)

// Create and start our Flow component. We then wait for it to export a
// consumer that we can send data to.
te := newTestEnvironment(t, innerProcessor, onTracesConsumer)
te.Start(fakeProcessorArgs{
Output: &otelcol.ConsumerArguments{
Metrics: []otelcol.Consumer{nextConsumer},
Logs: []otelcol.Consumer{nextConsumer},
Traces: []otelcol.Consumer{nextConsumer},
},
})

require.NoError(t, te.Controller.WaitExports(1*time.Second), "test component did not generate exports")
ce := te.Controller.Exports().(otelcol.ConsumerExports)

// Create a test set of traces and send it to our consumer in the background.
// We then wait for our channel to receive the traces, indicating that
// everything was wired up correctly.
go func() {
var err error

for {
err = ce.Input.ConsumeTraces(ctx, ptrace.NewTraces())

if errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
// Our component may not have been fully initialized yet. Wait a little
// bit before trying again.
time.Sleep(100 * time.Millisecond)
continue
}

require.NoError(t, err)
break
}
}()

require.NoError(t, waitTracesTrigger.Wait(time.Second), "consumer did not get invoked")
}

type testEnvironment struct {
t *testing.T

Controller *componenttest.Controller
}

func newTestEnvironment(
t *testing.T,
fp otelcomponent.TracesProcessor,
onTracesConsumer func(t otelconsumer.Traces),
) *testEnvironment {

t.Helper()

reg := component.Registration{
Name: "testcomponent",
Args: fakeProcessorArgs{},
Exports: otelcol.ConsumerExports{},
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
// Create a factory which always returns our instance of fakeProcessor
// defined above.
factory := otelcomponent.NewProcessorFactory(
"testcomponent",
func() otelconfig.Processor {
return fakeProcessorArgs{}.Convert()
},
otelcomponent.WithTracesProcessor(func(
_ context.Context,
_ otelcomponent.ProcessorCreateSettings,
_ otelconfig.Processor,
t otelconsumer.Traces,
) (otelcomponent.TracesProcessor, error) {

onTracesConsumer(t)
return fp, nil
}, otelcomponent.StabilityLevelUndefined),
)

return processor.New(opts, factory, args.(processor.Arguments))
},
}

return &testEnvironment{
t: t,
Controller: componenttest.NewControllerFromReg(util.TestLogger(t), reg),
}
}

func (te *testEnvironment) Start(args component.Arguments) {
go func() {
ctx := componenttest.TestContext(te.t)
err := te.Controller.Run(ctx, args)
require.NoError(te.t, err, "failed to run component")
}()
}

type fakeProcessorArgs struct {
Output *otelcol.ConsumerArguments
}

var _ processor.Arguments = fakeProcessorArgs{}

func (fa fakeProcessorArgs) Convert() otelconfig.Processor {
settings := otelconfig.NewProcessorSettings(otelconfig.NewComponentID("testcomponent"))
return &settings
}

func (fa fakeProcessorArgs) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension {
return nil
}

func (fa fakeProcessorArgs) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter {
return nil
}

func (fa fakeProcessorArgs) NextConsumers() *otelcol.ConsumerArguments {
return fa.Output
}

type fakeProcessor struct {
StartFunc func(ctx context.Context, host otelcomponent.Host) error
ShutdownFunc func(ctx context.Context) error
CapabilitiesFunc func() otelconsumer.Capabilities
ConsumeTracesFunc func(ctx context.Context, td ptrace.Traces) error
}

var _ otelcomponent.TracesProcessor = (*fakeProcessor)(nil)

func (fe *fakeProcessor) Start(ctx context.Context, host otelcomponent.Host) error {
if fe.StartFunc != nil {
return fe.StartFunc(ctx, host)
}
return nil
}

func (fe *fakeProcessor) Shutdown(ctx context.Context) error {
if fe.ShutdownFunc != nil {
return fe.ShutdownFunc(ctx)
}
return nil
}

func (fe *fakeProcessor) Capabilities() otelconsumer.Capabilities {
if fe.CapabilitiesFunc != nil {
return fe.CapabilitiesFunc()
}
return otelconsumer.Capabilities{}
}

func (fe *fakeProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
if fe.ConsumeTracesFunc != nil {
return fe.ConsumeTracesFunc(ctx, td)
}
return nil
}
2 changes: 1 addition & 1 deletion component/otelcol/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
type Arguments interface {
component.Arguments

// Convert converts the Arguments into an OpenTelemetry Collector exporter
// Convert converts the Arguments into an OpenTelemetry Collector receiver
// configuration.
Convert() otelconfig.Receiver

Expand Down

0 comments on commit 4bc7c07

Please sign in to comment.