Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flush timeout setting to filebeat registrar #5146

Merged
merged 1 commit into from
Sep 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Remove error log from runnerfactory as error is returned by API. {pull}5085[5085]
- Changed the number of shards in the default configuration to 3. {issue}5095[5095]
- Remove error log from runnerfactory as error is returned by API. {pull}5085[5085]
- Add `filebeat.registry_flush` setting, to delay the registry updates. {pull}5146[5146]

*Heartbeat*

Expand Down
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
finishedLogger := newFinishedLogger(wgEvents)

// Setup registrar to persist state
registrar, err := registrar.New(config.RegistryFile, finishedLogger)
registrar, err := registrar.New(config.RegistryFile, config.RegistryFlush, finishedLogger)
if err != nil {
logp.Err("Could not init registrar: %v", err)
return err
Expand Down
1 change: 1 addition & 0 deletions filebeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
type Config struct {
Prospectors []*common.Config `config:"prospectors"`
RegistryFile string `config:"registry_file"`
RegistryFlush time.Duration `config:"registry_flush"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would prefer registry.flush and registry.file but too late to change I think.

ConfigDir string `config:"config_dir"`
ShutdownTimeout time.Duration `config:"shutdown_timeout"`
Modules []*common.Config `config:"modules"`
Expand Down
47 changes: 36 additions & 11 deletions filebeat/registrar/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"sync"
"time"

"github.com/elastic/beats/filebeat/input/file"
helper "github.com/elastic/beats/libbeat/common/file"
Expand All @@ -18,9 +19,12 @@ type Registrar struct {
Channel chan []file.State
out successLogger
done chan struct{}
registryFile string // Path to the Registry File
states *file.States // Map with all file paths inside and the corresponding state
registryFile string // Path to the Registry File
wg sync.WaitGroup

states *file.States // Map with all file paths inside and the corresponding state
flushTimeout time.Duration
bufferedStateUpdates int
}

type successLogger interface {
Expand All @@ -34,12 +38,13 @@ var (
registryWrites = monitoring.NewInt(nil, "registrar.writes")
)

func New(registryFile string, out successLogger) (*Registrar, error) {
func New(registryFile string, flushTimeout time.Duration, out successLogger) (*Registrar, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call the config also flush_timeout? Would make it more explicit.

Copy link
Author

@urso urso Sep 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean filebeat.registry_flush_timeout? Well, actually it's more like a 'delay' setting...

r := &Registrar{
registryFile: registryFile,
done: make(chan struct{}),
states: file.NewStates(),
Channel: make(chan []file.State, 1),
flushTimeout: flushTimeout,
out: out,
wg: sync.WaitGroup{},
}
Expand Down Expand Up @@ -149,13 +154,28 @@ func (r *Registrar) Run() {
r.wg.Done()
}()

var (
timer *time.Timer
flushC <-chan time.Time
)

for {
select {
case <-r.done:
logp.Info("Ending Registrar")
return
case <-flushC:
flushC = nil
timer.Stop()
r.flushRegistry()
case states := <-r.Channel:
r.onEvents(states)
if r.flushTimeout <= 0 {
r.flushRegistry()
} else if flushC == nil {
timer = time.NewTimer(r.flushTimeout)
flushC = timer.C
}
}
}
}
Expand All @@ -168,17 +188,11 @@ func (r *Registrar) onEvents(states []file.State) {
cleanedStates := r.states.Cleanup()
statesCleanup.Add(int64(cleanedStates))

r.bufferedStateUpdates += len(states)

logp.Debug("registrar",
"Registrar states cleaned up. Before: %d, After: %d",
beforeCount, beforeCount-cleanedStates)

if err := r.writeRegistry(); err != nil {
logp.Err("Writing of registry returned error: %v. Continuing...", err)
}

if r.out != nil {
r.out.Published(len(states))
}
}

// processEventStates gets the states from the events and writes them to the registrar state
Expand All @@ -198,6 +212,17 @@ func (r *Registrar) Stop() {
r.wg.Wait()
}

func (r *Registrar) flushRegistry() {
if err := r.writeRegistry(); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a check here that if bufferedStateUpdates==0 it just returns? Like this flushTimeout could be set very low and still not have frequent writes in case there are no updates.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timer is only started after an event has been added.

logp.Err("Writing of registry returned error: %v. Continuing...", err)
}

if r.out != nil {
r.out.Published(r.bufferedStateUpdates)
}
r.bufferedStateUpdates = 0
}

// writeRegistry writes the new json registry file to disk.
func (r *Registrar) writeRegistry() error {
logp.Debug("registrar", "Write registry file: %s", r.registryFile)
Expand Down