Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add structured logging to logs input #25299

Merged
merged 6 commits into from
Apr 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Possible values for Netflow's locality fields (source.locality, destination.locality and flow.locality) are now `internal` and `external`, instead of `private` and `public`. {issue}24272[24272] {pull}24295[24295]
- Add User Agent Parser for Azure Sign In Logs Ingest Pipeline {pull}23201[23201]
- Changes filebeat httpjson input's append transform to create a list even with only a single value{pull}25074[25074]
- Change logging in logs input to structure logging. Some log message formats have changed. {pull}25299[25299]
- All url.* fields apart from url.original in the Apache, Nginx, IIS, Traefik, S3Access, Cisco, F5, Fortinet, Google Workspace, Imperva, Microsoft, Netscout, O365, Sophos, Squid, Suricata, Zeek, Zia, Zoom, and ZScaler modules are now url unescaped due to using the Elasticsearch uri_parts processor. {pull}24699[24699]

*Heartbeat*
Expand Down
61 changes: 35 additions & 26 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ type OutletFactory func() channel.Outleter

// Harvester contains all harvester related data
type Harvester struct {
logger *logp.Logger

id uuid.UUID
config config
source harvester.Source // the source being watched
Expand Down Expand Up @@ -120,6 +122,7 @@ type harvesterProgressMetrics struct {

// NewHarvester creates a new harvester
func NewHarvester(
logger *logp.Logger,
config *common.Config,
state file.State,
states *file.States,
Expand All @@ -132,7 +135,9 @@ func NewHarvester(
return nil, err
}

logger = logger.Named("harvester").With("harvester_id", id)
h := &Harvester{
logger: logger,
config: defaultConfig(),
state: state,
states: states,
Expand Down Expand Up @@ -204,7 +209,7 @@ func (h *Harvester) Setup() error {
return err
}

logp.Debug("harvester", "Harvester setup successful. Line terminator: %d", h.config.LineTerminator)
h.logger.Debugf("Harvester setup successful. Line terminator: %d", h.config.LineTerminator)

return nil
}
Expand Down Expand Up @@ -234,6 +239,8 @@ func (h *Harvester) updateCurrentSize() error {

// Run start the harvester and reads files line by line and sends events to the defined output
func (h *Harvester) Run() error {
logger := h.logger

// Allow for some cleanup on termination
if h.onTerminate != nil {
defer h.onTerminate()
Expand Down Expand Up @@ -287,19 +294,19 @@ func (h *Harvester) Run() error {
select {
// Applies when timeout is reached
case <-closeTimeout:
logp.Info("Closing harvester because close_timeout was reached: %s", source)
logger.Infof("Closing harvester because close_timeout was reached: %s", source)
// Required when reader loop returns and reader finished
case <-h.done:
}

h.stop()
err := h.reader.Close()
if err != nil {
logp.Err("Failed to stop harvester for file %s: %v", h.state.Source, err)
logger.Errorf("Failed to stop harvester for file: %v", err)
}
}(h.state.Source)

logp.Info("Harvester started for file: %s", h.state.Source)
logger.Info("Harvester started for file.")

h.doneWg.Add(1)
go func() {
Expand All @@ -318,21 +325,21 @@ func (h *Harvester) Run() error {
if err != nil {
switch err {
case ErrFileTruncate:
logp.Info("File was truncated. Begin reading file from offset 0: %s", h.state.Source)
logger.Info("File was truncated. Begin reading file from offset 0.")
h.state.Offset = 0
filesTruncated.Add(1)
case ErrRemoved:
logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.state.Source)
logger.Info("File was removed. Closing because close_removed is enabled.")
case ErrRenamed:
logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.state.Source)
logger.Info("File was renamed. Closing because close_renamed is enabled.")
case ErrClosed:
logp.Info("Reader was closed: %s. Closing.", h.state.Source)
logger.Info("Reader was closed. Closing.")
case io.EOF:
logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source)
logger.Info("End of file reached. Closing because close_eof is enabled.")
case ErrInactive:
logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive)
logger.Infof("File is inactive. Closing because close_inactive of %v reached.", h.config.CloseInactive)
default:
logp.Err("Read line error: %v; File: %v", err, h.state.Source)
logger.Errorf("Read line error: %v", err)
}
return nil
}
Expand Down Expand Up @@ -370,7 +377,7 @@ func (h *Harvester) monitorFileSize() {
case <-ticker.C:
err := h.updateCurrentSize()
if err != nil {
logp.Err("Error updating file size: %v; File: %v", err, h.state.Source)
h.logger.Errorf("Error updating file size: %v", err)
}
}
}
Expand Down Expand Up @@ -481,7 +488,7 @@ func (h *Harvester) SendStateUpdate() {

h.publishState(h.state)

logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset)
h.logger.Debugf("Update state (offset: %v).", h.state.Offset)
h.states.Update(h.state)
}

Expand All @@ -491,14 +498,14 @@ func (h *Harvester) shouldExportLine(line string) bool {
if len(h.config.IncludeLines) > 0 {
if !harvester.MatchAny(h.config.IncludeLines, line) {
// drop line
logp.Debug("harvester", "Drop line as it does not match any of the include patterns %s", line)
h.logger.Debugf("Drop line as it does not match any of the include patterns %s", line)
return false
}
}
if len(h.config.ExcludeLines) > 0 {
if harvester.MatchAny(h.config.ExcludeLines, line) {
// drop line
logp.Debug("harvester", "Drop line as it does match one of the exclude patterns%s", line)
h.logger.Debugf("Drop line as it does match one of the exclude patterns%s", line)
return false
}
}
Expand Down Expand Up @@ -539,6 +546,8 @@ func (h *Harvester) openFile() error {
}

func (h *Harvester) validateFile(f *os.File) error {
logger := h.logger

info, err := f.Stat()
if err != nil {
return fmt.Errorf("Failed getting stats for file %s: %s", h.state.Source, err)
Expand All @@ -557,9 +566,9 @@ func (h *Harvester) validateFile(f *os.File) error {
if err != nil {

if err == transform.ErrShortSrc {
logp.Info("Initialising encoding for '%v' failed due to file being too short", f)
logger.Infof("Initialising encoding for '%v' failed due to file being too short", f)
} else {
logp.Err("Initialising encoding for '%v' failed: %v", f, err)
logger.Errorf("Initialising encoding for '%v' failed: %v", f, err)
}
return err
}
Expand All @@ -570,7 +579,7 @@ func (h *Harvester) validateFile(f *os.File) error {
return err
}

logp.Debug("harvester", "Setting offset for file: %s. Offset: %d ", h.state.Source, offset)
logger.Debugf("Setting offset: %d ", offset)
h.state.Offset = offset

return nil
Expand All @@ -579,12 +588,12 @@ func (h *Harvester) validateFile(f *os.File) error {
func (h *Harvester) initFileOffset(file *os.File) (int64, error) {
// continue from last known offset
if h.state.Offset > 0 {
logp.Debug("harvester", "Set previous offset for file: %s. Offset: %d ", h.state.Source, h.state.Offset)
h.logger.Debugf("Set previous offset: %d ", h.state.Offset)
return file.Seek(h.state.Offset, os.SEEK_SET)
}

// get offset from file in case of encoding factory was required to read some data.
logp.Debug("harvester", "Setting offset for file based on seek: %s", h.state.Source)
h.logger.Debug("Setting offset to: 0")
return file.Seek(0, os.SEEK_CUR)
}

Expand All @@ -605,8 +614,8 @@ func (h *Harvester) cleanup() {
// Mark harvester as finished
h.state.Finished = true

logp.Debug("harvester", "Stopping harvester for file: %s", h.state.Source)
defer logp.Debug("harvester", "harvester cleanup finished for file: %s", h.state.Source)
h.logger.Debugf("Stopping harvester.")
defer h.logger.Debugf("harvester cleanup finished.")

// Make sure file is closed as soon as harvester exits
// If file was never opened, it can't be closed
Expand All @@ -615,14 +624,14 @@ func (h *Harvester) cleanup() {
// close file handler
h.source.Close()

logp.Debug("harvester", "Closing file: %s", h.state.Source)
h.logger.Debugf("Closing file")
harvesterOpenFiles.Add(-1)

// On completion, push offset so we can continue where we left off if we relaunch on the same file
// Only send offset if file object was created successfully
h.SendStateUpdate()
} else {
logp.Warn("Stopping harvester, NOT closing file as file info not available: %s", h.state.Source)
h.logger.Warn("Stopping harvester, NOT closing file as file info not available.")
}

harvesterClosed.Add(1)
Expand All @@ -645,12 +654,12 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
var r reader.Reader
var err error

logp.Debug("harvester", "newLogFileReader with config.MaxBytes: %d", h.config.MaxBytes)
h.logger.Debugf("newLogFileReader with config.MaxBytes: %d", h.config.MaxBytes)

// TODO: NewLineReader uses additional buffering to deal with encoding and testing
// for new lines in input stream. Simple 8-bit based encodings, or plain
// don't require 'complicated' logic.
h.log, err = NewLog(h.source, h.config.LogConfig)
h.log, err = NewLog(h.logger, h.source, h.config.LogConfig)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions filebeat/input/log/harvester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
"github.com/elastic/beats/v7/libbeat/reader/readfile/encoding"
Expand Down Expand Up @@ -75,6 +76,7 @@ func TestReadLine(t *testing.T) {
source := File{File: readFile}

h := Harvester{
logger: logp.NewLogger("harvester"),
config: config{
LogConfig: LogConfig{
CloseInactive: 500 * time.Millisecond,
Expand Down
Loading