Skip to content

Commit

Permalink
Move prospector log to its own package with log harvester
Browse files Browse the repository at this point in the history
This is the last step in reorganising the packages related to prospector and harvester. Follow up PR's will mainly focusing on abstracting out common functionality, standardise naming and have proper interfaces.

* Merge log harvester and log prospector config into one config
* Rename Log to prospector as part of the new package structure
* Move log harvester logic to log prospector package
* Keep common harvester logic in its own package
* stdin harvester still heavily depends on log harvester, needs to be split up and simplified at a later stage
* Further cleanup of Prospector interface. `Wait()` only exists as a temporary solution.
  • Loading branch information
ruflin committed May 11, 2017
1 parent 97a549a commit 676e836
Show file tree
Hide file tree
Showing 18 changed files with 268 additions and 247 deletions.
21 changes: 21 additions & 0 deletions filebeat/harvester/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,24 @@ func TestMatchAnyRegexps(t *testing.T) {
assert.Equal(t, MatchAny(matchers, "/var/log/log.gz"), true)

}

func TestExcludeLine(t *testing.T) {
regexp, err := InitMatchers("^DBG")
assert.Nil(t, err)
assert.True(t, MatchAny(regexp, "DBG: a debug message"))
assert.False(t, MatchAny(regexp, "ERR: an error message"))
}

func TestIncludeLine(t *testing.T) {
regexp, err := InitMatchers("^ERR", "^WARN")

assert.Nil(t, err)
assert.False(t, MatchAny(regexp, "DBG: a debug message"))
assert.True(t, MatchAny(regexp, "ERR: an error message"))
assert.True(t, MatchAny(regexp, "WARNING: a simple warning message"))
}

func TestInitRegexp(t *testing.T) {
_, err := InitMatchers("(((((")
assert.NotNil(t, err)
}
44 changes: 4 additions & 40 deletions filebeat/prospector/config.go
Original file line number Diff line number Diff line change
@@ -1,55 +1,19 @@
package prospector

import (
"fmt"
"time"

cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/libbeat/common/match"
)

var (
defaultConfig = prospectorConfig{
Enabled: true,
IgnoreOlder: 0,
ScanFrequency: 10 * time.Second,
InputType: cfg.DefaultInputType,
CleanInactive: 0,
CleanRemoved: true,
HarvesterLimit: 0,
Symlinks: false,
TailFiles: false,
ScanFrequency: 10 * time.Second,
InputType: cfg.DefaultInputType,
}
)

type prospectorConfig struct {
Enabled bool `config:"enabled"`
ExcludeFiles []match.Matcher `config:"exclude_files"`
IgnoreOlder time.Duration `config:"ignore_older"`
Paths []string `config:"paths"`
ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"`
InputType string `config:"input_type"`
CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"`
CleanRemoved bool `config:"clean_removed"`
HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"`
Symlinks bool `config:"symlinks"`
TailFiles bool `config:"tail_files"`
recursiveGlob bool `config:"recursive_glob.enabled"`
}

func (config *prospectorConfig) Validate() error {

if config.InputType == cfg.LogInputType && len(config.Paths) == 0 {
return fmt.Errorf("No paths were defined for prospector")
}

if config.CleanInactive != 0 && config.IgnoreOlder == 0 {
return fmt.Errorf("ignore_older must be enabled when clean_inactive is used")
}

if config.CleanInactive != 0 && config.CleanInactive <= config.IgnoreOlder+config.ScanFrequency {
return fmt.Errorf("clean_inactive must be > ignore_older + scan_frequency to make sure only files which are not monitored anymore are removed")
}

return nil
ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"`
InputType string `config:"input_type"`
}
73 changes: 57 additions & 16 deletions filebeat/harvester/config.go → filebeat/prospector/log/config.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,34 @@
package harvester
package log

import (
"fmt"
"time"

"github.com/dustin/go-humanize"
cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/reader"

"github.com/dustin/go-humanize"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/match"
"github.com/elastic/beats/libbeat/processors"
)

var (
defaultConfig = harvesterConfig{
BufferSize: 16 * humanize.KiByte,
defaultConfig = config{
// Common
InputType: cfg.DefaultInputType,
CleanInactive: 0,

// Prospector
Enabled: true,
IgnoreOlder: 0,
ScanFrequency: 10 * time.Second,
CleanRemoved: true,
HarvesterLimit: 0,
Symlinks: false,
TailFiles: false,

// Harvester
BufferSize: 16 * humanize.KiByte,
Backoff: 1 * time.Second,
BackoffFactor: 2,
MaxBackoff: 10 * time.Second,
Expand All @@ -26,15 +38,31 @@ var (
CloseRenamed: false,
CloseEOF: false,
CloseTimeout: 0,
CleanInactive: 0,
}
)

type harvesterConfig struct {
type config struct {

// Common
InputType string `config:"input_type"`
CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"`

// Prospector
Enabled bool `config:"enabled"`
ExcludeFiles []match.Matcher `config:"exclude_files"`
IgnoreOlder time.Duration `config:"ignore_older"`
Paths []string `config:"paths"`
ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"`
CleanRemoved bool `config:"clean_removed"`
HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"`
Symlinks bool `config:"symlinks"`
TailFiles bool `config:"tail_files"`
recursiveGlob bool `config:"recursive_glob.enabled"`

// Harvester
common.EventMetadata `config:",inline"` // Fields and tags to add to events.
BufferSize int `config:"harvester_buffer_size"`
Encoding string `config:"encoding"`
InputType string `config:"input_type"`
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`
BackoffFactor int `config:"backoff_factor" validate:"min=1"`
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
Expand All @@ -48,27 +76,40 @@ type harvesterConfig struct {
MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"`
Multiline *reader.MultilineConfig `config:"multiline"`
JSON *reader.JSONConfig `config:"json"`
CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"`
Pipeline string `config:"pipeline"`
Module string `config:"_module_name"` // hidden option to set the module name
Fileset string `config:"_fileset_name"` // hidden option to set the fileset name
Processors processors.PluginConfig `config:"processors"`
}

func (config *harvesterConfig) Validate() error {
func (c *config) Validate() error {

// Prospector
if c.InputType == cfg.LogInputType && len(c.Paths) == 0 {
return fmt.Errorf("No paths were defined for prospector")
}

if c.CleanInactive != 0 && c.IgnoreOlder == 0 {
return fmt.Errorf("ignore_older must be enabled when clean_inactive is used")
}

if c.CleanInactive != 0 && c.CleanInactive <= c.IgnoreOlder+c.ScanFrequency {
return fmt.Errorf("clean_inactive must be > ignore_older + scan_frequency to make sure only files which are not monitored anymore are removed")
}

// Harvester
// Check input type
if _, ok := cfg.ValidInputType[config.InputType]; !ok {
return fmt.Errorf("Invalid input type: %v", config.InputType)
if _, ok := cfg.ValidInputType[c.InputType]; !ok {
return fmt.Errorf("Invalid input type: %v", c.InputType)
}

if config.JSON != nil && len(config.JSON.MessageKey) == 0 &&
config.Multiline != nil {
if c.JSON != nil && len(c.JSON.MessageKey) == 0 &&
c.Multiline != nil {
return fmt.Errorf("When using the JSON decoder and multiline together, you need to specify a message_key value")
}

if config.JSON != nil && len(config.JSON.MessageKey) == 0 &&
(len(config.IncludeLines) > 0 || len(config.ExcludeLines) > 0) {
if c.JSON != nil && len(c.JSON.MessageKey) == 0 &&
(len(c.IncludeLines) > 0 || len(c.ExcludeLines) > 0) {
return fmt.Errorf("When using the JSON decoder and line filtering together, you need to specify a message_key value")
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// +build !integration

package prospector
package log

import (
"testing"
Expand All @@ -12,7 +12,7 @@ import (

func TestCleanOlderError(t *testing.T) {

config := prospectorConfig{
config := config{
CleanInactive: 10 * time.Hour,
}

Expand All @@ -22,7 +22,7 @@ func TestCleanOlderError(t *testing.T) {

func TestCleanOlderIgnoreOlderError(t *testing.T) {

config := prospectorConfig{
config := config{
CleanInactive: 10 * time.Hour,
IgnoreOlder: 15 * time.Hour,
}
Expand All @@ -33,7 +33,7 @@ func TestCleanOlderIgnoreOlderError(t *testing.T) {

func TestCleanOlderIgnoreOlderErrorEqual(t *testing.T) {

config := prospectorConfig{
config := config{
CleanInactive: 10 * time.Hour,
IgnoreOlder: 10 * time.Hour,
}
Expand All @@ -44,9 +44,11 @@ func TestCleanOlderIgnoreOlderErrorEqual(t *testing.T) {

func TestCleanOlderIgnoreOlder(t *testing.T) {

config := prospectorConfig{
config := config{
CleanInactive: 10*time.Hour + defaultConfig.ScanFrequency + 1*time.Second,
IgnoreOlder: 10 * time.Hour,
InputType: "log",
Paths: []string{"hello"},
}

err := config.Validate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
// line. As soon as the line is completed, it is read and returned.
//
// The stdin harvesters reads data from stdin.
package harvester
package log

import (
"errors"
Expand All @@ -18,7 +18,7 @@ import (

"github.com/satori/go.uuid"

"github.com/elastic/beats/filebeat/config"
cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/encoding"
"github.com/elastic/beats/filebeat/harvester/source"
"github.com/elastic/beats/filebeat/input/file"
Expand All @@ -43,7 +43,7 @@ type Outlet interface {
}

type Harvester struct {
config harvesterConfig
config config
state file.State
states *file.States
file source.FileSource /* the file being watched */
Expand All @@ -59,7 +59,7 @@ type Harvester struct {
}

func NewHarvester(
cfg *common.Config,
config *common.Config,
state file.State,
states *file.States,
outlet Outlet,
Expand All @@ -75,7 +75,7 @@ func NewHarvester(
ID: uuid.NewV4(),
}

if err := cfg.Unpack(&h.config); err != nil {
if err := config.Unpack(&h.config); err != nil {
return nil, err
}

Expand Down Expand Up @@ -107,12 +107,12 @@ func NewHarvester(
func (h *Harvester) open() error {

switch h.config.InputType {
case config.StdinInputType:
case cfg.StdinInputType:
return h.openStdin()
case config.LogInputType:
case cfg.LogInputType:
return h.openFile()
default:
return fmt.Errorf("Invalid input type")
return fmt.Errorf("Invalid harvester type: %+v", h.config)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
// +build !integration

package harvester
package log
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package harvester
package log

import (
"testing"
Expand Down
11 changes: 5 additions & 6 deletions filebeat/harvester/log.go → filebeat/prospector/log/log.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package harvester
package log

import (
"bytes"
Expand All @@ -8,7 +8,7 @@ import (
"os"
"time"

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/harvester/reader"
"github.com/elastic/beats/filebeat/harvester/source"
"github.com/elastic/beats/filebeat/input/file"
Expand All @@ -28,7 +28,6 @@ var (
harvesterClosed = monitoring.NewInt(harvesterMetrics, "closed")
harvesterRunning = monitoring.NewInt(harvesterMetrics, "running")
harvesterOpenFiles = monitoring.NewInt(harvesterMetrics, "open_files")
filesTruncated = monitoring.NewInt(harvesterMetrics, "files.truncated")
)

// Setup opens the file handler and creates the reader for the harvester
Expand Down Expand Up @@ -226,14 +225,14 @@ func (h *Harvester) SendStateUpdate() {
// the include_lines and exclude_lines options.
func (h *Harvester) shouldExportLine(line string) bool {
if len(h.config.IncludeLines) > 0 {
if !MatchAny(h.config.IncludeLines, line) {
if !harvester.MatchAny(h.config.IncludeLines, line) {
// drop line
logp.Debug("harvester", "Drop line as it does not match any of the include patterns %s", line)
return false
}
}
if len(h.config.ExcludeLines) > 0 {
if MatchAny(h.config.ExcludeLines, line) {
if harvester.MatchAny(h.config.ExcludeLines, line) {
// drop line
logp.Debug("harvester", "Drop line as it does match one of the exclude patterns%s", line)
return false
Expand Down Expand Up @@ -324,7 +323,7 @@ func (h *Harvester) initFileOffset(file *os.File) (int64, error) {
// getState returns an updated copy of the harvester state
func (h *Harvester) getState() file.State {

if h.config.InputType == config.StdinInputType {
if !h.file.HasState() {
return file.State{}
}

Expand Down
Loading

0 comments on commit 676e836

Please sign in to comment.