From 2ba483228fd48cb8256074a14d93a17224bb7322 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Mon, 11 Jul 2022 17:24:11 +0200 Subject: [PATCH 1/5] fix race condition when stopping inputs filestream ID bookkeeper This commit fixes three race conditions: 1. When reloading inputs the Reload method would not wait for the runner to stop before starting a new input. 2. The runner's Start method was not incrementing the WaitGroup, hence Stop would finish before the runner had stopped. 3. The input manager that keeps track of filestream input IDs did not syncronise access to the IDs map, allowing for some race conditions when issuing the log error about possible data duplucation. Some log messages have been modified to contain more details like the filestream ID. --- CHANGELOG.next.asciidoc | 2 ++ .../input/filestream/internal/input-logfile/input.go | 4 ++++ .../filestream/internal/input-logfile/manager.go | 12 ++++++++++++ filebeat/input/v2/compat/compat.go | 10 ++++++---- libbeat/cfgfile/list.go | 2 +- 5 files changed, 25 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ce5edbb6cd4a..5374b3ed8d79 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -40,6 +40,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Expand fields in `decode_json_fields` if target is set. {issue}31712[31712] {pull}32010[32010] - Fix OS name reported by add_host_metadata on Windows 11. {issue}30833[30833] {pull}32259[32259] +- Fix race condition when reloading runners {pull}32309[32309] *Auditbeat* @@ -49,6 +50,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Fix Cisco AMP rate limit and pagination. {pull}32030[32030] - Fix wrong state ID in states registry for awss3 s3 direct input. {pull}32164[32164] - cisco/asa: fix handling of user names when there are Security Group Tags present. {issue}32009[32009] {pull}32196[32196] +- Fix race conditions when reloading input V2 and filestream input {pull}32309[32309] *Heartbeat* diff --git a/filebeat/input/filestream/internal/input-logfile/input.go b/filebeat/input/filestream/internal/input-logfile/input.go index b0c5bf1d8104..25f9cdac6303 100644 --- a/filebeat/input/filestream/internal/input-logfile/input.go +++ b/filebeat/input/filestream/internal/input-logfile/input.go @@ -81,6 +81,10 @@ func (inp *managedInput) Run( inp.prospector.Run(ctx, sourceStore, hg) + // Notify the manager the input has stopped, currently that is used to + // keep track of duplicated IDs + inp.manager.StopInput(inp.userID) + return nil } diff --git a/filebeat/input/filestream/internal/input-logfile/manager.go b/filebeat/input/filestream/internal/input-logfile/manager.go index e7aba23a20ee..7aa635047981 100644 --- a/filebeat/input/filestream/internal/input-logfile/manager.go +++ b/filebeat/input/filestream/internal/input-logfile/manager.go @@ -71,6 +71,7 @@ type InputManager struct { store *store ackUpdater *updateWriter ackCH *updateChan + idsMux sync.Mutex ids map[string]struct{} } @@ -176,12 +177,14 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) { " duplication, please add an ID and restart Filebeat") } + cim.idsMux.Lock() if _, exists := cim.ids[settings.ID]; exists { cim.Logger.Errorf("filestream input with ID '%s' already exists, this "+ "will lead to data duplication, please use a different ID", settings.ID) } cim.ids[settings.ID] = struct{}{} + cim.idsMux.Unlock() prospector, harvester, err := cim.Configure(config) if err != nil { @@ -226,6 +229,15 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) { }, nil } +// StopInput peforms all necessary clean up when an input finishes. +func (cim *InputManager) StopInput(id string) { + cim.idsMux.Lock() + if _, exists := cim.ids[id]; exists { + delete(cim.ids, id) + } + cim.idsMux.Unlock() +} + func (cim *InputManager) getRetainedStore() *store { store := cim.store store.Retain() diff --git a/filebeat/input/v2/compat/compat.go b/filebeat/input/v2/compat/compat.go index 32d89ae7ea0f..1dedf808b25a 100644 --- a/filebeat/input/v2/compat/compat.go +++ b/filebeat/input/v2/compat/compat.go @@ -104,11 +104,13 @@ func (f *factory) Create( func (r *runner) String() string { return r.input.Name() } func (r *runner) Start() { + r.wg.Add(1) log := r.log name := r.input.Name() go func() { - log.Infof("Input %v starting", name) + defer r.wg.Done() + log.Infof("Input '%s' - '%s' starting", name, r.id) err := r.input.Run( v2.Context{ ID: r.id, @@ -119,9 +121,9 @@ func (r *runner) Start() { r.connector, ) if err != nil { - log.Errorf("Input '%v' failed with: %+v", name, err) + log.Errorf("Input '%s' - '%s' failed with: %+v", name, r.id, err) } else { - log.Infof("Input '%v' stopped", name) + log.Infof("Input '%s' - '%s' stopped (goroutine)", name, r.id) } }() } @@ -129,7 +131,7 @@ func (r *runner) Start() { func (r *runner) Stop() { r.sig.Cancel() r.wg.Wait() - r.log.Infof("Input '%v' stopped", r.input.Name()) + r.log.Infof("Input '%s' - '%s' stopped (runner)", r.input.Name(), r.id) } func configID(config *conf.C) (string, error) { diff --git a/libbeat/cfgfile/list.go b/libbeat/cfgfile/list.go index 88f2b2a5e268..7fe38755a824 100644 --- a/libbeat/cfgfile/list.go +++ b/libbeat/cfgfile/list.go @@ -85,7 +85,7 @@ func (r *RunnerList) Reload(configs []*reload.ConfigWithMeta) error { for hash, runner := range stopList { r.logger.Debugf("Stopping runner: %s", runner) delete(r.runners, hash) - go runner.Stop() + runner.Stop() moduleStops.Add(1) } From 0045b98223cacb4323e40bfa3e22f6c1ee59900b Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Tue, 12 Jul 2022 12:00:15 +0200 Subject: [PATCH 2/5] PR review inprovements --- filebeat/input/v2/compat/compat.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/filebeat/input/v2/compat/compat.go b/filebeat/input/v2/compat/compat.go index 1dedf808b25a..146b7098994d 100644 --- a/filebeat/input/v2/compat/compat.go +++ b/filebeat/input/v2/compat/compat.go @@ -110,7 +110,7 @@ func (r *runner) Start() { go func() { defer r.wg.Done() - log.Infof("Input '%s' - '%s' starting", name, r.id) + log.Infof("Input '%s' starting", name) err := r.input.Run( v2.Context{ ID: r.id, @@ -121,9 +121,9 @@ func (r *runner) Start() { r.connector, ) if err != nil { - log.Errorf("Input '%s' - '%s' failed with: %+v", name, r.id, err) + log.Errorf("Input '%s' failed with: %+v", name, err) } else { - log.Infof("Input '%s' - '%s' stopped (goroutine)", name, r.id) + log.Infof("Input '%s' stopped (goroutine)", name) } }() } @@ -131,7 +131,7 @@ func (r *runner) Start() { func (r *runner) Stop() { r.sig.Cancel() r.wg.Wait() - r.log.Infof("Input '%s' - '%s' stopped (runner)", r.input.Name(), r.id) + r.log.Infof("Input '%s' stopped (runner)", r.input.Name()) } func configID(config *conf.C) (string, error) { From 838cf5f0d8a048532e500a46beb8b35e64f6b19d Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Tue, 12 Jul 2022 12:00:25 +0200 Subject: [PATCH 3/5] fix tests --- filebeat/tests/system/test_reload_inputs.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/filebeat/tests/system/test_reload_inputs.py b/filebeat/tests/system/test_reload_inputs.py index cfc5a2f8f5f4..3daf7c8d9d45 100644 --- a/filebeat/tests/system/test_reload_inputs.py +++ b/filebeat/tests/system/test_reload_inputs.py @@ -281,11 +281,6 @@ def test_reload_same_config(self): with open(self.working_dir + "/configs/input.yml", 'w') as f: f.write(inputConfigTemplate.format(self.working_dir + "/logs/test.log")) - # Make sure error shows up in log file - self.wait_until( - lambda: self.log_contains("Can only start an input when all related states are finished"), - max_timeout=15) - # Wait until old runner is stopped self.wait_until( lambda: self.log_contains("Stopping runner:"), From c7b4608bfb7279036d0c9089b6fe7be5d9085368 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Tue, 12 Jul 2022 15:28:51 +0200 Subject: [PATCH 4/5] Fix lint issues on my code --- filebeat/input/filestream/internal/input-logfile/manager.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/filebeat/input/filestream/internal/input-logfile/manager.go b/filebeat/input/filestream/internal/input-logfile/manager.go index 7aa635047981..0161c935a9d0 100644 --- a/filebeat/input/filestream/internal/input-logfile/manager.go +++ b/filebeat/input/filestream/internal/input-logfile/manager.go @@ -232,9 +232,7 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) { // StopInput peforms all necessary clean up when an input finishes. func (cim *InputManager) StopInput(id string) { cim.idsMux.Lock() - if _, exists := cim.ids[id]; exists { - delete(cim.ids, id) - } + delete(cim.ids, id) cim.idsMux.Unlock() } From 6588ace128a10f2720c7ef8e50503934bc439e7d Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Tue, 12 Jul 2022 15:32:26 +0200 Subject: [PATCH 5/5] use sync.WaitGroup when stopping runners --- libbeat/cfgfile/list.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/libbeat/cfgfile/list.go b/libbeat/cfgfile/list.go index 7fe38755a824..76608bfd8e07 100644 --- a/libbeat/cfgfile/list.go +++ b/libbeat/cfgfile/list.go @@ -81,14 +81,23 @@ func (r *RunnerList) Reload(configs []*reload.ConfigWithMeta) error { r.logger.Debugf("Start list: %d, Stop list: %d", len(startList), len(stopList)) + wg := sync.WaitGroup{} // Stop removed runners for hash, runner := range stopList { + wg.Add(1) r.logger.Debugf("Stopping runner: %s", runner) delete(r.runners, hash) - runner.Stop() + go func() { + defer wg.Done() + runner.Stop() + r.logger.Debugf("Runner: '%s' has stopped", runner) + }() moduleStops.Add(1) } + // Wait for all runners to stop before starting new ones + wg.Wait() + // Start new runners for hash, config := range startList { runner, err := createRunner(r.factory, r.pipeline, config)