diff --git a/component/otelcol/auth/auth.go b/component/otelcol/auth/auth.go new file mode 100644 index 000000000000..892a11c187a4 --- /dev/null +++ b/component/otelcol/auth/auth.go @@ -0,0 +1,150 @@ +// Package auth provides utilities to create a Flow component from +// OpenTelemetry Collector authentication extensions. +// +// Other OpenTelemetry Collector extensions are better served as generic Flow +// components rather than being placed in the otelcol namespace. +package auth + +import ( + "context" + "os" + + "github.com/grafana/agent/component" + "github.com/grafana/agent/component/otelcol/internal/scheduler" + "github.com/grafana/agent/component/otelcol/internal/zapadapter" + "github.com/grafana/agent/pkg/build" + otelcomponent "go.opentelemetry.io/collector/component" + otelconfig "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" +) + +// Arguments is an extension of component.Arguments which contains necessary +// settings for OpenTelemetry Collector authentication extensions. +type Arguments interface { + component.Arguments + + // Convert converts the Arguments into an OpenTelemetry Collector + // authentication extension configuration. + Convert() otelconfig.Extension + + // Extensions returns the set of extensions that the configured component is + // allowed to use. + Extensions() map[otelconfig.ComponentID]otelcomponent.Extension + + // Exporters returns the set of exporters that are exposed to the configured + // component. + Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter +} + +// Exports is a common Exports type for Flow components which expose +// OpenTelemetry Collector authentication extensions. +type Exports struct { + // Handler is the managed otelcomponent.Extension. Handler is updated any + // time the extension is updated. + Handler otelcomponent.Extension `river:"handler,attr"` +} + +// Auth is a Flow component shim which manages an OpenTelemetry Collector +// authentication extension. +type Auth struct { + ctx context.Context + cancel context.CancelFunc + + opts component.Options + factory otelcomponent.ExtensionFactory + + sched *scheduler.Scheduler +} + +var ( + _ component.Component = (*Auth)(nil) + _ component.HealthComponent = (*Auth)(nil) +) + +// New creates a new Flow component which encapsulates an OpenTelemetry +// Collector authentication extension. args must hold a value of the argument +// type registered with the Flow component. +// +// The registered component must be registered to export the Exports type from +// this package, otherwise New will panic. +func New(opts component.Options, f otelcomponent.ExtensionFactory, args Arguments) (*Auth, error) { + ctx, cancel := context.WithCancel(context.Background()) + + r := &Auth{ + ctx: ctx, + cancel: cancel, + + opts: opts, + factory: f, + + sched: scheduler.New(opts.Logger), + } + if err := r.Update(args); err != nil { + return nil, err + } + return r, nil +} + +// Run starts the Auth component. +func (r *Auth) Run(ctx context.Context) error { + defer r.cancel() + return r.sched.Run(ctx) +} + +// Update implements component.Component. It will convert the Arguments into +// configuration for OpenTelemetry Collector authentication extension +// configuration and manage the underlying OpenTelemetry Collector extension. +func (r *Auth) Update(args component.Arguments) error { + rargs := args.(Arguments) + + host := scheduler.NewHost( + r.opts.Logger, + scheduler.WithHostExtensions(rargs.Extensions()), + scheduler.WithHostExporters(rargs.Exporters()), + ) + + settings := otelcomponent.ExtensionCreateSettings{ + TelemetrySettings: otelcomponent.TelemetrySettings{ + Logger: zapadapter.New(r.opts.Logger), + + // TODO(rfratto): expose tracing and logging statistics. + // + // We may want to put off tracing until we have native tracing + // instrumentation from Flow, but metrics should come sooner since we're + // already set up for supporting component-specific metrics. + TracerProvider: trace.NewNoopTracerProvider(), + MeterProvider: metric.NewNoopMeterProvider(), + }, + + BuildInfo: otelcomponent.BuildInfo{ + Command: os.Args[0], + Description: "Grafana Agent", + Version: build.Version, + }, + } + + extensionConfig := rargs.Convert() + + // Create instances of the extension from our factory. + var components []otelcomponent.Component + + ext, err := r.factory.CreateExtension(r.ctx, settings, extensionConfig) + if err != nil { + return err + } else if ext != nil { + components = append(components, ext) + } + + // Inform listeners that our handler changed. + r.opts.OnStateChange(Exports{Handler: ext}) + + // Schedule the components to run once our component is running. + r.sched.Schedule(host, components...) + return nil +} + +// CurrentHealth implements component.HealthComponent. +func (r *Auth) CurrentHealth() component.Health { + return r.sched.CurrentHealth() +} diff --git a/component/otelcol/auth/auth_test.go b/component/otelcol/auth/auth_test.go new file mode 100644 index 000000000000..0d0d4a9901b3 --- /dev/null +++ b/component/otelcol/auth/auth_test.go @@ -0,0 +1,96 @@ +package auth_test + +import ( + "context" + "testing" + "time" + + "github.com/grafana/agent/component" + "github.com/grafana/agent/component/otelcol" + "github.com/grafana/agent/component/otelcol/auth" + "github.com/grafana/agent/pkg/flow/componenttest" + "github.com/grafana/agent/pkg/util" + "github.com/stretchr/testify/require" + otelcomponent "go.opentelemetry.io/collector/component" + otelconfig "go.opentelemetry.io/collector/config" +) + +func TestAuth(t *testing.T) { + var ( + waitCreated = util.NewWaitTrigger() + onCreated = func() { + waitCreated.Trigger() + } + ) + + // Create and start our Flow component. We then wait for it to export a + // consumer that we can send data to. + te := newTestEnvironment(t, onCreated) + te.Start(fakeAuthArgs{}) + + require.NoError(t, waitCreated.Wait(time.Second), "extension never created") +} + +type testEnvironment struct { + t *testing.T + + Controller *componenttest.Controller +} + +func newTestEnvironment(t *testing.T, onCreated func()) *testEnvironment { + t.Helper() + + reg := component.Registration{ + Name: "testcomponent", + Args: fakeAuthArgs{}, + Exports: otelcol.ConsumerExports{}, + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + factory := otelcomponent.NewExtensionFactory( + "testcomponent", + func() otelconfig.Extension { return nil }, + func( + _ context.Context, + _ otelcomponent.ExtensionCreateSettings, + _ otelconfig.Extension, + ) (otelcomponent.Extension, error) { + + onCreated() + return nil, nil + }, otelcomponent.StabilityLevelUndefined, + ) + + return auth.New(opts, factory, args.(auth.Arguments)) + }, + } + + return &testEnvironment{ + t: t, + Controller: componenttest.NewControllerFromReg(util.TestLogger(t), reg), + } +} + +func (te *testEnvironment) Start(args component.Arguments) { + go func() { + ctx := componenttest.TestContext(te.t) + err := te.Controller.Run(ctx, args) + require.NoError(te.t, err, "failed to run component") + }() +} + +type fakeAuthArgs struct { +} + +var _ auth.Arguments = fakeAuthArgs{} + +func (fa fakeAuthArgs) Convert() otelconfig.Extension { + settings := otelconfig.NewExtensionSettings(otelconfig.NewComponentID("testcomponent")) + return &settings +} + +func (fa fakeAuthArgs) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension { + return nil +} + +func (fa fakeAuthArgs) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter { + return nil +}