From 5d9cc6cc28931bf0065fa8d80e0ee862b4fdbf03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20P=C3=A9rez-Aradros=20Herce?= Date: Mon, 26 Jun 2017 18:21:19 +0200 Subject: [PATCH 1/5] Add filebeat modules reloading config --- CHANGELOG.asciidoc | 1 + filebeat/_meta/common.reference.p2.yml | 16 +- filebeat/beater/filebeat.go | 9 +- filebeat/config/config.go | 1 + filebeat/crawler/crawler.go | 18 ++- filebeat/filebeat.reference.yml | 16 +- filebeat/fileset/factory.go | 86 ++++++++++ filebeat/tests/system/config/filebeat.yml.j2 | 2 +- .../system/module/test/test/config/test.yml | 5 + .../module/test/test/ingest/pipeline.json | 1 + .../system/module/test/test/manifest.yml | 9 ++ filebeat/tests/system/test_reload_modules.py | 152 ++++++++++++++++++ ...t_reload.py => test_reload_prospectors.py} | 6 +- 13 files changed, 302 insertions(+), 20 deletions(-) create mode 100644 filebeat/fileset/factory.go create mode 100644 filebeat/tests/system/module/test/test/config/test.yml create mode 100644 filebeat/tests/system/module/test/test/ingest/pipeline.json create mode 100644 filebeat/tests/system/module/test/test/manifest.yml create mode 100644 filebeat/tests/system/test_reload_modules.py rename filebeat/tests/system/{test_reload.py => test_reload_prospectors.py} (98%) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 28b49fb9814..386d4c0fa4a 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -68,6 +68,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha2...master[Check the HEAD d - Add support for loading Xpack Machine Learning configurations from the modules, and added sample configurations for the Nginx module. {pull}4506[4506] - Add udp prospector type. {pull}4452[4452] - Enabled Cgo which means libc is dynamically compiled. {pull}4546[4546] +- Add module config reloading mechanism {pull}4566[4566] *Heartbeat* diff --git a/filebeat/_meta/common.reference.p2.yml b/filebeat/_meta/common.reference.p2.yml index 2339507e3fb..77b357ba4a2 100644 --- a/filebeat/_meta/common.reference.p2.yml +++ b/filebeat/_meta/common.reference.p2.yml @@ -262,8 +262,14 @@ filebeat.prospectors: #filebeat.shutdown_timeout: 0 # Enable filebeat config reloading -#filebeat.config.prospectors: - #enabled: false - #path: configs/*.yml - #reload.enabled: true - #reload.period: 10s +#filebeat.config: + #prospectors: + #enabled: false + #path: prospectors.d/*.yml + #reload.enabled: true + #reload.period: 10s + #modules: + #enabled: false + #path: modules.d/*.yml + #reload.enabled: true + #reload.period: 10s diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 1d092288a1e..a2f1c3fadad 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -67,7 +67,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { } } - if !config.ConfigProspector.Enabled() && !haveEnabledProspectors { + if !config.ConfigProspector.Enabled() && !config.ConfigModules.Enabled() && !haveEnabledProspectors { if !b.InSetupCmd { return nil, errors.New("No modules or prospectors enabled and configuration reloading disabled. What files do you want me to watch?") } else { @@ -76,7 +76,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { } } - if *once && config.ConfigProspector.Enabled() { + if *once && config.ConfigProspector.Enabled() && config.ConfigModules.Enabled() { return nil, errors.New("prospector configs and -once cannot be used together") } @@ -176,7 +176,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error { return err } - crawler, err := crawler.New(channel.NewOutlet(fb.done, spooler.Channel, wgEvents), config.Prospectors, fb.done, *once) + outlet := channel.NewOutlet(fb.done, spooler.Channel, wgEvents) + crawler, err := crawler.New(outlet, config.Prospectors, b.Info.Version, fb.done, *once) if err != nil { logp.Err("Could not init crawler: %v", err) return err @@ -218,7 +219,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { spooler.Stop() }() - err = crawler.Start(registrar, config.ConfigProspector) + err = crawler.Start(registrar, config.ConfigProspector, config.ConfigModules) if err != nil { crawler.Stop() return err diff --git a/filebeat/config/config.go b/filebeat/config/config.go index e69b6b77f08..eec92518c59 100644 --- a/filebeat/config/config.go +++ b/filebeat/config/config.go @@ -28,6 +28,7 @@ type Config struct { ShutdownTimeout time.Duration `config:"shutdown_timeout"` Modules []*common.Config `config:"modules"` ConfigProspector *common.Config `config:"config.prospectors"` + ConfigModules *common.Config `config:"config.modules"` } var ( diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index 5a54784085a..96bffe37266 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/elastic/beats/filebeat/channel" + "github.com/elastic/beats/filebeat/fileset" "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/filebeat/prospector" "github.com/elastic/beats/filebeat/registrar" @@ -20,22 +21,24 @@ type Crawler struct { wg sync.WaitGroup reloader *cfgfile.Reloader once bool + beatVersion string beatDone chan struct{} } -func New(out channel.Outleter, prospectorConfigs []*common.Config, beatDone chan struct{}, once bool) (*Crawler, error) { +func New(out channel.Outleter, prospectorConfigs []*common.Config, beatVersion string, beatDone chan struct{}, once bool) (*Crawler, error) { return &Crawler{ out: out, prospectors: map[uint64]*prospector.Prospector{}, prospectorConfigs: prospectorConfigs, once: once, + beatVersion: beatVersion, beatDone: beatDone, }, nil } // Start starts the crawler with all prospectors -func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config) error { +func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config, configModules *common.Config) error { logp.Info("Loading Prospectors: %v", len(c.prospectorConfigs)) @@ -57,6 +60,17 @@ func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config }() } + if configModules.Enabled() { + logp.Beta("Loading separate prospectors is enabled.") + + c.reloader = cfgfile.NewReloader(configModules) + // TODO add beatVersion here + factory := fileset.NewFactory(c.out, r, "", c.beatDone) + go func() { + c.reloader.Run(factory) + }() + } + logp.Info("Loading and starting Prospectors completed. Enabled prospectors: %v", len(c.prospectors)) return nil diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index d4bcf8a60a4..816adeb4ea4 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -431,11 +431,17 @@ filebeat.prospectors: #filebeat.shutdown_timeout: 0 # Enable filebeat config reloading -#filebeat.config.prospectors: - #enabled: false - #path: configs/*.yml - #reload.enabled: true - #reload.period: 10s +#filebeat.config: + #prospectors: + #enabled: false + #path: prospectors.d/*.yml + #reload.enabled: true + #reload.period: 10s + #modules: + #enabled: false + #path: modules.d/*.yml + #reload.enabled: true + #reload.period: 10s #================================ General ====================================== diff --git a/filebeat/fileset/factory.go b/filebeat/fileset/factory.go new file mode 100644 index 00000000000..7b223f69d41 --- /dev/null +++ b/filebeat/fileset/factory.go @@ -0,0 +1,86 @@ +package fileset + +import ( + "github.com/elastic/beats/libbeat/cfgfile" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/mitchellh/hashstructure" + + "github.com/elastic/beats/filebeat/channel" + "github.com/elastic/beats/filebeat/prospector" + "github.com/elastic/beats/filebeat/registrar" +) + +// Factory is a factory for registrars +type Factory struct { + outlet channel.Outleter + registrar *registrar.Registrar + beatVersion string + beatDone chan struct{} +} + +// Wrap an array of prospectors and implements cfgfile.Runner interface +type prospectorsRunner struct { + id uint64 + prospectors []*prospector.Prospector +} + +// NewFactory instantiates a new Factory +func NewFactory(outlet channel.Outleter, registrar *registrar.Registrar, beatVersion string, beatDone chan struct{}) *Factory { + return &Factory{ + outlet: outlet, + registrar: registrar, + beatVersion: beatVersion, + beatDone: beatDone, + } +} + +// Create creates a module based on a config +func (f *Factory) Create(c *common.Config) (cfgfile.Runner, error) { + // Start a registry of one module: + m, err := NewModuleRegistry([]*common.Config{c}, f.beatVersion) + if err != nil { + return nil, err + } + + pConfigs, err := m.GetProspectorConfigs() + if err != nil { + return nil, err + } + + // Hash module ID + var h map[string]interface{} + c.Unpack(&h) + id, err := hashstructure.Hash(h, nil) + if err != nil { + return nil, err + } + + prospectors := make([]*prospector.Prospector, len(pConfigs)) + for i, pConfig := range pConfigs { + prospectors[i], err = prospector.NewProspector(pConfig, f.outlet, f.beatDone, f.registrar.GetStates()) + if err != nil { + logp.Err("Error creating prospector: %s", err) + return nil, err + } + } + + return &prospectorsRunner{ + id: id, + prospectors: prospectors, + }, nil +} + +func (p *prospectorsRunner) Start() { + for _, prospector := range p.prospectors { + prospector.Start() + } +} +func (p *prospectorsRunner) Stop() { + for _, prospector := range p.prospectors { + prospector.Stop() + } +} +func (p *prospectorsRunner) ID() uint64 { + return p.id +} diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index cfe5fb64d54..4961a452cf9 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -98,7 +98,7 @@ filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("reg filebeat.publish_async: {{publish_async}} {% if reload or reload_path -%} -filebeat.config.prospectors: +filebeat.config.{{ reload_type|default("prospectors") }}: enabled: true path: {{ reload_path }} {% if reload -%} diff --git a/filebeat/tests/system/module/test/test/config/test.yml b/filebeat/tests/system/module/test/test/config/test.yml new file mode 100644 index 00000000000..21548bbaf93 --- /dev/null +++ b/filebeat/tests/system/module/test/test/config/test.yml @@ -0,0 +1,5 @@ +type: log +paths: +{{ range $i, $path := .paths }} + - {{$path}} +{{ end }} diff --git a/filebeat/tests/system/module/test/test/ingest/pipeline.json b/filebeat/tests/system/module/test/test/ingest/pipeline.json new file mode 100644 index 00000000000..0967ef424bc --- /dev/null +++ b/filebeat/tests/system/module/test/test/ingest/pipeline.json @@ -0,0 +1 @@ +{} diff --git a/filebeat/tests/system/module/test/test/manifest.yml b/filebeat/tests/system/module/test/test/manifest.yml new file mode 100644 index 00000000000..8b90842943f --- /dev/null +++ b/filebeat/tests/system/module/test/test/manifest.yml @@ -0,0 +1,9 @@ +module_version: "1.0" + +var: + - name: paths + default: + - test.log + +ingest_pipeline: ingest/pipeline.json +prospector: config/test.yml diff --git a/filebeat/tests/system/test_reload_modules.py b/filebeat/tests/system/test_reload_modules.py new file mode 100644 index 00000000000..d6b3f183ff5 --- /dev/null +++ b/filebeat/tests/system/test_reload_modules.py @@ -0,0 +1,152 @@ +import re +import sys +import unittest +import os +import shutil +import time +from filebeat import BaseTest + + +moduleConfigTemplate = """ +- module: test + test: + enabled: true + var.paths: + - {} + prospector: + scan_frequency: 1s + auth: + enabled: false +""" + + +class Test(BaseTest): + + def setUp(self): + super(BaseTest, self).setUp() + # Copy system module + shutil.copytree(os.path.join("module", "test"), + os.path.join(self.working_dir, "module", "test")) + + def test_reload(self): + """ + Test modules basic reload + """ + self.render_config_template( + reload=True, + reload_path=self.working_dir + "/configs/*.yml", + reload_type="modules", + prospectors=False, + ) + + proc = self.start_beat() + + os.mkdir(self.working_dir + "/logs/") + logfile = self.working_dir + "/logs/test.log" + os.mkdir(self.working_dir + "/configs/") + + with open(self.working_dir + "/configs/system.yml.test", 'w') as f: + f.write(moduleConfigTemplate.format(self.working_dir + "/logs/*")) + os.rename(self.working_dir + "/configs/system.yml.test", + self.working_dir + "/configs/system.yml") + + with open(logfile, 'w') as f: + f.write("Hello world\n") + + self.wait_until(lambda: self.output_lines() > 0) + assert self.output_has_message("Hello world") + proc.check_kill_and_wait() + + def test_start_stop(self): + """ + Test basic modules start and stop + """ + self.render_config_template( + reload=True, + reload_path=self.working_dir + "/configs/*.yml", + reload_type="modules", + prospectors=False, + ) + + proc = self.start_beat() + + os.mkdir(self.working_dir + "/logs/") + logfile = self.working_dir + "/logs/test.log" + os.mkdir(self.working_dir + "/configs/") + + with open(self.working_dir + "/configs/system.yml.test", 'w') as f: + f.write(moduleConfigTemplate.format(self.working_dir + "/logs/*")) + os.rename(self.working_dir + "/configs/system.yml.test", + self.working_dir + "/configs/system.yml") + + with open(logfile, 'w') as f: + f.write("Hello world\n") + + self.wait_until(lambda: self.output_lines() == 1, max_timeout=10) + print(self.output_lines()) + + # Remove prospector + with open(self.working_dir + "/configs/system.yml", 'w') as f: + f.write("") + + # Wait until prospector is stopped + self.wait_until( + lambda: self.log_contains("Runner stopped:"), + max_timeout=15) + + with open(logfile, 'a') as f: + f.write("Hello world\n") + + # Wait to give a change to pick up the new line (it shouldn't) + time.sleep(1) + + self.wait_until(lambda: self.output_lines() == 1, max_timeout=5) + proc.check_kill_and_wait() + + def test_load_configs(self): + """ + Test loading separate module configs + """ + self.render_config_template( + reload_path=self.working_dir + "/configs/*.yml", + reload_type="modules", + prospectors=False, + ) + + os.mkdir(self.working_dir + "/logs/") + os.mkdir(self.working_dir + "/configs/") + logfile1 = self.working_dir + "/logs/test1.log" + logfile2 = self.working_dir + "/logs/test2.log" + + with open(self.working_dir + "/configs/module1.yml", 'w') as f: + f.write(moduleConfigTemplate.format( + self.working_dir + "/logs/test1.log")) + + with open(self.working_dir + "/configs/module2.yml", 'w') as f: + f.write(moduleConfigTemplate.format( + self.working_dir + "/logs/test2.log")) + + proc = self.start_beat() + + with open(logfile1, 'w') as f: + f.write("Hello 1\n") + + self.wait_until(lambda: self.output_lines() == 1) + + with open(logfile2, 'w') as f: + f.write("Hello 2\n") + + self.wait_until(lambda: self.output_lines() == 2) + + output = self.read_output() + + # Reloading stopped. + self.wait_until( + lambda: self.log_contains("Loading of config files completed."), + max_timeout=15) + + # Make sure the correct lines were picked up + assert self.output_lines() == 2 + assert output[0]["message"] == "Hello 1" + assert output[1]["message"] == "Hello 2" + proc.check_kill_and_wait() diff --git a/filebeat/tests/system/test_reload.py b/filebeat/tests/system/test_reload_prospectors.py similarity index 98% rename from filebeat/tests/system/test_reload.py rename to filebeat/tests/system/test_reload_prospectors.py index 084d2a01eab..d9db94c8c50 100644 --- a/filebeat/tests/system/test_reload.py +++ b/filebeat/tests/system/test_reload_prospectors.py @@ -18,7 +18,7 @@ class Test(BaseTest): def test_reload(self): """ - Test basic reload + Test basic prospectors reload """ self.render_config_template( reload=True, @@ -43,7 +43,7 @@ def test_reload(self): def test_start_stop(self): """ - Test basic start and stop + Test basic prospectors start and stop """ self.render_config_template( reload=True, @@ -86,7 +86,7 @@ def test_start_stop(self): def test_start_stop_replace(self): """ - Test basic start and replace with an other prospecto + Test basic start and replace with another prospector """ self.render_config_template( reload=True, From 094e6bf927fb262654c5f10511ca0767e24bab85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20P=C3=A9rez-Aradros=20Herce?= Date: Mon, 3 Jul 2017 11:46:02 +0200 Subject: [PATCH 2/5] Pass beat version to module factory --- filebeat/crawler/crawler.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index 96bffe37266..2a36aaba399 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -64,8 +64,7 @@ func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config logp.Beta("Loading separate prospectors is enabled.") c.reloader = cfgfile.NewReloader(configModules) - // TODO add beatVersion here - factory := fileset.NewFactory(c.out, r, "", c.beatDone) + factory := fileset.NewFactory(c.out, r, c.beatVersion, c.beatDone) go func() { c.reloader.Run(factory) }() From acba788ffc1feb5548aa60312df9b0d3f8a3fb8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20P=C3=A9rez-Aradros=20Herce?= Date: Mon, 3 Jul 2017 14:20:22 +0200 Subject: [PATCH 3/5] Mark reloading as beta --- CHANGELOG.asciidoc | 2 +- filebeat/crawler/crawler.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 386d4c0fa4a..f64e07204bf 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -68,7 +68,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha2...master[Check the HEAD d - Add support for loading Xpack Machine Learning configurations from the modules, and added sample configurations for the Nginx module. {pull}4506[4506] - Add udp prospector type. {pull}4452[4452] - Enabled Cgo which means libc is dynamically compiled. {pull}4546[4546] -- Add module config reloading mechanism {pull}4566[4566] +- Add Beta module config reloading mechanism {pull}4566[4566] *Heartbeat* diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index 2a36aaba399..35db56008b4 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -61,7 +61,7 @@ func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config } if configModules.Enabled() { - logp.Beta("Loading separate prospectors is enabled.") + logp.Beta("Loading separate modules is enabled.") c.reloader = cfgfile.NewReloader(configModules) factory := fileset.NewFactory(c.out, r, c.beatVersion, c.beatDone) From 9ba3d40409da1bfc7b84ffd317662816d65bdf39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20P=C3=A9rez-Aradros=20Herce?= Date: Mon, 3 Jul 2017 16:24:52 +0200 Subject: [PATCH 4/5] Avoid repeating message when modules/prospectors reload is enabled --- filebeat/prospector/log/prospector.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filebeat/prospector/log/prospector.go b/filebeat/prospector/log/prospector.go index 9d7a84825ee..844e49999a3 100644 --- a/filebeat/prospector/log/prospector.go +++ b/filebeat/prospector/log/prospector.go @@ -104,7 +104,7 @@ func (p *Prospector) loadStates(states []file.State) error { } } - logp.Info("Prospector with previous states loaded: %v", p.states.Count()) + logp.Debug("prospector", "Prospector with previous states loaded: %v", p.states.Count()) return nil } From 798f41b32509d0144a9c9c98ca14cbcc2e1ebd51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20P=C3=A9rez-Aradros=20Herce?= Date: Mon, 3 Jul 2017 16:38:29 +0200 Subject: [PATCH 5/5] Load pipelines on new module instances --- filebeat/beater/filebeat.go | 23 +++++++-- filebeat/crawler/crawler.go | 4 +- filebeat/fileset/factory.go | 50 +++++++++++++------ filebeat/tests/system/config/filebeat.yml.j2 | 4 ++ .../module/test/test/ingest/default.json | 8 +++ .../module/test/test/ingest/pipeline.json | 1 - .../system/module/test/test/manifest.yml | 2 +- filebeat/tests/system/test_reload_modules.py | 36 +++++++++++++ 8 files changed, 105 insertions(+), 23 deletions(-) create mode 100644 filebeat/tests/system/module/test/test/ingest/default.json delete mode 100644 filebeat/tests/system/module/test/test/ingest/pipeline.json diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index a2f1c3fadad..a5927c1a4ef 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -24,6 +24,11 @@ import ( _ "github.com/elastic/beats/filebeat/processor/add_kubernetes_metadata" ) +const pipelinesWarning = "Filebeat is unable to load the Ingest Node pipelines for the configured" + + " modules because the Elasticsearch output is not configured/enabled. If you have" + + " already loaded the Ingest Node pipelines or are using Logstash pipelines, you" + + " can ignore this warning." + var ( once = flag.Bool("once", false, "Run filebeat only once until all harvesters reach EOF") ) @@ -99,10 +104,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { // setup. func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error { if b.Config.Output.Name() != "elasticsearch" { - logp.Warn("Filebeat is unable to load the Ingest Node pipelines for the configured" + - " modules because the Elasticsearch output is not configured/enabled. If you have" + - " already loaded the Ingest Node pipelines or are using Logstash pipelines, you" + - " can ignore this warning.") + logp.Warn(pipelinesWarning) return nil } @@ -219,7 +221,18 @@ func (fb *Filebeat) Run(b *beat.Beat) error { spooler.Stop() }() - err = crawler.Start(registrar, config.ConfigProspector, config.ConfigModules) + var esClient fileset.PipelineLoader + if b.Config.Output.Name() == "elasticsearch" { + esConfig := b.Config.Output.Config() + esClient, err = elasticsearch.NewConnectedClient(esConfig) + if err != nil { + return errors.Wrap(err, "Error creating Elasticsearch client") + } + } else { + logp.Warn(pipelinesWarning) + } + + err = crawler.Start(registrar, config.ConfigProspector, config.ConfigModules, esClient) if err != nil { crawler.Stop() return err diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index 35db56008b4..7bcecbd6f65 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -38,7 +38,7 @@ func New(out channel.Outleter, prospectorConfigs []*common.Config, beatVersion s } // Start starts the crawler with all prospectors -func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config, configModules *common.Config) error { +func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config, configModules *common.Config, pipelineLoader fileset.PipelineLoader) error { logp.Info("Loading Prospectors: %v", len(c.prospectorConfigs)) @@ -64,7 +64,7 @@ func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config logp.Beta("Loading separate modules is enabled.") c.reloader = cfgfile.NewReloader(configModules) - factory := fileset.NewFactory(c.out, r, c.beatVersion, c.beatDone) + factory := fileset.NewFactory(c.out, r, c.beatVersion, pipelineLoader, c.beatDone) go func() { c.reloader.Run(factory) }() diff --git a/filebeat/fileset/factory.go b/filebeat/fileset/factory.go index 7b223f69d41..3a7791ff152 100644 --- a/filebeat/fileset/factory.go +++ b/filebeat/fileset/factory.go @@ -4,6 +4,7 @@ import ( "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/outputs/elasticsearch" "github.com/mitchellh/hashstructure" "github.com/elastic/beats/filebeat/channel" @@ -11,27 +12,31 @@ import ( "github.com/elastic/beats/filebeat/registrar" ) -// Factory is a factory for registrars +// Factory for modules type Factory struct { - outlet channel.Outleter - registrar *registrar.Registrar - beatVersion string - beatDone chan struct{} + outlet channel.Outleter + registrar *registrar.Registrar + beatVersion string + pipelineLoader PipelineLoader + beatDone chan struct{} } // Wrap an array of prospectors and implements cfgfile.Runner interface type prospectorsRunner struct { - id uint64 - prospectors []*prospector.Prospector + id uint64 + moduleRegistry *ModuleRegistry + prospectors []*prospector.Prospector + pipelineLoader PipelineLoader } // NewFactory instantiates a new Factory -func NewFactory(outlet channel.Outleter, registrar *registrar.Registrar, beatVersion string, beatDone chan struct{}) *Factory { +func NewFactory(outlet channel.Outleter, registrar *registrar.Registrar, beatVersion string, pipelineLoader PipelineLoader, beatDone chan struct{}) *Factory { return &Factory{ - outlet: outlet, - registrar: registrar, - beatVersion: beatVersion, - beatDone: beatDone, + outlet: outlet, + registrar: registrar, + beatVersion: beatVersion, + beatDone: beatDone, + pipelineLoader: pipelineLoader, } } @@ -66,12 +71,29 @@ func (f *Factory) Create(c *common.Config) (cfgfile.Runner, error) { } return &prospectorsRunner{ - id: id, - prospectors: prospectors, + id: id, + moduleRegistry: m, + prospectors: prospectors, + pipelineLoader: f.pipelineLoader, }, nil } func (p *prospectorsRunner) Start() { + // Load pipelines + if p.pipelineLoader != nil { + // Setup a callback & load now too, as we are already connected + callback := func(esClient *elasticsearch.Client) error { + return p.moduleRegistry.LoadPipelines(p.pipelineLoader) + } + elasticsearch.RegisterConnectCallback(callback) + + err := p.moduleRegistry.LoadPipelines(p.pipelineLoader) + if err != nil { + // Log error and continue + logp.Err("Error loading pipeline: %s", err) + } + } + for _, prospector := range p.prospectors { prospector.Start() } diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index 4961a452cf9..6073977562f 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -164,8 +164,12 @@ processors: #------------------------------- Elasticsearch output ---------------------------- output.elasticsearch: hosts: ["{{ elasticsearch.host }}"] +{% if elasticsearch.pipeline %} pipeline: {{elasticsearch.pipeline}} +{% endif %} +{% if elasticsearch.index %} index: {{elasticsearch.index}} +{% endif %} {%- elif logstash %} #------------------------------- Logstash output --------------------------------- output.logstash: diff --git a/filebeat/tests/system/module/test/test/ingest/default.json b/filebeat/tests/system/module/test/test/ingest/default.json new file mode 100644 index 00000000000..e011c8e116d --- /dev/null +++ b/filebeat/tests/system/module/test/test/ingest/default.json @@ -0,0 +1,8 @@ +{ + "description": "Test pipeline.", + "processors": [{ + "remove":{ + "field": "message" + } + }] +} diff --git a/filebeat/tests/system/module/test/test/ingest/pipeline.json b/filebeat/tests/system/module/test/test/ingest/pipeline.json deleted file mode 100644 index 0967ef424bc..00000000000 --- a/filebeat/tests/system/module/test/test/ingest/pipeline.json +++ /dev/null @@ -1 +0,0 @@ -{} diff --git a/filebeat/tests/system/module/test/test/manifest.yml b/filebeat/tests/system/module/test/test/manifest.yml index 8b90842943f..d55e76cd2df 100644 --- a/filebeat/tests/system/module/test/test/manifest.yml +++ b/filebeat/tests/system/module/test/test/manifest.yml @@ -5,5 +5,5 @@ var: default: - test.log -ingest_pipeline: ingest/pipeline.json +ingest_pipeline: ingest/default.json prospector: config/test.yml diff --git a/filebeat/tests/system/test_reload_modules.py b/filebeat/tests/system/test_reload_modules.py index d6b3f183ff5..dcba40c967e 100644 --- a/filebeat/tests/system/test_reload_modules.py +++ b/filebeat/tests/system/test_reload_modules.py @@ -4,7 +4,10 @@ import os import shutil import time + from filebeat import BaseTest +from beat.beat import INTEGRATION_TESTS +from elasticsearch import Elasticsearch moduleConfigTemplate = """ @@ -24,6 +27,7 @@ class Test(BaseTest): def setUp(self): super(BaseTest, self).setUp() + self.es = Elasticsearch([self.get_elasticsearch_url()]) # Copy system module shutil.copytree(os.path.join("module", "test"), os.path.join(self.working_dir, "module", "test")) @@ -57,6 +61,38 @@ def test_reload(self): assert self.output_has_message("Hello world") proc.check_kill_and_wait() + @unittest.skipUnless(INTEGRATION_TESTS, "integration test") + def test_reload_writes_pipeline(self): + """ + Test modules reload brings pipelines + """ + self.render_config_template( + reload=True, + reload_path=self.working_dir + "/configs/*.yml", + reload_type="modules", + prospectors=False, + elasticsearch={"host": self.get_elasticsearch_url()} + ) + + proc = self.start_beat() + + os.mkdir(self.working_dir + "/logs/") + logfile = self.working_dir + "/logs/test.log" + os.mkdir(self.working_dir + "/configs/") + + with open(self.working_dir + "/configs/system.yml.test", 'w') as f: + f.write(moduleConfigTemplate.format(self.working_dir + "/logs/*")) + os.rename(self.working_dir + "/configs/system.yml.test", + self.working_dir + "/configs/system.yml") + + with open(logfile, 'w') as f: + f.write("Hello world\n") + + # Check pipeline is present + self.wait_until(lambda: any(re.match("filebeat-.*-test-test-default", key) + for key in self.es.transport.perform_request("GET", "/_ingest/pipeline/").keys())) + proc.check_kill_and_wait() + def test_start_stop(self): """ Test basic modules start and stop