Skip to content

Commit

Permalink
filestream: require unique input id on filebeat startup (#41731)
Browse files Browse the repository at this point in the history
During startup filebeat now validates the filestream inputs and fails to start if there are with duplicated IDs. Duplicated IDs might cause data duplication therefore now it's mandatory to have unique for each filestream input. 

Disruptive User Impact
Impact: Users who have not configured unique IDs for their filestream inputs will need to update their configurations to include unique IDs for each input. Failure to do so will prevent Filebeat from starting.

Previously: Filebeat would only log an error if filestream would find inputs with duplicated IDs, potentially leading to data duplication.

Now: Filebeat will fail to start if any filestream input has a duplicated ID.
  • Loading branch information
AndersonQ authored Dec 3, 2024
1 parent 01cc134 commit 8e20316
Show file tree
Hide file tree
Showing 5 changed files with 422 additions and 1 deletion.
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Remove deprecated awscloudwatch field from Filebeat. {pull}41089[41089]
- The performance of ingesting SQS data with the S3 input has improved by up to 60x for queues with many small events. `max_number_of_messages` config for SQS mode is now ignored, as the new design no longer needs a manual cap on messages. Instead, use `number_of_workers` to scale ingestion rate in both S3 and SQS modes. The increased efficiency may increase network bandwidth consumption, which can be throttled by lowering `number_of_workers`. It may also increase number of events stored in memory, which can be throttled by lowering the configured size of the internal queue. {pull}40699[40699]
- Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover. {pull}41585[41585]

- Add kafka compression support for ZSTD.
- Filebeat fails to start if there is any input with a duplicated ID. It logs the duplicated IDs and the offending inputs configurations. {pull}41731[41731]

*Heartbeat*

Expand Down
6 changes: 6 additions & 0 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/elastic/beats/v7/filebeat/fileset"
_ "github.com/elastic/beats/v7/filebeat/include"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/filebeat/input/filestream"
"github.com/elastic/beats/v7/filebeat/input/filestream/takeover"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/filebeat/input/v2/compat"
Expand Down Expand Up @@ -291,6 +292,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}
defer stateStore.Close()

err = filestream.ValidateInputIDs(config.Inputs, logp.NewLogger("input.filestream"))
if err != nil {
logp.Err("invalid filestream configuration: %+v", err)
return err
}
err = processLogInputTakeOver(stateStore, config)
if err != nil {
logp.Err("Failed to attempt filestream state take over: %+v", err)
Expand Down
59 changes: 59 additions & 0 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package filestream

import (
"fmt"
"strings"
"time"

"github.com/dustin/go-humanize"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/elastic/beats/v7/libbeat/reader/parser"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

// Config stores the options of a file stream.
Expand Down Expand Up @@ -142,3 +144,60 @@ func (c *config) Validate() error {

return nil
}

// ValidateInputIDs checks all filestream inputs to ensure all input IDs are
// unique. If there is a duplicated ID, it logs an error containing the offending
// input configurations and returns an error containing the duplicated IDs.
// A single empty ID is a valid ID as it's unique, however multiple empty IDs
// are not unique and are therefore are treated as any other duplicated ID.
func ValidateInputIDs(inputs []*conf.C, logger *logp.Logger) error {
duplicatedConfigs := make(map[string][]*conf.C)
var duplicates []string
for _, input := range inputs {
fsInput := struct {
ID string `config:"id"`
Type string `config:"type"`
}{}
err := input.Unpack(&fsInput)
if err != nil {
return fmt.Errorf("failed to unpack filestream input configuration: %w", err)
}
if fsInput.Type == "filestream" {
duplicatedConfigs[fsInput.ID] = append(duplicatedConfigs[fsInput.ID], input)
// we just need to collect the duplicated IDs once, therefore collect
// it only the first time we see a duplicated ID.
if len(duplicatedConfigs[fsInput.ID]) == 2 {
duplicates = append(duplicates, fsInput.ID)
}
}
}

if len(duplicates) != 0 {
jsonDupCfg := collectOffendingInputs(duplicates, duplicatedConfigs)
logger.Errorw("filestream inputs with duplicated IDs", "inputs", jsonDupCfg)
var quotedDuplicates []string
for _, dup := range duplicates {
quotedDuplicates = append(quotedDuplicates, fmt.Sprintf("%q", dup))
}
return fmt.Errorf("filestream inputs validation error: filestream inputs with duplicated IDs: %v", strings.Join(quotedDuplicates, ","))
}

return nil
}

func collectOffendingInputs(duplicates []string, ids map[string][]*conf.C) []map[string]interface{} {
var cfgs []map[string]interface{}

for _, id := range duplicates {
for _, dupcfgs := range ids[id] {
toJson := map[string]interface{}{}
err := dupcfgs.Unpack(&toJson)
if err != nil {
toJson[id] = fmt.Sprintf("failed to unpack config: %v", err)
}
cfgs = append(cfgs, toJson)
}
}

return cfgs
}
190 changes: 190 additions & 0 deletions filebeat/input/filestream/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@
package filestream

import (
"encoding/json"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest/observer"

conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

func TestConfigValidate(t *testing.T) {
Expand All @@ -30,3 +37,186 @@ func TestConfigValidate(t *testing.T) {
require.Error(t, err)
})
}

func TestValidateInputIDs(t *testing.T) {
tcs := []struct {
name string
cfg []string
assertErr func(t *testing.T, err error)
assertLogs func(t *testing.T, buff *observer.ObservedLogs)
}{
{
name: "empty config",
cfg: []string{""},
assertErr: func(t *testing.T, err error) {
assert.NoError(t, err, "empty config should not return an error")
},
},
{
name: "one empty ID is allowed",
cfg: []string{`
type: filestream
`, `
type: filestream
id: some-id-1
`, `
type: filestream
id: some-id-2
`,
},
assertErr: func(t *testing.T, err error) {
assert.NoError(t, err, "one empty id is allowed")
},
},
{
name: "duplicated empty ID",
cfg: []string{`
type: filestream
paths:
- "/tmp/empty-1"
`, `
type: filestream
paths:
- "/tmp/empty-2"
`, `
type: filestream
id: unique-id-1
`, `
type: filestream
id: unique-id-2
`, `
type: filestream
id: unique-ID
`,
},
assertErr: func(t *testing.T, err error) {
assert.ErrorContains(t, err, `filestream inputs with duplicated IDs: ""`)

},
assertLogs: func(t *testing.T, obs *observer.ObservedLogs) {
want := `[{"paths":["/tmp/empty-1"],"type":"filestream"},{"paths":["/tmp/empty-2"],"type":"filestream"}]`

logs := obs.TakeAll()
require.Len(t, logs, 1, "there should be only one log entry")

got, err := json.Marshal(logs[0].ContextMap()["inputs"])
require.NoError(t, err, "could not marshal duplicated IDs inputs")
assert.Equal(t, want, string(got))
},
}, {
name: "duplicated IDs",
cfg: []string{`
type: filestream
id: duplicated-id-1
`, `
type: filestream
id: duplicated-id-1
`, `
type: filestream
id: duplicated-id-2
`, `
type: filestream
id: duplicated-id-2
`, `
type: filestream
id: duplicated-id-2
`, `
type: filestream
id: unique-ID
`,
},
assertErr: func(t *testing.T, err error) {
assert.ErrorContains(t, err, "filestream inputs with duplicated IDs")
assert.ErrorContains(t, err, "duplicated-id-1")
assert.ErrorContains(t, err, "duplicated-id-2")
assert.Equal(t, strings.Count(err.Error(), "duplicated-id-1"), 1, "each IDs should appear only once")
assert.Equal(t, strings.Count(err.Error(), "duplicated-id-2"), 1, "each IDs should appear only once")

},
assertLogs: func(t *testing.T, obs *observer.ObservedLogs) {
want := `[{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"}]`

logs := obs.TakeAll()
require.Len(t, logs, 1, "there should be only one log entry")

got, err := json.Marshal(logs[0].ContextMap()["inputs"])
require.NoError(t, err, "could not marshal duplicated IDs inputs")
assert.Equal(t, want, string(got))
},
},
{
name: "duplicated IDs and empty ID",
cfg: []string{`
type: filestream
`, `
type: filestream
`, `
type: filestream
id: duplicated-id-1
`, `
type: filestream
id: duplicated-id-1
`, `
type: filestream
id: duplicated-id-2
`, `
type: filestream
id: duplicated-id-2
`, `
type: filestream
id: unique-ID
`,
},
assertErr: func(t *testing.T, err error) {
assert.ErrorContains(t, err, "filestream inputs with duplicated IDs")
},
assertLogs: func(t *testing.T, obs *observer.ObservedLogs) {
want := `[{"type":"filestream"},{"type":"filestream"},{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"}]`

logs := obs.TakeAll()
require.Len(t, logs, 1, "there should be only one log entry")

got, err := json.Marshal(logs[0].ContextMap()["inputs"])
require.NoError(t, err, "could not marshal duplicated IDs inputs")
assert.Equal(t, want, string(got))

},
},
{
name: "only unique IDs",
cfg: []string{`
type: filestream
id: unique-id-1
`, `
type: filestream
id: unique-id-2
`, `
type: filestream
id: unique-id-3
`,
},
assertErr: func(t *testing.T, err error) {
assert.NoError(t, err, "only unique IDs should not return an error")
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
var inputs []*conf.C
for _, c := range tc.cfg {
cfg, err := conf.NewConfigFrom(c)
require.NoError(t, err, "could not create input configuration")
inputs = append(inputs, cfg)
}
err := logp.DevelopmentSetup(logp.ToObserverOutput())
require.NoError(t, err, "could not setup log for development")

err = ValidateInputIDs(inputs, logp.L())
tc.assertErr(t, err)
if tc.assertLogs != nil {
tc.assertLogs(t, logp.ObserverLogs())
}
})
}
}
Loading

0 comments on commit 8e20316

Please sign in to comment.