From b6ecc4e34e0b0cb3139312697b0e157a6bb9528d Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Thu, 21 Nov 2024 11:01:25 +0100 Subject: [PATCH 1/7] filestream: validate input ids on startup During startup filebeat now validates the filestream inputs and fails to start if there are inputs without ID or with duplicated IDs --- CHANGELOG.next.asciidoc | 2 +- filebeat/beater/filebeat.go | 6 + filebeat/input/filestream/config.go | 77 +++++++++ filebeat/input/filestream/config_test.go | 161 ++++++++++++++++++ filebeat/tests/integration/filestream_test.go | 122 +++++++++++++ 5 files changed, 367 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 3685a377e3f6..a06ccaf5bc68 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -50,8 +50,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Remove deprecated awscloudwatch field from Filebeat. {pull}41089[41089] - The performance of ingesting SQS data with the S3 input has improved by up to 60x for queues with many small events. `max_number_of_messages` config for SQS mode is now ignored, as the new design no longer needs a manual cap on messages. Instead, use `number_of_workers` to scale ingestion rate in both S3 and SQS modes. The increased efficiency may increase network bandwidth consumption, which can be throttled by lowering `number_of_workers`. It may also increase number of events stored in memory, which can be throttled by lowering the configured size of the internal queue. {pull}40699[40699] - Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover. {pull}41585[41585] - - Add kafka compression support for ZSTD. +- Filebeat fails to start if there is any input without ID or with a duplicated ID {pull}41731[41731] *Heartbeat* diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 815b6fabfde2..5eb25f71c4d0 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -32,6 +32,7 @@ import ( "github.com/elastic/beats/v7/filebeat/fileset" _ "github.com/elastic/beats/v7/filebeat/include" "github.com/elastic/beats/v7/filebeat/input" + "github.com/elastic/beats/v7/filebeat/input/filestream" "github.com/elastic/beats/v7/filebeat/input/filestream/takeover" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/filebeat/input/v2/compat" @@ -291,6 +292,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error { } defer stateStore.Close() + err = filestream.ValidateInputIDs(config.Inputs, logp.NewLogger("filestream")) + if err != nil { + logp.Err("invalid filestream configuration: %+v", err) + return err + } err = processLogInputTakeOver(stateStore, config) if err != nil { logp.Err("Failed to attempt filestream state take over: %+v", err) diff --git a/filebeat/input/filestream/config.go b/filebeat/input/filestream/config.go index 1cb8fa5da979..658961cafc7d 100644 --- a/filebeat/input/filestream/config.go +++ b/filebeat/input/filestream/config.go @@ -19,6 +19,7 @@ package filestream import ( "fmt" + "strings" "time" "github.com/dustin/go-humanize" @@ -27,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/reader/parser" "github.com/elastic/beats/v7/libbeat/reader/readfile" conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" ) // Config stores the options of a file stream. @@ -142,3 +144,78 @@ func (c *config) Validate() error { return nil } + +// ValidateInputIDs checks all filestream inputs to ensure all have an ID and +// the ID is unique. If there is any empty or duplicated ID, it logs an error +// containing the offending input configurations and returns an error containing +// the duplicated IDs. +func ValidateInputIDs(inputs []*conf.C, logger *logp.Logger) error { + ids := make(map[string][]*conf.C) + var duplicates []string + var empty []*conf.C + for _, input := range inputs { + fsInput := struct { + ID string `config:"id"` + Type string `config:"type"` + }{} + err := input.Unpack(&fsInput) + if err != nil { + return fmt.Errorf("failed to unpack filestream input configuration: %w", err) + } + if fsInput.Type == "filestream" { + if fsInput.ID == "" { + empty = append(empty, input) + continue + } + ids[fsInput.ID] = append(ids[fsInput.ID], input) + if len(ids[fsInput.ID]) > 1 { + duplicates = append(duplicates, fsInput.ID) + } + } + } + + var errs []string + if empty != nil { + errs = append(errs, "input without ID") + } + if len(duplicates) != 0 { + errs = append(errs, fmt.Sprintf("filestream inputs with duplicated IDs: %v", strings.Join(duplicates, ","))) + } + + if len(errs) != 0 { + jsonDupCfg := collectOffendingInputs(duplicates, empty, ids) + logger.Errorw("filestream inputs with invalid IDs", "inputs", jsonDupCfg) + + return fmt.Errorf("filestream inputs validation error: %s", strings.Join(errs, ", ")) + } + + return nil +} + +func collectOffendingInputs(duplicates []string, empty []*conf.C, ids map[string][]*conf.C) []map[string]interface{} { + var cfgs []map[string]interface{} + + if len(empty) > 0 { + for _, cfg := range empty { + toJson := map[string]interface{}{} + err := cfg.Unpack(&toJson) + if err != nil { + toJson["emptyID"] = fmt.Sprintf("failes to umpack config: %v", err) + } + cfgs = append(cfgs, toJson) + } + } + + for _, id := range duplicates { + for _, dupcfgs := range ids[id] { + toJson := map[string]interface{}{} + err := dupcfgs.Unpack(&toJson) + if err != nil { + toJson[id] = fmt.Sprintf("failes to umpack config: %v", err) + } + cfgs = append(cfgs, toJson) + } + } + + return cfgs +} diff --git a/filebeat/input/filestream/config_test.go b/filebeat/input/filestream/config_test.go index 6cf045060c94..41bb11906057 100644 --- a/filebeat/input/filestream/config_test.go +++ b/filebeat/input/filestream/config_test.go @@ -18,9 +18,15 @@ package filestream import ( + "encoding/json" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest/observer" + + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" ) func TestConfigValidate(t *testing.T) { @@ -30,3 +36,158 @@ func TestConfigValidate(t *testing.T) { require.Error(t, err) }) } + +func TestValidateInputIDs(t *testing.T) { + tcs := []struct { + name string + cfg []string + assertErr func(t *testing.T, err error) + assertLogs func(t *testing.T, buff *observer.ObservedLogs) + }{ + { + name: "empty config", + cfg: []string{""}, + assertErr: func(t *testing.T, err error) { + assert.NoError(t, err, "empty config should not return an error") + }, + }, + { + name: "empty ID", + cfg: []string{` +type: filestream +`, ` +type: filestream +id: some-id-1 +`, ` +type: filestream +id: some-id-2 +`, + }, + assertErr: func(t *testing.T, err error) { + assert.ErrorContains(t, err, "input without ID") + }, + assertLogs: func(t *testing.T, obs *observer.ObservedLogs) { + want := `[{"type":"filestream"}]` + + logs := obs.TakeAll() + require.Len(t, logs, 1, "there should be only one log entry") + + got, err := json.Marshal(logs[0].ContextMap()["inputs"]) + require.NoError(t, err, "could not marshal duplicated IDs inputs") + assert.Equal(t, want, string(got)) + + }, + }, + { + name: "duplicated IDs", + cfg: []string{` +type: filestream +id: duplicated-id-1 +`, ` +type: filestream +id: duplicated-id-1 +`, ` +type: filestream +id: duplicated-id-2 +`, ` +type: filestream +id: duplicated-id-2 +`, ` +type: filestream +id: unique-ID +`, + }, + assertErr: func(t *testing.T, err error) { + assert.ErrorContains(t, err, "filestream inputs with duplicated IDs") + assert.ErrorContains(t, err, "filestream inputs with duplicated IDs") + assert.ErrorContains(t, err, "duplicated-id-1") + assert.ErrorContains(t, err, "duplicated-id-2") + }, + assertLogs: func(t *testing.T, obs *observer.ObservedLogs) { + want := `[{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"}]` + + logs := obs.TakeAll() + require.Len(t, logs, 1, "there should be only one log entry") + + got, err := json.Marshal(logs[0].ContextMap()["inputs"]) + require.NoError(t, err, "could not marshal duplicated IDs inputs") + assert.Equal(t, want, string(got)) + }, + }, + { + name: "duplicated IDs and empty ID", + cfg: []string{` +type: filestream +`, ` +type: filestream +`, ` +type: filestream +id: duplicated-id-1 +`, ` +type: filestream +id: duplicated-id-1 +`, ` +type: filestream +id: duplicated-id-2 +`, ` +type: filestream +id: duplicated-id-2 +`, ` +type: filestream +id: unique-ID +`, + }, + assertErr: func(t *testing.T, err error) { + assert.ErrorContains(t, err, "input without ID") + assert.ErrorContains(t, err, "filestream inputs with duplicated IDs") + }, + assertLogs: func(t *testing.T, obs *observer.ObservedLogs) { + want := `[{"type":"filestream"},{"type":"filestream"},{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"}]` + + logs := obs.TakeAll() + require.Len(t, logs, 1, "there should be only one log entry") + + got, err := json.Marshal(logs[0].ContextMap()["inputs"]) + require.NoError(t, err, "could not marshal duplicated IDs inputs") + assert.Equal(t, want, string(got)) + + }, + }, + { + name: "only unique IDs", + cfg: []string{` +type: filestream +id: unique-id-1 +`, ` +type: filestream +id: unique-id-2 +`, ` +type: filestream +id: unique-id-3 +`, + }, + assertErr: func(t *testing.T, err error) { + assert.NoError(t, err, "only unique IDs should not return an error") + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + var inputs []*conf.C + for _, c := range tc.cfg { + cfg, err := conf.NewConfigFrom(c) + require.NoError(t, err, "could not create input configuration") + inputs = append(inputs, cfg) + } + err := logp.DevelopmentSetup(logp.ToObserverOutput()) + require.NoError(t, err, "could not setup log for development") + + err = ValidateInputIDs(inputs, logp.L()) + tc.assertErr(t, err) + if tc.assertLogs != nil { + tc.assertLogs(t, logp.ObserverLogs()) + } + }) + } +} diff --git a/filebeat/tests/integration/filestream_test.go b/filebeat/tests/integration/filestream_test.go index 3ddb04a2c20c..100b60e08448 100644 --- a/filebeat/tests/integration/filestream_test.go +++ b/filebeat/tests/integration/filestream_test.go @@ -26,6 +26,9 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/tests/integration" ) @@ -105,3 +108,122 @@ func TestFilestreamCleanInactive(t *testing.T) { registryFile := filepath.Join(filebeat.TempDir(), "data", "registry", "filebeat", "log.json") filebeat.WaitFileContains(registryFile, `"op":"remove"`, time.Second) } + +func TestFilestreamValidationPreventsFilebeatStart(t *testing.T) { + duplicatedIDs := ` +filebeat.inputs: + - type: filestream + id: duplicated-id-1 + enabled: true + paths: + - /tmp/*.log + - type: filestream + id: duplicated-id-1 + enabled: true + paths: + - /var/log/*.log + +output.discard.enabled: true +logging: + level: debug + metrics: + enabled: false +` + emptyID := ` +filebeat.inputs: + - type: filestream + enabled: true + paths: + - /tmp/*.log + - type: filestream + id: unique-id-1 + enabled: true + paths: + - /var/log/*.log + +output.discard.enabled: true +logging: + level: debug + metrics: + enabled: false +` + tcs := []struct { + name string + cfg string + wantErrLog string + }{ + { + name: "duplicated IDs", + cfg: duplicatedIDs, + wantErrLog: "filestream inputs with duplicated IDs", + }, + { + name: "empty ID", + cfg: emptyID, + wantErrLog: "input without ID", + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + + // Write configuration file and start Filebeat + filebeat.WriteConfigFile(tc.cfg) + filebeat.Start() + + // Wait for error log + filebeat.WaitForLogs( + "filestream inputs validation error", + 10*time.Second, + "Filebeat did log a validation error") + + filebeat.WaitForLogs( + tc.wantErrLog, + 10*time.Second, "Filebeat did not log there are issues with input ids") + + proc, err := filebeat.Process.Wait() + require.NoError(t, err, "filebeat process.Wait returned an error") + assert.False(t, proc.Success(), "filebeat should have failed to start") + }) + } +} + +func TestFilestreamValidationSucceeds(t *testing.T) { + cfg := ` +filebeat.inputs: + - type: filestream + id: unique-id-1 + enabled: true + paths: + - /tmp/*.log + - type: filestream + id: unique-id-2 + enabled: true + paths: + - /var/log/*.log + +output.discard.enabled: true +logging: + level: debug +` + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + + // Write configuration file and start Filebeat + filebeat.WriteConfigFile(cfg) + filebeat.Start() + + // Wait for error log + filebeat.WaitForLogs( + "Input 'filestream' starting", + 10*time.Second, + "Filebeat did log a validation error") +} From 70c35694fe6a21c9a64b3a87bbd0f9bcdf77fe6b Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Fri, 22 Nov 2024 15:41:52 +0100 Subject: [PATCH 2/7] fix typos --- filebeat/input/filestream/config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/filebeat/input/filestream/config.go b/filebeat/input/filestream/config.go index 658961cafc7d..c06127633491 100644 --- a/filebeat/input/filestream/config.go +++ b/filebeat/input/filestream/config.go @@ -200,7 +200,7 @@ func collectOffendingInputs(duplicates []string, empty []*conf.C, ids map[string toJson := map[string]interface{}{} err := cfg.Unpack(&toJson) if err != nil { - toJson["emptyID"] = fmt.Sprintf("failes to umpack config: %v", err) + toJson["emptyID"] = fmt.Sprintf("failed to unpack config: %v", err) } cfgs = append(cfgs, toJson) } @@ -211,7 +211,7 @@ func collectOffendingInputs(duplicates []string, empty []*conf.C, ids map[string toJson := map[string]interface{}{} err := dupcfgs.Unpack(&toJson) if err != nil { - toJson[id] = fmt.Sprintf("failes to umpack config: %v", err) + toJson[id] = fmt.Sprintf("failed to unpack config: %v", err) } cfgs = append(cfgs, toJson) } From 0bc1392828b10d3993c4aa4ee4b3bc237df2f854 Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Fri, 22 Nov 2024 18:40:14 +0100 Subject: [PATCH 3/7] pr changes --- filebeat/input/filestream/config_test.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/filebeat/input/filestream/config_test.go b/filebeat/input/filestream/config_test.go index 41bb11906057..cf05fe680d19 100644 --- a/filebeat/input/filestream/config_test.go +++ b/filebeat/input/filestream/config_test.go @@ -19,6 +19,7 @@ package filestream import ( "encoding/json" + "strings" "testing" "github.com/stretchr/testify/assert" @@ -94,17 +95,22 @@ type: filestream id: duplicated-id-2 `, ` type: filestream +id: duplicated-id-2 +`, ` +type: filestream id: unique-ID `, }, assertErr: func(t *testing.T, err error) { - assert.ErrorContains(t, err, "filestream inputs with duplicated IDs") assert.ErrorContains(t, err, "filestream inputs with duplicated IDs") assert.ErrorContains(t, err, "duplicated-id-1") assert.ErrorContains(t, err, "duplicated-id-2") + assert.Equal(t, strings.Count(err.Error(), "duplicated-id-1"), 1, "each IDs should appear only once") + assert.Equal(t, strings.Count(err.Error(), "duplicated-id-2"), 1, "each IDs should appear only once") + }, assertLogs: func(t *testing.T, obs *observer.ObservedLogs) { - want := `[{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"}]` + want := `[{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"}]` logs := obs.TakeAll() require.Len(t, logs, 1, "there should be only one log entry") From e05c888127bc17d3cdf4471ab47f19f5115dd86f Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Fri, 22 Nov 2024 18:41:06 +0100 Subject: [PATCH 4/7] pr changes --- filebeat/input/filestream/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filebeat/input/filestream/config.go b/filebeat/input/filestream/config.go index c06127633491..5fc73c112020 100644 --- a/filebeat/input/filestream/config.go +++ b/filebeat/input/filestream/config.go @@ -168,7 +168,7 @@ func ValidateInputIDs(inputs []*conf.C, logger *logp.Logger) error { continue } ids[fsInput.ID] = append(ids[fsInput.ID], input) - if len(ids[fsInput.ID]) > 1 { + if len(ids[fsInput.ID]) == 2 { duplicates = append(duplicates, fsInput.ID) } } From 885799e5fe61b45d12420eb45854ff5333eed656 Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Mon, 25 Nov 2024 10:47:50 +0100 Subject: [PATCH 5/7] allow empty IDs --- CHANGELOG.next.asciidoc | 2 +- filebeat/input/filestream/config.go | 37 +++++------------------- filebeat/input/filestream/config_test.go | 37 +++++++++++++++++++----- 3 files changed, 39 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a06ccaf5bc68..acb81e5a8956 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -51,7 +51,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - The performance of ingesting SQS data with the S3 input has improved by up to 60x for queues with many small events. `max_number_of_messages` config for SQS mode is now ignored, as the new design no longer needs a manual cap on messages. Instead, use `number_of_workers` to scale ingestion rate in both S3 and SQS modes. The increased efficiency may increase network bandwidth consumption, which can be throttled by lowering `number_of_workers`. It may also increase number of events stored in memory, which can be throttled by lowering the configured size of the internal queue. {pull}40699[40699] - Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover. {pull}41585[41585] - Add kafka compression support for ZSTD. -- Filebeat fails to start if there is any input without ID or with a duplicated ID {pull}41731[41731] +- Filebeat fails to start if there is any input with a duplicated ID. It logs the duplicated IDs and the offending inputs configurations. {pull}41731[41731] *Heartbeat* diff --git a/filebeat/input/filestream/config.go b/filebeat/input/filestream/config.go index 5fc73c112020..c73492608b06 100644 --- a/filebeat/input/filestream/config.go +++ b/filebeat/input/filestream/config.go @@ -152,7 +152,6 @@ func (c *config) Validate() error { func ValidateInputIDs(inputs []*conf.C, logger *logp.Logger) error { ids := make(map[string][]*conf.C) var duplicates []string - var empty []*conf.C for _, input := range inputs { fsInput := struct { ID string `config:"id"` @@ -163,10 +162,6 @@ func ValidateInputIDs(inputs []*conf.C, logger *logp.Logger) error { return fmt.Errorf("failed to unpack filestream input configuration: %w", err) } if fsInput.Type == "filestream" { - if fsInput.ID == "" { - empty = append(empty, input) - continue - } ids[fsInput.ID] = append(ids[fsInput.ID], input) if len(ids[fsInput.ID]) == 2 { duplicates = append(duplicates, fsInput.ID) @@ -174,38 +169,22 @@ func ValidateInputIDs(inputs []*conf.C, logger *logp.Logger) error { } } - var errs []string - if empty != nil { - errs = append(errs, "input without ID") - } if len(duplicates) != 0 { - errs = append(errs, fmt.Sprintf("filestream inputs with duplicated IDs: %v", strings.Join(duplicates, ","))) - } - - if len(errs) != 0 { - jsonDupCfg := collectOffendingInputs(duplicates, empty, ids) - logger.Errorw("filestream inputs with invalid IDs", "inputs", jsonDupCfg) - - return fmt.Errorf("filestream inputs validation error: %s", strings.Join(errs, ", ")) + jsonDupCfg := collectOffendingInputs(duplicates, ids) + logger.Errorw("filestream inputs with duplicated IDs", "inputs", jsonDupCfg) + var quotedDuplicates []string + for _, dup := range duplicates { + quotedDuplicates = append(quotedDuplicates, fmt.Sprintf("%q", dup)) + } + return fmt.Errorf("filestream inputs validation error: filestream inputs with duplicated IDs: %v", strings.Join(quotedDuplicates, ",")) } return nil } -func collectOffendingInputs(duplicates []string, empty []*conf.C, ids map[string][]*conf.C) []map[string]interface{} { +func collectOffendingInputs(duplicates []string, ids map[string][]*conf.C) []map[string]interface{} { var cfgs []map[string]interface{} - if len(empty) > 0 { - for _, cfg := range empty { - toJson := map[string]interface{}{} - err := cfg.Unpack(&toJson) - if err != nil { - toJson["emptyID"] = fmt.Sprintf("failed to unpack config: %v", err) - } - cfgs = append(cfgs, toJson) - } - } - for _, id := range duplicates { for _, dupcfgs := range ids[id] { toJson := map[string]interface{}{} diff --git a/filebeat/input/filestream/config_test.go b/filebeat/input/filestream/config_test.go index cf05fe680d19..729b712d58ea 100644 --- a/filebeat/input/filestream/config_test.go +++ b/filebeat/input/filestream/config_test.go @@ -53,7 +53,7 @@ func TestValidateInputIDs(t *testing.T) { }, }, { - name: "empty ID", + name: "one empty ID is allowed", cfg: []string{` type: filestream `, ` @@ -65,10 +65,36 @@ id: some-id-2 `, }, assertErr: func(t *testing.T, err error) { - assert.ErrorContains(t, err, "input without ID") + assert.NoError(t, err, "one empty id is allowed") + }, + }, + { + name: "duplicated empty ID", + cfg: []string{` +type: filestream +paths: + - "/tmp/empty-1" +`, ` +type: filestream +paths: + - "/tmp/empty-2" +`, ` +type: filestream +id: unique-id-1 +`, ` +type: filestream +id: unique-id-2 +`, ` +type: filestream +id: unique-ID +`, + }, + assertErr: func(t *testing.T, err error) { + assert.ErrorContains(t, err, `filestream inputs with duplicated IDs: ""`) + }, assertLogs: func(t *testing.T, obs *observer.ObservedLogs) { - want := `[{"type":"filestream"}]` + want := `[{"paths":["/tmp/empty-1"],"type":"filestream"},{"paths":["/tmp/empty-2"],"type":"filestream"}]` logs := obs.TakeAll() require.Len(t, logs, 1, "there should be only one log entry") @@ -76,10 +102,8 @@ id: some-id-2 got, err := json.Marshal(logs[0].ContextMap()["inputs"]) require.NoError(t, err, "could not marshal duplicated IDs inputs") assert.Equal(t, want, string(got)) - }, - }, - { + }, { name: "duplicated IDs", cfg: []string{` type: filestream @@ -144,7 +168,6 @@ id: unique-ID `, }, assertErr: func(t *testing.T, err error) { - assert.ErrorContains(t, err, "input without ID") assert.ErrorContains(t, err, "filestream inputs with duplicated IDs") }, assertLogs: func(t *testing.T, obs *observer.ObservedLogs) { From 100e0f5c7918ded2620d7f0017d49b521c731cec Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Mon, 25 Nov 2024 18:41:43 +0100 Subject: [PATCH 6/7] adjust integration tests --- filebeat/beater/filebeat.go | 2 +- filebeat/tests/integration/filestream_test.go | 78 +++++++++++++++---- 2 files changed, 62 insertions(+), 18 deletions(-) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 5eb25f71c4d0..ceab21aa3590 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -292,7 +292,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { } defer stateStore.Close() - err = filestream.ValidateInputIDs(config.Inputs, logp.NewLogger("filestream")) + err = filestream.ValidateInputIDs(config.Inputs, logp.NewLogger("input.filestream")) if err != nil { logp.Err("invalid filestream configuration: %+v", err) return err diff --git a/filebeat/tests/integration/filestream_test.go b/filebeat/tests/integration/filestream_test.go index 100b60e08448..45cf99fbfb61 100644 --- a/filebeat/tests/integration/filestream_test.go +++ b/filebeat/tests/integration/filestream_test.go @@ -136,11 +136,50 @@ filebeat.inputs: paths: - /tmp/*.log - type: filestream - id: unique-id-1 enabled: true paths: - /var/log/*.log +output.discard.enabled: true +logging: + level: debug + metrics: + enabled: false +` + multipleDuplicatedIDs := ` +filebeat.inputs: + - type: filestream + enabled: true + paths: + - /tmp/*.log + - type: filestream + enabled: true + paths: + - /var/log/*.log + + - type: filestream + id: duplicated-id-1 + enabled: true + paths: + - /tmp/duplicated-id-1.log + - type: filestream + id: duplicated-id-1 + enabled: true + paths: + - /tmp/duplicated-id-1-2.log + + + - type: filestream + id: unique-id-1 + enabled: true + paths: + - /tmp/unique-id-1.log + - type: filestream + id: unique-id-2 + enabled: true + paths: + - /var/log/unique-id-2.log + output.discard.enabled: true logging: level: debug @@ -148,19 +187,20 @@ logging: enabled: false ` tcs := []struct { - name string - cfg string - wantErrLog string + name string + cfg string }{ { - name: "duplicated IDs", - cfg: duplicatedIDs, - wantErrLog: "filestream inputs with duplicated IDs", + name: "duplicated IDs", + cfg: duplicatedIDs, + }, + { + name: "duplicated empty ID", + cfg: emptyID, }, { - name: "empty ID", - cfg: emptyID, - wantErrLog: "input without ID", + name: "two inputs without ID and duplicated IDs", + cfg: multipleDuplicatedIDs, }, } @@ -180,15 +220,12 @@ logging: filebeat.WaitForLogs( "filestream inputs validation error", 10*time.Second, - "Filebeat did log a validation error") - - filebeat.WaitForLogs( - tc.wantErrLog, - 10*time.Second, "Filebeat did not log there are issues with input ids") + "Filebeat did not log a filestream input validation error") proc, err := filebeat.Process.Wait() require.NoError(t, err, "filebeat process.Wait returned an error") assert.False(t, proc.Success(), "filebeat should have failed to start") + }) } } @@ -196,20 +233,27 @@ logging: func TestFilestreamValidationSucceeds(t *testing.T) { cfg := ` filebeat.inputs: + - type: filestream + enabled: true + paths: + - /var/log/*.log + - type: filestream id: unique-id-1 enabled: true paths: - - /tmp/*.log + - /tmp/unique-id-1.log - type: filestream id: unique-id-2 enabled: true paths: - - /var/log/*.log + - /var/log/unique-id-2.log output.discard.enabled: true logging: level: debug + metrics: + enabled: false ` filebeat := integration.NewBeat( t, From cec09f37df2237e64f80c3547a19036b007f8bc7 Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Tue, 3 Dec 2024 11:44:17 +0100 Subject: [PATCH 7/7] wee refactor on docs and names --- filebeat/input/filestream/config.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/filebeat/input/filestream/config.go b/filebeat/input/filestream/config.go index c73492608b06..2860dd673c23 100644 --- a/filebeat/input/filestream/config.go +++ b/filebeat/input/filestream/config.go @@ -145,12 +145,13 @@ func (c *config) Validate() error { return nil } -// ValidateInputIDs checks all filestream inputs to ensure all have an ID and -// the ID is unique. If there is any empty or duplicated ID, it logs an error -// containing the offending input configurations and returns an error containing -// the duplicated IDs. +// ValidateInputIDs checks all filestream inputs to ensure all input IDs are +// unique. If there is a duplicated ID, it logs an error containing the offending +// input configurations and returns an error containing the duplicated IDs. +// A single empty ID is a valid ID as it's unique, however multiple empty IDs +// are not unique and are therefore are treated as any other duplicated ID. func ValidateInputIDs(inputs []*conf.C, logger *logp.Logger) error { - ids := make(map[string][]*conf.C) + duplicatedConfigs := make(map[string][]*conf.C) var duplicates []string for _, input := range inputs { fsInput := struct { @@ -162,15 +163,17 @@ func ValidateInputIDs(inputs []*conf.C, logger *logp.Logger) error { return fmt.Errorf("failed to unpack filestream input configuration: %w", err) } if fsInput.Type == "filestream" { - ids[fsInput.ID] = append(ids[fsInput.ID], input) - if len(ids[fsInput.ID]) == 2 { + duplicatedConfigs[fsInput.ID] = append(duplicatedConfigs[fsInput.ID], input) + // we just need to collect the duplicated IDs once, therefore collect + // it only the first time we see a duplicated ID. + if len(duplicatedConfigs[fsInput.ID]) == 2 { duplicates = append(duplicates, fsInput.ID) } } } if len(duplicates) != 0 { - jsonDupCfg := collectOffendingInputs(duplicates, ids) + jsonDupCfg := collectOffendingInputs(duplicates, duplicatedConfigs) logger.Errorw("filestream inputs with duplicated IDs", "inputs", jsonDupCfg) var quotedDuplicates []string for _, dup := range duplicates {