Skip to content

Commit

Permalink
filestream: validate input ids on startup
Browse files Browse the repository at this point in the history
During startup filebeat now validates the filestream inputs and fails to start if there are inputs without ID or with duplicated IDs
  • Loading branch information
AndersonQ committed Nov 21, 2024
1 parent d66f2bf commit b6ecc4e
Show file tree
Hide file tree
Showing 5 changed files with 367 additions and 1 deletion.
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Remove deprecated awscloudwatch field from Filebeat. {pull}41089[41089]
- The performance of ingesting SQS data with the S3 input has improved by up to 60x for queues with many small events. `max_number_of_messages` config for SQS mode is now ignored, as the new design no longer needs a manual cap on messages. Instead, use `number_of_workers` to scale ingestion rate in both S3 and SQS modes. The increased efficiency may increase network bandwidth consumption, which can be throttled by lowering `number_of_workers`. It may also increase number of events stored in memory, which can be throttled by lowering the configured size of the internal queue. {pull}40699[40699]
- Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover. {pull}41585[41585]

- Add kafka compression support for ZSTD.
- Filebeat fails to start if there is any input without ID or with a duplicated ID {pull}41731[41731]

*Heartbeat*

Expand Down
6 changes: 6 additions & 0 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/elastic/beats/v7/filebeat/fileset"
_ "github.com/elastic/beats/v7/filebeat/include"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/filebeat/input/filestream"
"github.com/elastic/beats/v7/filebeat/input/filestream/takeover"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/filebeat/input/v2/compat"
Expand Down Expand Up @@ -291,6 +292,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}
defer stateStore.Close()

err = filestream.ValidateInputIDs(config.Inputs, logp.NewLogger("filestream"))
if err != nil {
logp.Err("invalid filestream configuration: %+v", err)
return err
}
err = processLogInputTakeOver(stateStore, config)
if err != nil {
logp.Err("Failed to attempt filestream state take over: %+v", err)
Expand Down
77 changes: 77 additions & 0 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package filestream

import (
"fmt"
"strings"
"time"

"github.com/dustin/go-humanize"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/elastic/beats/v7/libbeat/reader/parser"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

// Config stores the options of a file stream.
Expand Down Expand Up @@ -142,3 +144,78 @@ func (c *config) Validate() error {

return nil
}

// ValidateInputIDs checks all filestream inputs to ensure all have an ID and
// the ID is unique. If there is any empty or duplicated ID, it logs an error
// containing the offending input configurations and returns an error containing
// the duplicated IDs.
func ValidateInputIDs(inputs []*conf.C, logger *logp.Logger) error {
ids := make(map[string][]*conf.C)
var duplicates []string
var empty []*conf.C
for _, input := range inputs {
fsInput := struct {
ID string `config:"id"`
Type string `config:"type"`
}{}
err := input.Unpack(&fsInput)
if err != nil {
return fmt.Errorf("failed to unpack filestream input configuration: %w", err)
}
if fsInput.Type == "filestream" {
if fsInput.ID == "" {
empty = append(empty, input)
continue
}
ids[fsInput.ID] = append(ids[fsInput.ID], input)
if len(ids[fsInput.ID]) > 1 {
duplicates = append(duplicates, fsInput.ID)
}
}
}

var errs []string
if empty != nil {
errs = append(errs, "input without ID")
}
if len(duplicates) != 0 {
errs = append(errs, fmt.Sprintf("filestream inputs with duplicated IDs: %v", strings.Join(duplicates, ",")))
}

if len(errs) != 0 {
jsonDupCfg := collectOffendingInputs(duplicates, empty, ids)
logger.Errorw("filestream inputs with invalid IDs", "inputs", jsonDupCfg)

return fmt.Errorf("filestream inputs validation error: %s", strings.Join(errs, ", "))
}

return nil
}

func collectOffendingInputs(duplicates []string, empty []*conf.C, ids map[string][]*conf.C) []map[string]interface{} {
var cfgs []map[string]interface{}

if len(empty) > 0 {
for _, cfg := range empty {
toJson := map[string]interface{}{}
err := cfg.Unpack(&toJson)
if err != nil {
toJson["emptyID"] = fmt.Sprintf("failes to umpack config: %v", err)
}
cfgs = append(cfgs, toJson)
}
}

for _, id := range duplicates {
for _, dupcfgs := range ids[id] {
toJson := map[string]interface{}{}
err := dupcfgs.Unpack(&toJson)
if err != nil {
toJson[id] = fmt.Sprintf("failes to umpack config: %v", err)
}
cfgs = append(cfgs, toJson)
}
}

return cfgs
}
161 changes: 161 additions & 0 deletions filebeat/input/filestream/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@
package filestream

import (
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest/observer"

conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

func TestConfigValidate(t *testing.T) {
Expand All @@ -30,3 +36,158 @@ func TestConfigValidate(t *testing.T) {
require.Error(t, err)
})
}

func TestValidateInputIDs(t *testing.T) {
tcs := []struct {
name string
cfg []string
assertErr func(t *testing.T, err error)
assertLogs func(t *testing.T, buff *observer.ObservedLogs)
}{
{
name: "empty config",
cfg: []string{""},
assertErr: func(t *testing.T, err error) {
assert.NoError(t, err, "empty config should not return an error")
},
},
{
name: "empty ID",
cfg: []string{`
type: filestream
`, `
type: filestream
id: some-id-1
`, `
type: filestream
id: some-id-2
`,
},
assertErr: func(t *testing.T, err error) {
assert.ErrorContains(t, err, "input without ID")
},
assertLogs: func(t *testing.T, obs *observer.ObservedLogs) {
want := `[{"type":"filestream"}]`

logs := obs.TakeAll()
require.Len(t, logs, 1, "there should be only one log entry")

got, err := json.Marshal(logs[0].ContextMap()["inputs"])
require.NoError(t, err, "could not marshal duplicated IDs inputs")
assert.Equal(t, want, string(got))

},
},
{
name: "duplicated IDs",
cfg: []string{`
type: filestream
id: duplicated-id-1
`, `
type: filestream
id: duplicated-id-1
`, `
type: filestream
id: duplicated-id-2
`, `
type: filestream
id: duplicated-id-2
`, `
type: filestream
id: unique-ID
`,
},
assertErr: func(t *testing.T, err error) {
assert.ErrorContains(t, err, "filestream inputs with duplicated IDs")
assert.ErrorContains(t, err, "filestream inputs with duplicated IDs")
assert.ErrorContains(t, err, "duplicated-id-1")
assert.ErrorContains(t, err, "duplicated-id-2")
},
assertLogs: func(t *testing.T, obs *observer.ObservedLogs) {
want := `[{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"}]`

logs := obs.TakeAll()
require.Len(t, logs, 1, "there should be only one log entry")

got, err := json.Marshal(logs[0].ContextMap()["inputs"])
require.NoError(t, err, "could not marshal duplicated IDs inputs")
assert.Equal(t, want, string(got))
},
},
{
name: "duplicated IDs and empty ID",
cfg: []string{`
type: filestream
`, `
type: filestream
`, `
type: filestream
id: duplicated-id-1
`, `
type: filestream
id: duplicated-id-1
`, `
type: filestream
id: duplicated-id-2
`, `
type: filestream
id: duplicated-id-2
`, `
type: filestream
id: unique-ID
`,
},
assertErr: func(t *testing.T, err error) {
assert.ErrorContains(t, err, "input without ID")
assert.ErrorContains(t, err, "filestream inputs with duplicated IDs")
},
assertLogs: func(t *testing.T, obs *observer.ObservedLogs) {
want := `[{"type":"filestream"},{"type":"filestream"},{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-1","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"},{"id":"duplicated-id-2","type":"filestream"}]`

logs := obs.TakeAll()
require.Len(t, logs, 1, "there should be only one log entry")

got, err := json.Marshal(logs[0].ContextMap()["inputs"])
require.NoError(t, err, "could not marshal duplicated IDs inputs")
assert.Equal(t, want, string(got))

},
},
{
name: "only unique IDs",
cfg: []string{`
type: filestream
id: unique-id-1
`, `
type: filestream
id: unique-id-2
`, `
type: filestream
id: unique-id-3
`,
},
assertErr: func(t *testing.T, err error) {
assert.NoError(t, err, "only unique IDs should not return an error")
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
var inputs []*conf.C
for _, c := range tc.cfg {
cfg, err := conf.NewConfigFrom(c)
require.NoError(t, err, "could not create input configuration")
inputs = append(inputs, cfg)
}
err := logp.DevelopmentSetup(logp.ToObserverOutput())
require.NoError(t, err, "could not setup log for development")

err = ValidateInputIDs(inputs, logp.L())
tc.assertErr(t, err)
if tc.assertLogs != nil {
tc.assertLogs(t, logp.ObserverLogs())
}
})
}
}
Loading

0 comments on commit b6ecc4e

Please sign in to comment.