From 0d89cc70d41a4beae76c84dd3766325f0d891e74 Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Mon, 11 Sep 2017 20:39:20 +0200 Subject: [PATCH] Add flush timeout setting to filebeat registrar (#5146) --- CHANGELOG.asciidoc | 1 + filebeat/beater/filebeat.go | 2 +- filebeat/config/config.go | 1 + filebeat/registrar/registrar.go | 47 +++++++++++++++++++++++++-------- 4 files changed, 39 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index f2efb4799f74..4135deeaa3c0 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -81,6 +81,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Remove error log from runnerfactory as error is returned by API. {pull}5085[5085] - Changed the number of shards in the default configuration to 3. {issue}5095[5095] - Remove error log from runnerfactory as error is returned by API. {pull}5085[5085] +- Add `filebeat.registry_flush` setting, to delay the registry updates. {pull}5146[5146] *Heartbeat* diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index bf4559d2e677..dc67b059a349 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -167,7 +167,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { finishedLogger := newFinishedLogger(wgEvents) // Setup registrar to persist state - registrar, err := registrar.New(config.RegistryFile, finishedLogger) + registrar, err := registrar.New(config.RegistryFile, config.RegistryFlush, finishedLogger) if err != nil { logp.Err("Could not init registrar: %v", err) return err diff --git a/filebeat/config/config.go b/filebeat/config/config.go index 9e79203be5d0..b8dcc1f5ba9a 100644 --- a/filebeat/config/config.go +++ b/filebeat/config/config.go @@ -21,6 +21,7 @@ const ( type Config struct { Prospectors []*common.Config `config:"prospectors"` RegistryFile string `config:"registry_file"` + RegistryFlush time.Duration `config:"registry_flush"` ConfigDir string `config:"config_dir"` ShutdownTimeout time.Duration `config:"shutdown_timeout"` Modules []*common.Config `config:"modules"` diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index 4b535e221eaf..5cbd05de5fe4 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" "sync" + "time" "github.com/elastic/beats/filebeat/input/file" helper "github.com/elastic/beats/libbeat/common/file" @@ -18,9 +19,12 @@ type Registrar struct { Channel chan []file.State out successLogger done chan struct{} - registryFile string // Path to the Registry File - states *file.States // Map with all file paths inside and the corresponding state + registryFile string // Path to the Registry File wg sync.WaitGroup + + states *file.States // Map with all file paths inside and the corresponding state + flushTimeout time.Duration + bufferedStateUpdates int } type successLogger interface { @@ -34,12 +38,13 @@ var ( registryWrites = monitoring.NewInt(nil, "registrar.writes") ) -func New(registryFile string, out successLogger) (*Registrar, error) { +func New(registryFile string, flushTimeout time.Duration, out successLogger) (*Registrar, error) { r := &Registrar{ registryFile: registryFile, done: make(chan struct{}), states: file.NewStates(), Channel: make(chan []file.State, 1), + flushTimeout: flushTimeout, out: out, wg: sync.WaitGroup{}, } @@ -149,13 +154,28 @@ func (r *Registrar) Run() { r.wg.Done() }() + var ( + timer *time.Timer + flushC <-chan time.Time + ) + for { select { case <-r.done: logp.Info("Ending Registrar") return + case <-flushC: + flushC = nil + timer.Stop() + r.flushRegistry() case states := <-r.Channel: r.onEvents(states) + if r.flushTimeout <= 0 { + r.flushRegistry() + } else if flushC == nil { + timer = time.NewTimer(r.flushTimeout) + flushC = timer.C + } } } } @@ -168,17 +188,11 @@ func (r *Registrar) onEvents(states []file.State) { cleanedStates := r.states.Cleanup() statesCleanup.Add(int64(cleanedStates)) + r.bufferedStateUpdates += len(states) + logp.Debug("registrar", "Registrar states cleaned up. Before: %d, After: %d", beforeCount, beforeCount-cleanedStates) - - if err := r.writeRegistry(); err != nil { - logp.Err("Writing of registry returned error: %v. Continuing...", err) - } - - if r.out != nil { - r.out.Published(len(states)) - } } // processEventStates gets the states from the events and writes them to the registrar state @@ -198,6 +212,17 @@ func (r *Registrar) Stop() { r.wg.Wait() } +func (r *Registrar) flushRegistry() { + if err := r.writeRegistry(); err != nil { + logp.Err("Writing of registry returned error: %v. Continuing...", err) + } + + if r.out != nil { + r.out.Published(r.bufferedStateUpdates) + } + r.bufferedStateUpdates = 0 +} + // writeRegistry writes the new json registry file to disk. func (r *Registrar) writeRegistry() error { logp.Debug("registrar", "Write registry file: %s", r.registryFile)