diff --git a/filebeat/input/v2/compat/compat.go b/filebeat/input/v2/compat/compat.go index 416b6628e19..826c05c3059 100644 --- a/filebeat/input/v2/compat/compat.go +++ b/filebeat/input/v2/compat/compat.go @@ -24,6 +24,8 @@ import ( "context" "errors" "fmt" + "math/rand" // using for better performance + "strconv" "sync" "github.com/mitchellh/hashstructure" @@ -73,12 +75,18 @@ func RunnerFactory( } func (f *factory) CheckConfig(cfg *conf.C) error { - _, err := f.loader.Configure(cfg) + // just check the config, therefore to avoid potential side effects (ID duplication) + // change the ID. + checkCfg, err := f.generateCheckConfig(cfg) + if err != nil { + f.log.Warnw(fmt.Sprintf("input V2 factory.CheckConfig failed to clone config before checking it. Original config will be checked, it might trigger an input duplication warning: %v", err), "original_config", conf.DebugString(cfg, true)) + } + _, err = f.loader.Configure(checkCfg) if err != nil { return fmt.Errorf("runner factory could not check config: %w", err) } - if err = f.loader.Delete(cfg); err != nil { + if err = f.loader.Delete(checkCfg); err != nil { return fmt.Errorf( "runner factory failed to delete an input after config check: %w", err) @@ -176,3 +184,24 @@ func configID(config *conf.C) (string, error) { return fmt.Sprintf("%16X", id), nil } + +func (f *factory) generateCheckConfig(config *conf.C) (*conf.C, error) { + testCfg, err := conf.NewConfigFrom(config) + if err != nil { + return config, fmt.Errorf("failed to create new config: %w", err) + } + + // let's try to override the `inputID` field, if it fails, give up + inputID, err := testCfg.String("inputID", -1) + if err != nil { + return config, fmt.Errorf("failed to get 'inputID': %w", err) + } + + // using math/rand for performance, generate a 0-9 string + err = testCfg.SetString("inputID", -1, inputID+strconv.Itoa(rand.Intn(10))) + if err != nil { + return config, fmt.Errorf("failed to set 'inputID': %w", err) + } + + return testCfg, nil +} diff --git a/filebeat/input/v2/compat/compat_test.go b/filebeat/input/v2/compat/compat_test.go index c5092583c0e..0c52383b781 100644 --- a/filebeat/input/v2/compat/compat_test.go +++ b/filebeat/input/v2/compat/compat_test.go @@ -18,6 +18,8 @@ package compat import ( + "errors" + "fmt" "sync" "testing" @@ -62,6 +64,72 @@ func TestRunnerFactory_CheckConfig(t *testing.T) { assert.Equal(t, 0, countRun) }) + t.Run("does not cause input ID duplication", func(t *testing.T) { + log := logp.NewLogger("test") + var countConfigure, countTest, countRun int + var runWG sync.WaitGroup + var ids = map[string]int{} + var idsMu sync.Mutex + + // setup + plugins := inputest.SinglePlugin("test", &inputest.MockInputManager{ + OnConfigure: func(cfg *conf.C) (v2.Input, error) { + idsMu.Lock() + defer idsMu.Unlock() + id, err := cfg.String("id", -1) + assert.NoError(t, err, "OnConfigure: could not get 'id' fom config") + idsCount := ids[id] + ids[id] = idsCount + 1 + + countConfigure++ + return &inputest.MockInput{ + OnTest: func(_ v2.TestContext) error { countTest++; return nil }, + OnRun: func(_ v2.Context, _ beat.PipelineConnector) error { + runWG.Done() + countRun++ + return nil + }, + }, nil + }, + }) + loader := inputest.MustNewTestLoader(t, plugins, "type", "test") + factory := RunnerFactory(log, beat.Info{}, loader.Loader) + + inputID := "filestream-kubernetes-pod-aee2af1c6365ecdd72416f44aab49cd8bdc7522ab008c39784b7fd9d46f794a4" + inputCfg := fmt.Sprintf(` +id: %s +parsers: + - container: null +paths: + - /var/log/containers/*aee2af1c6365ecdd72416f44aab49cd8bdc7522ab008c39784b7fd9d46f794a4.log +prospector: + scanner: + symlinks: true +take_over: true +type: test +`, inputID) + + runner, err := factory.Create(nil, conf.MustNewConfigFrom(inputCfg)) + require.NoError(t, err, "could not create input") + + runWG.Add(1) + runner.Start() + defer runner.Stop() + // wait input to be running + runWG.Wait() + + err = factory.CheckConfig(conf.MustNewConfigFrom(inputCfg)) + require.NoError(t, err, "unexpected error when calling CheckConfig") + + // validate: configured an input, but do not run test or run + assert.Equal(t, 2, countConfigure, "OnConfigure should be called only 2 times") + assert.Equal(t, 0, countTest, "OnTest should not have been called") + assert.Equal(t, 1, countRun, "OnRun should be called only once") + idsMu.Lock() + assert.Equal(t, 1, ids[inputID]) + idsMu.Unlock() + }) + t.Run("fail if input type is unknown to loader", func(t *testing.T) { log := logp.NewLogger("test") plugins := inputest.SinglePlugin("test", inputest.ConstInputManager(nil)) @@ -118,3 +186,44 @@ func TestRunnerFactory_CreateAndRun(t *testing.T) { assert.Error(t, err) }) } + +func TestGenerateCheckConfig(t *testing.T) { + tcs := []struct { + name string + cfg *conf.C + want *conf.C + wantErr error + assertCfg func(t assert.TestingT, expected interface{}, actual interface{}, msgAndArgs ...interface{}) bool + }{ + { + name: "id is present", + cfg: conf.MustNewConfigFrom("inputID: some-id"), + assertCfg: assert.NotEqual, + }, + { + name: "absent id", + cfg: conf.MustNewConfigFrom(""), + wantErr: errors.New("failed to get 'inputID'"), + assertCfg: assert.Equal, + }, + { + name: "invalid config", + cfg: nil, + wantErr: errors.New("failed to create new config"), + assertCfg: assert.Equal, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + f := factory{} + + got, err := f.generateCheckConfig(tc.cfg) + if tc.wantErr != nil { + assert.ErrorContains(t, err, tc.wantErr.Error()) + } + + tc.assertCfg(t, tc.cfg, got) + }) + } +}