Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

component/otelcol/processor: create processor component abstraction #2284

Merged
merged 5 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions component/otelcol/processor/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Package processor exposes utilities to create a Flow component from
// OpenTelemetry Collector processors.
package processor

import (
"context"
"errors"
"os"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/internal/fanoutconsumer"
"github.com/grafana/agent/component/otelcol/internal/lazyconsumer"
"github.com/grafana/agent/component/otelcol/internal/scheduler"
"github.com/grafana/agent/component/otelcol/internal/zapadapter"
"github.com/grafana/agent/pkg/build"
otelcomponent "go.opentelemetry.io/collector/component"
otelconfig "go.opentelemetry.io/collector/config"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)

// Arguments is an extension of component.Arguments which contains necessary
// settings for OpenTelemetry Collector processors.
type Arguments interface {
component.Arguments

// Convert converts the Arguments into an OpenTelemetry Collector processor
// configuration.
Convert() otelconfig.Processor

// Extensions returns the set of extensions that the configured component is
// allowed to use.
Extensions() map[otelconfig.ComponentID]otelcomponent.Extension

// Exporters returns the set of exporters that are exposed to the configured
// component.
Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter

// NextConsumers returns the set of consumers to send data to.
NextConsumers() *otelcol.ConsumerArguments
}

// Processor is a Flow component shim which manages an OpenTelemetry Collector
// processor component.
type Processor struct {
ctx context.Context
cancel context.CancelFunc

opts component.Options
factory otelcomponent.ProcessorFactory
consumer *lazyconsumer.Consumer

sched *scheduler.Scheduler
}

var (
_ component.Component = (*Processor)(nil)
_ component.HealthComponent = (*Processor)(nil)
)

// New creates a new Flow component which encapsulates an OpenTelemetry
// Collector processor. args must hold a value of the argument type registered
// with the Flow component.
//
// The registered component must be registered to export the
// otelcol.ConsumerExports type, otherwise New will panic.
func New(opts component.Options, f otelcomponent.ProcessorFactory, args Arguments) (*Processor, error) {
ctx, cancel := context.WithCancel(context.Background())

consumer := lazyconsumer.New(ctx)

// Immediately set our state with our consumer. The exports will never change
// throughout the lifetime of our component.
//
// This will panic if the wrapping component is not registered to export
// otelcol.ConsumerExports.
opts.OnStateChange(otelcol.ConsumerExports{Input: consumer})

p := &Processor{
ctx: ctx,
cancel: cancel,

opts: opts,
factory: f,
consumer: consumer,

sched: scheduler.New(opts.Logger),
}
if err := p.Update(args); err != nil {
return nil, err
}
return p, nil
}

// Run starts the Processor component.
func (p *Processor) Run(ctx context.Context) error {
defer p.cancel()
return p.sched.Run(ctx)
}

// Update implements component.Component. It will convert the Arguments into
// configuration for OpenTelemetry Collector processor configuration and manage
// the underlying OpenTelemetry Collector processor.
func (p *Processor) Update(args component.Arguments) error {
pargs := args.(Arguments)

host := scheduler.NewHost(
p.opts.Logger,
scheduler.WithHostExtensions(pargs.Extensions()),
scheduler.WithHostExporters(pargs.Exporters()),
)

settings := otelcomponent.ProcessorCreateSettings{
TelemetrySettings: otelcomponent.TelemetrySettings{
Logger: zapadapter.New(p.opts.Logger),

// TODO(rfratto): expose tracing and logging statistics.
//
// We may want to put off tracing until we have native tracing
// instrumentation from Flow, but metrics should come sooner since we're
// already set up for supporting component-specific metrics.
TracerProvider: trace.NewNoopTracerProvider(),
MeterProvider: metric.NewNoopMeterProvider(),
},

BuildInfo: otelcomponent.BuildInfo{
Command: os.Args[0],
Description: "Grafana Agent",
Version: build.Version,
},
}

processorConfig := pargs.Convert()

var (
next = pargs.NextConsumers()
nextTraces = fanoutconsumer.Traces(next.Traces)
nextMetrics = fanoutconsumer.Metrics(next.Metrics)
nextLogs = fanoutconsumer.Logs(next.Logs)
)

// Create instances of the processor from our factory for each of our
// supported telemetry signals.
var components []otelcomponent.Component

tracesProcessor, err := p.factory.CreateTracesProcessor(p.ctx, settings, processorConfig, nextTraces)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
} else if tracesProcessor != nil {
components = append(components, tracesProcessor)
}

metricsProcessor, err := p.factory.CreateMetricsProcessor(p.ctx, settings, processorConfig, nextMetrics)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
} else if metricsProcessor != nil {
components = append(components, metricsProcessor)
}

logsProcessor, err := p.factory.CreateLogsProcessor(p.ctx, settings, processorConfig, nextLogs)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
} else if logsProcessor != nil {
components = append(components, logsProcessor)
}

// Schedule the components to run once our component is running.
p.sched.Schedule(host, components...)
p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor)
return nil
}

// CurrentHealth implements component.HealthComponent.
func (p *Processor) CurrentHealth() component.Health {
return p.sched.CurrentHealth()
}
207 changes: 207 additions & 0 deletions component/otelcol/processor/processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package processor_test

import (
"context"
"errors"
"testing"
"time"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/internal/fakeconsumer"
"github.com/grafana/agent/component/otelcol/processor"
"github.com/grafana/agent/pkg/flow/componenttest"
"github.com/grafana/agent/pkg/util"
"github.com/stretchr/testify/require"
otelcomponent "go.opentelemetry.io/collector/component"
otelconfig "go.opentelemetry.io/collector/config"
otelconsumer "go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/ptrace"
)

func TestProcessor(t *testing.T) {
ctx := componenttest.TestContext(t)

// Create an instance of a fake OpenTelemetry Collector processor which our
// Flow component will wrap around. Our fake processor will immediately
// forward data to the connected consumer once one is made available to it.
var (
consumer otelconsumer.Traces

waitConsumerTrigger = util.NewWaitTrigger()
onTracesConsumer = func(t otelconsumer.Traces) {
consumer = t
waitConsumerTrigger.Trigger()
}

waitTracesTrigger = util.NewWaitTrigger()
nextConsumer = &fakeconsumer.Consumer{
ConsumeTracesFunc: func(context.Context, ptrace.Traces) error {
waitTracesTrigger.Trigger()
return nil
},
}

// Our fake processor will wait for a consumer to be registered and then
// pass along data directly to it.
innerProcessor = &fakeProcessor{
ConsumeTracesFunc: func(ctx context.Context, td ptrace.Traces) error {
require.NoError(t, waitConsumerTrigger.Wait(time.Second), "no next consumer registered")
return consumer.ConsumeTraces(ctx, td)
},
}
)

// Create and start our Flow component. We then wait for it to export a
// consumer that we can send data to.
te := newTestEnvironment(t, innerProcessor, onTracesConsumer)
te.Start(fakeProcessorArgs{
Output: &otelcol.ConsumerArguments{
Metrics: []otelcol.Consumer{nextConsumer},
Logs: []otelcol.Consumer{nextConsumer},
Traces: []otelcol.Consumer{nextConsumer},
},
})

require.NoError(t, te.Controller.WaitExports(1*time.Second), "test component did not generate exports")
ce := te.Controller.Exports().(otelcol.ConsumerExports)

// Create a test set of traces and send it to our consumer in the background.
// We then wait for our channel to receive the traces, indicating that
// everything was wired up correctly.
go func() {
var err error

for {
err = ce.Input.ConsumeTraces(ctx, ptrace.NewTraces())

if errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
// Our component may not have been fully initialized yet. Wait a little
// bit before trying again.
time.Sleep(100 * time.Millisecond)
continue
}

require.NoError(t, err)
break
}
}()

require.NoError(t, waitTracesTrigger.Wait(time.Second), "consumer did not get invoked")
}

type testEnvironment struct {
t *testing.T

Controller *componenttest.Controller
}

func newTestEnvironment(
t *testing.T,
fp otelcomponent.TracesProcessor,
onTracesConsumer func(t otelconsumer.Traces),
) *testEnvironment {

t.Helper()

reg := component.Registration{
Name: "testcomponent",
Args: fakeProcessorArgs{},
Exports: otelcol.ConsumerExports{},
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
// Create a factory which always returns our instance of fakeProcessor
// defined above.
factory := otelcomponent.NewProcessorFactory(
"testcomponent",
func() otelconfig.Processor {
return fakeProcessorArgs{}.Convert()
},
otelcomponent.WithTracesProcessor(func(
_ context.Context,
_ otelcomponent.ProcessorCreateSettings,
_ otelconfig.Processor,
t otelconsumer.Traces,
) (otelcomponent.TracesProcessor, error) {

onTracesConsumer(t)
return fp, nil
}, otelcomponent.StabilityLevelUndefined),
)

return processor.New(opts, factory, args.(processor.Arguments))
},
}

return &testEnvironment{
t: t,
Controller: componenttest.NewControllerFromReg(util.TestLogger(t), reg),
}
}

func (te *testEnvironment) Start(args component.Arguments) {
go func() {
ctx := componenttest.TestContext(te.t)
err := te.Controller.Run(ctx, args)
require.NoError(te.t, err, "failed to run component")
}()
}

type fakeProcessorArgs struct {
Output *otelcol.ConsumerArguments
}

var _ processor.Arguments = fakeProcessorArgs{}

func (fa fakeProcessorArgs) Convert() otelconfig.Processor {
settings := otelconfig.NewProcessorSettings(otelconfig.NewComponentID("testcomponent"))
return &settings
}

func (fa fakeProcessorArgs) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension {
return nil
}

func (fa fakeProcessorArgs) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter {
return nil
}

func (fa fakeProcessorArgs) NextConsumers() *otelcol.ConsumerArguments {
return fa.Output
}

type fakeProcessor struct {
StartFunc func(ctx context.Context, host otelcomponent.Host) error
ShutdownFunc func(ctx context.Context) error
CapabilitiesFunc func() otelconsumer.Capabilities
ConsumeTracesFunc func(ctx context.Context, td ptrace.Traces) error
}

var _ otelcomponent.TracesProcessor = (*fakeProcessor)(nil)

func (fe *fakeProcessor) Start(ctx context.Context, host otelcomponent.Host) error {
if fe.StartFunc != nil {
return fe.StartFunc(ctx, host)
}
return nil
}

func (fe *fakeProcessor) Shutdown(ctx context.Context) error {
if fe.ShutdownFunc != nil {
return fe.ShutdownFunc(ctx)
}
return nil
}

func (fe *fakeProcessor) Capabilities() otelconsumer.Capabilities {
if fe.CapabilitiesFunc != nil {
return fe.CapabilitiesFunc()
}
return otelconsumer.Capabilities{}
}

func (fe *fakeProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
if fe.ConsumeTracesFunc != nil {
return fe.ConsumeTracesFunc(ctx, td)
}
return nil
}
2 changes: 1 addition & 1 deletion component/otelcol/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
type Arguments interface {
component.Arguments

// Convert converts the Arguments into an OpenTelemetry Collector exporter
// Convert converts the Arguments into an OpenTelemetry Collector receiver
// configuration.
Convert() otelconfig.Receiver

Expand Down