Skip to content

Commit

Permalink
Add filebeat modules reloading config
Browse files Browse the repository at this point in the history
  • Loading branch information
exekias committed Jun 28, 2017
1 parent 3c385ff commit 5d9cc6c
Show file tree
Hide file tree
Showing 13 changed files with 302 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha2...master[Check the HEAD d
- Add support for loading Xpack Machine Learning configurations from the modules, and added sample configurations for the Nginx module. {pull}4506[4506]
- Add udp prospector type. {pull}4452[4452]
- Enabled Cgo which means libc is dynamically compiled. {pull}4546[4546]
- Add module config reloading mechanism {pull}4566[4566]

*Heartbeat*

Expand Down
16 changes: 11 additions & 5 deletions filebeat/_meta/common.reference.p2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,14 @@ filebeat.prospectors:
#filebeat.shutdown_timeout: 0

# Enable filebeat config reloading
#filebeat.config.prospectors:
#enabled: false
#path: configs/*.yml
#reload.enabled: true
#reload.period: 10s
#filebeat.config:
#prospectors:
#enabled: false
#path: prospectors.d/*.yml
#reload.enabled: true
#reload.period: 10s
#modules:
#enabled: false
#path: modules.d/*.yml
#reload.enabled: true
#reload.period: 10s
9 changes: 5 additions & 4 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
}
}

if !config.ConfigProspector.Enabled() && !haveEnabledProspectors {
if !config.ConfigProspector.Enabled() && !config.ConfigModules.Enabled() && !haveEnabledProspectors {
if !b.InSetupCmd {
return nil, errors.New("No modules or prospectors enabled and configuration reloading disabled. What files do you want me to watch?")
} else {
Expand All @@ -76,7 +76,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
}
}

if *once && config.ConfigProspector.Enabled() {
if *once && config.ConfigProspector.Enabled() && config.ConfigModules.Enabled() {
return nil, errors.New("prospector configs and -once cannot be used together")
}

Expand Down Expand Up @@ -176,7 +176,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
return err
}

crawler, err := crawler.New(channel.NewOutlet(fb.done, spooler.Channel, wgEvents), config.Prospectors, fb.done, *once)
outlet := channel.NewOutlet(fb.done, spooler.Channel, wgEvents)
crawler, err := crawler.New(outlet, config.Prospectors, b.Info.Version, fb.done, *once)
if err != nil {
logp.Err("Could not init crawler: %v", err)
return err
Expand Down Expand Up @@ -218,7 +219,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
spooler.Stop()
}()

err = crawler.Start(registrar, config.ConfigProspector)
err = crawler.Start(registrar, config.ConfigProspector, config.ConfigModules)
if err != nil {
crawler.Stop()
return err
Expand Down
1 change: 1 addition & 0 deletions filebeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Config struct {
ShutdownTimeout time.Duration `config:"shutdown_timeout"`
Modules []*common.Config `config:"modules"`
ConfigProspector *common.Config `config:"config.prospectors"`
ConfigModules *common.Config `config:"config.modules"`
}

var (
Expand Down
18 changes: 16 additions & 2 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/fileset"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/prospector"
"github.com/elastic/beats/filebeat/registrar"
Expand All @@ -20,22 +21,24 @@ type Crawler struct {
wg sync.WaitGroup
reloader *cfgfile.Reloader
once bool
beatVersion string
beatDone chan struct{}
}

func New(out channel.Outleter, prospectorConfigs []*common.Config, beatDone chan struct{}, once bool) (*Crawler, error) {
func New(out channel.Outleter, prospectorConfigs []*common.Config, beatVersion string, beatDone chan struct{}, once bool) (*Crawler, error) {

return &Crawler{
out: out,
prospectors: map[uint64]*prospector.Prospector{},
prospectorConfigs: prospectorConfigs,
once: once,
beatVersion: beatVersion,
beatDone: beatDone,
}, nil
}

// Start starts the crawler with all prospectors
func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config) error {
func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config, configModules *common.Config) error {

logp.Info("Loading Prospectors: %v", len(c.prospectorConfigs))

Expand All @@ -57,6 +60,17 @@ func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config
}()
}

if configModules.Enabled() {
logp.Beta("Loading separate prospectors is enabled.")

c.reloader = cfgfile.NewReloader(configModules)
// TODO add beatVersion here
factory := fileset.NewFactory(c.out, r, "", c.beatDone)
go func() {
c.reloader.Run(factory)
}()
}

logp.Info("Loading and starting Prospectors completed. Enabled prospectors: %v", len(c.prospectors))

return nil
Expand Down
16 changes: 11 additions & 5 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -431,11 +431,17 @@ filebeat.prospectors:
#filebeat.shutdown_timeout: 0

# Enable filebeat config reloading
#filebeat.config.prospectors:
#enabled: false
#path: configs/*.yml
#reload.enabled: true
#reload.period: 10s
#filebeat.config:
#prospectors:
#enabled: false
#path: prospectors.d/*.yml
#reload.enabled: true
#reload.period: 10s
#modules:
#enabled: false
#path: modules.d/*.yml
#reload.enabled: true
#reload.period: 10s

#================================ General ======================================

Expand Down
86 changes: 86 additions & 0 deletions filebeat/fileset/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package fileset

import (
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/mitchellh/hashstructure"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/prospector"
"github.com/elastic/beats/filebeat/registrar"
)

// Factory is a factory for registrars
type Factory struct {
outlet channel.Outleter
registrar *registrar.Registrar
beatVersion string
beatDone chan struct{}
}

// Wrap an array of prospectors and implements cfgfile.Runner interface
type prospectorsRunner struct {
id uint64
prospectors []*prospector.Prospector
}

// NewFactory instantiates a new Factory
func NewFactory(outlet channel.Outleter, registrar *registrar.Registrar, beatVersion string, beatDone chan struct{}) *Factory {
return &Factory{
outlet: outlet,
registrar: registrar,
beatVersion: beatVersion,
beatDone: beatDone,
}
}

// Create creates a module based on a config
func (f *Factory) Create(c *common.Config) (cfgfile.Runner, error) {
// Start a registry of one module:
m, err := NewModuleRegistry([]*common.Config{c}, f.beatVersion)
if err != nil {
return nil, err
}

pConfigs, err := m.GetProspectorConfigs()
if err != nil {
return nil, err
}

// Hash module ID
var h map[string]interface{}
c.Unpack(&h)
id, err := hashstructure.Hash(h, nil)
if err != nil {
return nil, err
}

prospectors := make([]*prospector.Prospector, len(pConfigs))
for i, pConfig := range pConfigs {
prospectors[i], err = prospector.NewProspector(pConfig, f.outlet, f.beatDone, f.registrar.GetStates())
if err != nil {
logp.Err("Error creating prospector: %s", err)
return nil, err
}
}

return &prospectorsRunner{
id: id,
prospectors: prospectors,
}, nil
}

func (p *prospectorsRunner) Start() {
for _, prospector := range p.prospectors {
prospector.Start()
}
}
func (p *prospectorsRunner) Stop() {
for _, prospector := range p.prospectors {
prospector.Stop()
}
}
func (p *prospectorsRunner) ID() uint64 {
return p.id
}
2 changes: 1 addition & 1 deletion filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("reg
filebeat.publish_async: {{publish_async}}

{% if reload or reload_path -%}
filebeat.config.prospectors:
filebeat.config.{{ reload_type|default("prospectors") }}:
enabled: true
path: {{ reload_path }}
{% if reload -%}
Expand Down
5 changes: 5 additions & 0 deletions filebeat/tests/system/module/test/test/config/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: log
paths:
{{ range $i, $path := .paths }}
- {{$path}}
{{ end }}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
9 changes: 9 additions & 0 deletions filebeat/tests/system/module/test/test/manifest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module_version: "1.0"

var:
- name: paths
default:
- test.log

ingest_pipeline: ingest/pipeline.json
prospector: config/test.yml
Loading

0 comments on commit 5d9cc6c

Please sign in to comment.