Skip to content

Commit

Permalink
Introduce harvester_limit to limit number of harvesters (#2417)
Browse files Browse the repository at this point in the history
This limits the number of harvesters that are started per prospector. Closes #2236.
  • Loading branch information
ruflin authored and tsg committed Aug 30, 2016
1 parent eafb684 commit 45202c5
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 29 deletions.
8 changes: 5 additions & 3 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,16 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d

*Packetbeat*

- Add cassandra protocol analyzer to packetbeat. {pull}1959[1959]
- Match connections with IPv6 addresses to processes {pull}2254[2254]
- Add IP address to -devices command output {pull}2327[2327]
- Add cassandra protocol analyzer to packetbeat. {pull}1959[1959]
- Match connections with IPv6 addresses to processes {pull}2254[2254]
- Add IP address to -devices command output {pull}2327[2327]

*Topbeat*

*Filebeat*

- Add harvester_limit option {pull}2417[2417]

*Winlogbeat*


Expand Down
19 changes: 18 additions & 1 deletion filebeat/docs/reference/configuration/filebeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ The timestamp for closing a file does not depend on the modification time of the

You can use time strings like 2h (2 hours) and 5m (5 minutes). The default is 5m.


===== close_renamed

WARNING: Only use this options if you understand that data loss is a potential side effect.
Expand Down Expand Up @@ -410,6 +409,24 @@ the backoff algorithm is disabled, and the `backoff` value is used for waiting f
lines. The `backoff` value will be multiplied each time with the `backoff_factor` until
`max_backoff` is reached. The default is 2.

===== harvester_limit

EXPERIMENTAL

harvester_limit limits the number of harvesters that are started in parallel for one prospector. This directly relates
to the maximum number of file handlers that are opened. The default is 0 which means there is no limit. This configuration
is useful if the number of files to be harvested exceeds the open file handler limit of the operating system.

As setting a limit on harvester means that potentially not all files are opened in parallel, it is recommended to use
this option in combination with the close_* options to make sure harvesters are stopped more often so new files can be
picked up.

Currently if a new harvester can be started again, the new harvester to be started is picked randomly. This means it can
happen that a harvester for a file which was just closed and the file was updated again will be started instead of a
harvester for a file which wasn't harvested for a longer period of time.

This configuration option applies per prospector. This can be indirectly used to set higher priorities on certain prospectors
by assining a higher limit of harvesters.

[[configuration-global-options]]
=== Filebeat Global Configuration
Expand Down
4 changes: 4 additions & 0 deletions filebeat/etc/beat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ filebeat.prospectors:
# The backoff value will be multiplied each time with the backoff_factor until max_backoff is reached
#backoff_factor: 2

# Experimental: Max number of harvesters that are started in parallel.
# Default is 0 which means unlimited
#harvester_limit: 0

### Harvester closing options

# Close inactive closes the file handler after the predefined period.
Expand Down
4 changes: 4 additions & 0 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ filebeat.prospectors:
# The backoff value will be multiplied each time with the backoff_factor until max_backoff is reached
#backoff_factor: 2

# Experimental: Max number of harvesters that are started in parallel.
# Default is 0 which means unlimited
#harvester_limit: 0

### Harvester closing options

# Close inactive closes the file handler after the predefined period.
Expand Down
26 changes: 14 additions & 12 deletions filebeat/prospector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,24 @@ import (

var (
defaultConfig = prospectorConfig{
IgnoreOlder: 0,
ScanFrequency: 10 * time.Second,
InputType: cfg.DefaultInputType,
CleanInactive: 0,
CleanRemoved: false,
IgnoreOlder: 0,
ScanFrequency: 10 * time.Second,
InputType: cfg.DefaultInputType,
CleanInactive: 0,
CleanRemoved: false,
HarvesterLimit: 0,
}
)

type prospectorConfig struct {
ExcludeFiles []*regexp.Regexp `config:"exclude_files"`
IgnoreOlder time.Duration `config:"ignore_older"`
Paths []string `config:"paths"`
ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"`
InputType string `config:"input_type"`
CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"`
CleanRemoved bool `config:"clean_removed"`
ExcludeFiles []*regexp.Regexp `config:"exclude_files"`
IgnoreOlder time.Duration `config:"ignore_older"`
Paths []string `config:"paths"`
ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"`
InputType string `config:"input_type"`
CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"`
CleanRemoved bool `config:"clean_removed"`
HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"`
}

func (config *prospectorConfig) Validate() error {
Expand Down
34 changes: 25 additions & 9 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sync"
"time"

"sync/atomic"

cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input"
Expand All @@ -14,14 +16,15 @@ import (
)

type Prospector struct {
cfg *common.Config // Raw config
config prospectorConfig
prospectorer Prospectorer
spoolerChan chan *input.Event
harvesterChan chan *input.Event
done chan struct{}
states *file.States
wg sync.WaitGroup
cfg *common.Config // Raw config
config prospectorConfig
prospectorer Prospectorer
spoolerChan chan *input.Event
harvesterChan chan *input.Event
done chan struct{}
states *file.States
wg sync.WaitGroup
harvesterCounter uint64
}

type Prospectorer interface {
Expand Down Expand Up @@ -154,7 +157,14 @@ func (p *Prospector) createHarvester(state file.State) (*harvester.Harvester, er
return h, err
}

// startHarvester starts a new harvester with the given offset
// In case the HarvesterLimit is reached, an error is returned
func (p *Prospector) startHarvester(state file.State, offset int64) error {

if p.config.HarvesterLimit > 0 && atomic.LoadUint64(&p.harvesterCounter) >= p.config.HarvesterLimit {
return fmt.Errorf("Harvester limit reached.")
}

state.Offset = offset
// Create harvester with state
h, err := p.createHarvester(state)
Expand All @@ -163,8 +173,14 @@ func (p *Prospector) startHarvester(state file.State, offset int64) error {
}

p.wg.Add(1)
// startHarvester is not run concurrently, but atomic operations are need for the decrementing of the counter
// inside the following go routine
atomic.AddUint64(&p.harvesterCounter, 1)
go func() {
defer p.wg.Done()
defer func() {
atomic.AddUint64(&p.harvesterCounter, ^uint64(0))
p.wg.Done()
}()
// Starts harvester and picks the right type. In case type is not set, set it to defeault (log)
h.Harvest()
}()
Expand Down
6 changes: 3 additions & 3 deletions filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (p *ProspectorLog) scan() {
logp.Debug("prospector", "Start harvester for new file: %s", newState.Source)
err := p.Prospector.startHarvester(newState, 0)
if err != nil {
logp.Err("Harvester could not be started on new file: %s", err)
logp.Err("Harvester could not be started on new file: %s, Err: %s", newState.Source, err)
}
} else {
p.harvestExistingFile(newState, lastState)
Expand All @@ -182,7 +182,7 @@ func (p *ProspectorLog) harvestExistingFile(newState file.State, oldState file.S
logp.Debug("prospector", "Resuming harvesting of file: %s, offset: %v", newState.Source, oldState.Offset)
err := p.Prospector.startHarvester(newState, oldState.Offset)
if err != nil {
logp.Err("Harvester could not be started on existing file: %s", err)
logp.Err("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err)
}
return
}
Expand All @@ -192,7 +192,7 @@ func (p *ProspectorLog) harvestExistingFile(newState file.State, oldState file.S
logp.Debug("prospector", "Old file was truncated. Starting from the beginning: %s", newState.Source)
err := p.Prospector.startHarvester(newState, 0)
if err != nil {
logp.Err("Harvester could not be started on truncated file: %s", err)
logp.Err("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err)
}

filesTrucated.Add(1)
Expand Down
1 change: 1 addition & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ filebeat.prospectors:
force_close_files: {{force_close_files}}
clean_inactive: {{clean_inactive}}
clean_removed: {{clean_removed}}
harvester_limit: {{harvester_limit | default(0) }}

{% if fields %}
fields:
Expand Down
49 changes: 48 additions & 1 deletion filebeat/tests/system/test_prospector.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,10 +574,57 @@ def test_skip_symlinks(self):
lambda: self.output_has(lines=1),
max_timeout=15)

time.sleep(5)
filebeat.check_kill_and_wait()

data = self.read_output()

# Make sure there is only one entry, means it didn't follow the symlink
assert len(data) == 1

def test_harvester_limit(self):
"""
Test if harvester_limit applies
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
harvester_limit=1,
close_inactive="1s",
scan_frequency="1s",
)

os.mkdir(self.working_dir + "/log/")
testfile1 = self.working_dir + "/log/test1.log"
testfile2 = self.working_dir + "/log/test2.log"
testfile3 = self.working_dir + "/log/test3.log"

with open(testfile1, 'w') as file:
file.write("Line1\n")

with open(testfile2, 'w') as file:
file.write("Line2\n")

with open(testfile3, 'w') as file:
file.write("Line3\n")

filebeat = self.start_beat()

# check that not all harvesters were started
self.wait_until(
lambda: self.log_contains("Harvester limit reached"),
max_timeout=10)

# wait for registry to be written
self.wait_until(
lambda: self.log_contains("Registry file updated"),
max_timeout=10)

# Make sure not all events were written so far
data = self.read_output()
assert len(data) < 3

self.wait_until(lambda: self.output_has(lines=3), max_timeout=15)

data = self.read_output()
assert len(data) == 3

filebeat.check_kill_and_wait()

0 comments on commit 45202c5

Please sign in to comment.