Skip to content

Commit

Permalink
filebeat: input v2 compat uses random ID for CheckConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
AndersonQ committed Nov 11, 2024
1 parent bfde79f commit 12cc1ab
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 2 deletions.
33 changes: 31 additions & 2 deletions filebeat/input/v2/compat/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"context"
"errors"
"fmt"
"math/rand" // using for better performance
"strconv"
"sync"

"github.com/mitchellh/hashstructure"
Expand Down Expand Up @@ -73,12 +75,18 @@ func RunnerFactory(
}

func (f *factory) CheckConfig(cfg *conf.C) error {
_, err := f.loader.Configure(cfg)
// just check the config, therefore to avoid potential side effects (ID duplication)
// change the ID.
checkCfg, err := f.generateCheckConfig(cfg)
if err != nil {
f.log.Warnw(fmt.Sprintf("input V2 factory.CheckConfig failed to clone config before checking it. Original config will be checked, it might trigger an input duplication warning: %v", err), "original_config", conf.DebugString(cfg, true))
}
_, err = f.loader.Configure(checkCfg)
if err != nil {
return fmt.Errorf("runner factory could not check config: %w", err)
}

if err = f.loader.Delete(cfg); err != nil {
if err = f.loader.Delete(checkCfg); err != nil {
return fmt.Errorf(
"runner factory failed to delete an input after config check: %w",
err)
Expand Down Expand Up @@ -176,3 +184,24 @@ func configID(config *conf.C) (string, error) {

return fmt.Sprintf("%16X", id), nil
}

func (f *factory) generateCheckConfig(config *conf.C) (*conf.C, error) {
testCfg, err := conf.NewConfigFrom(config)
if err != nil {
return config, fmt.Errorf("failed to create new config: %w", err)
}

// let's try to override the `inputID` field, if it fails, give up
inputID, err := testCfg.String("inputID", -1)
if err != nil {
return config, fmt.Errorf("failed to get 'inputID': %w", err)
}

// using math/rand for performance, generate a 0-9 string
err = testCfg.SetString("inputID", -1, inputID+strconv.Itoa(rand.Intn(10)))
if err != nil {
return config, fmt.Errorf("failed to set 'inputID': %w", err)
}

return testCfg, nil
}
109 changes: 109 additions & 0 deletions filebeat/input/v2/compat/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package compat

import (
"errors"
"fmt"
"sync"
"testing"

Expand Down Expand Up @@ -62,6 +64,72 @@ func TestRunnerFactory_CheckConfig(t *testing.T) {
assert.Equal(t, 0, countRun)
})

t.Run("does not cause input ID duplication", func(t *testing.T) {
log := logp.NewLogger("test")
var countConfigure, countTest, countRun int
var runWG sync.WaitGroup
var ids = map[string]int{}
var idsMu sync.Mutex

// setup
plugins := inputest.SinglePlugin("test", &inputest.MockInputManager{
OnConfigure: func(cfg *conf.C) (v2.Input, error) {
idsMu.Lock()
defer idsMu.Unlock()
id, err := cfg.String("id", -1)
assert.NoError(t, err, "OnConfigure: could not get 'id' fom config")
idsCount := ids[id]
ids[id] = idsCount + 1

countConfigure++
return &inputest.MockInput{
OnTest: func(_ v2.TestContext) error { countTest++; return nil },
OnRun: func(_ v2.Context, _ beat.PipelineConnector) error {
runWG.Done()
countRun++
return nil
},
}, nil
},
})
loader := inputest.MustNewTestLoader(t, plugins, "type", "test")
factory := RunnerFactory(log, beat.Info{}, loader.Loader)

inputID := "filestream-kubernetes-pod-aee2af1c6365ecdd72416f44aab49cd8bdc7522ab008c39784b7fd9d46f794a4"
inputCfg := fmt.Sprintf(`
id: %s
parsers:
- container: null
paths:
- /var/log/containers/*aee2af1c6365ecdd72416f44aab49cd8bdc7522ab008c39784b7fd9d46f794a4.log
prospector:
scanner:
symlinks: true
take_over: true
type: test
`, inputID)

runner, err := factory.Create(nil, conf.MustNewConfigFrom(inputCfg))
require.NoError(t, err, "could not create input")

runWG.Add(1)
runner.Start()
defer runner.Stop()
// wait input to be running
runWG.Wait()

err = factory.CheckConfig(conf.MustNewConfigFrom(inputCfg))
require.NoError(t, err, "unexpected error when calling CheckConfig")

// validate: configured an input, but do not run test or run
assert.Equal(t, 2, countConfigure, "OnConfigure should be called only 2 times")
assert.Equal(t, 0, countTest, "OnTest should not have been called")
assert.Equal(t, 1, countRun, "OnRun should be called only once")
idsMu.Lock()
assert.Equal(t, 1, ids[inputID])
idsMu.Unlock()
})

t.Run("fail if input type is unknown to loader", func(t *testing.T) {
log := logp.NewLogger("test")
plugins := inputest.SinglePlugin("test", inputest.ConstInputManager(nil))
Expand Down Expand Up @@ -118,3 +186,44 @@ func TestRunnerFactory_CreateAndRun(t *testing.T) {
assert.Error(t, err)
})
}

func TestGenerateCheckConfig(t *testing.T) {
tcs := []struct {
name string
cfg *conf.C
want *conf.C
wantErr error
assertCfg func(t assert.TestingT, expected interface{}, actual interface{}, msgAndArgs ...interface{}) bool
}{
{
name: "id is present",
cfg: conf.MustNewConfigFrom("inputID: some-id"),
assertCfg: assert.NotEqual,
},
{
name: "absent id",
cfg: conf.MustNewConfigFrom(""),
wantErr: errors.New("failed to get 'inputID'"),
assertCfg: assert.Equal,
},
{
name: "invalid config",
cfg: nil,
wantErr: errors.New("failed to create new config"),
assertCfg: assert.Equal,
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
f := factory{}

got, err := f.generateCheckConfig(tc.cfg)
if tc.wantErr != nil {
assert.ErrorContains(t, err, tc.wantErr.Error())
}

tc.assertCfg(t, tc.cfg, got)
})
}
}

0 comments on commit 12cc1ab

Please sign in to comment.