Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.0](backport #30534) [libbeat | filebeat] Log error when parsing config block and disabled input on filebeat #30654

Merged
merged 4 commits into from
Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif
- Fix a logging bug when `ssl.verification_mode` was set to `full` or `certificate`, the command `test output` incorrectly logged that TLS was disabled.
- Fix the ability for subcommands to be ran properly from the beats containers. {pull}30452[30452]
- Update docker/distribution dependency library to fix a security issues concerning OCI Manifest Type Confusion Issue. {pull}30462[30462]
- Log errors when parsing and applying config blocks and if the input is disabled. {pull}30534[30534]

*Auditbeat*

Expand Down
31 changes: 19 additions & 12 deletions filebeat/beater/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,30 +68,28 @@ func (c *crawler) Start(
) error {
log := c.log

log.Infof("Loading Inputs: %v", len(c.inputConfigs))
log.Infof("Loading Inputs: %d", len(c.inputConfigs))

// Prospect the globs/paths given on the command line and launch harvesters
for _, inputConfig := range c.inputConfigs {
err := c.startInput(pipeline, inputConfig)
if err != nil {
return fmt.Errorf("starting input failed: %+v", err)
return fmt.Errorf("starting input failed: %w", err)
}
}

if configInputs.Enabled() {
c.inputReloader = cfgfile.NewReloader(pipeline, configInputs)
if err := c.inputReloader.Check(c.inputsFactory); err != nil {
return fmt.Errorf("creating input reloader failed: %+v", err)
return fmt.Errorf("creating input reloader failed: %w", err)
}

}

if configModules.Enabled() {
c.modulesReloader = cfgfile.NewReloader(pipeline, configModules)
if err := c.modulesReloader.Check(c.modulesFactory); err != nil {
return fmt.Errorf("creating module reloader failed: %+v", err)
return fmt.Errorf("creating module reloader failed: %w", err)
}

}

if c.inputReloader != nil {
Expand All @@ -105,7 +103,7 @@ func (c *crawler) Start(
}()
}

log.Infof("Loading and starting Inputs completed. Enabled inputs: %v", len(c.inputs))
log.Infof("Loading and starting Inputs completed. Enabled inputs: %d", len(c.inputs))

return nil
}
Expand All @@ -114,23 +112,32 @@ func (c *crawler) startInput(
pipeline beat.PipelineConnector,
config *common.Config,
) error {
// TODO: Either use debug or remove it after https://github.com/elastic/beats/pull/30534
// is fixed.
c.log.Infof("starting input, keys present on the config: %v",
config.FlattenedKeys())

if !config.Enabled() {
c.log.Infof("input disabled, skipping it")
return nil
}

var h map[string]interface{}
config.Unpack(&h)
err := config.Unpack(&h)
if err != nil {
return fmt.Errorf("could not unpack config: %w", err)
}
id, err := hashstructure.Hash(h, nil)
if err != nil {
return fmt.Errorf("can not compute id from configuration: %v", err)
return fmt.Errorf("can not compute id from configuration: %w", err)
}
if _, ok := c.inputs[id]; ok {
return fmt.Errorf("input with same ID already exists: %v", id)
return fmt.Errorf("input with same ID already exists: %d", id)
}

runner, err := c.inputsFactory.Create(pipeline, config)
if err != nil {
return fmt.Errorf("Error while initializing input: %+v", err)
return fmt.Errorf("error while initializing input: %w", err)
}
if inputRunner, ok := runner.(*input.Runner); ok {
inputRunner.Once = c.once
Expand All @@ -155,7 +162,7 @@ func (c *crawler) Stop() {
}()
}

logp.Info("Stopping %v inputs", len(c.inputs))
logp.Info("Stopping %d inputs", len(c.inputs))
// Stop inputs in parallel
for id, p := range c.inputs {
id, p := id, p
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func (h *Harvester) Run() error {
}
}(h.state.Source)

logger.Info("Harvester started for file.")
logger.Infof("Harvester started for paths: %v", h.config.Paths)

h.doneWg.Add(1)
go func() {
Expand Down
10 changes: 6 additions & 4 deletions filebeat/tests/system/test_stdin.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ def test_stdin(self):

proc = self.start_beat()

msg = "Harvester started"
self.wait_until(
lambda: self.log_contains(
"Harvester started for file."),
max_timeout=10)
lambda: self.log_contains(msg),
max_timeout=10,
err_msg=f"did not find '{msg}' in the logs")

iterations1 = 5
for n in range(0, iterations1):
Expand Down Expand Up @@ -102,4 +103,5 @@ def test_stdin_is_exclusive(self):

filebeat = self.start_beat()
filebeat.check_wait(exit_code=1)
assert self.log_contains("Exiting: stdin requires to be run in exclusive mode, configured inputs: stdin, udp")
msg = "Exiting: stdin requires to be run in exclusive mode, configured inputs: stdin, udp"
assert self.log_contains(msg), f"did not find '{msg}' in the logs"
6 changes: 3 additions & 3 deletions libbeat/tests/system/beat/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ def setUp(self):
# running tests in parallel
pass

def wait_until(self, cond, max_timeout=10, poll_interval=0.1, name="cond"):
def wait_until(self, cond, max_timeout=10, poll_interval=0.1, name="cond", err_msg=""):
"""
Waits until the cond function returns true,
or until the max_timeout is reached. Calls the cond
Expand All @@ -360,8 +360,8 @@ def wait_until(self, cond, max_timeout=10, poll_interval=0.1, name="cond"):
start = datetime.now()
while not cond():
if datetime.now() - start > timedelta(seconds=max_timeout):
raise TimeoutError("Timeout waiting for '{}' to be true. ".format(name) +
"Waited {} seconds.".format(max_timeout))
raise WaitTimeoutError(
f"Timeout waiting for condition '{name}'. Waited {max_timeout} seconds: {err_msg}")
time.sleep(poll_interval)

def get_log(self, logfile=None):
Expand Down
6 changes: 4 additions & 2 deletions x-pack/libbeat/management/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func (cm *Manager) OnConfig(s string) {
if errs := cm.apply(blocks); errs != nil {
// `cm.apply` already logs the errors; currently allow beat to run degraded
cm.updateStatusWithError(err)
cm.logger.Errorf("failed applying config blocks: %v", err)
return
}

Expand Down Expand Up @@ -256,8 +257,8 @@ func (cm *Manager) apply(blocks ConfigBlocks) error {
}

// Unset missing configs
for name := range missing {
if missing[name] {
for name, isMissing := range missing {
if isMissing {
if err := cm.reload(name, []*ConfigBlock{}); err != nil {
errors = multierror.Append(errors, err)
}
Expand Down Expand Up @@ -319,6 +320,7 @@ func (cm *Manager) toConfigBlocks(cfg common.MapStr) (ConfigBlocks, error) {
for _, regName := range cm.registry.GetRegisteredNames() {
iBlock, err := cfg.GetValue(regName)
if err != nil {
cm.logger.Errorf("failed to get '%s' from config: %v. Continuing to next one", regName, err)
continue
}

Expand Down