From 8e20316b921a5c8117421de50ed15212184b9f12 Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Tue, 3 Dec 2024 13:50:45 +0100 Subject: [PATCH] filestream: require unique input id on filebeat startup (#41731) During startup filebeat now validates the filestream inputs and fails to start if there are with duplicated IDs. Duplicated IDs might cause data duplication therefore now it's mandatory to have unique for each filestream input. Disruptive User Impact Impact: Users who have not configured unique IDs for their filestream inputs will need to update their configurations to include unique IDs for each input. Failure to do so will prevent Filebeat from starting. Previously: Filebeat would only log an error if filestream would find inputs with duplicated IDs, potentially leading to data duplication. Now: Filebeat will fail to start if any filestream input has a duplicated ID. --- CHANGELOG.next.asciidoc | 2 +- filebeat/beater/filebeat.go | 6 + filebeat/input/filestream/config.go | 59 ++++++ filebeat/input/filestream/config_test.go | 190 ++++++++++++++++++ filebeat/tests/integration/filestream_test.go | 166 +++++++++++++++ 5 files changed, 422 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7b7a351e202..f95aebc42c5 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -50,8 +50,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Remove deprecated awscloudwatch field from Filebeat. {pull}41089[41089] - The performance of ingesting SQS data with the S3 input has improved by up to 60x for queues with many small events. `max_number_of_messages` config for SQS mode is now ignored, as the new design no longer needs a manual cap on messages. Instead, use `number_of_workers` to scale ingestion rate in both S3 and SQS modes. The increased efficiency may increase network bandwidth consumption, which can be throttled by lowering `number_of_workers`. It may also increase number of events stored in memory, which can be throttled by lowering the configured size of the internal queue. {pull}40699[40699] - Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover. {pull}41585[41585] - - Add kafka compression support for ZSTD. +- Filebeat fails to start if there is any input with a duplicated ID. It logs the duplicated IDs and the offending inputs configurations. {pull}41731[41731] *Heartbeat* diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 815b6fabfde..ceab21aa359 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -32,6 +32,7 @@ import ( "github.com/elastic/beats/v7/filebeat/fileset" _ "github.com/elastic/beats/v7/filebeat/include" "github.com/elastic/beats/v7/filebeat/input" + "github.com/elastic/beats/v7/filebeat/input/filestream" "github.com/elastic/beats/v7/filebeat/input/filestream/takeover" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/filebeat/input/v2/compat" @@ -291,6 +292,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error { } defer stateStore.Close() + err = filestream.ValidateInputIDs(config.Inputs, logp.NewLogger("input.filestream")) + if err != nil { + logp.Err("invalid filestream configuration: %+v", err) + return err + } err = processLogInputTakeOver(stateStore, config) if err != nil { logp.Err("Failed to attempt filestream state take over: %+v", err) diff --git a/filebeat/input/filestream/config.go b/filebeat/input/filestream/config.go index 1cb8fa5da97..2860dd673c2 100644 --- a/filebeat/input/filestream/config.go +++ b/filebeat/input/filestream/config.go @@ -19,6 +19,7 @@ package filestream import ( "fmt" + "strings" "time" "github.com/dustin/go-humanize" @@ -27,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/reader/parser" "github.com/elastic/beats/v7/libbeat/reader/readfile" conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" ) // Config stores the options of a file stream. @@ -142,3 +144,60 @@ func (c *config) Validate() error { return nil } + +// ValidateInputIDs checks all filestream inputs to ensure all input IDs are +// unique. If there is a duplicated ID, it logs an error containing the offending +// input configurations and returns an error containing the duplicated IDs. +// A single empty ID is a valid ID as it's unique, however multiple empty IDs +// are not unique and are therefore are treated as any other duplicated ID. +func ValidateInputIDs(inputs []*conf.C, logger *logp.Logger) error { + duplicatedConfigs := make(map[string][]*conf.C) + var duplicates []string + for _, input := range inputs { + fsInput := struct { + ID string `config:"id"` + Type string `config:"type"` + }{} + err := input.Unpack(&fsInput) + if err != nil { + return fmt.Errorf("failed to unpack filestream input configuration: %w", err) + } + if fsInput.Type == "filestream" { + duplicatedConfigs[fsInput.ID] = append(duplicatedConfigs[fsInput.ID], input) + // we just need to collect the duplicated IDs once, therefore collect + // it only the first time we see a duplicated ID. + if len(duplicatedConfigs[fsInput.ID]) == 2 { + duplicates = append(duplicates, fsInput.ID) + } + } + } + + if len(duplicates) != 0 { + jsonDupCfg := collectOffendingInputs(duplicates, duplicatedConfigs) + logger.Errorw("filestream inputs with duplicated IDs", "inputs", jsonDupCfg) + var quotedDuplicates []string + for _, dup := range duplicates { + quotedDuplicates = append(quotedDuplicates, fmt.Sprintf("%q", dup)) + } + return fmt.Errorf("filestream inputs validation error: filestream inputs with duplicated IDs: %v", strings.Join(quotedDuplicates, ",")) + } + + return nil +} + +func collectOffendingInputs(duplicates []string, ids map[string][]*conf.C) []map[string]interface{} { + var cfgs []map[string]interface{} + + for _, id := range duplicates { + for _, dupcfgs := range ids[id] { + toJson := map[string]interface{}{} + err := dupcfgs.Unpack(&toJson) + if err != nil { + toJson[id] = fmt.Sprintf("failed to unpack config: %v", err) + } + cfgs = append(cfgs, toJson) + } + } + + return cfgs +} diff --git a/filebeat/input/filestream/config_test.go b/filebeat/input/filestream/config_test.go index 6cf045060c9..729b712d58e 100644 --- a/filebeat/input/filestream/config_test.go +++ b/filebeat/input/filestream/config_test.go @@ -18,9 +18,16 @@ package filestream import ( + "encoding/json" + "strings" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest/observer" + + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" ) func TestConfigValidate(t *testing.T) { @@ -30,3 +37,186 @@ func TestConfigValidate(t *testing.T) { require.Error(t, err) }) } + +func TestValidateInputIDs(t *testing.T) { + tcs := []struct { + name string + cfg []string + assertErr func(t *testing.T, err error) + assertLogs func(t *testing.T, buff *observer.ObservedLogs) + }{ + { + name: "empty config", + cfg: []string{""}, + assertErr: func(t *testing.T, err error) { + assert.NoError(t, err, "empty config should not return an error") + }, + }, + { + name: "one empty ID is allowed", + cfg: []string{` +type: filestream +`, ` +type: filestream +id: some-id-1 +`, ` +type: filestream +id: some-id-2 +`, + }, + assertErr: func(t *testing.T, err error) { + assert.NoError(t, err, "one empty id is allowed") + }, + }, + { + name: "duplicated empty ID", + cfg: []string{` +type: filestream +paths: + - "/tmp/empty-1" +`, ` +type: filestream +paths: + - "/tmp/empty-2" +`, ` +type: filestream +id: unique-id-1 +`, ` +type: filestream +id: unique-id-2 +`, ` +type: filestream +id: unique-ID +`, + }, + assertErr: func(t *testing.T, err error) { + assert.ErrorContains(t, err, `filestream inputs with duplicated IDs: ""`) + + }, + assertLogs: func(t *testing.T, obs *observer.ObservedLogs) { + want := `[{"paths":["/tmp/empty-1"],"type":"filestream"},{"paths":["/tmp/empty-2"],"type":"filestream"}]` + + logs := obs.TakeAll() + require.Len(t, logs, 1, "there should be only one log entry") + + got, err := json.Marshal(logs[0].ContextMap()["inputs"]) + require.NoError(t, err, "could not marshal duplicated IDs inputs") + assert.Equal(t, want, string(got)) + }, + }, { + name: "duplicated IDs", + cfg: []string{` +type: filestream +id: duplicated-id-1 +`, ` +type: filestream +id: duplicated-id-1 +`, ` +type: filestream +id: duplicated-id-2 +`, ` +type: filestream +id: duplicated-id-2 +`, ` +type: filestream +id: duplicated-id-2 +`, ` +type: filestream +id: unique-ID +`, + }, + assertErr: func(t *testing.T, err error) { + assert.ErrorContains(t, err, "filestream inputs with duplicated IDs") + assert.ErrorContains(t, err, "duplicated-id-1") + assert.ErrorContains(t, err, "duplicated-id-2") + assert.Equal(t, strings.Count(err.Error(), "duplicated-id-1"), 1, "each IDs should appear only once") + assert.Equal(t, strings.Count(err.Error(), "duplicated-id-2"), 1, "each IDs should appear only once") + + }, + assertLogs: func(t *testing.T, obs *observer.ObservedLogs) { + want := `[{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"}]` + + logs := obs.TakeAll() + require.Len(t, logs, 1, "there should be only one log entry") + + got, err := json.Marshal(logs[0].ContextMap()["inputs"]) + require.NoError(t, err, "could not marshal duplicated IDs inputs") + assert.Equal(t, want, string(got)) + }, + }, + { + name: "duplicated IDs and empty ID", + cfg: []string{` +type: filestream +`, ` +type: filestream +`, ` +type: filestream +id: duplicated-id-1 +`, ` +type: filestream +id: duplicated-id-1 +`, ` +type: filestream +id: duplicated-id-2 +`, ` +type: filestream +id: duplicated-id-2 +`, ` +type: filestream +id: unique-ID +`, + }, + assertErr: func(t *testing.T, err error) { + assert.ErrorContains(t, err, "filestream inputs with duplicated IDs") + }, + assertLogs: func(t *testing.T, obs *observer.ObservedLogs) { + want := `[{"type":"filestream"},{"type":"filestream"},{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"}]` + + logs := obs.TakeAll() + require.Len(t, logs, 1, "there should be only one log entry") + + got, err := json.Marshal(logs[0].ContextMap()["inputs"]) + require.NoError(t, err, "could not marshal duplicated IDs inputs") + assert.Equal(t, want, string(got)) + + }, + }, + { + name: "only unique IDs", + cfg: []string{` +type: filestream +id: unique-id-1 +`, ` +type: filestream +id: unique-id-2 +`, ` +type: filestream +id: unique-id-3 +`, + }, + assertErr: func(t *testing.T, err error) { + assert.NoError(t, err, "only unique IDs should not return an error") + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + var inputs []*conf.C + for _, c := range tc.cfg { + cfg, err := conf.NewConfigFrom(c) + require.NoError(t, err, "could not create input configuration") + inputs = append(inputs, cfg) + } + err := logp.DevelopmentSetup(logp.ToObserverOutput()) + require.NoError(t, err, "could not setup log for development") + + err = ValidateInputIDs(inputs, logp.L()) + tc.assertErr(t, err) + if tc.assertLogs != nil { + tc.assertLogs(t, logp.ObserverLogs()) + } + }) + } +} diff --git a/filebeat/tests/integration/filestream_test.go b/filebeat/tests/integration/filestream_test.go index 3ddb04a2c20..45cf99fbfb6 100644 --- a/filebeat/tests/integration/filestream_test.go +++ b/filebeat/tests/integration/filestream_test.go @@ -26,6 +26,9 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/tests/integration" ) @@ -105,3 +108,166 @@ func TestFilestreamCleanInactive(t *testing.T) { registryFile := filepath.Join(filebeat.TempDir(), "data", "registry", "filebeat", "log.json") filebeat.WaitFileContains(registryFile, `"op":"remove"`, time.Second) } + +func TestFilestreamValidationPreventsFilebeatStart(t *testing.T) { + duplicatedIDs := ` +filebeat.inputs: + - type: filestream + id: duplicated-id-1 + enabled: true + paths: + - /tmp/*.log + - type: filestream + id: duplicated-id-1 + enabled: true + paths: + - /var/log/*.log + +output.discard.enabled: true +logging: + level: debug + metrics: + enabled: false +` + emptyID := ` +filebeat.inputs: + - type: filestream + enabled: true + paths: + - /tmp/*.log + - type: filestream + enabled: true + paths: + - /var/log/*.log + +output.discard.enabled: true +logging: + level: debug + metrics: + enabled: false +` + multipleDuplicatedIDs := ` +filebeat.inputs: + - type: filestream + enabled: true + paths: + - /tmp/*.log + - type: filestream + enabled: true + paths: + - /var/log/*.log + + - type: filestream + id: duplicated-id-1 + enabled: true + paths: + - /tmp/duplicated-id-1.log + - type: filestream + id: duplicated-id-1 + enabled: true + paths: + - /tmp/duplicated-id-1-2.log + + + - type: filestream + id: unique-id-1 + enabled: true + paths: + - /tmp/unique-id-1.log + - type: filestream + id: unique-id-2 + enabled: true + paths: + - /var/log/unique-id-2.log + +output.discard.enabled: true +logging: + level: debug + metrics: + enabled: false +` + tcs := []struct { + name string + cfg string + }{ + { + name: "duplicated IDs", + cfg: duplicatedIDs, + }, + { + name: "duplicated empty ID", + cfg: emptyID, + }, + { + name: "two inputs without ID and duplicated IDs", + cfg: multipleDuplicatedIDs, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + + // Write configuration file and start Filebeat + filebeat.WriteConfigFile(tc.cfg) + filebeat.Start() + + // Wait for error log + filebeat.WaitForLogs( + "filestream inputs validation error", + 10*time.Second, + "Filebeat did not log a filestream input validation error") + + proc, err := filebeat.Process.Wait() + require.NoError(t, err, "filebeat process.Wait returned an error") + assert.False(t, proc.Success(), "filebeat should have failed to start") + + }) + } +} + +func TestFilestreamValidationSucceeds(t *testing.T) { + cfg := ` +filebeat.inputs: + - type: filestream + enabled: true + paths: + - /var/log/*.log + + - type: filestream + id: unique-id-1 + enabled: true + paths: + - /tmp/unique-id-1.log + - type: filestream + id: unique-id-2 + enabled: true + paths: + - /var/log/unique-id-2.log + +output.discard.enabled: true +logging: + level: debug + metrics: + enabled: false +` + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + + // Write configuration file and start Filebeat + filebeat.WriteConfigFile(cfg) + filebeat.Start() + + // Wait for error log + filebeat.WaitForLogs( + "Input 'filestream' starting", + 10*time.Second, + "Filebeat did log a validation error") +}