forked from grafana/agent
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
component/otelcol/auth: create auth extension component abstraction
OpenTelemetry supports many extensions. Extensions are used as a generic way to put "everything else" into OpenTelemetry. There are two types of extensions relevant to us: * [Authentication extensions][auth-ext]: used for both client and server authentication. * [Storage extensions][storage-ext]: used for external storage of state. Other extensions, such as [awsproxy][] are useful but better suited as generic Flow components rather than being shoved in the otelcol namespace, since they are unrelated to telemetry pipelines and aren't referenced by other otelcol components in the upstream configuration. This commit introduces a new package, component/otelcol/auth, which exposes a generic Flow component implementation which can run OpenTelemetry Collector extensions meant for authentication. While storage extensions may end up being Flow components eventually, it's currently marked as experimental upstream. We will reevaluate storage extension components once things have stabilized a little more. Like grafana#2227, grafana#2254, and grafana#2284, it leaves some work unfinished for future PRs: * Component-specific metrics are currently ignored. * Component-specific traces are currently ignored. As of this commit, there are no registered `otelcol.auth.*` components. Implementations for OpenTelemetry Collector Flow components will be done in future PRs. Related to grafana#2213. [auth-ext]: https://pkg.go.dev/go.opentelemetry.io/collector@v0.61.0/config/configauth [storage-ext]: https://pkg.go.dev/go.opentelemetry.io/collector/extension/experimental/storage [awsproxy]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.61.0/extension/awsproxy
- Loading branch information
Showing
2 changed files
with
248 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
// Package auth provides utilities to create a Flow component from | ||
// OpenTelemetry Collector authentication extensions. | ||
// | ||
// Other OpenTelemetry Collector extensions are better served as generic Flow | ||
// components rather than being placed in the otelcol namespace. | ||
package auth | ||
|
||
import ( | ||
"context" | ||
"os" | ||
|
||
"github.com/grafana/agent/component" | ||
"github.com/grafana/agent/component/otelcol/internal/scheduler" | ||
"github.com/grafana/agent/component/otelcol/internal/zapadapter" | ||
"github.com/grafana/agent/pkg/build" | ||
otelcomponent "go.opentelemetry.io/collector/component" | ||
otelconfig "go.opentelemetry.io/collector/config" | ||
"go.opentelemetry.io/otel/metric" | ||
"go.opentelemetry.io/otel/trace" | ||
) | ||
|
||
// Arguments is an extension of component.Arguments which contains necessary | ||
// settings for OpenTelemetry Collector authentication extensions. | ||
type Arguments interface { | ||
component.Arguments | ||
|
||
// Convert converts the Arguments into an OpenTelemetry Collector | ||
// authentication extension configuration. | ||
Convert() otelconfig.Extension | ||
|
||
// Extensions returns the set of extensions that the configured component is | ||
// allowed to use. | ||
Extensions() map[otelconfig.ComponentID]otelcomponent.Extension | ||
|
||
// Exporters returns the set of exporters that are exposed to the configured | ||
// component. | ||
Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter | ||
} | ||
|
||
// Exports is a common Exports type for Flow components which expose | ||
// OpenTelemetry Collector authentication extensions. | ||
type Exports struct { | ||
// Handler is the managed otelcomponent.Extension. Handler is updated any | ||
// time the extension is updated. | ||
Handler otelcomponent.Extension `river:"handler,attr"` | ||
} | ||
|
||
// Auth is a Flow component shim which manages an OpenTelemetry Collector | ||
// authentication extension. | ||
type Auth struct { | ||
ctx context.Context | ||
cancel context.CancelFunc | ||
|
||
opts component.Options | ||
factory otelcomponent.ExtensionFactory | ||
|
||
sched *scheduler.Scheduler | ||
} | ||
|
||
var ( | ||
_ component.Component = (*Auth)(nil) | ||
_ component.HealthComponent = (*Auth)(nil) | ||
) | ||
|
||
// New creates a new Flow component which encapsulates an OpenTelemetry | ||
// Collector authentication extension. args must hold a value of the argument | ||
// type registered with the Flow component. | ||
// | ||
// The registered component must be registered to export the Exports type from | ||
// this package, otherwise New will panic. | ||
func New(opts component.Options, f otelcomponent.ExtensionFactory, args Arguments) (*Auth, error) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
|
||
r := &Auth{ | ||
ctx: ctx, | ||
cancel: cancel, | ||
|
||
opts: opts, | ||
factory: f, | ||
|
||
sched: scheduler.New(opts.Logger), | ||
} | ||
if err := r.Update(args); err != nil { | ||
return nil, err | ||
} | ||
return r, nil | ||
} | ||
|
||
// Run starts the Auth component. | ||
func (r *Auth) Run(ctx context.Context) error { | ||
defer r.cancel() | ||
return r.sched.Run(ctx) | ||
} | ||
|
||
// Update implements component.Component. It will convert the Arguments into | ||
// configuration for OpenTelemetry Collector authentication extension | ||
// configuration and manage the underlying OpenTelemetry Collector extension. | ||
func (r *Auth) Update(args component.Arguments) error { | ||
rargs := args.(Arguments) | ||
|
||
host := scheduler.NewHost( | ||
r.opts.Logger, | ||
scheduler.WithHostExtensions(rargs.Extensions()), | ||
scheduler.WithHostExporters(rargs.Exporters()), | ||
) | ||
|
||
settings := otelcomponent.ExtensionCreateSettings{ | ||
TelemetrySettings: otelcomponent.TelemetrySettings{ | ||
Logger: zapadapter.New(r.opts.Logger), | ||
|
||
// TODO(rfratto): expose tracing and logging statistics. | ||
// | ||
// We may want to put off tracing until we have native tracing | ||
// instrumentation from Flow, but metrics should come sooner since we're | ||
// already set up for supporting component-specific metrics. | ||
TracerProvider: trace.NewNoopTracerProvider(), | ||
MeterProvider: metric.NewNoopMeterProvider(), | ||
}, | ||
|
||
BuildInfo: otelcomponent.BuildInfo{ | ||
Command: os.Args[0], | ||
Description: "Grafana Agent", | ||
Version: build.Version, | ||
}, | ||
} | ||
|
||
extensionConfig := rargs.Convert() | ||
|
||
// Create instances of the receiver from our factory for each of our | ||
// supported telemetry signals. | ||
var components []otelcomponent.Component | ||
|
||
ext, err := r.factory.CreateExtension(r.ctx, settings, extensionConfig) | ||
if err != nil { | ||
return err | ||
} else if ext != nil { | ||
components = append(components, ext) | ||
} | ||
|
||
// Inform listeners that our handler changed. | ||
r.opts.OnStateChange(Exports{Handler: ext}) | ||
|
||
// Schedule the components to run once our component is running. | ||
r.sched.Schedule(host, components...) | ||
return nil | ||
} | ||
|
||
// CurrentHealth implements component.HealthComponent. | ||
func (r *Auth) CurrentHealth() component.Health { | ||
return r.sched.CurrentHealth() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
package auth_test | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
"github.com/grafana/agent/component" | ||
"github.com/grafana/agent/component/otelcol" | ||
"github.com/grafana/agent/component/otelcol/auth" | ||
"github.com/grafana/agent/pkg/flow/componenttest" | ||
"github.com/grafana/agent/pkg/util" | ||
"github.com/stretchr/testify/require" | ||
otelcomponent "go.opentelemetry.io/collector/component" | ||
otelconfig "go.opentelemetry.io/collector/config" | ||
) | ||
|
||
func TestAuth(t *testing.T) { | ||
var ( | ||
waitCreated = util.NewWaitTrigger() | ||
onCreated = func() { | ||
waitCreated.Trigger() | ||
} | ||
) | ||
|
||
// Create and start our Flow component. We then wait for it to export a | ||
// consumer that we can send data to. | ||
te := newTestEnvironment(t, onCreated) | ||
te.Start(fakeAuthArgs{}) | ||
|
||
require.NoError(t, waitCreated.Wait(time.Second), "extension never created") | ||
} | ||
|
||
type testEnvironment struct { | ||
t *testing.T | ||
|
||
Controller *componenttest.Controller | ||
} | ||
|
||
func newTestEnvironment(t *testing.T, onCreated func()) *testEnvironment { | ||
t.Helper() | ||
|
||
reg := component.Registration{ | ||
Name: "testcomponent", | ||
Args: fakeAuthArgs{}, | ||
Exports: otelcol.ConsumerExports{}, | ||
Build: func(opts component.Options, args component.Arguments) (component.Component, error) { | ||
// Create a factory which always returns our instance of fakeReceiver | ||
// defined above. | ||
factory := otelcomponent.NewExtensionFactory( | ||
"testcomponent", | ||
func() otelconfig.Extension { return nil }, | ||
func( | ||
_ context.Context, | ||
_ otelcomponent.ExtensionCreateSettings, | ||
_ otelconfig.Extension, | ||
) (otelcomponent.Extension, error) { | ||
onCreated() | ||
return nil, nil | ||
}, otelcomponent.StabilityLevelUndefined, | ||
) | ||
|
||
return auth.New(opts, factory, args.(auth.Arguments)) | ||
}, | ||
} | ||
|
||
return &testEnvironment{ | ||
t: t, | ||
Controller: componenttest.NewControllerFromReg(util.TestLogger(t), reg), | ||
} | ||
} | ||
|
||
func (te *testEnvironment) Start(args component.Arguments) { | ||
go func() { | ||
ctx := componenttest.TestContext(te.t) | ||
err := te.Controller.Run(ctx, args) | ||
require.NoError(te.t, err, "failed to run component") | ||
}() | ||
} | ||
|
||
type fakeAuthArgs struct { | ||
} | ||
|
||
var _ auth.Arguments = fakeAuthArgs{} | ||
|
||
func (fa fakeAuthArgs) Convert() otelconfig.Extension { | ||
settings := otelconfig.NewExtensionSettings(otelconfig.NewComponentID("testcomponent")) | ||
return &settings | ||
} | ||
|
||
func (fa fakeAuthArgs) Extensions() map[otelconfig.ComponentID]otelcomponent.Extension { | ||
return nil | ||
} | ||
|
||
func (fa fakeAuthArgs) Exporters() map[otelconfig.DataType]map[otelconfig.ComponentID]otelcomponent.Exporter { | ||
return nil | ||
} |