Skip to content

Commit

Permalink
component/otelcol/auth: create auth extension component abstraction
Browse files Browse the repository at this point in the history
OpenTelemetry supports many extensions. Extensions are used as a generic
way to put "everything else" into OpenTelemetry. To quote their
[documentation][ext-docs]:

> Extension is the interface for objects hosted by the OpenTelemetry
> Collector that don't participate directly on data pipelines but provide
> some functionality to the service, examples: health check endpoint,
> z-pages, etc.

There are two types of extensions relevant to us:

* [Authentication extensions][auth-ext]: used for both client and server
  authentication.
* [Storage extensions][storage-ext]: used for external storage of state.

Other extensions, such as [awsproxy][] are useful but better suited as
generic Flow components rather than being shoved in the otelcol
namespace, since they are unrelated to telemetry pipelines and aren't
referenced by other otelcol components in the upstream configuration.

This commit introduces a new package, component/otelcol/auth, which
exposes a generic Flow component implementation which can run
OpenTelemetry Collector extensions meant for authentication.

While storage extensions may end up being Flow components eventually,
it's currently marked as experimental upstream. We will reevaluate
storage extension components once things have stabilized a little more.

Like grafana#2227, grafana#2254, and grafana#2284, it leaves some work unfinished for future
PRs:

* Component-specific metrics are currently ignored.
* Component-specific traces are currently ignored.

As of this commit, there are no registered `otelcol.auth.*` components.
Implementations for OpenTelemetry Collector Flow components will be done
in future PRs.

Related to grafana#2213.

[ext-docs]: https://pkg.go.dev/go.opentelemetry.io/collector@v0.61.0/component#Extension
[auth-ext]: https://pkg.go.dev/go.opentelemetry.io/collector@v0.61.0/config/configauth
[storage-ext]: https://pkg.go.dev/go.opentelemetry.io/collector/extension/experimental/storage
[awsproxy]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.61.0/extension/awsproxy
  • Loading branch information
rfratto committed Oct 12, 2022
1 parent e099fac commit 0a17ad1
Show file tree
Hide file tree
Showing 2 changed files with 249 additions and 0 deletions.
151 changes: 151 additions & 0 deletions component/otelcol/auth/auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Package auth provides utilities to create a Flow component from
// OpenTelemetry Collector authentication extensions.
//
// Other OpenTelemetry Collector extensions are better served as generic Flow
// components rather than being placed in the otelcol namespace.
package auth

import (
"context"
"os"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/otelcol/internal/scheduler"
"github.com/grafana/agent/component/otelcol/internal/zapadapter"
"github.com/grafana/agent/pkg/build"
otelcomponent "go.opentelemetry.io/collector/component"
otelconfig "go.opentelemetry.io/collector/config"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)

// Arguments is an extension of component.Arguments which contains necessary
// settings for OpenTelemetry Collector authentication extensions.
type Arguments interface {
component.Arguments

// Convert converts the Arguments into an OpenTelemetry Collector
// authentication extension configuration.
Convert() otelconfig.Extension

// Extensions returns the set of extensions that the configured component is
// allowed to use.
Extensions() map[otelconfig.ComponentID]otelcomponent.Extension

// Exporters returns the set of exporters that are exposed to the configured
// component.
Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter
}

// Exports is a common Exports type for Flow components which expose
// OpenTelemetry Collector authentication extensions.
type Exports struct {
// Handler is the managed otelcomponent.Extension. Handler is updated any
// time the extension is updated.
Handler otelcomponent.Extension `river:"handler,attr"`
}

// Auth is a Flow component shim which manages an OpenTelemetry Collector
// authentication extension.
type Auth struct {
ctx context.Context
cancel context.CancelFunc

opts component.Options
factory otelcomponent.ExtensionFactory

sched *scheduler.Scheduler
}

var (
_ component.Component = (*Auth)(nil)
_ component.HealthComponent = (*Auth)(nil)
)

// New creates a new Flow component which encapsulates an OpenTelemetry
// Collector authentication extension. args must hold a value of the argument
// type registered with the Flow component.
//
// The registered component must be registered to export the Exports type from
// this package, otherwise New will panic.
func New(opts component.Options, f otelcomponent.ExtensionFactory, args Arguments) (*Auth, error) {
ctx, cancel := context.WithCancel(context.Background())

r := &Auth{
ctx: ctx,
cancel: cancel,

opts: opts,
factory: f,

sched: scheduler.New(opts.Logger),
}
if err := r.Update(args); err != nil {
return nil, err
}
return r, nil
}

// Run starts the Auth component.
func (r *Auth) Run(ctx context.Context) error {
defer r.cancel()
return r.sched.Run(ctx)
}

// Update implements component.Component. It will convert the Arguments into
// configuration for OpenTelemetry Collector authentication extension
// configuration and manage the underlying OpenTelemetry Collector extension.
func (r *Auth) Update(args component.Arguments) error {
rargs := args.(Arguments)

host := scheduler.NewHost(
r.opts.Logger,
scheduler.WithHostExtensions(rargs.Extensions()),
scheduler.WithHostExporters(rargs.Exporters()),
)

settings := otelcomponent.ExtensionCreateSettings{
TelemetrySettings: otelcomponent.TelemetrySettings{
Logger: zapadapter.New(r.opts.Logger),

// TODO(rfratto): expose tracing and logging statistics.
//
// We may want to put off tracing until we have native tracing
// instrumentation from Flow, but metrics should come sooner since we're
// already set up for supporting component-specific metrics.
TracerProvider: trace.NewNoopTracerProvider(),
MeterProvider: metric.NewNoopMeterProvider(),
},

BuildInfo: otelcomponent.BuildInfo{
Command: os.Args[0],
Description: "Grafana Agent",
Version: build.Version,
},
}

extensionConfig := rargs.Convert()

// Create instances of the receiver from our factory for each of our
// supported telemetry signals.
var components []otelcomponent.Component

ext, err := r.factory.CreateExtension(r.ctx, settings, extensionConfig)
if err != nil {
return err
} else if ext != nil {
components = append(components, ext)
}

// Inform listeners that our handler changed.
r.opts.OnStateChange(Exports{Handler: ext})

// Schedule the components to run once our component is running.
r.sched.Schedule(host, components...)
return nil
}

// CurrentHealth implements component.HealthComponent.
func (r *Auth) CurrentHealth() component.Health {
return r.sched.CurrentHealth()
}
98 changes: 98 additions & 0 deletions component/otelcol/auth/auth_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package auth_test

import (
"context"
"testing"
"time"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/auth"
"github.com/grafana/agent/pkg/flow/componenttest"
"github.com/grafana/agent/pkg/util"
"github.com/stretchr/testify/require"
otelcomponent "go.opentelemetry.io/collector/component"
otelconfig "go.opentelemetry.io/collector/config"
)

func TestAuth(t *testing.T) {
var (
waitCreated = util.NewWaitTrigger()
onCreated = func() {
waitCreated.Trigger()
}
)

// Create and start our Flow component. We then wait for it to export a
// consumer that we can send data to.
te := newTestEnvironment(t, onCreated)
te.Start(fakeAuthArgs{})

require.NoError(t, waitCreated.Wait(time.Second), "extension never created")
}

type testEnvironment struct {
t *testing.T

Controller *componenttest.Controller
}

func newTestEnvironment(t *testing.T, onCreated func()) *testEnvironment {
t.Helper()

reg := component.Registration{
Name: "testcomponent",
Args: fakeAuthArgs{},
Exports: otelcol.ConsumerExports{},
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
// Create a factory which always returns our instance of fakeReceiver
// defined above.
factory := otelcomponent.NewExtensionFactory(
"testcomponent",
func() otelconfig.Extension { return nil },
func(
_ context.Context,
_ otelcomponent.ExtensionCreateSettings,
_ otelconfig.Extension,
) (otelcomponent.Extension, error) {

onCreated()
return nil, nil
}, otelcomponent.StabilityLevelUndefined,
)

return auth.New(opts, factory, args.(auth.Arguments))
},
}

return &testEnvironment{
t: t,
Controller: componenttest.NewControllerFromReg(util.TestLogger(t), reg),
}
}

func (te *testEnvironment) Start(args component.Arguments) {
go func() {
ctx := componenttest.TestContext(te.t)
err := te.Controller.Run(ctx, args)
require.NoError(te.t, err, "failed to run component")
}()
}

type fakeAuthArgs struct {
}

var _ auth.Arguments = fakeAuthArgs{}

func (fa fakeAuthArgs) Convert() otelconfig.Extension {
settings := otelconfig.NewExtensionSettings(otelconfig.NewComponentID("testcomponent"))
return &settings
}

func (fa fakeAuthArgs) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension {
return nil
}

func (fa fakeAuthArgs) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter {
return nil
}

0 comments on commit 0a17ad1

Please sign in to comment.