diff --git a/.buildkite/filebeat/filebeat-pipeline.yml b/.buildkite/filebeat/filebeat-pipeline.yml index b5aa15f6605..2c9fe05b3b2 100644 --- a/.buildkite/filebeat/filebeat-pipeline.yml +++ b/.buildkite/filebeat/filebeat-pipeline.yml @@ -105,6 +105,9 @@ steps: artifact_paths: - "filebeat/build/*.xml" - "filebeat/build/*.json" + - "filebeat/build/integration-tests/*" + - "filebeat/build/integration-tests/Test*/*" + - "filebeat/build/integration-tests/Test*/data/**/*" notify: - github_commit_status: context: "filebeat: Go Integration Tests" diff --git a/filebeat/input/journald/config.go b/filebeat/input/journald/config.go index d354baaacf5..56f9388f503 100644 --- a/filebeat/input/journald/config.go +++ b/filebeat/input/journald/config.go @@ -38,6 +38,9 @@ var includeMatchesWarnOnce sync.Once // Config stores the options of a journald input. type config struct { + // ID is the input ID, each instance must have a unique ID + ID string `config:"id"` + // Paths stores the paths to the journal files to be read. Paths []string `config:"paths"` diff --git a/filebeat/input/journald/input.go b/filebeat/input/journald/input.go index 20e46bd0cc2..0ab3c548177 100644 --- a/filebeat/input/journald/input.go +++ b/filebeat/input/journald/input.go @@ -42,6 +42,7 @@ type journalReader interface { } type journald struct { + ID string Backoff time.Duration MaxBackoff time.Duration Since time.Duration @@ -108,6 +109,7 @@ func Configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) { } return sources, &journald{ + ID: config.ID, Since: config.Since, Seek: config.Seek, Matches: journalfield.IncludeMatches(config.Matches), @@ -124,7 +126,7 @@ func (inp *journald) Name() string { return pluginName } func (inp *journald) Test(src cursor.Source, ctx input.TestContext) error { reader, err := journalctl.New( - ctx.Logger, + ctx.Logger.With("input_id", inp.ID), ctx.Cancelation, inp.Units, inp.Identifiers, @@ -149,7 +151,9 @@ func (inp *journald) Run( cursor cursor.Cursor, publisher cursor.Publisher, ) error { - logger := ctx.Logger.With("path", src.Name()) + logger := ctx.Logger. + With("path", src.Name()). + With("input_id", inp.ID) currentCheckpoint := initCheckpoint(logger, cursor) mode := inp.Seek diff --git a/filebeat/input/journald/pkg/journalctl/jctlmock_test.go b/filebeat/input/journald/pkg/journalctl/jctlmock_test.go index 4f113d36f10..9fed391de5e 100644 --- a/filebeat/input/journald/pkg/journalctl/jctlmock_test.go +++ b/filebeat/input/journald/pkg/journalctl/jctlmock_test.go @@ -18,6 +18,8 @@ // Code generated by moq; DO NOT EDIT. // github.com/matryer/moq +//go:build linux + package journalctl import ( diff --git a/filebeat/input/journald/pkg/journalctl/journalctl.go b/filebeat/input/journald/pkg/journalctl/journalctl.go index c0c21332965..b015b896e3d 100644 --- a/filebeat/input/journald/pkg/journalctl/journalctl.go +++ b/filebeat/input/journald/pkg/journalctl/journalctl.go @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//go:build linux + package journalctl import ( @@ -22,6 +24,7 @@ import ( "errors" "fmt" "io" + "io/fs" "os/exec" "strings" "sync" @@ -97,7 +100,31 @@ func Factory(canceller input.Canceler, logger *logp.Logger, binary string, args data, err := reader.ReadBytes('\n') if err != nil { if !errors.Is(err, io.EOF) { - logger.Errorf("cannot read from journalctl stdout: '%s'", err) + var logError = false + var pathError *fs.PathError + if errors.As(err, &pathError) { + // Because we're reading from the stdout from a process that will + // eventually exit, it can happen that when reading we get the + // fs.PathError below instead of an io.EOF. This is expected, + // it only means the process has exited, its stdout has been + // closed and there is nothing else for us to read. + // This is expected and does not cause any data loss. + // So we log at level debug to have it in our logs if ever needed + // while avoiding adding error level logs on user's deployments + // for situations that are well handled. + if pathError.Op == "read" && + pathError.Path == "|0" && + pathError.Err.Error() == "file already closed" { + logger.Debugf("cannot read from journalctl stdout: '%s'", err) + } else { + logError = true + } + } else { + logError = true + } + if logError { + logger.Errorf("cannot read from journalctl stdout: '%s'", err) + } } return } diff --git a/filebeat/input/journald/pkg/journalctl/mode.go b/filebeat/input/journald/pkg/journalctl/mode.go index 5f0c60386b2..ac61bb55458 100644 --- a/filebeat/input/journald/pkg/journalctl/mode.go +++ b/filebeat/input/journald/pkg/journalctl/mode.go @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//go:build linux + package journalctl import "fmt" diff --git a/filebeat/input/journald/pkg/journalctl/mode_test.go b/filebeat/input/journald/pkg/journalctl/mode_test.go index 9e63a3169d0..545ff08207f 100644 --- a/filebeat/input/journald/pkg/journalctl/mode_test.go +++ b/filebeat/input/journald/pkg/journalctl/mode_test.go @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//go:build linux + package journalctl import ( diff --git a/filebeat/input/journald/pkg/journalctl/reader.go b/filebeat/input/journald/pkg/journalctl/reader.go index 5e8ef54f543..c654a17dfdf 100644 --- a/filebeat/input/journald/pkg/journalctl/reader.go +++ b/filebeat/input/journald/pkg/journalctl/reader.go @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//go:build linux + package journalctl import ( @@ -113,9 +115,8 @@ type Reader struct { jctl Jctl jctlFactory JctlFactory - backoff backoff.Backoff - seekMode SeekMode - state readerState + backoff backoff.Backoff + state readerState } // handleSeekAndCursor returns the correct arguments for seek and cursor. @@ -311,7 +312,7 @@ func (r *Reader) Next(cancel input.Canceler) (JournalEntry, error) { // We recreate the backoff so r.backoff.Last().IsZero() // will return true next time it's called making us to // wait in case jouranlctl crashes in less than 5s. - if !r.backoff.Last().IsZero() && time.Now().Sub(r.backoff.Last()) > 5*time.Second { + if !r.backoff.Last().IsZero() && time.Since(r.backoff.Last()) > 5*time.Second { r.backoff = backoff.NewExpBackoff(cancel.Done(), 100*time.Millisecond, 2*time.Second) } else { r.backoff.Wait() diff --git a/filebeat/input/journald/pkg/journalctl/reader_test.go b/filebeat/input/journald/pkg/journalctl/reader_test.go index f1c5f3bf4bc..4690d9cdf83 100644 --- a/filebeat/input/journald/pkg/journalctl/reader_test.go +++ b/filebeat/input/journald/pkg/journalctl/reader_test.go @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//go:build linux + package journalctl import ( diff --git a/filebeat/input/journald/pkg/journalfield/conv.go b/filebeat/input/journald/pkg/journalfield/conv.go index a7a6994c0bb..4c7575114d2 100644 --- a/filebeat/input/journald/pkg/journalfield/conv.go +++ b/filebeat/input/journald/pkg/journalfield/conv.go @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//go:build linux + package journalfield import ( diff --git a/filebeat/input/journald/pkg/journalfield/default_other.go b/filebeat/input/journald/pkg/journalfield/default_other.go deleted file mode 100644 index 1d645e162a0..00000000000 --- a/filebeat/input/journald/pkg/journalfield/default_other.go +++ /dev/null @@ -1,46 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//go:build !linux - -package journalfield - -// journaldEventFields provides default field mappings and conversions rules. -var journaldEventFields = FieldConversion{ - // provided by systemd journal - "COREDUMP_UNIT": text("journald.coredump.unit"), - "COREDUMP_USER_UNIT": text("journald.coredump.user_unit"), - "OBJECT_AUDIT_LOGINUID": integer("journald.object.audit.login_uid"), - "OBJECT_AUDIT_SESSION": integer("journald.object.audit.session"), - "OBJECT_CMDLINE": text("journald.object.process.command_line"), - "OBJECT_COMM": text("journald.object.process.name"), - "OBJECT_EXE": text("journald.object.process.executable"), - "OBJECT_GID": integer("journald.object.gid"), - "OBJECT_PID": integer("journald.object.pid"), - "OBJECT_SYSTEMD_OWNER_UID": integer("journald.object.systemd.owner_uid"), - "OBJECT_SYSTEMD_SESSION": text("journald.object.systemd.session"), - "OBJECT_SYSTEMD_UNIT": text("journald.object.systemd.unit"), - "OBJECT_SYSTEMD_USER_UNIT": text("journald.object.systemd.user_unit"), - "OBJECT_UID": integer("journald.object.uid"), - "_KERNEL_DEVICE": text("journald.kernel.device"), - "_KERNEL_SUBSYSTEM": text("journald.kernel.subsystem"), - "_SYSTEMD_INVOCATION_ID": text("systemd.invocation_id"), - "_SYSTEMD_USER_SLICE": text("systemd.user_slice"), - "_UDEV_DEVLINK": text("journald.kernel.device_symlinks"), - "_UDEV_DEVNODE": text("journald.kernel.device_node_path"), - "_UDEV_SYSNAME": text("journald.kernel.device_name"), -} diff --git a/filebeat/input/journald/pkg/journalfield/matcher.go b/filebeat/input/journald/pkg/journalfield/matcher.go index 07d4e6ba753..8f44579f263 100644 --- a/filebeat/input/journald/pkg/journalfield/matcher.go +++ b/filebeat/input/journald/pkg/journalfield/matcher.go @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//go:build linux + package journalfield import ( diff --git a/filebeat/input/systemlogs/input.go b/filebeat/input/systemlogs/input.go index 27570587110..eadc5a8565a 100644 --- a/filebeat/input/systemlogs/input.go +++ b/filebeat/input/systemlogs/input.go @@ -172,47 +172,6 @@ func useJournald(c *conf.C) (bool, error) { return true, nil } -func toJournaldConfig(cfg *conf.C) (*conf.C, error) { //nolint:unused // It's used on Linux - newCfg, err := cfg.Child("journald", -1) - if err != nil { - return nil, fmt.Errorf("cannot extract 'journald' block: %w", err) - } - - if _, err := cfg.Remove("journald", -1); err != nil { - return nil, err - } - - if _, err := cfg.Remove("type", -1); err != nil { - return nil, err - } - - if _, err := cfg.Remove("files", -1); err != nil { - return nil, err - } - - if _, err := cfg.Remove("use_journald", -1); err != nil { - return nil, err - } - - if _, err := cfg.Remove("use_files", -1); err != nil { - return nil, err - } - - if err := newCfg.Merge(cfg); err != nil { - return nil, err - } - - if err := newCfg.SetString("type", -1, "journald"); err != nil { - return nil, fmt.Errorf("cannot set 'type': %w", err) - } - - if err := cfg.SetString("type", -1, pluginName); err != nil { - return nil, fmt.Errorf("cannot set type back to '%s': %w", pluginName, err) - } - - return newCfg, nil -} - func toFilesConfig(cfg *conf.C) (*conf.C, error) { newCfg, err := cfg.Child("files", -1) if err != nil { diff --git a/filebeat/input/systemlogs/input_linux.go b/filebeat/input/systemlogs/input_linux.go index 5a98c270b97..98a59361c0b 100644 --- a/filebeat/input/systemlogs/input_linux.go +++ b/filebeat/input/systemlogs/input_linux.go @@ -47,3 +47,44 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) { return journald.Configure(journaldCfg) } + +func toJournaldConfig(cfg *conf.C) (*conf.C, error) { + newCfg, err := cfg.Child("journald", -1) + if err != nil { + return nil, fmt.Errorf("cannot extract 'journald' block: %w", err) + } + + if _, err := cfg.Remove("journald", -1); err != nil { + return nil, err + } + + if _, err := cfg.Remove("type", -1); err != nil { + return nil, err + } + + if _, err := cfg.Remove("files", -1); err != nil { + return nil, err + } + + if _, err := cfg.Remove("use_journald", -1); err != nil { + return nil, err + } + + if _, err := cfg.Remove("use_files", -1); err != nil { + return nil, err + } + + if err := newCfg.Merge(cfg); err != nil { + return nil, err + } + + if err := newCfg.SetString("type", -1, "journald"); err != nil { + return nil, fmt.Errorf("cannot set 'type': %w", err) + } + + if err := cfg.SetString("type", -1, pluginName); err != nil { + return nil, fmt.Errorf("cannot set type back to '%s': %w", pluginName, err) + } + + return newCfg, nil +} diff --git a/filebeat/tests/integration/systemlogs_all_test.go b/filebeat/tests/integration/systemlogs_all_test.go new file mode 100644 index 00000000000..cbc0c50c129 --- /dev/null +++ b/filebeat/tests/integration/systemlogs_all_test.go @@ -0,0 +1,126 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration + +import ( + "bufio" + _ "embed" + "encoding/json" + "errors" + "io" + "os" + "path/filepath" + "testing" + "time" + + cp "github.com/otiai10/copy" + "github.com/stretchr/testify/require" +) + +//go:embed testdata/filebeat_system_module.yml +var systemModuleCfg string + +func copyModulesDir(t *testing.T, dst string) { + pwd, err := os.Getwd() + if err != nil { + t.Fatalf("cannot get the current directory: %s", err) + } + localModules := filepath.Join(pwd, "../", "../", "module") + localModulesD := filepath.Join(pwd, "../", "../", "modules.d") + + if err := cp.Copy(localModules, filepath.Join(dst, "module")); err != nil { + t.Fatalf("cannot copy 'module' folder to test folder: %s", err) + } + if err := cp.Copy(localModulesD, filepath.Join(dst, "modules.d")); err != nil { + t.Fatalf("cannot copy 'modules.d' folder to test folder: %s", err) + } +} + +//nolint:unused,nolintlint // necessary on Linux +func waitForAllFilesets(t *testing.T, outputGlob string, msgAndArgs ...any) { + require.Eventually( + t, + findFilesetNames(t, outputGlob), + time.Minute, + 10*time.Millisecond, + msgAndArgs...) +} + +//nolint:unused,nolintlint // necessary on Linux +func findFilesetNames(t *testing.T, outputGlob string) func() bool { + f := func() bool { + files, err := filepath.Glob(outputGlob) + if err != nil { + t.Fatalf("cannot get files list for glob '%s': '%s'", outputGlob, err) + } + + if len(files) > 1 { + t.Fatalf( + "only a single output file is supported, found: %d. Files: %s", + len(files), + files, + ) + } + + foundSyslog := false + foundAuth := false + + file, err := os.Open(files[0]) + if err != nil { + t.Fatalf("cannot open '%s': '%s'", files[0], err) + } + defer file.Close() + + r := bufio.NewReader(file) + for { + line, err := r.ReadBytes('\n') + if err != nil { + if errors.Is(err, io.EOF) { + break + } else { + t.Fatalf("cannot read '%s': '%s", file.Name(), err) + } + } + + data := struct { + Fileset struct { + Name string `json:"name"` + } `json:"fileset"` + }{} + + if err := json.Unmarshal(line, &data); err != nil { + t.Fatalf("cannot parse output line as JSON: %s", err) + } + + switch data.Fileset.Name { + case "syslog": + foundSyslog = true + case "auth": + foundAuth = true + } + + if foundAuth && foundSyslog { + return true + } + } + + return false + } + + return f +} diff --git a/filebeat/tests/integration/systemlogs_test.go b/filebeat/tests/integration/systemlogs_linux_test.go similarity index 55% rename from filebeat/tests/integration/systemlogs_test.go rename to filebeat/tests/integration/systemlogs_linux_test.go index 8cd390a0019..88af84734af 100644 --- a/filebeat/tests/integration/systemlogs_test.go +++ b/filebeat/tests/integration/systemlogs_linux_test.go @@ -20,22 +20,14 @@ package integration import ( - _ "embed" "fmt" - "os" - "path" "path/filepath" "testing" "time" - cp "github.com/otiai10/copy" - "github.com/elastic/beats/v7/libbeat/tests/integration" ) -//go:embed testdata/filebeat_system_module.yml -var systemModuleCfg string - // TestSystemLogsCanUseJournald aims to ensure the system-logs input can // correctly choose and start a journald input when the globs defined in // var.paths do not resolve to any file. @@ -51,7 +43,7 @@ func TestSystemModuleCanUseJournaldInput(t *testing.T) { // As the name says, we want this folder to exist bu t be empty globWithoutFiles := filepath.Join(filebeat.TempDir(), "this-folder-does-not-exist") - yamlCfg := fmt.Sprintf(systemModuleCfg, globWithoutFiles, workDir) + yamlCfg := fmt.Sprintf(systemModuleCfg, globWithoutFiles, globWithoutFiles, workDir) filebeat.WriteConfigFile(yamlCfg) filebeat.Start() @@ -64,47 +56,12 @@ func TestSystemModuleCanUseJournaldInput(t *testing.T) { "journalctl started with PID", 10*time.Second, "system-logs did not start journald input") -} -func TestSystemLogsCanUseLogInput(t *testing.T) { - t.Skip("The system module is not using the system-logs input at the moment") - filebeat := integration.NewBeat( + // Scan every event in the output until at least one from + // each fileset (auth, syslog) is found. + waitForAllFilesets( t, - "filebeat", - "../../filebeat.test", + filepath.Join(workDir, "output*.ndjson"), + "did not find events from both filesets: 'auth' and 'syslog'", ) - workDir := filebeat.TempDir() - copyModulesDir(t, workDir) - - logFilePath := path.Join(workDir, "syslog") - integration.GenerateLogFile(t, logFilePath, 5, false) - yamlCfg := fmt.Sprintf(systemModuleCfg, logFilePath, workDir) - - filebeat.WriteConfigFile(yamlCfg) - filebeat.Start() - - filebeat.WaitForLogs( - "using log input because file(s) was(were) found", - 10*time.Second, - "system-logs did not select the log input") - filebeat.WaitForLogs( - "Harvester started for paths:", - 10*time.Second, - "system-logs did not start the log input") -} - -func copyModulesDir(t *testing.T, dst string) { - pwd, err := os.Getwd() - if err != nil { - t.Fatalf("cannot get the current directory: %s", err) - } - localModules := filepath.Join(pwd, "../", "../", "module") - localModulesD := filepath.Join(pwd, "../", "../", "modules.d") - - if err := cp.Copy(localModules, filepath.Join(dst, "module")); err != nil { - t.Fatalf("cannot copy 'module' folder to test folder: %s", err) - } - if err := cp.Copy(localModulesD, filepath.Join(dst, "modules.d")); err != nil { - t.Fatalf("cannot copy 'modules.d' folder to test folder: %s", err) - } } diff --git a/filebeat/tests/integration/systemlogs_other_test.go b/filebeat/tests/integration/systemlogs_other_test.go new file mode 100644 index 00000000000..42fc61b426d --- /dev/null +++ b/filebeat/tests/integration/systemlogs_other_test.go @@ -0,0 +1,56 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build integration + +package integration + +import ( + "fmt" + "path" + "testing" + "time" + + "github.com/elastic/beats/v7/libbeat/tests/integration" +) + +func TestSystemLogsCanUseLogInput(t *testing.T) { + t.Skip("The system module is not using the system-logs input at the moment") + filebeat := integration.NewBeat( + t, + "filebeat", + "../../filebeat.test", + ) + workDir := filebeat.TempDir() + copyModulesDir(t, workDir) + + logFilePath := path.Join(workDir, "syslog") + integration.GenerateLogFile(t, logFilePath, 5, false) + yamlCfg := fmt.Sprintf(systemModuleCfg, logFilePath, logFilePath, workDir) + + filebeat.WriteConfigFile(yamlCfg) + filebeat.Start() + + filebeat.WaitForLogs( + "using log input because file(s) was(were) found", + 10*time.Second, + "system-logs did not select the log input") + filebeat.WaitForLogs( + "Harvester started for paths:", + 10*time.Second, + "system-logs did not start the log input") +} diff --git a/filebeat/tests/integration/testdata/filebeat_system_module.yml b/filebeat/tests/integration/testdata/filebeat_system_module.yml index 27de8f2a414..d781aa1590a 100644 --- a/filebeat/tests/integration/testdata/filebeat_system_module.yml +++ b/filebeat/tests/integration/testdata/filebeat_system_module.yml @@ -4,6 +4,10 @@ filebeat.modules: enabled: true var.paths: - "%s" + auth: + enabled: true + var.paths: + - "%s" path.home: %s @@ -14,3 +18,12 @@ output: file: path: ${path.home} filename: "output" + rotate_every_kb: 500000 # 500mb + +logging: + level: debug + selectors: + - input + - input.journald + - input.journald.reader + - input.journald.reader.journalctl-runner diff --git a/libbeat/tests/integration/framework.go b/libbeat/tests/integration/framework.go index 8adbc18959d..904fc1e302a 100644 --- a/libbeat/tests/integration/framework.go +++ b/libbeat/tests/integration/framework.go @@ -283,8 +283,6 @@ func (b *BeatProc) waitBeatToExit() { b.t.Fatalf("error waiting for %q to finish: %s. Exit code: %s", b.beatName, err, exitCode) } - - return } // Stop stops the Beat process @@ -313,10 +311,10 @@ func (b *BeatProc) stopNonsynced() { defer b.waitingMutex.Unlock() ps, err := b.Process.Wait() if err != nil { - b.t.Logf("[WARN] got an error waiting mockbeat to top: %v", err) + b.t.Logf("[WARN] got an error waiting %s to top: %v", b.beatName, err) } if !ps.Success() { - b.t.Logf("[WARN] mockbeat did not stopped successfully: %v", ps.String()) + b.t.Logf("[WARN] %s did not stopped successfully: %v", b.beatName, ps.String()) } }