Skip to content

Commit

Permalink
Update allow_older_versions when running under Elastic Agent (#34964)
Browse files Browse the repository at this point in the history
When Beats are running under Elastic Agent their initial output
configuration is empty. Only a few moments later the output
configuration arrives as an update via the control protocol.

On startup Beats register a global Elasticsearch connection
callback to validate the Elasticsearch version. Unfortunately,
this callback didn't account for the later `allow_older_versions`
update via the control protocol and the updated value was not used.

This fixes that broken behaviour and makes an update to the entire in-memory
output configuration on each control protocol update.
  • Loading branch information
rdner authored and chrisberkhout committed Jun 1, 2023
1 parent ad950c5 commit 51faa24
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix concurrent map writes when system/process code called from reporter code {pull}32491[32491]
- Fix panics when a processor is closed twice {pull}34647[34647]
- Update elastic-agent-system-metrics to v0.4.6 to allow builds on mips platforms. {pull}34674[34674]
- Fix Beats started by agent do not respect the allow_older_versions: true configuration flag {issue}34227[34227] {pull}34964[34964]

*Auditbeat*

Expand Down
43 changes: 33 additions & 10 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,10 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
logSystemInfo(b.Info)
logp.Info("Setup Beat: %s; Version: %s", b.Info.Beat, b.Info.Version)

b.checkElasticsearchVersion()
err = b.registerESVersionCheckCallback()
if err != nil {
return nil, err
}

err = b.registerESIndexManagement()
if err != nil {
Expand Down Expand Up @@ -981,15 +984,18 @@ func (b *Beat) loadDashboards(ctx context.Context, force bool) error {
return nil
}

// checkElasticsearchVersion registers a global callback to make sure ES instance we are connecting
// registerESVersionCheckCallback registers a global callback to make sure ES instance we are connecting
// to is at least on the same version as the Beat.
// If the check is disabled or the output is not Elasticsearch, nothing happens.
func (b *Beat) checkElasticsearchVersion() {
if b.isConnectionToOlderVersionAllowed() {
return
}
func (b *Beat) registerESVersionCheckCallback() error {
_, err := elasticsearch.RegisterGlobalCallback(func(conn *eslegclient.Connection) error {
if !isElasticsearchOutput(b.Config.Output.Name()) {
return errors.New("Elasticsearch output is not configured")
}
if b.isConnectionToOlderVersionAllowed() {
return nil
}

_, _ = elasticsearch.RegisterGlobalCallback(func(conn *eslegclient.Connection) error {
esVersion := conn.GetVersion()
beatVersion, err := libversion.New(b.Info.Version)
if err != nil {
Expand All @@ -1000,6 +1006,8 @@ func (b *Beat) checkElasticsearchVersion() {
}
return nil
})

return err
}

func (b *Beat) isConnectionToOlderVersionAllowed() bool {
Expand Down Expand Up @@ -1035,13 +1043,28 @@ func (b *Beat) indexSetupCallback() elasticsearch.ConnectCallback {
}

func (b *Beat) makeOutputReloader(outReloader pipeline.OutputReloader) reload.Reloadable {
return reload.ReloadableFunc(func(config *reload.ConfigWithMeta) error {
return reload.ReloadableFunc(func(update *reload.ConfigWithMeta) error {
if update == nil {
return nil
}

if b.OutputConfigReloader != nil {
if err := b.OutputConfigReloader.Reload(config); err != nil {
if err := b.OutputConfigReloader.Reload(update); err != nil {
return err
}
}
return outReloader.Reload(config, b.createOutput)

// we need to update the output configuration because
// some callbacks are relying on it to be up to date.
// e.g. the Elasticsearch version validation
if update.Config != nil {
err := b.Config.Output.Unpack(update.Config)
if err != nil {
return err
}
}

return outReloader.Reload(update, b.createOutput)
})
}

Expand Down
46 changes: 46 additions & 0 deletions libbeat/cmd/instance/beat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
"testing"

"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/elastic-agent-libs/config"

"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -224,3 +227,46 @@ func TestSanitizeIPs(t *testing.T) {
})
}
}

func TestReloader(t *testing.T) {
t.Run("updates the output configuration on the beat", func(t *testing.T) {
b, err := NewBeat("testbeat", "testidx", "0.9", false)
require.NoError(t, err)

cfg := `
elasticsearch:
hosts: ["https://127.0.0.1:9200"]
username: "elastic"
allow_older_versions: true
`
c, err := config.NewConfigWithYAML([]byte(cfg), cfg)
require.NoError(t, err)
outCfg, err := c.Child("elasticsearch", -1)
require.NoError(t, err)

update := &reload.ConfigWithMeta{Config: c}
m := &outputReloaderMock{}
reloader := b.makeOutputReloader(m)

require.False(t, b.Config.Output.IsSet(), "the output should not be set yet")
require.False(t, b.isConnectionToOlderVersionAllowed(), "the flag should not be present in the empty configuration")
err = reloader.Reload(update)
require.NoError(t, err)
require.True(t, b.Config.Output.IsSet(), "now the output should be set")
require.Equal(t, outCfg, b.Config.Output.Config())
require.Same(t, c, m.cfg.Config)
require.True(t, b.isConnectionToOlderVersionAllowed(), "the flag should be present")
})
}

type outputReloaderMock struct {
cfg *reload.ConfigWithMeta
}

func (r *outputReloaderMock) Reload(
cfg *reload.ConfigWithMeta,
factory func(o outputs.Observer, cfg config.Namespace) (outputs.Group, error),
) error {
r.cfg = cfg
return nil
}

0 comments on commit 51faa24

Please sign in to comment.