From 57c98914eacd78160ee1a9ac75894ebdf32d4fb2 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Tue, 19 Mar 2019 21:24:47 +0100 Subject: [PATCH] Stop waiting for signals on closed outleters (#11263) Outleters start a goroutine to handle the finalization of filebeat. If the outleter is closed by other means the goroutine will be kept running even if it has nothing to do, leaking goroutines. Stop this goroutine if the outleter is closed. --- CHANGELOG.next.asciidoc | 1 + filebeat/channel/interface.go | 1 + filebeat/channel/outlet.go | 7 +++++++ filebeat/channel/util.go | 12 ++++++++++-- filebeat/input/log/input_other_test.go | 1 + 5 files changed, 20 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8d75baa11a2..9c0571ae1e1 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -170,6 +170,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix errors in filebeat Zeek dashboard and README files. Add notice.log support. {pull}10916[10916] - Fix a bug when converting NetFlow fields to snake_case. {pull}10950[10950] - Add on_failure handler for Zeek ingest pipelines. Fix one field name error for notice and add an additional test case. {issue}11004[11004] {pull}11105[11105] +- Fix goroutine leak happening when harvesters are dynamically stopped. {pull}11263[11263] - Fix issue preventing docker container events to be stored if the container has a network interface without ip address. {issue}11225[11225] {pull}11247[11247] - Add on_failure handler for Zeek ingest pipelines. Fix one field name error for notice and add an additional test case. {issue}11004[11004] {pull}11105[11105] diff --git a/filebeat/channel/interface.go b/filebeat/channel/interface.go index 82e5a82af37..877b818870a 100644 --- a/filebeat/channel/interface.go +++ b/filebeat/channel/interface.go @@ -32,5 +32,6 @@ type Connector func(*common.Config, *common.MapStrPointer) (Outleter, error) // Outleter is the outlet for an input type Outleter interface { Close() error + Done() <-chan struct{} OnEvent(data *util.Data) bool } diff --git a/filebeat/channel/outlet.go b/filebeat/channel/outlet.go index d130cf9ceeb..c0fe2b0c9e3 100644 --- a/filebeat/channel/outlet.go +++ b/filebeat/channel/outlet.go @@ -27,6 +27,7 @@ type outlet struct { wg eventCounter client beat.Client isOpen atomic.Bool + done chan struct{} } func newOutlet(client beat.Client, wg eventCounter) *outlet { @@ -34,6 +35,7 @@ func newOutlet(client beat.Client, wg eventCounter) *outlet { wg: wg, client: client, isOpen: atomic.MakeBool(true), + done: make(chan struct{}), } return o } @@ -41,11 +43,16 @@ func newOutlet(client beat.Client, wg eventCounter) *outlet { func (o *outlet) Close() error { isOpen := o.isOpen.Swap(false) if isOpen { + close(o.done) return o.client.Close() } return nil } +func (o *outlet) Done() <-chan struct{} { + return o.done +} + func (o *outlet) OnEvent(d *util.Data) bool { if !o.isOpen.Load() { return false diff --git a/filebeat/channel/util.go b/filebeat/channel/util.go index 134765c4cd8..aec2132fa20 100644 --- a/filebeat/channel/util.go +++ b/filebeat/channel/util.go @@ -71,6 +71,10 @@ func (o *subOutlet) Close() error { return nil } +func (o *subOutlet) Done() <-chan struct{} { + return o.done +} + func (o *subOutlet) OnEvent(d *util.Data) bool { o.mutex.Lock() @@ -114,8 +118,12 @@ func (o *subOutlet) OnEvent(d *util.Data) bool { func CloseOnSignal(outlet Outleter, sig <-chan struct{}) Outleter { if sig != nil { go func() { - <-sig - outlet.Close() + select { + case <-outlet.Done(): + return + case <-sig: + outlet.Close() + } }() } return outlet diff --git a/filebeat/input/log/input_other_test.go b/filebeat/input/log/input_other_test.go index 3a36cb6040d..bdaba0c7d2a 100644 --- a/filebeat/input/log/input_other_test.go +++ b/filebeat/input/log/input_other_test.go @@ -169,3 +169,4 @@ type TestOutlet struct{} func (o TestOutlet) OnEvent(event *util.Data) bool { return true } func (o TestOutlet) Close() error { return nil } +func (o TestOutlet) Done() <-chan struct{} { return nil }