Skip to content

Commit

Permalink
Update allow_older_versions when running under Elastic Agent (#34964)
Browse files Browse the repository at this point in the history
When Beats are running under Elastic Agent their initial output
configuration is empty. Only a few moments later the output
configuration arrives as an update via the control protocol.

On startup Beats register a global Elasticsearch connection
callback to validate the Elasticsearch version. Unfortunately,
this callback didn't account for the later `allow_older_versions`
update via the control protocol and the updated value was not used.

This fixes that broken behaviour and makes an update to the entire in-memory
output configuration on each control protocol update.

(cherry picked from commit 1a9d627)

# Conflicts:
#	libbeat/cmd/instance/beat_test.go
  • Loading branch information
rdner authored and mergify[bot] committed May 25, 2023
1 parent 48648eb commit 2d9e224
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 10 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix race condition when stopping runners {pull}32433[32433]
- Fix concurrent map writes when system/process code called from reporter code {pull}32491[32491]
- Fix panics when a processor is closed twice {pull}34647[34647]
- Update elastic-agent-system-metrics to v0.4.6 to allow builds on mips platforms. {pull}34674[34674]
- Fix Beats started by agent do not respect the allow_older_versions: true configuration flag {issue}34227[34227] {pull}34964[34964]

*Auditbeat*

Expand Down
43 changes: 33 additions & 10 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,10 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
logSystemInfo(b.Info)
logp.Info("Setup Beat: %s; Version: %s", b.Info.Beat, b.Info.Version)

b.checkElasticsearchVersion()
err = b.registerESVersionCheckCallback()
if err != nil {
return nil, err
}

err = b.registerESIndexManagement()
if err != nil {
Expand Down Expand Up @@ -907,15 +910,18 @@ func (b *Beat) loadDashboards(ctx context.Context, force bool) error {
return nil
}

// checkElasticsearchVersion registers a global callback to make sure ES instance we are connecting
// registerESVersionCheckCallback registers a global callback to make sure ES instance we are connecting
// to is at least on the same version as the Beat.
// If the check is disabled or the output is not Elasticsearch, nothing happens.
func (b *Beat) checkElasticsearchVersion() {
if b.isConnectionToOlderVersionAllowed() {
return
}
func (b *Beat) registerESVersionCheckCallback() error {
_, err := elasticsearch.RegisterGlobalCallback(func(conn *eslegclient.Connection) error {
if !isElasticsearchOutput(b.Config.Output.Name()) {
return errors.New("Elasticsearch output is not configured")
}
if b.isConnectionToOlderVersionAllowed() {
return nil
}

_, _ = elasticsearch.RegisterGlobalCallback(func(conn *eslegclient.Connection) error {
esVersion := conn.GetVersion()
beatVersion, err := libversion.New(b.Info.Version)
if err != nil {
Expand All @@ -926,6 +932,8 @@ func (b *Beat) checkElasticsearchVersion() {
}
return nil
})

return err
}

func (b *Beat) isConnectionToOlderVersionAllowed() bool {
Expand Down Expand Up @@ -961,13 +969,28 @@ func (b *Beat) indexSetupCallback() elasticsearch.ConnectCallback {
}

func (b *Beat) makeOutputReloader(outReloader pipeline.OutputReloader) reload.Reloadable {
return reload.ReloadableFunc(func(config *reload.ConfigWithMeta) error {
return reload.ReloadableFunc(func(update *reload.ConfigWithMeta) error {
if update == nil {
return nil
}

if b.OutputConfigReloader != nil {
if err := b.OutputConfigReloader.Reload(config); err != nil {
if err := b.OutputConfigReloader.Reload(update); err != nil {
return err
}
}
return outReloader.Reload(config, b.createOutput)

// we need to update the output configuration because
// some callbacks are relying on it to be up to date.
// e.g. the Elasticsearch version validation
if update.Config != nil {
err := b.Config.Output.Unpack(update.Config)
if err != nil {
return err
}
}

return outReloader.Reload(update, b.createOutput)
})
}

Expand Down
111 changes: 111 additions & 0 deletions libbeat/cmd/instance/beat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
"testing"

"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/elastic-agent-libs/config"

"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -160,3 +163,111 @@ func TestMetaJsonWithTimestamp(t *testing.T) {
assert.Equal(t, nil, err, "Unable to load meta file properly")
assert.True(t, firstStart.Equal(secondBeat.Info.FirstStart), "Cannot load first start")
}
<<<<<<< HEAD
=======

func TestSanitizeIPs(t *testing.T) {
cases := []struct {
name string
ips []string
expectedIPs []string
}{
{
name: "does not change valid IPs",
ips: []string{
"127.0.0.1",
"::1",
"fe80::1",
"fe80::6ca6:cdff:fe6a:4f59",
"192.168.1.101",
},
expectedIPs: []string{
"127.0.0.1",
"::1",
"fe80::1",
"fe80::6ca6:cdff:fe6a:4f59",
"192.168.1.101",
},
},
{
name: "cuts the masks",
ips: []string{
"127.0.0.1/8",
"::1/128",
"fe80::1/64",
"fe80::6ca6:cdff:fe6a:4f59/64",
"192.168.1.101/24",
},
expectedIPs: []string{
"127.0.0.1",
"::1",
"fe80::1",
"fe80::6ca6:cdff:fe6a:4f59",
"192.168.1.101",
},
},
{
name: "excludes invalid IPs",
ips: []string{
"",
"fe80::6ca6:cdff:fe6a:4f59",
"invalid",
"192.168.1.101",
},
expectedIPs: []string{
"fe80::6ca6:cdff:fe6a:4f59",
"192.168.1.101",
},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.expectedIPs, sanitizeIPs(tc.ips))
})
}
}

func TestReloader(t *testing.T) {
t.Run("updates the output configuration on the beat", func(t *testing.T) {
b, err := NewBeat("testbeat", "testidx", "0.9", false)
require.NoError(t, err)

cfg := `
elasticsearch:
hosts: ["https://127.0.0.1:9200"]
username: "elastic"
allow_older_versions: true
`
c, err := config.NewConfigWithYAML([]byte(cfg), cfg)
require.NoError(t, err)
outCfg, err := c.Child("elasticsearch", -1)
require.NoError(t, err)

update := &reload.ConfigWithMeta{Config: c}
m := &outputReloaderMock{}
reloader := b.makeOutputReloader(m)

require.False(t, b.Config.Output.IsSet(), "the output should not be set yet")
require.False(t, b.isConnectionToOlderVersionAllowed(), "the flag should not be present in the empty configuration")
err = reloader.Reload(update)
require.NoError(t, err)
require.True(t, b.Config.Output.IsSet(), "now the output should be set")
require.Equal(t, outCfg, b.Config.Output.Config())
require.Same(t, c, m.cfg.Config)
require.True(t, b.isConnectionToOlderVersionAllowed(), "the flag should be present")
})
}

type outputReloaderMock struct {
cfg *reload.ConfigWithMeta
}

func (r *outputReloaderMock) Reload(
cfg *reload.ConfigWithMeta,
factory func(o outputs.Observer, cfg config.Namespace) (outputs.Group, error),
) error {
r.cfg = cfg
return nil
}
>>>>>>> 1a9d627794 (Update `allow_older_versions` when running under Elastic Agent (#34964))

0 comments on commit 2d9e224

Please sign in to comment.