Skip to content

Commit

Permalink
Merge pull request #644 from ruflin/filebeat-prospector
Browse files Browse the repository at this point in the history
Stop filebeat if filebeat is started without any prospectors defined
  • Loading branch information
Steffen Siering committed Jan 6, 2016
2 parents d98476b + c3f485a commit ce1ffd4
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 6 deletions.
5 changes: 4 additions & 1 deletion filebeat/beat/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
// Start up spooler
go fb.Spooler.Run()

crawl.Start(fb.FbConfig.Filebeat.Prospectors, fb.Spooler.Channel)
err = crawl.Start(fb.FbConfig.Filebeat.Prospectors, fb.Spooler.Channel)
if err != nil {
return err
}

// Publishes event to output
go Publish(b, fb)
Expand Down
14 changes: 10 additions & 4 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,22 @@ type Crawler struct {
running bool
}

func (crawler *Crawler) Start(files []config.ProspectorConfig, eventChan chan *input.FileEvent) {
func (crawler *Crawler) Start(prospectorConfigs []config.ProspectorConfig, eventChan chan *input.FileEvent) error {

pendingProspectorCnt := 0
crawler.running = true

if len(prospectorConfigs) == 0 {
return fmt.Errorf("No prospectors defined. You must have at least one prospector defined in the config file.")
}

// Prospect the globs/paths given on the command line and launch harvesters
for _, fileconfig := range files {
for _, prospectorConfig := range prospectorConfigs {

logp.Debug("prospector", "File Configs: %v", fileconfig.Paths)
logp.Debug("prospector", "File Configs: %v", prospectorConfig.Paths)

prospector := &Prospector{
ProspectorConfig: fileconfig,
ProspectorConfig: prospectorConfig,
registrar: crawler.Registrar,
}

Expand Down Expand Up @@ -76,6 +80,8 @@ func (crawler *Crawler) Start(files []config.ProspectorConfig, eventChan chan *i
}

logp.Info("All prospectors initialised with %d states to persist", len(crawler.Registrar.State))

return nil
}

func (crawler *Crawler) Stop() {
Expand Down
19 changes: 19 additions & 0 deletions filebeat/crawler/crawler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package crawler

import (
"testing"

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/input"
"github.com/stretchr/testify/assert"
)

func TestCrawlerStartError(t *testing.T) {
crawler := Crawler{}
channel := make(chan *input.FileEvent, 1)
prospectorConfigs := []config.ProspectorConfig{}

error := crawler.Start(prospectorConfigs, channel)

assert.Error(t, error)
}
6 changes: 5 additions & 1 deletion filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
############################# Filebeat ######################################
filebeat:
prospectors:
{% if prospectors is not defined %}
{% set prospectors = true %}
{% endif %}
{% if prospectors %}
-
# Paths that should be crawled and fetched
{% if path %}paths:
Expand Down Expand Up @@ -48,7 +52,7 @@ filebeat:
timeout: 1s
max_lines: {{ max_lines|default(500) }}
{% endif %}

{% endif %}
spool_size:
idle_timeout: 0.1s
registry_file: {{ fb.working_dir + '/' }}{{ registryFile|default(".filebeat")}}
Expand Down
18 changes: 18 additions & 0 deletions filebeat/tests/system/test_prospector.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,21 @@ def test_rotating_ignore_older_low_write_rate(self):
max_timeout=5)

proc.kill_and_wait()

def test_shutdown_no_prospectors(self):
self.render_config_template(
prospectors=False,
)

proc = self.start_filebeat(debug_selectors=['*'])

# wait for first "Start next scan" log message
self.wait_until(
lambda: self.log_contains(
"No prospectors defined"),
max_timeout=10)

self.wait_until(
lambda: self.log_contains(
"shutting down"),
max_timeout=10)

0 comments on commit ce1ffd4

Please sign in to comment.