Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 88 additions & 1 deletion internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

"go.elastic.co/apm/v2"

"github.com/elastic/go-ucfg"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
Expand Down Expand Up @@ -215,7 +217,7 @@ func New(
if err != nil {
return nil, nil, nil, err
}
configMgr = coordinator.NewConfigPatchManager(managed, PatchAPMConfig(log, rawConfig))
configMgr = coordinator.NewConfigPatchManager(managed, injectOutputOverrides(log, rawConfig), PatchAPMConfig(log, rawConfig))
}
}

Expand Down Expand Up @@ -305,3 +307,88 @@ func mergeFleetConfig(ctx context.Context, rawConfig *config.Config) (storage.St

return store, cfg, nil
}

// injectOutputOverrides takes local configuration for specific outputs and applies them to the configuration.
//
// The name of the output must match or no options will be overwritten.
func injectOutputOverrides(log *logger.Logger, rawConfig *config.Config) func(change coordinator.ConfigChange) coordinator.ConfigChange {
// merging uses no resolving as the AST variable substitution occurs on the outputs
// append the values to arrays (don't allow complete overriding of arrays)
mergeOpts := config.NoResolveOptions
mergeOpts = append(mergeOpts, ucfg.AppendValues)

// parse the outputs defined local in the configuration
// in the case the configuration as no outputs defined (most cases) then noop can be used
var parsed struct {
Outputs map[string]*ucfg.Config `config:"outputs"`
}
err := rawConfig.UnpackTo(&parsed)
if err != nil {
log.Errorf("error decoding raw config, output injection disabled: %v", err)
return noop
}
if len(parsed.Outputs) == 0 {
return noop
}

return func(change coordinator.ConfigChange) coordinator.ConfigChange {
cfg := change.Config()
outputs, err := cfg.Agent.Child("outputs", -1)
if err != nil {
if !isMissingError(err) {
// expecting only ErrMissing
log.Errorf("error getting outputs from config: %v", err)
}
return change
}
for outputName, outputOverrides := range parsed.Outputs {
cfgOutput, err := outputs.Child(outputName, -1)
if err != nil {
// no output with that name; do nothing
continue
}
// the order of merging is important
//
// this merges the ConfigChange on-top of the rawConfig to ensure that the
// ConfigChange options always override local options
//
// meaning that local options are only applied in the case that the ConfigChange
// doesn't provide a different value for those fields
err = func() error {
clone, err := ucfg.NewFrom(outputOverrides, mergeOpts...)
if err != nil {
return fmt.Errorf("failed to clone output overrides: %w", err)
}
err = clone.Merge(cfgOutput, mergeOpts...)
if err != nil {
return fmt.Errorf("failed to merge output over overrides: %w", err)
}
err = outputs.SetChild(outputName, -1, clone, mergeOpts...)
if err != nil {
return fmt.Errorf("failed to re-set output with overrides: %w", err)
}
return nil
}()
if err != nil {
log.Errorf("failed to perform output injection for output %s: %v", outputName, err)
continue
}
log.Infof("successfully injected output overrides for output %s", outputName)
}
return change
}
}

// isMissingError returns true if the error is because the field is missing
//
// Sadly go-ucfg doesn't support Unwrap interface so using `errors.Is(err, ucfg.ErrMissing)` doesn't work
// this specific function is required to ensure its an `ErrMissing` error.
func isMissingError(err error) bool {
//nolint:errorlint // limitation of go-ucfg (read docstring)
switch v := err.(type) {
case ucfg.Error:
//nolint:errorlint // limitation of go-ucfg (read docstring)
return v.Reason() == ucfg.ErrMissing
}
return false
}
215 changes: 215 additions & 0 deletions internal/pkg/agent/application/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,218 @@ func TestLimitsLog(t *testing.T) {
logs := obs.FilterMessageSnippet(expLogLine)
require.Equalf(t, 1, logs.Len(), "expected one log message about limits change")
}

func TestInjectOutputOverrides(t *testing.T) {
scenarios := []struct {
Name string
RawConfig map[string]any
ChangeConfig map[string]any
Result map[string]any
}{
{
Name: "rawConfig no outputs",
RawConfig: map[string]any{
"inputs": []any{},
},
ChangeConfig: map[string]any{
"outputs": map[string]any{
"default": map[string]any{
"type": "elasticsearch",
},
},
},
Result: map[string]any{
"outputs": map[string]any{
"default": map[string]any{
"type": "elasticsearch",
},
},
},
},
{
Name: "change config no outputs",
RawConfig: map[string]any{
"outputs": map[string]any{
"default": map[string]any{
"type": "elasticsearch",
},
},
},
ChangeConfig: map[string]any{
"inputs": []any{},
},
Result: map[string]any{
"inputs": []any{},
},
},
{
Name: "mismatch output",
RawConfig: map[string]any{
"outputs": map[string]any{
"default": map[string]any{
"type": "elasticsearch",
"headers": map[string]any{
"X-App-Auth": "token-123",
},
},
},
},
ChangeConfig: map[string]any{
"outputs": map[string]any{
"elasticsearch": map[string]any{
"type": "elasticsearch",
},
},
},
Result: map[string]any{
"outputs": map[string]any{
"elasticsearch": map[string]any{
"type": "elasticsearch",
},
},
},
},
{
Name: "simple merge",
RawConfig: map[string]any{
"outputs": map[string]any{
"default": map[string]any{
"type": "elasticsearch",
"headers": map[string]any{
"X-App-Auth": "token-123",
},
},
},
},
ChangeConfig: map[string]any{
"outputs": map[string]any{
"default": map[string]any{
"type": "elasticsearch",
},
},
},
Result: map[string]any{
"outputs": map[string]any{
"default": map[string]any{
"type": "elasticsearch",
"headers": map[string]any{
"X-App-Auth": "token-123",
},
},
},
},
},
{
Name: "simple merge array",
RawConfig: map[string]any{
"outputs": map[string]any{
"default": map[string]any{
"type": "elasticsearch",
"headers": map[string]any{
"X-App-Auth": "token-123",
},
},
},
},
ChangeConfig: map[string]any{
"outputs": map[string]any{
"default": map[string]any{
"type": "elasticsearch",
"headers": map[string]any{
"X-Other-Field": "field-123",
},
},
},
},
Result: map[string]any{
"outputs": map[string]any{
"default": map[string]any{
"type": "elasticsearch",
"headers": map[string]any{
"X-App-Auth": "token-123",
"X-Other-Field": "field-123",
},
},
},
},
},
{
Name: "override setting from change",
RawConfig: map[string]any{
"outputs": map[string]any{
"default": map[string]any{
"type": "elasticsearch",
"headers": map[string]any{
"X-App-Auth": "token-123",
},
},
},
},
ChangeConfig: map[string]any{
"outputs": map[string]any{
"default": map[string]any{
"type": "kafka",
"headers": map[string]any{
"X-App-Auth": "token-546",
},
},
},
},
Result: map[string]any{
"outputs": map[string]any{
"default": map[string]any{
"type": "kafka",
"headers": map[string]any{
"X-App-Auth": "token-546",
},
},
},
},
},
{
Name: "setting variables are not expanded",
RawConfig: map[string]any{
"outputs": map[string]any{
"default": map[string]any{
"type": "elasticsearch",
"headers": map[string]any{
"X-App-Auth": "${filesource.app_token}",
},
},
},
},
ChangeConfig: map[string]any{
"outputs": map[string]any{
"default": map[string]any{
"type": "kafka",
"headers": map[string]any{
"X-App-Other": "${filesource.other_token}",
},
},
},
},
Result: map[string]any{
"outputs": map[string]any{
"default": map[string]any{
"type": "kafka",
"headers": map[string]any{
"X-App-Auth": "${filesource.app_token}",
"X-App-Other": "${filesource.other_token}",
},
},
},
},
},
}
for _, scenario := range scenarios {
t.Run(scenario.Name, func(t *testing.T) {
log, _ := loggertest.New(t.Name())
rawConfig := config.MustNewConfigFrom(scenario.RawConfig)
cc := &mockConfigChange{c: config.MustNewConfigFrom(scenario.ChangeConfig)}
observed := injectOutputOverrides(log, rawConfig)(cc).Config()
observedMap, err := observed.ToMapStr()
require.NoError(t, err)
assert.Equal(t, scenario.Result, observedMap)
})
}
}
19 changes: 11 additions & 8 deletions internal/pkg/agent/application/coordinator/config_patcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ type ConfigPatch func(change ConfigChange) ConfigChange

// ConfigPatchManager is a decorator to restore some agent settings from the elastic agent configuration file
type ConfigPatchManager struct {
inner ConfigManager
outCh chan ConfigChange
patchFn ConfigPatch
inner ConfigManager
outCh chan ConfigChange
patchFns []ConfigPatch
}

func (c ConfigPatchManager) Run(ctx context.Context) error {
Expand All @@ -36,14 +36,17 @@ func (c ConfigPatchManager) Watch() <-chan ConfigChange {

func (c ConfigPatchManager) patch(src <-chan ConfigChange, dst chan ConfigChange) {
for ccc := range src {
dst <- c.patchFn(ccc)
for _, patchFn := range c.patchFns {
ccc = patchFn(ccc)
}
dst <- ccc
}
}

func NewConfigPatchManager(inner ConfigManager, pf ConfigPatch) *ConfigPatchManager {
func NewConfigPatchManager(inner ConfigManager, pfs ...ConfigPatch) *ConfigPatchManager {
return &ConfigPatchManager{
inner: inner,
outCh: make(chan ConfigChange),
patchFn: pf,
inner: inner,
outCh: make(chan ConfigChange),
patchFns: pfs,
}
}
Loading