diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 19436f709779..88bcc38e0f41 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -67,6 +67,9 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] *Affecting all Beats* +- Improve performance of disk queue by coalescing writes. {pull}31935[31935] +- Update `elastic/go-structform` from `v0.0.9` to `v0.0.10` to reduce memory usage. {pull}32536[32536] +- Added `--enable-all-filesets` to the `setup` command to simplify loading all ingest pipelines. {issue}30916[30916] {pull}33114[33114] *Auditbeat* diff --git a/filebeat/autodiscover/builder/hints/logs.go b/filebeat/autodiscover/builder/hints/logs.go index e999f1d481e7..59d2eea70ae1 100644 --- a/filebeat/autodiscover/builder/hints/logs.go +++ b/filebeat/autodiscover/builder/hints/logs.go @@ -69,7 +69,7 @@ func NewLogHints(cfg *conf.C) (autodiscover.Builder, error) { return nil, fmt.Errorf("unable to unpack hints config due to error: %w", err) } - moduleRegistry, err := fileset.NewModuleRegistry(nil, beat.Info{}, false) + moduleRegistry, err := fileset.NewModuleRegistry(nil, beat.Info{}, false, false) if err != nil { return nil, err } diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index fcb04a67ab9b..af8cf4e4374b 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -105,7 +105,7 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea return nil, err } - moduleRegistry, err := fileset.NewModuleRegistry(config.Modules, b.Info, true) + moduleRegistry, err := fileset.NewModuleRegistry(config.Modules, b.Info, true, false) if err != nil { return nil, err } @@ -177,8 +177,17 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error { // When running the subcommand setup, configuration from modules.d directories // have to be loaded using cfg.Reloader. Otherwise those configurations are skipped. pipelineLoaderFactory := newPipelineLoaderFactory(b.Config.Output.Config()) - modulesFactory := fileset.NewSetupFactory(b.Info, pipelineLoaderFactory) + enableAllFilesets, _ := b.BeatConfig.Bool("config.modules.enable_all_filesets", -1) + modulesFactory := fileset.NewSetupFactory(b.Info, pipelineLoaderFactory, enableAllFilesets) if fb.config.ConfigModules.Enabled() { + if enableAllFilesets { + //All module configs need to be loaded to enable all the filesets + //contained in the modules. The default glob just loads the enabled + //ones. Switching the glob pattern from *.yml to * achieves this. + origPath, _ := fb.config.ConfigModules.String("path", -1) + newPath := strings.TrimSuffix(origPath, ".yml") + _ = fb.config.ConfigModules.SetString("path", -1, newPath) + } modulesLoader := cfgfile.NewReloader(fb.pipeline, fb.config.ConfigModules) modulesLoader.Load(modulesFactory) } diff --git a/filebeat/docs/howto/load-ingest-pipelines.asciidoc b/filebeat/docs/howto/load-ingest-pipelines.asciidoc index 68471845a3a3..a6ec44e7ea28 100644 --- a/filebeat/docs/howto/load-ingest-pipelines.asciidoc +++ b/filebeat/docs/howto/load-ingest-pipelines.asciidoc @@ -4,11 +4,45 @@ The ingest pipelines used to parse log lines are set up automatically the first time you run {beatname_uc}, assuming the {es} output is enabled. If you're sending events to {ls} you need to load the ingest pipelines manually. To do this, run the -`setup` command with the `--pipelines` option specified. If you used the -<> command to enable modules in the `modules.d` -directory, also specify the `--modules` flag. For example, the following command -loads the ingest pipelines used by all filesets enabled in the system, nginx, -and mysql modules: +`setup` command with the `--pipelines` option specified. You also need to enable +the modules and filesets, this can be accomplished one of two ways. + +First you can use the `--modules` option to enable the module, and the +`-M` option to enable the fileset. For example, the following command +loads the access pipeline from the nginx module. + +*deb and rpm:* + +["source","sh",subs="attributes"] +---- +{beatname_lc} setup --pipelines --modules nginx -M "nginx.access.enabled=true" +---- + +*mac:* + +["source","sh",subs="attributes"] +---- +./{beatname_lc} setup --pipelines --modules nginx -M "nginx.access.enabled=true" +---- + +*linux:* + +["source","sh",subs="attributes"] +---- +./{beatname_lc} setup --pipelines --modules nginx -M "nginx.access.enabled=true" +---- + +*win:* + +["source","sh",subs="attributes"] +---- +PS > .{backslash}{beatname_lc}.exe setup --pipelines --modules nginx -M "nginx.access.enabled=true" +---- + +The second option is to use the `--enable-all-filesets` option to +enable all the modules and all the filesets so all of the ingest +pipelines are loaded. For example, the following command loads all +the ingest pipelines. //TODO: Replace with the platform tab widget. @@ -16,28 +50,28 @@ and mysql modules: ["source","sh",subs="attributes"] ---- -{beatname_lc} setup --pipelines --modules system,nginx,mysql +{beatname_lc} setup --pipelines --enable-all-filesets ---- *mac:* ["source","sh",subs="attributes"] ---- -./{beatname_lc} setup --pipelines --modules system,nginx,mysql +./{beatname_lc} setup --pipelines --enable-all-filesets ---- *linux:* ["source","sh",subs="attributes"] ---- -./{beatname_lc} setup --pipelines --modules system,nginx,mysql +./{beatname_lc} setup --pipelines --enable-all-filesets ---- *win:* ["source","sh",subs="attributes"] ---- -PS > .{backslash}{beatname_lc}.exe setup --pipelines --modules system,nginx,mysql +PS > .{backslash}{beatname_lc}.exe setup --pipelines --enable-all-filesets ---- TIP: If you're loading ingest pipelines manually because you want to send events diff --git a/filebeat/fileset/factory.go b/filebeat/fileset/factory.go index 7dff0996dddd..bdc6350618e2 100644 --- a/filebeat/fileset/factory.go +++ b/filebeat/fileset/factory.go @@ -129,7 +129,7 @@ func (f *Factory) CheckConfig(c *conf.C) error { // createRegistry starts a registry for a set of filesets, it returns the registry and // its input configurations func (f *Factory) createRegistry(c *conf.C) (*ModuleRegistry, []*conf.C, error) { - m, err := NewModuleRegistry([]*conf.C{c}, f.beatInfo, false) + m, err := NewModuleRegistry([]*conf.C{c}, f.beatInfo, false, false) if err != nil { return nil, nil, err } diff --git a/filebeat/fileset/modules.go b/filebeat/fileset/modules.go index 79d34ce609e8..158b45f56d39 100644 --- a/filebeat/fileset/modules.go +++ b/filebeat/fileset/modules.go @@ -19,13 +19,13 @@ package fileset import ( "encoding/json" + "errors" "fmt" "io/ioutil" "os" "path/filepath" "strings" - "github.com/pkg/errors" "gopkg.in/yaml.v2" "github.com/elastic/beats/v7/libbeat/beat" @@ -51,12 +51,19 @@ func newModuleRegistry(modulesPath string, moduleConfigs []*ModuleConfig, overrides *ModuleOverrides, beatInfo beat.Info, + enableAllFilesets bool, ) (*ModuleRegistry, error) { reg := ModuleRegistry{ registry: []Module{}, log: logp.NewLogger(logName), } + for _, mcfg := range moduleConfigs { + // an empty ModuleConfig can reach this so we only force enable a + // config if the Module name is set and Enabled pointer is valid. + if enableAllFilesets && mcfg.Module != "" && mcfg.Enabled != nil { + *mcfg.Enabled = true + } if mcfg.Module == "" || (mcfg.Enabled != nil && !(*mcfg.Enabled)) { continue } @@ -67,7 +74,7 @@ func newModuleRegistry(modulesPath string, } moduleFilesets, err := getModuleFilesets(modulesPath, mcfg.Module) if err != nil { - return nil, fmt.Errorf("error getting filesets for module %s: %v", mcfg.Module, err) + return nil, fmt.Errorf("error getting filesets for module %s: %w", mcfg.Module, err) } module := Module{ config: *mcfg, @@ -77,9 +84,14 @@ func newModuleRegistry(modulesPath string, fcfg, err = applyOverrides(fcfg, mcfg.Module, filesetName, overrides) if err != nil { - return nil, fmt.Errorf("error applying overrides on fileset %s/%s: %v", mcfg.Module, filesetName, err) + return nil, fmt.Errorf("error applying overrides on fileset %s/%s: %w", mcfg.Module, filesetName, err) } + // ModuleConfig can have empty Filesets so we only force + // enable if the Enabled pointer is valid + if enableAllFilesets && fcfg.Enabled != nil { + *fcfg.Enabled = true + } if fcfg.Enabled != nil && !(*fcfg.Enabled) { continue } @@ -98,7 +110,7 @@ func newModuleRegistry(modulesPath string, return nil, err } if err = fileset.Read(beatInfo); err != nil { - return nil, fmt.Errorf("error reading fileset %s/%s: %v", mcfg.Module, filesetName, err) + return nil, fmt.Errorf("error reading fileset %s/%s: %w", mcfg.Module, filesetName, err) } module.filesets = append(module.filesets, *fileset) } @@ -109,14 +121,14 @@ func newModuleRegistry(modulesPath string, for _, mod := range reg.registry { filesets := reg.ModuleConfiguredFilesets(mod) if len(filesets) == 0 { - return nil, errors.Errorf("module %s is configured but has no enabled filesets", mod.config.Module) + return nil, fmt.Errorf("module %s is configured but has no enabled filesets", mod.config.Module) } } return ®, nil } // NewModuleRegistry reads and loads the configured module into the registry. -func NewModuleRegistry(moduleConfigs []*conf.C, beatInfo beat.Info, init bool) (*ModuleRegistry, error) { +func NewModuleRegistry(moduleConfigs []*conf.C, beatInfo beat.Info, init bool, enableAllFilesets bool) (*ModuleRegistry, error) { modulesPath := paths.Resolve(paths.Home, "module") stat, err := os.Stat(modulesPath) @@ -143,7 +155,7 @@ func NewModuleRegistry(moduleConfigs []*conf.C, beatInfo beat.Info, init bool) ( moduleConfig, err := mcfgFromConfig(cfg) if err != nil { - return nil, errors.Wrap(err, "error unpacking module config") + return nil, fmt.Errorf("error unpacking module config :%w", err) } mcfgs = append(mcfgs, moduleConfig) } @@ -154,7 +166,7 @@ func NewModuleRegistry(moduleConfigs []*conf.C, beatInfo beat.Info, init bool) ( } enableFilesetsFromOverrides(mcfgs, modulesOverrides) - return newModuleRegistry(modulesPath, mcfgs, modulesOverrides, beatInfo) + return newModuleRegistry(modulesPath, mcfgs, modulesOverrides, beatInfo, enableAllFilesets) } // enableFilesetsFromOverrides enables in mcfgs the filesets mentioned in overrides, @@ -189,7 +201,7 @@ func mcfgFromConfig(cfg *conf.C) (*ModuleConfig, error) { err = cfg.Unpack(&dict) if err != nil { - return nil, fmt.Errorf("error unpacking module %s in a dict: %v", mcfg.Module, err) + return nil, fmt.Errorf("error unpacking module %s in a dict: %w", mcfg.Module, err) } mcfg.Filesets = map[string]*FilesetConfig{} @@ -203,16 +215,16 @@ func mcfgFromConfig(cfg *conf.C) (*ModuleConfig, error) { continue } - filesetConfig, _ := dict[name] // Nil config if name is not present. + filesetConfig := dict[name] // Nil config if name is not present. tmpCfg, err := conf.NewConfigFrom(filesetConfig) if err != nil { - return nil, fmt.Errorf("error creating config from fileset %s/%s: %v", mcfg.Module, name, err) + return nil, fmt.Errorf("error creating config from fileset %s/%s: %w", mcfg.Module, name, err) } fcfg, err := NewFilesetConfig(tmpCfg) if err != nil { - return nil, fmt.Errorf("error creating config from fileset %s/%s: %v", mcfg.Module, name, err) + return nil, fmt.Errorf("error creating config from fileset %s/%s: %w", mcfg.Module, name, err) } mcfg.Filesets[name] = fcfg } @@ -274,7 +286,7 @@ func applyOverrides(fcfg *FilesetConfig, config, err := conf.NewConfigFrom(fcfg) if err != nil { - return nil, fmt.Errorf("error creating vars config object: %v", err) + return nil, fmt.Errorf("error creating vars config object: %w", err) } toMerge := []*conf.C{config} @@ -282,12 +294,12 @@ func applyOverrides(fcfg *FilesetConfig, resultConfig, err := conf.MergeConfigs(toMerge...) if err != nil { - return nil, fmt.Errorf("error merging configs: %v", err) + return nil, fmt.Errorf("error merging configs: %w", err) } res, err := NewFilesetConfig(resultConfig) if err != nil { - return nil, fmt.Errorf("error unpacking configs: %v", err) + return nil, fmt.Errorf("error unpacking configs: %w", err) } return res, nil @@ -324,7 +336,7 @@ func (reg *ModuleRegistry) GetInputConfigs() ([]*conf.C, error) { for _, fileset := range module.filesets { fcfg, err := fileset.getInputConfig() if err != nil { - return result, fmt.Errorf("error getting config for fileset %s/%s: %v", module.config.Module, fileset.name, err) + return result, fmt.Errorf("error getting config for fileset %s/%s: %w", module.config.Module, fileset.name, err) } result = append(result, fcfg) } @@ -367,7 +379,7 @@ func checkAvailableProcessors(esClient PipelineLoader, requiredProcessors []Proc } status, body, err := esClient.Request("GET", "/_nodes/ingest", "", nil, nil) if err != nil { - return fmt.Errorf("error querying _nodes/ingest: %v", err) + return fmt.Errorf("error querying _nodes/ingest: %w", err) } if status > 299 { return fmt.Errorf("error querying _nodes/ingest. Status: %d. Response body: %s", status, body) diff --git a/filebeat/fileset/modules_integration_test.go b/filebeat/fileset/modules_integration_test.go index d51192f50fab..0fb6428b2e18 100644 --- a/filebeat/fileset/modules_integration_test.go +++ b/filebeat/fileset/modules_integration_test.go @@ -115,7 +115,7 @@ func TestSetupNginx(t *testing.T) { }, } - reg, err := newModuleRegistry(modulesPath, configs, nil, makeTestInfo("5.2.0")) + reg, err := newModuleRegistry(modulesPath, configs, nil, makeTestInfo("5.2.0"), false) if err != nil { t.Fatal(err) } @@ -194,7 +194,7 @@ func TestLoadMultiplePipelines(t *testing.T) { {"foo", &enabled, filesetConfigs}, } - reg, err := newModuleRegistry(modulesPath, configs, nil, makeTestInfo("6.6.0")) + reg, err := newModuleRegistry(modulesPath, configs, nil, makeTestInfo("6.6.0"), false) if err != nil { t.Fatal(err) } @@ -239,7 +239,7 @@ func TestLoadMultiplePipelinesWithRollback(t *testing.T) { {"foo", &enabled, filesetConfigs}, } - reg, err := newModuleRegistry(modulesPath, configs, nil, makeTestInfo("6.6.0")) + reg, err := newModuleRegistry(modulesPath, configs, nil, makeTestInfo("6.6.0"), false) if err != nil { t.Fatal(err) } diff --git a/filebeat/fileset/modules_test.go b/filebeat/fileset/modules_test.go index 77242b2bb8e2..e3c2853ffb20 100644 --- a/filebeat/fileset/modules_test.go +++ b/filebeat/fileset/modules_test.go @@ -82,7 +82,7 @@ func TestNewModuleRegistry(t *testing.T) { }, } - reg, err := newModuleRegistry(modulesPath, configs, nil, beat.Info{Version: "5.2.0"}) + reg, err := newModuleRegistry(modulesPath, configs, nil, beat.Info{Version: "5.2.0"}, false) require.NoError(t, err) assert.NotNil(t, reg) @@ -149,7 +149,7 @@ func TestNewModuleRegistryConfig(t *testing.T) { }, } - reg, err := newModuleRegistry(modulesPath, configs, nil, beat.Info{Version: "5.2.0"}) + reg, err := newModuleRegistry(modulesPath, configs, nil, beat.Info{Version: "5.2.0"}, false) require.NoError(t, err) assert.NotNil(t, reg) @@ -175,7 +175,7 @@ func TestMovedModule(t *testing.T) { }, } - reg, err := newModuleRegistry(modulesPath, configs, nil, beat.Info{Version: "5.2.0"}) + reg, err := newModuleRegistry(modulesPath, configs, nil, beat.Info{Version: "5.2.0"}, false) require.NoError(t, err) assert.NotNil(t, reg) } @@ -446,7 +446,7 @@ func TestMissingModuleFolder(t *testing.T) { load(t, map[string]interface{}{"module": "nginx"}), } - reg, err := NewModuleRegistry(configs, beat.Info{Version: "5.2.0"}, true) + reg, err := NewModuleRegistry(configs, beat.Info{Version: "5.2.0"}, true, false) require.NoError(t, err) assert.NotNil(t, reg) diff --git a/filebeat/fileset/setup.go b/filebeat/fileset/setup.go index e773ba9fe996..3ef5a6f2997e 100644 --- a/filebeat/fileset/setup.go +++ b/filebeat/fileset/setup.go @@ -29,20 +29,22 @@ type SetupFactory struct { beatInfo beat.Info pipelineLoaderFactory PipelineLoaderFactory overwritePipelines bool + enableAllFilesets bool } // NewSetupFactory creates a SetupFactory -func NewSetupFactory(beatInfo beat.Info, pipelineLoaderFactory PipelineLoaderFactory) *SetupFactory { +func NewSetupFactory(beatInfo beat.Info, pipelineLoaderFactory PipelineLoaderFactory, enableAllFilesets bool) *SetupFactory { return &SetupFactory{ beatInfo: beatInfo, pipelineLoaderFactory: pipelineLoaderFactory, overwritePipelines: true, + enableAllFilesets: enableAllFilesets, } } // Create creates a new SetupCfgRunner to setup module configuration. func (sf *SetupFactory) Create(_ beat.PipelineConnector, c *conf.C) (cfgfile.Runner, error) { - m, err := NewModuleRegistry([]*conf.C{c}, sf.beatInfo, false) + m, err := NewModuleRegistry([]*conf.C{c}, sf.beatInfo, false, sf.enableAllFilesets) if err != nil { return nil, err } diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 30a960c3288c..683d10a8959b 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -549,7 +549,8 @@ type SetupSettings struct { //Deprecated: use IndexManagementKey instead Template bool //Deprecated: use IndexManagementKey instead - ILMPolicy bool + ILMPolicy bool + EnableAllFilesets bool } // Setup registers ES index template, kibana dashboards, ml jobs and pipelines. @@ -614,12 +615,16 @@ func (b *Beat) Setup(settings Settings, bt beat.Creator, setup SetupSettings) er } if setup.Pipeline && b.OverwritePipelinesCallback != nil { + if setup.EnableAllFilesets { + if err := b.Beat.BeatConfig.SetBool("config.modules.enable_all_filesets", -1, true); err != nil { + return fmt.Errorf("error setting enable_all_filesets config option %w", err) + } + } esConfig := b.Config.Output.Config() err = b.OverwritePipelinesCallback(esConfig) if err != nil { return err } - fmt.Println("Loaded Ingest pipelines") } diff --git a/libbeat/cmd/setup.go b/libbeat/cmd/setup.go index 01c2b110a6b5..f1d2432ead94 100644 --- a/libbeat/cmd/setup.go +++ b/libbeat/cmd/setup.go @@ -34,6 +34,8 @@ const ( PipelineKey = "pipelines" //IndexManagementKey used for loading all components related to ES index management in setup cmd IndexManagementKey = "index-management" + //EnableAllFilesetsKey enables all modules and filesets regardless of config + EnableAllFilesetsKey = "enable-all-filesets" ) func genSetupCmd(settings instance.Settings, beatCreator beat.Creator) *cobra.Command { @@ -55,9 +57,10 @@ func genSetupCmd(settings instance.Settings, beatCreator beat.Creator) *cobra.Co } var registeredFlags = map[string]bool{ - DashboardKey: false, - PipelineKey: false, - IndexManagementKey: false, + DashboardKey: false, + PipelineKey: false, + IndexManagementKey: false, + EnableAllFilesetsKey: false, } var setupAll = true @@ -88,6 +91,8 @@ func genSetupCmd(settings instance.Settings, beatCreator beat.Creator) *cobra.Co s.Pipeline = true case IndexManagementKey: s.IndexManagement = true + case EnableAllFilesetsKey: + s.EnableAllFilesets = true } } } @@ -102,6 +107,7 @@ func genSetupCmd(settings instance.Settings, beatCreator beat.Creator) *cobra.Co setup.Flags().Bool(PipelineKey, false, "Setup Ingest pipelines") setup.Flags().Bool(IndexManagementKey, false, "Setup all components related to Elasticsearch index management, including template, ilm policy and rollover alias") + setup.Flags().Bool("enable-all-filesets", false, "Behave as if all modules and filesets had been enabled") return &setup } diff --git a/libbeat/docs/command-reference.asciidoc b/libbeat/docs/command-reference.asciidoc index fb722ccb08c0..d3d88a57ab8c 100644 --- a/libbeat/docs/command-reference.asciidoc +++ b/libbeat/docs/command-reference.asciidoc @@ -834,6 +834,14 @@ Sets up ingest pipelines for configured filesets. {beatname_uc} looks for enabled modules in the +{beatname_lc}.yml+ file. If you used the <> command to enable modules in the `modules.d` directory, also specify the `--modules` flag. + +*`--enable-all-filesets`*:: +Enables all modules and filesets. This is useful with `--pipelines` +if you want to load all ingest pipelines. Without this option you +would have to list every module with the <> +command and enable every fileset within the module with a `-M` option, +to load all of the ingest pipelines. + endif::[] *`--index-management`*::