From afa61c3026ec7d7e840cbcfff41fd1cba84878e6 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Thu, 2 Aug 2018 16:43:50 +0200 Subject: [PATCH] Filebeat: Fix goroutine leak in SubOutlet (#7820) This fixes a goroutine leaking every time that a harvester is closed. (cherry picked from commit b84e083625ec1039fab4fc6fb4b7f33efa90be22) --- CHANGELOG.asciidoc | 2 ++ filebeat/channel/util.go | 40 ++++++++++++++++++++++++---------------- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 9ff2b7065c5..aebab87733c 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -38,6 +38,8 @@ https://github.com/elastic/beats/compare/v6.4.0...6.4[Check the HEAD diff] *Filebeat* +- Fixed a memory leak when harvesters are closed. {pull}7820[7820] + *Heartbeat* *Metricbeat* diff --git a/filebeat/channel/util.go b/filebeat/channel/util.go index 8111bc2a011..134765c4cd8 100644 --- a/filebeat/channel/util.go +++ b/filebeat/channel/util.go @@ -18,17 +18,19 @@ package channel import ( + "sync" + "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/common/atomic" ) type subOutlet struct { - isOpen atomic.Bool - done chan struct{} - ch chan *util.Data - res chan bool + done chan struct{} + ch chan *util.Data + res chan bool + mutex sync.Mutex + closeOnce sync.Once } // ConnectTo creates a new Connector, combining a beat.Pipeline with an outlet Factory. @@ -42,10 +44,9 @@ func ConnectTo(pipeline beat.Pipeline, factory Factory) Connector { // underlying outlet. func SubOutlet(out Outleter) Outleter { s := &subOutlet{ - isOpen: atomic.MakeBool(true), - done: make(chan struct{}), - ch: make(chan *util.Data), - res: make(chan bool, 1), + done: make(chan struct{}), + ch: make(chan *util.Data), + res: make(chan bool, 1), } go func() { @@ -58,21 +59,30 @@ func SubOutlet(out Outleter) Outleter { } func (o *subOutlet) Close() error { - isOpen := o.isOpen.Swap(false) - if isOpen { + o.closeOnce.Do(func() { + // Signal OnEvent() to terminate close(o.done) - } + // This mutex prevents the event channel to be closed if OnEvent is + // still running. + o.mutex.Lock() + defer o.mutex.Unlock() + close(o.ch) + }) return nil } func (o *subOutlet) OnEvent(d *util.Data) bool { - if !o.isOpen.Load() { + + o.mutex.Lock() + defer o.mutex.Unlock() + select { + case <-o.done: return false + default: } select { case <-o.done: - close(o.ch) return false case o.ch <- d: @@ -92,8 +102,6 @@ func (o *subOutlet) OnEvent(d *util.Data) bool { // Once all messages are in the publisher pipeline, in correct order, // it depends on registrar/publisher pipeline if state is finally updated // in the registrar. - - close(o.ch) return true case ret := <-o.res: