diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ce5edbb6cd4..5374b3ed8d7 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -40,6 +40,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Expand fields in `decode_json_fields` if target is set. {issue}31712[31712] {pull}32010[32010] - Fix OS name reported by add_host_metadata on Windows 11. {issue}30833[30833] {pull}32259[32259] +- Fix race condition when reloading runners {pull}32309[32309] *Auditbeat* @@ -49,6 +50,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Fix Cisco AMP rate limit and pagination. {pull}32030[32030] - Fix wrong state ID in states registry for awss3 s3 direct input. {pull}32164[32164] - cisco/asa: fix handling of user names when there are Security Group Tags present. {issue}32009[32009] {pull}32196[32196] +- Fix race conditions when reloading input V2 and filestream input {pull}32309[32309] *Heartbeat* diff --git a/filebeat/input/filestream/internal/input-logfile/input.go b/filebeat/input/filestream/internal/input-logfile/input.go index b0c5bf1d810..25f9cdac630 100644 --- a/filebeat/input/filestream/internal/input-logfile/input.go +++ b/filebeat/input/filestream/internal/input-logfile/input.go @@ -81,6 +81,10 @@ func (inp *managedInput) Run( inp.prospector.Run(ctx, sourceStore, hg) + // Notify the manager the input has stopped, currently that is used to + // keep track of duplicated IDs + inp.manager.StopInput(inp.userID) + return nil } diff --git a/filebeat/input/filestream/internal/input-logfile/manager.go b/filebeat/input/filestream/internal/input-logfile/manager.go index e7aba23a20e..0161c935a9d 100644 --- a/filebeat/input/filestream/internal/input-logfile/manager.go +++ b/filebeat/input/filestream/internal/input-logfile/manager.go @@ -71,6 +71,7 @@ type InputManager struct { store *store ackUpdater *updateWriter ackCH *updateChan + idsMux sync.Mutex ids map[string]struct{} } @@ -176,12 +177,14 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) { " duplication, please add an ID and restart Filebeat") } + cim.idsMux.Lock() if _, exists := cim.ids[settings.ID]; exists { cim.Logger.Errorf("filestream input with ID '%s' already exists, this "+ "will lead to data duplication, please use a different ID", settings.ID) } cim.ids[settings.ID] = struct{}{} + cim.idsMux.Unlock() prospector, harvester, err := cim.Configure(config) if err != nil { @@ -226,6 +229,13 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) { }, nil } +// StopInput peforms all necessary clean up when an input finishes. +func (cim *InputManager) StopInput(id string) { + cim.idsMux.Lock() + delete(cim.ids, id) + cim.idsMux.Unlock() +} + func (cim *InputManager) getRetainedStore() *store { store := cim.store store.Retain() diff --git a/filebeat/input/v2/compat/compat.go b/filebeat/input/v2/compat/compat.go index 32d89ae7ea0..146b7098994 100644 --- a/filebeat/input/v2/compat/compat.go +++ b/filebeat/input/v2/compat/compat.go @@ -104,11 +104,13 @@ func (f *factory) Create( func (r *runner) String() string { return r.input.Name() } func (r *runner) Start() { + r.wg.Add(1) log := r.log name := r.input.Name() go func() { - log.Infof("Input %v starting", name) + defer r.wg.Done() + log.Infof("Input '%s' starting", name) err := r.input.Run( v2.Context{ ID: r.id, @@ -119,9 +121,9 @@ func (r *runner) Start() { r.connector, ) if err != nil { - log.Errorf("Input '%v' failed with: %+v", name, err) + log.Errorf("Input '%s' failed with: %+v", name, err) } else { - log.Infof("Input '%v' stopped", name) + log.Infof("Input '%s' stopped (goroutine)", name) } }() } @@ -129,7 +131,7 @@ func (r *runner) Start() { func (r *runner) Stop() { r.sig.Cancel() r.wg.Wait() - r.log.Infof("Input '%v' stopped", r.input.Name()) + r.log.Infof("Input '%s' stopped (runner)", r.input.Name()) } func configID(config *conf.C) (string, error) { diff --git a/filebeat/tests/system/test_reload_inputs.py b/filebeat/tests/system/test_reload_inputs.py index cfc5a2f8f5f..3daf7c8d9d4 100644 --- a/filebeat/tests/system/test_reload_inputs.py +++ b/filebeat/tests/system/test_reload_inputs.py @@ -281,11 +281,6 @@ def test_reload_same_config(self): with open(self.working_dir + "/configs/input.yml", 'w') as f: f.write(inputConfigTemplate.format(self.working_dir + "/logs/test.log")) - # Make sure error shows up in log file - self.wait_until( - lambda: self.log_contains("Can only start an input when all related states are finished"), - max_timeout=15) - # Wait until old runner is stopped self.wait_until( lambda: self.log_contains("Stopping runner:"), diff --git a/libbeat/cfgfile/list.go b/libbeat/cfgfile/list.go index 88f2b2a5e26..76608bfd8e0 100644 --- a/libbeat/cfgfile/list.go +++ b/libbeat/cfgfile/list.go @@ -81,14 +81,23 @@ func (r *RunnerList) Reload(configs []*reload.ConfigWithMeta) error { r.logger.Debugf("Start list: %d, Stop list: %d", len(startList), len(stopList)) + wg := sync.WaitGroup{} // Stop removed runners for hash, runner := range stopList { + wg.Add(1) r.logger.Debugf("Stopping runner: %s", runner) delete(r.runners, hash) - go runner.Stop() + go func() { + defer wg.Done() + runner.Stop() + r.logger.Debugf("Runner: '%s' has stopped", runner) + }() moduleStops.Add(1) } + // Wait for all runners to stop before starting new ones + wg.Wait() + // Start new runners for hash, config := range startList { runner, err := createRunner(r.factory, r.pipeline, config)