Skip to content

Commit

Permalink
Update allow_older_versions when running under Elastic Agent (#34964)
Browse files Browse the repository at this point in the history
When Beats are running under Elastic Agent their initial output
configuration is empty. Only a few moments later the output
configuration arrives as an update via the control protocol.

On startup Beats register a global Elasticsearch connection
callback to validate the Elasticsearch version. Unfortunately,
this callback didn't account for the later `allow_older_versions`
update via the control protocol and the updated value was not used.

This fixes that broken behaviour and makes an update to the entire in-memory
output configuration on each control protocol update.

(cherry picked from commit 1a9d627)
  • Loading branch information
rdner authored and mergify[bot] committed Mar 30, 2023
1 parent 333454a commit b7d8887
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 10 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,24 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]

*Affecting all Beats*
- Support for multiline zookeeper logs {issue}2496[2496]
- Allow `clock_nanosleep` in the default seccomp profiles for amd64 and 386. Newer versions of glibc (e.g. 2.31) require it. {issue}33792[33792]
- Disable lockfile when running under elastic-agent. {pull}33988[33988]
- Fix lockfile logic, retry locking {pull}34194[34194]
- Add checks to ensure reloading of units if the configuration actually changed. {pull}34346[34346]
- Fix namespacing on self-monitoring {pull}32336[32336]
- Fix race condition when stopping runners {pull}32433[32433]
- Fix concurrent map writes when system/process code called from reporter code {pull}32491[32491]
- Log errors from the Elastic Agent V2 client errors channel. Avoids blocking when error occurs communicating with the Elastic Agent. {pull}34392[34392]
- Only log publish event messages in trace log level under elastic-agent. {pull}34391[34391]
- Fix issue where updating a single Elastic Agent configuration unit results in other units being turned off. {pull}34504[34504]
- Fix dropped events when monitor a beat under the agent and send its `Host info` log entry. {pull}34599[34599]

- Fix namespacing on self-monitoring {pull}32336[32336]
- Fix race condition when stopping runners {pull}32433[32433]
- Fix concurrent map writes when system/process code called from reporter code {pull}32491[32491]
- Fix panics when a processor is closed twice {pull}34647[34647]
- Update elastic-agent-system-metrics to v0.4.6 to allow builds on mips platforms. {pull}34674[34674]
- Fix Beats started by agent do not respect the allow_older_versions: true configuration flag {issue}34227[34227] {pull}34964[34964]

*Auditbeat*

Expand Down
43 changes: 33 additions & 10 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,10 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
logSystemInfo(b.Info)
logp.Info("Setup Beat: %s; Version: %s", b.Info.Beat, b.Info.Version)

b.checkElasticsearchVersion()
err = b.registerESVersionCheckCallback()
if err != nil {
return nil, err
}

err = b.registerESIndexManagement()
if err != nil {
Expand Down Expand Up @@ -907,15 +910,18 @@ func (b *Beat) loadDashboards(ctx context.Context, force bool) error {
return nil
}

// checkElasticsearchVersion registers a global callback to make sure ES instance we are connecting
// registerESVersionCheckCallback registers a global callback to make sure ES instance we are connecting
// to is at least on the same version as the Beat.
// If the check is disabled or the output is not Elasticsearch, nothing happens.
func (b *Beat) checkElasticsearchVersion() {
if b.isConnectionToOlderVersionAllowed() {
return
}
func (b *Beat) registerESVersionCheckCallback() error {
_, err := elasticsearch.RegisterGlobalCallback(func(conn *eslegclient.Connection) error {
if !isElasticsearchOutput(b.Config.Output.Name()) {
return errors.New("Elasticsearch output is not configured")
}
if b.isConnectionToOlderVersionAllowed() {
return nil
}

_, _ = elasticsearch.RegisterGlobalCallback(func(conn *eslegclient.Connection) error {
esVersion := conn.GetVersion()
beatVersion, err := libversion.New(b.Info.Version)
if err != nil {
Expand All @@ -926,6 +932,8 @@ func (b *Beat) checkElasticsearchVersion() {
}
return nil
})

return err
}

func (b *Beat) isConnectionToOlderVersionAllowed() bool {
Expand Down Expand Up @@ -961,13 +969,28 @@ func (b *Beat) indexSetupCallback() elasticsearch.ConnectCallback {
}

func (b *Beat) makeOutputReloader(outReloader pipeline.OutputReloader) reload.Reloadable {
return reload.ReloadableFunc(func(config *reload.ConfigWithMeta) error {
return reload.ReloadableFunc(func(update *reload.ConfigWithMeta) error {
if update == nil {
return nil
}

if b.OutputConfigReloader != nil {
if err := b.OutputConfigReloader.Reload(config); err != nil {
if err := b.OutputConfigReloader.Reload(update); err != nil {
return err
}
}
return outReloader.Reload(config, b.createOutput)

// we need to update the output configuration because
// some callbacks are relying on it to be up to date.
// e.g. the Elasticsearch version validation
if update.Config != nil {
err := b.Config.Output.Unpack(update.Config)
if err != nil {
return err
}
}

return outReloader.Reload(update, b.createOutput)
})
}

Expand Down
46 changes: 46 additions & 0 deletions libbeat/cmd/instance/beat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
"testing"

"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/elastic-agent-libs/config"

"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -224,3 +227,46 @@ func TestSanitizeIPs(t *testing.T) {
})
}
}

func TestReloader(t *testing.T) {
t.Run("updates the output configuration on the beat", func(t *testing.T) {
b, err := NewBeat("testbeat", "testidx", "0.9", false)
require.NoError(t, err)

cfg := `
elasticsearch:
hosts: ["https://127.0.0.1:9200"]
username: "elastic"
allow_older_versions: true
`
c, err := config.NewConfigWithYAML([]byte(cfg), cfg)
require.NoError(t, err)
outCfg, err := c.Child("elasticsearch", -1)
require.NoError(t, err)

update := &reload.ConfigWithMeta{Config: c}
m := &outputReloaderMock{}
reloader := b.makeOutputReloader(m)

require.False(t, b.Config.Output.IsSet(), "the output should not be set yet")
require.False(t, b.isConnectionToOlderVersionAllowed(), "the flag should not be present in the empty configuration")
err = reloader.Reload(update)
require.NoError(t, err)
require.True(t, b.Config.Output.IsSet(), "now the output should be set")
require.Equal(t, outCfg, b.Config.Output.Config())
require.Same(t, c, m.cfg.Config)
require.True(t, b.isConnectionToOlderVersionAllowed(), "the flag should be present")
})
}

type outputReloaderMock struct {
cfg *reload.ConfigWithMeta
}

func (r *outputReloaderMock) Reload(
cfg *reload.ConfigWithMeta,
factory func(o outputs.Observer, cfg config.Namespace) (outputs.Group, error),
) error {
r.cfg = cfg
return nil
}

0 comments on commit b7d8887

Please sign in to comment.