Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix race condition when stopping inputs filestream ID bookkeeper #32309

Merged
merged 5 commits into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]

- Expand fields in `decode_json_fields` if target is set. {issue}31712[31712] {pull}32010[32010]
- Fix OS name reported by add_host_metadata on Windows 11. {issue}30833[30833] {pull}32259[32259]
- Fix race condition when reloading runners {pull}32309[32309]

*Auditbeat*

Expand All @@ -49,6 +50,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix Cisco AMP rate limit and pagination. {pull}32030[32030]
- Fix wrong state ID in states registry for awss3 s3 direct input. {pull}32164[32164]
- cisco/asa: fix handling of user names when there are Security Group Tags present. {issue}32009[32009] {pull}32196[32196]
- Fix race conditions when reloading input V2 and filestream input {pull}32309[32309]

*Heartbeat*

Expand Down
4 changes: 4 additions & 0 deletions filebeat/input/filestream/internal/input-logfile/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func (inp *managedInput) Run(

inp.prospector.Run(ctx, sourceStore, hg)

// Notify the manager the input has stopped, currently that is used to
// keep track of duplicated IDs
inp.manager.StopInput(inp.userID)

return nil
}

Expand Down
10 changes: 10 additions & 0 deletions filebeat/input/filestream/internal/input-logfile/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type InputManager struct {
store *store
ackUpdater *updateWriter
ackCH *updateChan
idsMux sync.Mutex
ids map[string]struct{}
}

Expand Down Expand Up @@ -176,12 +177,14 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) {
" duplication, please add an ID and restart Filebeat")
}

cim.idsMux.Lock()
if _, exists := cim.ids[settings.ID]; exists {
cim.Logger.Errorf("filestream input with ID '%s' already exists, this "+
"will lead to data duplication, please use a different ID", settings.ID)
}

cim.ids[settings.ID] = struct{}{}
cim.idsMux.Unlock()

prospector, harvester, err := cim.Configure(config)
if err != nil {
Expand Down Expand Up @@ -226,6 +229,13 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) {
}, nil
}

// StopInput peforms all necessary clean up when an input finishes.
func (cim *InputManager) StopInput(id string) {
cim.idsMux.Lock()
delete(cim.ids, id)
cim.idsMux.Unlock()
}

func (cim *InputManager) getRetainedStore() *store {
store := cim.store
store.Retain()
Expand Down
10 changes: 6 additions & 4 deletions filebeat/input/v2/compat/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,13 @@ func (f *factory) Create(
func (r *runner) String() string { return r.input.Name() }

func (r *runner) Start() {
r.wg.Add(1)
log := r.log
name := r.input.Name()

go func() {
log.Infof("Input %v starting", name)
defer r.wg.Done()
log.Infof("Input '%s' starting", name)
err := r.input.Run(
v2.Context{
ID: r.id,
Expand All @@ -119,17 +121,17 @@ func (r *runner) Start() {
r.connector,
)
if err != nil {
log.Errorf("Input '%v' failed with: %+v", name, err)
log.Errorf("Input '%s' failed with: %+v", name, err)
} else {
log.Infof("Input '%v' stopped", name)
log.Infof("Input '%s' stopped (goroutine)", name)
}
}()
}

func (r *runner) Stop() {
r.sig.Cancel()
r.wg.Wait()
r.log.Infof("Input '%v' stopped", r.input.Name())
r.log.Infof("Input '%s' stopped (runner)", r.input.Name())
}

func configID(config *conf.C) (string, error) {
Expand Down
5 changes: 0 additions & 5 deletions filebeat/tests/system/test_reload_inputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,6 @@ def test_reload_same_config(self):
with open(self.working_dir + "/configs/input.yml", 'w') as f:
f.write(inputConfigTemplate.format(self.working_dir + "/logs/test.log"))

# Make sure error shows up in log file
self.wait_until(
lambda: self.log_contains("Can only start an input when all related states are finished"),
max_timeout=15)

# Wait until old runner is stopped
self.wait_until(
lambda: self.log_contains("Stopping runner:"),
Expand Down
11 changes: 10 additions & 1 deletion libbeat/cfgfile/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,23 @@ func (r *RunnerList) Reload(configs []*reload.ConfigWithMeta) error {

r.logger.Debugf("Start list: %d, Stop list: %d", len(startList), len(stopList))

wg := sync.WaitGroup{}
// Stop removed runners
for hash, runner := range stopList {
wg.Add(1)
r.logger.Debugf("Stopping runner: %s", runner)
delete(r.runners, hash)
go runner.Stop()
go func() {
defer wg.Done()
runner.Stop()
r.logger.Debugf("Runner: '%s' has stopped", runner)
}()
moduleStops.Add(1)
}

// Wait for all runners to stop before starting new ones
wg.Wait()

// Start new runners
for hash, config := range startList {
runner, err := createRunner(r.factory, r.pipeline, config)
Expand Down