Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notify extensions of the Collector's effective configuration #6833

Merged
Merged
16 changes: 16 additions & 0 deletions .chloggen/configwatcher-interface.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: extension

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add optional `ConfigWatcher` interface

# One or more tracking issues or pull requests related to the change
issues: [6596]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Extensions implementing this interface will be notified of the Collector's effective config.
18 changes: 18 additions & 0 deletions .chloggen/extension-effective-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: otelcol

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add optional `ConfmapProvider` interface for Config Providers

# One or more tracking issues or pull requests related to the change
issues: [6596]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
This allows providing the Collector's configuration as a marshaled confmap.Conf object
from a ConfigProvider
17 changes: 17 additions & 0 deletions .chloggen/servicesettings-confmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add `CollectorConf` field to `service.Settings`

# One or more tracking issues or pull requests related to the change
issues: [6596]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
This field is intended to be used by the Collector to pass its effective configuration to the service.
8 changes: 8 additions & 0 deletions extension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
)

// Extension is the interface for objects hosted by the OpenTelemetry Collector that
Expand All @@ -32,6 +33,13 @@ type PipelineWatcher interface {
NotReady() error
}

// ConfigWatcher is an interface that should be implemented by an extension that
// wishes to be notified of the Collector's effective configuration.
type ConfigWatcher interface {
// NotifyConfig notifies the extension of the Collector's current effective configuration.
NotifyConfig(ctx context.Context, conf *confmap.Conf) error
}

// CreateSettings is passed to Factory.Create(...) function.
type CreateSettings struct {
// ID returns the ID of the component that will be created.
Expand Down
14 changes: 14 additions & 0 deletions otelcol/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/extension"
Expand Down Expand Up @@ -143,6 +144,17 @@ func (col *Collector) Shutdown() {
func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
col.setCollectorState(StateStarting)

var conf *confmap.Conf

if cp, ok := col.set.ConfigProvider.(ConfmapProvider); ok {
var err error
conf, err = cp.GetConfmap(ctx)

if err != nil {
return fmt.Errorf("failed to resolve config: %w", err)
}
}

cfg, err := col.set.ConfigProvider.Get(ctx, col.set.Factories)
if err != nil {
return fmt.Errorf("failed to get config: %w", err)
Expand All @@ -154,6 +166,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {

col.service, err = service.New(ctx, service.Settings{
BuildInfo: col.set.BuildInfo,
CollectorConf: conf,
Receivers: receiver.NewBuilder(cfg.Receivers, col.set.Factories.Receivers),
Processors: processor.NewBuilder(cfg.Processors, col.set.Factories.Processors),
Exporters: exporter.NewBuilder(cfg.Exporters, col.set.Factories.Exporters),
Expand All @@ -174,6 +187,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
return multierr.Combine(err, col.service.Shutdown(ctx))
}
col.setCollectorState(StateRunning)

return nil
}

Expand Down
45 changes: 45 additions & 0 deletions otelcol/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/converter/expandconverter"
)

func TestStateString(t *testing.T) {
Expand Down Expand Up @@ -369,6 +371,31 @@ func TestCollectorDryRun(t *testing.T) {
require.Error(t, col.DryRun(context.Background()))
}

func TestPassConfmapToServiceFailure(t *testing.T) {
factories, err := nopFactories()
require.NoError(t, err)

cfgProvider, err := NewConfigProvider(ConfigProviderSettings{
ResolverSettings: confmap.ResolverSettings{
URIs: []string{filepath.Join("testdata", "otelcol-invalid.yaml")},
Providers: makeMapProvidersMap(newFailureProvider()),
Converters: []confmap.Converter{expandconverter.New()},
},
})
require.NoError(t, err)

set := CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: cfgProvider,
}
col, err := NewCollector(set)
require.NoError(t, err)

err = col.Run(context.Background())
require.Error(t, err)
}

func startCollector(ctx context.Context, t *testing.T, col *Collector) *sync.WaitGroup {
wg := &sync.WaitGroup{}
wg.Add(1)
Expand All @@ -378,3 +405,21 @@ func startCollector(ctx context.Context, t *testing.T, col *Collector) *sync.Wai
}()
return wg
}

type failureProvider struct{}

func newFailureProvider() confmap.Provider {
return &failureProvider{}
}

func (fmp *failureProvider) Retrieve(_ context.Context, _ string, _ confmap.WatcherFunc) (*confmap.Retrieved, error) {
return nil, errors.New("a failure occurred during configuration retrieval")
}

func (*failureProvider) Scheme() string {
return "file"
}

func (*failureProvider) Shutdown(context.Context) error {
return nil
}
25 changes: 25 additions & 0 deletions otelcol/configprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,26 @@ type ConfigProvider interface {
Shutdown(ctx context.Context) error
}

// ConfmapProvider is an optional interface to be implemented by ConfigProviders
// to provide confmap.Conf objects representing a marshaled version of the
// Collector's configuration.
//
// The purpose of this interface is that otelcol.ConfigProvider structs do not
// necessarily need to use confmap.Conf as their underlying config structure.
type ConfmapProvider interface {
// GetConfmap resolves the Collector's configuration and provides it as a confmap.Conf object.
//
// Should never be called concurrently with itself or any ConfigProvider method.
GetConfmap(ctx context.Context) (*confmap.Conf, error)
}

type configProvider struct {
mapResolver *confmap.Resolver
}

var _ ConfigProvider = &configProvider{}
var _ ConfmapProvider = &configProvider{}

// ConfigProviderSettings are the settings to configure the behavior of the ConfigProvider.
type ConfigProviderSettings struct {
// ResolverSettings are the settings to configure the behavior of the confmap.Resolver.
Expand Down Expand Up @@ -106,6 +122,15 @@ func (cm *configProvider) Shutdown(ctx context.Context) error {
return cm.mapResolver.Shutdown(ctx)
}

func (cm *configProvider) GetConfmap(ctx context.Context) (*confmap.Conf, error) {
conf, err := cm.mapResolver.Resolve(ctx)
if err != nil {
return nil, fmt.Errorf("cannot resolve the configuration: %w", err)
}

return conf, nil
}

func newDefaultConfigProviderSettings(uris []string) ConfigProviderSettings {
return ConfigProviderSettings{
ResolverSettings: confmap.ResolverSettings{
Expand Down
121 changes: 64 additions & 57 deletions otelcol/configprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,69 +11,36 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
"gopkg.in/yaml.v3"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/provider/fileprovider"
"go.opentelemetry.io/collector/confmap/provider/yamlprovider"
"go.opentelemetry.io/collector/connector/connectortest"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/extension/extensiontest"
"go.opentelemetry.io/collector/processor/processortest"
"go.opentelemetry.io/collector/receiver/receivertest"
"go.opentelemetry.io/collector/service"
"go.opentelemetry.io/collector/service/pipelines"
"go.opentelemetry.io/collector/service/telemetry"
)

var configNop = &Config{
Receivers: map[component.ID]component.Config{component.NewID("nop"): receivertest.NewNopFactory().CreateDefaultConfig()},
Processors: map[component.ID]component.Config{component.NewID("nop"): processortest.NewNopFactory().CreateDefaultConfig()},
Exporters: map[component.ID]component.Config{component.NewID("nop"): exportertest.NewNopFactory().CreateDefaultConfig()},
Connectors: map[component.ID]component.Config{component.NewIDWithName("nop", "con"): connectortest.NewNopFactory().CreateDefaultConfig()},
Extensions: map[component.ID]component.Config{component.NewID("nop"): extensiontest.NewNopFactory().CreateDefaultConfig()},
Service: service.Config{
Extensions: []component.ID{component.NewID("nop")},
Pipelines: pipelines.Config{
component.NewID("traces"): {
Receivers: []component.ID{component.NewID("nop")},
Processors: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "con")},
},
component.NewID("metrics"): {
Receivers: []component.ID{component.NewID("nop")},
Processors: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop")},
},
component.NewID("logs"): {
Receivers: []component.ID{component.NewID("nop"), component.NewIDWithName("nop", "con")},
Processors: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop")},
},
},
Telemetry: telemetry.Config{
Logs: telemetry.LogsConfig{
Level: zapcore.InfoLevel,
Development: false,
Encoding: "console",
Sampling: &telemetry.LogsSamplingConfig{
Initial: 100,
Thereafter: 100,
},
OutputPaths: []string{"stderr"},
ErrorOutputPaths: []string{"stderr"},
DisableCaller: false,
DisableStacktrace: false,
InitialFields: map[string]any(nil),
},
Metrics: telemetry.MetricsConfig{
Level: configtelemetry.LevelBasic,
Address: "localhost:8888",
},
},
},
func newConfig(yamlBytes []byte, factories Factories) (*Config, error) {
var stringMap = map[string]interface{}{}
err := yaml.Unmarshal(yamlBytes, stringMap)

if err != nil {
return nil, err
}

conf := confmap.NewFromStringMap(stringMap)

cfg, err := unmarshal(conf, factories)
if err != nil {
return nil, err
}

return &Config{
Receivers: cfg.Receivers.Configs(),
Processors: cfg.Processors.Configs(),
Exporters: cfg.Exporters.Configs(),
Connectors: cfg.Connectors.Configs(),
Extensions: cfg.Extensions.Configs(),
Service: cfg.Service,
}, nil
}

func TestConfigProviderYaml(t *testing.T) {
Expand All @@ -97,6 +64,10 @@ func TestConfigProviderYaml(t *testing.T) {

cfg, err := cp.Get(context.Background(), factories)
require.NoError(t, err)

configNop, err := newConfig(yamlBytes, factories)
require.NoError(t, err)

assert.EqualValues(t, configNop, cfg)
}

Expand All @@ -118,5 +89,41 @@ func TestConfigProviderFile(t *testing.T) {

cfg, err := cp.Get(context.Background(), factories)
require.NoError(t, err)

yamlBytes, err := os.ReadFile(filepath.Join("testdata", "otelcol-nop.yaml"))
require.NoError(t, err)

configNop, err := newConfig(yamlBytes, factories)
require.NoError(t, err)

assert.EqualValues(t, configNop, cfg)
}

func TestGetConfmap(t *testing.T) {
uriLocation := "file:" + filepath.Join("testdata", "otelcol-nop.yaml")
provider := fileprovider.New()
set := ConfigProviderSettings{
ResolverSettings: confmap.ResolverSettings{
URIs: []string{uriLocation},
Providers: map[string]confmap.Provider{provider.Scheme(): provider},
},
}

configBytes, err := os.ReadFile(filepath.Join("testdata", "otelcol-nop.yaml"))
require.NoError(t, err)

yamlMap := map[string]any{}
err = yaml.Unmarshal(configBytes, yamlMap)
require.NoError(t, err)

cp, err := NewConfigProvider(set)
require.NoError(t, err)

cmp, ok := cp.(ConfmapProvider)
require.True(t, ok)

cmap, err := cmp.GetConfmap(context.Background())
require.NoError(t, err)

assert.EqualValues(t, yamlMap, cmap.ToStringMap())
}
12 changes: 12 additions & 0 deletions service/extensions/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.uber.org/multierr"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/service/internal/components"
"go.opentelemetry.io/collector/service/internal/zpages"
Expand Down Expand Up @@ -72,6 +73,17 @@ func (bes *Extensions) NotifyPipelineNotReady() error {
return errs
}

func (bes *Extensions) NotifyConfig(ctx context.Context, conf *confmap.Conf) error {
dmitryax marked this conversation as resolved.
Show resolved Hide resolved
var errs error
for _, ext := range bes.extMap {
if cw, ok := ext.(extension.ConfigWatcher); ok {
clonedConf := confmap.NewFromStringMap(conf.ToStringMap())
errs = multierr.Append(errs, cw.NotifyConfig(ctx, clonedConf))
}
}
return errs
}

func (bes *Extensions) GetExtensions() map[component.ID]component.Component {
result := make(map[component.ID]component.Component, len(bes.extMap))
for extID, v := range bes.extMap {
Expand Down
Loading