diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7b7a351e202..f95aebc42c5 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -50,8 +50,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Remove deprecated awscloudwatch field from Filebeat. {pull}41089[41089] - The performance of ingesting SQS data with the S3 input has improved by up to 60x for queues with many small events. `max_number_of_messages` config for SQS mode is now ignored, as the new design no longer needs a manual cap on messages. Instead, use `number_of_workers` to scale ingestion rate in both S3 and SQS modes. The increased efficiency may increase network bandwidth consumption, which can be throttled by lowering `number_of_workers`. It may also increase number of events stored in memory, which can be throttled by lowering the configured size of the internal queue. {pull}40699[40699] - Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover. {pull}41585[41585] - - Add kafka compression support for ZSTD. +- Filebeat fails to start if there is any input with a duplicated ID. It logs the duplicated IDs and the offending inputs configurations. {pull}41731[41731] *Heartbeat* diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 815b6fabfde..ceab21aa359 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -32,6 +32,7 @@ import ( "github.com/elastic/beats/v7/filebeat/fileset" _ "github.com/elastic/beats/v7/filebeat/include" "github.com/elastic/beats/v7/filebeat/input" + "github.com/elastic/beats/v7/filebeat/input/filestream" "github.com/elastic/beats/v7/filebeat/input/filestream/takeover" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/filebeat/input/v2/compat" @@ -291,6 +292,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error { } defer stateStore.Close() + err = filestream.ValidateInputIDs(config.Inputs, logp.NewLogger("input.filestream")) + if err != nil { + logp.Err("invalid filestream configuration: %+v", err) + return err + } err = processLogInputTakeOver(stateStore, config) if err != nil { logp.Err("Failed to attempt filestream state take over: %+v", err) diff --git a/filebeat/input/filestream/config.go b/filebeat/input/filestream/config.go index 1cb8fa5da97..2860dd673c2 100644 --- a/filebeat/input/filestream/config.go +++ b/filebeat/input/filestream/config.go @@ -19,6 +19,7 @@ package filestream import ( "fmt" + "strings" "time" "github.com/dustin/go-humanize" @@ -27,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/reader/parser" "github.com/elastic/beats/v7/libbeat/reader/readfile" conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" ) // Config stores the options of a file stream. @@ -142,3 +144,60 @@ func (c *config) Validate() error { return nil } + +// ValidateInputIDs checks all filestream inputs to ensure all input IDs are +// unique. If there is a duplicated ID, it logs an error containing the offending +// input configurations and returns an error containing the duplicated IDs. +// A single empty ID is a valid ID as it's unique, however multiple empty IDs +// are not unique and are therefore are treated as any other duplicated ID. +func ValidateInputIDs(inputs []*conf.C, logger *logp.Logger) error { + duplicatedConfigs := make(map[string][]*conf.C) + var duplicates []string + for _, input := range inputs { + fsInput := struct { + ID string `config:"id"` + Type string `config:"type"` + }{} + err := input.Unpack(&fsInput) + if err != nil { + return fmt.Errorf("failed to unpack filestream input configuration: %w", err) + } + if fsInput.Type == "filestream" { + duplicatedConfigs[fsInput.ID] = append(duplicatedConfigs[fsInput.ID], input) + // we just need to collect the duplicated IDs once, therefore collect + // it only the first time we see a duplicated ID. + if len(duplicatedConfigs[fsInput.ID]) == 2 { + duplicates = append(duplicates, fsInput.ID) + } + } + } + + if len(duplicates) != 0 { + jsonDupCfg := collectOffendingInputs(duplicates, duplicatedConfigs) + logger.Errorw("filestream inputs with duplicated IDs", "inputs", jsonDupCfg) + var quotedDuplicates []string + for _, dup := range duplicates { + quotedDuplicates = append(quotedDuplicates, fmt.Sprintf("%q", dup)) + } + return fmt.Errorf("filestream inputs validation error: filestream inputs with duplicated IDs: %v", strings.Join(quotedDuplicates, ",")) + } + + return nil +} + +func collectOffendingInputs(duplicates []string, ids map[string][]*conf.C) []map[string]interface{} { + var cfgs []map[string]interface{} + + for _, id := range duplicates { + for _, dupcfgs := range ids[id] { + toJson := map[string]interface{}{} + err := dupcfgs.Unpack(&toJson) + if err != nil { + toJson[id] = fmt.Sprintf("failed to unpack config: %v", err) + } + cfgs = append(cfgs, toJson) + } + } + + return cfgs +} diff --git a/filebeat/input/filestream/config_test.go b/filebeat/input/filestream/config_test.go index 6cf045060c9..729b712d58e 100644 --- a/filebeat/input/filestream/config_test.go +++ b/filebeat/input/filestream/config_test.go @@ -18,9 +18,16 @@ package filestream import ( + "encoding/json" + "strings" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest/observer" + + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" ) func TestConfigValidate(t *testing.T) { @@ -30,3 +37,186 @@ func TestConfigValidate(t *testing.T) { require.Error(t, err) }) } + +func TestValidateInputIDs(t *testing.T) { + tcs := []struct { + name string + cfg []string + assertErr func(t *testing.T, err error) + assertLogs func(t *testing.T, buff *observer.ObservedLogs) + }{ + { + name: "empty config", + cfg: []string{""}, + assertErr: func(t *testing.T, err error) { + assert.NoError(t, err, "empty config should not return an error") + }, + }, + { + name: "one empty ID is allowed", + cfg: []string{` +type: filestream +`, ` +type: filestream +id: some-id-1 +`, ` +type: filestream +id: some-id-2 +`, + }, + assertErr: func(t *testing.T, err error) { + assert.NoError(t, err, "one empty id is allowed") + }, + }, + { + name: "duplicated empty ID", + cfg: []string{` +type: filestream +paths: + - "/tmp/empty-1" +`, ` +type: filestream +paths: + - "/tmp/empty-2" +`, ` +type: filestream +id: unique-id-1 +`, ` +type: filestream +id: unique-id-2 +`, ` +type: filestream +id: unique-ID +`, + }, + assertErr: func(t *testing.T, err error) { + assert.ErrorContains(t, err, `filestream inputs with duplicated IDs: ""`) + + }, + assertLogs: func(t *testing.T, obs *observer.ObservedLogs) { + want := `[{"paths":["/tmp/empty-1"],"type":"filestream"},{"paths":["/tmp/empty-2"],"type":"filestream"}]` + + logs := obs.TakeAll() + require.Len(t, logs, 1, "there should be only one log entry") + + got, err := json.Marshal(logs[0].ContextMap()["inputs"]) + require.NoError(t, err, "could not marshal duplicated IDs inputs") + assert.Equal(t, want, string(got)) + }, + }, { + name: "duplicated IDs", + cfg: []string{` +type: filestream +id: duplicated-id-1 +`, ` +type: filestream +id: duplicated-id-1 +`, ` +type: filestream +id: duplicated-id-2 +`, ` +type: filestream +id: duplicated-id-2 +`, ` +type: filestream +id: duplicated-id-2 +`, ` +type: filestream +id: unique-ID +`, + }, + assertErr: func(t *testing.T, err error) { + assert.ErrorContains(t, err, "filestream inputs with duplicated IDs") + assert.ErrorContains(t, err, "duplicated-id-1") + assert.ErrorContains(t, err, "duplicated-id-2") + assert.Equal(t, strings.Count(err.Error(), "duplicated-id-1"), 1, "each IDs should appear only once") + assert.Equal(t, strings.Count(err.Error(), "duplicated-id-2"), 1, "each IDs should appear only once") + + }, + assertLogs: func(t *testing.T, obs *observer.ObservedLogs) { + want := `[{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"}]` + + logs := obs.TakeAll() + require.Len(t, logs, 1, "there should be only one log entry") + + got, err := json.Marshal(logs[0].ContextMap()["inputs"]) + require.NoError(t, err, "could not marshal duplicated IDs inputs") + assert.Equal(t, want, string(got)) + }, + }, + { + name: "duplicated IDs and empty ID", + cfg: []string{` +type: filestream +`, ` +type: filestream +`, ` +type: filestream +id: duplicated-id-1 +`, ` +type: filestream +id: duplicated-id-1 +`, ` +type: filestream +id: duplicated-id-2 +`, ` +type: filestream +id: duplicated-id-2 +`, ` +type: filestream +id: unique-ID +`, + }, + assertErr: func(t *testing.T, err error) { + assert.ErrorContains(t, err, "filestream inputs with duplicated IDs") + }, + assertLogs: func(t *testing.T, obs *observer.ObservedLogs) { + want := `[{"type":"filestream"},{"type":"filestream"},{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"}]` + + logs := obs.TakeAll() + require.Len(t, logs, 1, "there should be only one log entry") + + got, err := json.Marshal(logs[0].ContextMap()["inputs"]) + require.NoError(t, err, "could not marshal duplicated IDs inputs") + assert.Equal(t, want, string(got)) + + }, + }, + { + name: "only unique IDs", + cfg: []string{` +type: filestream +id: unique-id-1 +`, ` +type: filestream +id: unique-id-2 +`, ` +type: filestream +id: unique-id-3 +`, + }, + assertErr: func(t *testing.T, err error) { + assert.NoError(t, err, "only unique IDs should not return an error") + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + var inputs []*conf.C + for _, c := range tc.cfg { + cfg, err := conf.NewConfigFrom(c) + require.NoError(t, err, "could not create input configuration") + inputs = append(inputs, cfg) + } + err := logp.DevelopmentSetup(logp.ToObserverOutput()) + require.NoError(t, err, "could not setup log for development") + + err = ValidateInputIDs(inputs, logp.L()) + tc.assertErr(t, err) + if tc.assertLogs != nil { + tc.assertLogs(t, logp.ObserverLogs()) + } + }) + } +} diff --git a/filebeat/tests/integration/filestream_test.go b/filebeat/tests/integration/filestream_test.go index 3ddb04a2c20..45cf99fbfb6 100644 --- a/filebeat/tests/integration/filestream_test.go +++ b/filebeat/tests/integration/filestream_test.go @@ -26,6 +26,9 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/tests/integration" ) @@ -105,3 +108,166 @@ func TestFilestreamCleanInactive(t *testing.T) { registryFile := filepath.Join(filebeat.TempDir(), "data", "registry", "filebeat", "log.json") filebeat.WaitFileContains(registryFile, `"op":"remove"`, time.Second) } + +func TestFilestreamValidationPreventsFilebeatStart(t *testing.T) { + duplicatedIDs := ` +filebeat.inputs: + - type: filestream + id: duplicated-id-1 + enabled: true + paths: + - /tmp/*.log + - type: filestream + id: duplicated-id-1 + enabled: true + paths: + - /var/log/*.log + +output.discard.enabled: true +logging: + level: debug + metrics: + enabled: false +` + emptyID := ` +filebeat.inputs: + - type: filestream + enabled: true + paths: + - /tmp/*.log + - type: filestream + enabled: true + paths: + - /var/log/*.log + +output.discard.enabled: true +logging: + level: debug + metrics: + enabled: false +` + multipleDuplicatedIDs := ` +filebeat.inputs: + - type: filestream + enabled: true + paths: + - /tmp/*.log + - type: filestream + enabled: true + paths: + - /var/log/*.log + + - type: filestream + id: duplicated-id-1 + enabled: true + paths: + - /tmp/duplicated-id-1.log + - type: filestream + id: duplicated-id-1 + enabled: true + paths: + - /tmp/duplicated-id-1-2.log + + + - type: filestream + id: unique-id-1 + enabled: true + paths: + - /tmp/unique-id-1.log + - type: filestream + id: unique-id-2 + enabled: true + paths: + - /var/log/unique-id-2.log + +output.discard.enabled: true +logging: + level: debug + metrics: + enabled: false +` + tcs := []struct { + name string + cfg string + }{ + { + name: "duplicated IDs", + cfg: duplicatedIDs, + }, + { + name: "duplicated empty ID", + cfg: emptyID, + }, + { + name: "two inputs without ID and duplicated IDs", + cfg: multipleDuplicatedIDs, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + + // Write configuration file and start Filebeat + filebeat.WriteConfigFile(tc.cfg) + filebeat.Start() + + // Wait for error log + filebeat.WaitForLogs( + "filestream inputs validation error", + 10*time.Second, + "Filebeat did not log a filestream input validation error") + + proc, err := filebeat.Process.Wait() + require.NoError(t, err, "filebeat process.Wait returned an error") + assert.False(t, proc.Success(), "filebeat should have failed to start") + + }) + } +} + +func TestFilestreamValidationSucceeds(t *testing.T) { + cfg := ` +filebeat.inputs: + - type: filestream + enabled: true + paths: + - /var/log/*.log + + - type: filestream + id: unique-id-1 + enabled: true + paths: + - /tmp/unique-id-1.log + - type: filestream + id: unique-id-2 + enabled: true + paths: + - /var/log/unique-id-2.log + +output.discard.enabled: true +logging: + level: debug + metrics: + enabled: false +` + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + + // Write configuration file and start Filebeat + filebeat.WriteConfigFile(cfg) + filebeat.Start() + + // Wait for error log + filebeat.WaitForLogs( + "Input 'filestream' starting", + 10*time.Second, + "Filebeat did log a validation error") +}