diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index da046d14e00..9056508d849 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -2,6 +2,7 @@ package beater import ( "fmt" + "sync" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" @@ -16,8 +17,9 @@ import ( // Filebeat is a beater object. Contains all objects needed to run the beat type Filebeat struct { - config *cfg.Config - done chan struct{} + config *cfg.Config + sigWait *signalWait + done chan struct{} } // New creates a new Filebeat pointer instance. @@ -31,8 +33,9 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { } fb := &Filebeat{ - done: make(chan struct{}), - config: &config, + done: make(chan struct{}), + sigWait: newSignalWait(), + config: &config, } return fb, nil } @@ -42,8 +45,16 @@ func (fb *Filebeat) Run(b *beat.Beat) error { var err error config := fb.config + var wgEvents *sync.WaitGroup // count active events for waiting on shutdown + var finishedLogger publisher.SuccessLogger + + if fb.config.ShutdownTimeout > 0 { + wgEvents = &sync.WaitGroup{} + finishedLogger = newFinishedLogger(wgEvents) + } + // Setup registrar to persist state - registrar, err := registrar.New(config.RegistryFile, nil) + registrar, err := registrar.New(config.RegistryFile, finishedLogger) if err != nil { logp.Err("Could not init registrar: %v", err) return err @@ -65,7 +76,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { } crawler, err := crawler.New( - newSpoolerOutlet(fb.done, spooler, nil), + newSpoolerOutlet(fb.done, spooler, wgEvents), config.Prospectors) if err != nil { logp.Err("Could not init crawler: %v", err) @@ -92,9 +103,17 @@ func (fb *Filebeat) Run(b *beat.Beat) error { // Starting spooler spooler.Start() + // Stopping spooler will flush items - defer spooler.Stop() - defer publisherChan.Close() + defer func() { + // With harvesters being stopped, optionally wait for all enqueued events being + // published and written by registrar before continuing shutdown. + fb.sigWait.Wait() + + // continue shutdown + publisherChan.Close() + spooler.Stop() + }() err = crawler.Start(registrar.GetStates()) if err != nil { @@ -106,6 +125,12 @@ func (fb *Filebeat) Run(b *beat.Beat) error { // Blocks progressing. As soon as channel is closed, all defer statements come into play <-fb.done + if fb.config.ShutdownTimeout > 0 { + // Wait for either timeout or all events having been ACKed by outputs. + fb.sigWait.Add(wgEvents.Wait) + fb.sigWait.AddTimeout(fb.config.ShutdownTimeout) + } + return nil } diff --git a/filebeat/beater/signalwait.go b/filebeat/beater/signalwait.go new file mode 100644 index 00000000000..56611ca6c80 --- /dev/null +++ b/filebeat/beater/signalwait.go @@ -0,0 +1,48 @@ +package beater + +import "time" + +type signalWait struct { + count int // number of potential 'alive' signals + signals chan struct{} +} + +func newSignalWait() *signalWait { + return &signalWait{ + signals: make(chan struct{}, 1), + } +} + +func (s *signalWait) Wait() { + if s.count == 0 { + return + } + + <-s.signals + s.count-- +} + +func (s *signalWait) Add(fn func()) { + s.count++ + go func() { + fn() + var v struct{} + s.signals <- v + }() +} + +func (s *signalWait) AddChan(c <-chan struct{}) { + s.Add(func() { <-c }) +} + +func (s *signalWait) AddTimer(t *time.Timer) { + s.Add(func() { <-t.C }) +} + +func (s *signalWait) AddTimeout(d time.Duration) { + s.AddTimer(time.NewTimer(d)) +} + +func (s *signalWait) Signal() { + s.Add(func() {}) +} diff --git a/filebeat/config/config.go b/filebeat/config/config.go index 8d8d2439c87..0b8cb1920f5 100644 --- a/filebeat/config/config.go +++ b/filebeat/config/config.go @@ -19,19 +19,21 @@ const ( ) type Config struct { - Prospectors []*common.Config `config:"prospectors"` - SpoolSize uint64 `config:"spool_size" validate:"min=1"` - PublishAsync bool `config:"publish_async"` - IdleTimeout time.Duration `config:"idle_timeout" validate:"nonzero,min=0s"` - RegistryFile string `config:"registry_file"` - ConfigDir string `config:"config_dir"` + Prospectors []*common.Config `config:"prospectors"` + SpoolSize uint64 `config:"spool_size" validate:"min=1"` + PublishAsync bool `config:"publish_async"` + IdleTimeout time.Duration `config:"idle_timeout" validate:"nonzero,min=0s"` + RegistryFile string `config:"registry_file"` + ConfigDir string `config:"config_dir"` + ShutdownTimeout time.Duration `config:"shutdown_timeout"` } var ( DefaultConfig = Config{ - RegistryFile: "registry", - SpoolSize: 2048, - IdleTimeout: 5 * time.Second, + RegistryFile: "registry", + SpoolSize: 2048, + IdleTimeout: 5 * time.Second, + ShutdownTimeout: 0, } ) diff --git a/filebeat/etc/beat.full.yml b/filebeat/etc/beat.full.yml index b760baf2d26..bf0b27bd5d6 100644 --- a/filebeat/etc/beat.full.yml +++ b/filebeat/etc/beat.full.yml @@ -231,3 +231,7 @@ filebeat.prospectors: # the prospector part is processed. All global options like spool_size are ignored. # The config_dir MUST point to a different directory then where the main filebeat config file is in. #filebeat.config_dir: + +# How long filebeat waits on shutdown for the publisher to finish. +# Default is 0, not waiting. +#filebeat.shutdown_timeout: 0 diff --git a/filebeat/filebeat.full.yml b/filebeat/filebeat.full.yml index 52bb9029c4d..12856794c26 100644 --- a/filebeat/filebeat.full.yml +++ b/filebeat/filebeat.full.yml @@ -232,6 +232,10 @@ filebeat.prospectors: # The config_dir MUST point to a different directory then where the main filebeat config file is in. #filebeat.config_dir: +# How long filebeat waits on shutdown for the publisher to finish. +# Default is 0, not waiting. +#filebeat.shutdown_timeout: 0 + #================================ General ===================================== # The name of the shipper that publishes the network data. It can be used to group