Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Renaming prospector to input #6078

Merged
merged 3 commits into from
Jan 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di

*Filebeat*
- Switch to docker prospector in sample manifests for Kubernetes deployment {pull}5963[5963]
- Renaming of the prospector type to the input type and all prospectors are now moved to the input
folder, to maintain backward compatibility type aliasing was used to map the old type to the new
one. This change also affect YAML configuration. {pull}6078[6078]

*Heartbeat*

Expand Down
9 changes: 8 additions & 1 deletion filebeat/_meta/fields.common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@
- name: prospector.type
required: true
description: >
The prospector type from which the event was generated. This field is set to the value specified for the `type` option in the prospector section of the Filebeat config file.
The input type from which the event was generated. This field is set to the value specified
for the `type` option in the input section of the Filebeat config file. (DEPRECATED: see `event.type`)

- name: event.type
required: true
description: >
The input type from which the event was generated. This field is set to the value specified
for the `type` option in the input section of the Filebeat config file.

- name: read_timestamp
description: >
Expand Down
18 changes: 9 additions & 9 deletions filebeat/beater/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ import (
"github.com/elastic/beats/libbeat/common/bus"
)

// AutodiscoverAdapter for Filebeat modules & prospectors
// AutodiscoverAdapter for Filebeat modules & input
type AutodiscoverAdapter struct {
prospectorFactory cfgfile.RunnerFactory
moduleFactory cfgfile.RunnerFactory
inputFactory cfgfile.RunnerFactory
moduleFactory cfgfile.RunnerFactory
}

// NewAutodiscoverAdapter builds and returns an autodiscover adapter for Filebeat modules & prospectors
func NewAutodiscoverAdapter(prospectorFactory, moduleFactory cfgfile.RunnerFactory) *AutodiscoverAdapter {
// NewAutodiscoverAdapter builds and returns an autodiscover adapter for Filebeat modules & input
func NewAutodiscoverAdapter(inputFactory, moduleFactory cfgfile.RunnerFactory) *AutodiscoverAdapter {
return &AutodiscoverAdapter{
prospectorFactory: prospectorFactory,
moduleFactory: moduleFactory,
inputFactory: inputFactory,
moduleFactory: moduleFactory,
}
}

Expand All @@ -37,12 +37,12 @@ func (m *AutodiscoverAdapter) CheckConfig(c *common.Config) error {
return nil
}

// Create a module or prospector from the given config
// Create a module or input from the given config
func (m *AutodiscoverAdapter) Create(c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
if c.HasField("module") {
return m.moduleFactory.Create(c, meta)
}
return m.prospectorFactory.Create(c, meta)
return m.inputFactory.Create(c, meta)
}

// StartFilter returns the bus filter to retrieve runner start triggering events
Expand Down
38 changes: 23 additions & 15 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
return nil, err
}

if len(config.Prospectors) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some system tests with the old config options to make sure this works as expected? We have done this for previous renamings and I think it's pretty useful to make sure it actually works and be remembered that we break something, when we remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, I was planning to do it in a dedicated PR, because right now all the tests use the old config options :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM. Didn't realise it's all on the old config. Good to know.

cfgwarn.Deprecate("7.0.0", "prospectors are deprecated, Use `inputs` instead.")
if len(config.Inputs) > 0 {
return nil, fmt.Errorf("prospectors and inputs used in the configuration file, define only inputs not both")
}
config.Inputs = config.Prospectors
}

moduleRegistry, err := fileset.NewModuleRegistry(config.Modules, b.Info.Version, true)
if err != nil {
return nil, err
Expand All @@ -62,7 +70,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
logp.Info("Enabled modules/filesets: %s", moduleRegistry.InfoString())
}

moduleProspectors, err := moduleRegistry.GetProspectorConfigs()
moduleInputs, err := moduleRegistry.GetInputConfigs()
if err != nil {
return nil, err
}
Expand All @@ -71,28 +79,28 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
return nil, err
}

// Add prospectors created by the modules
config.Prospectors = append(config.Prospectors, moduleProspectors...)
// Add inputs created by the modules
config.Inputs = append(config.Inputs, moduleInputs...)

haveEnabledProspectors := false
for _, prospector := range config.Prospectors {
if prospector.Enabled() {
haveEnabledProspectors = true
haveEnabledInputs := false
for _, input := range config.Inputs {
if input.Enabled() {
haveEnabledInputs = true
break
}
}

if !config.ConfigProspector.Enabled() && !config.ConfigModules.Enabled() && !haveEnabledProspectors && config.Autodiscover == nil {
if !config.ConfigInput.Enabled() && !config.ConfigModules.Enabled() && !haveEnabledInputs && config.Autodiscover == nil {
if !b.InSetupCmd {
return nil, errors.New("No modules or prospectors enabled and configuration reloading disabled. What files do you want me to watch?")
return nil, errors.New("no modules or inputs enabled and configuration reloading disabled. What files do you want me to watch?")
}

// in the `setup` command, log this only as a warning
logp.Warn("Setup called, but no modules enabled.")
}

if *once && config.ConfigProspector.Enabled() && config.ConfigModules.Enabled() {
return nil, errors.New("prospector configs and -once cannot be used together")
if *once && config.ConfigInput.Enabled() && config.ConfigModules.Enabled() {
return nil, errors.New("input configs and -once cannot be used together")
}

fb := &Filebeat{
Expand Down Expand Up @@ -220,7 +228,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
outDone := make(chan struct{}) // outDone closes down all active pipeline connections
crawler, err := crawler.New(
channel.NewOutletFactory(outDone, b.Publisher, wgEvents).Create,
config.Prospectors,
config.Inputs,
b.Info.Version,
fb.done,
*once)
Expand Down Expand Up @@ -261,7 +269,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
logp.Warn(pipelinesWarning)
}

err = crawler.Start(registrar, config.ConfigProspector, config.ConfigModules, pipelineLoaderFactory)
err = crawler.Start(registrar, config.ConfigInput, config.ConfigModules, pipelineLoaderFactory)
if err != nil {
crawler.Stop()
return err
Expand All @@ -279,7 +287,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {

var adiscover *autodiscover.Autodiscover
if fb.config.Autodiscover != nil {
adapter := NewAutodiscoverAdapter(crawler.ProspectorsFactory, crawler.ModulesFactory)
adapter := NewAutodiscoverAdapter(crawler.InputsFactory, crawler.ModulesFactory)
adiscover, err = autodiscover.NewAutodiscover("filebeat", adapter, config.Autodiscover)
if err != nil {
return err
Expand All @@ -291,7 +299,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
waitFinished.AddChan(fb.done)
waitFinished.Wait()

// Stop autodiscover -> Stop crawler -> stop prospectors -> stop harvesters
// Stop autodiscover -> Stop crawler -> stop inputs -> stop harvesters
// Note: waiting for crawlers to stop here in order to install wgEvents.Wait
// after all events have been enqueued for publishing. Otherwise wgEvents.Wait
// or publisher might panic due to concurrent updates.
Expand Down
17 changes: 10 additions & 7 deletions filebeat/channel/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ type clientEventer struct {
wgEvents eventCounter
}

// prospectorOutletConfig defines common prospector settings
// inputOutletConfig defines common input settings
// for the publisher pipline.
type prospectorOutletConfig struct {
type inputOutletConfig struct {
// event processing
common.EventMetadata `config:",inline"` // Fields and tags to add to events.
Processors processors.PluginConfig `config:"processors"`

// implicit event fields
Type string `config:"type"` // prospector.type
Type string `config:"type"` // input.type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case the comment is correct, this is a breaking change of the data format I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why its a breaking changes? The name of the private struct changed and the only the comment changed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only indirectly breaking if the comment is correct. I assume when looking at it that you removed prospector.type from the event, but you copied it to input.type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// hidden filebeat modules settings
Module string `config:"_module_name"` // hidden setting
Expand All @@ -44,7 +44,7 @@ type prospectorOutletConfig struct {
}

// NewOutletFactory creates a new outlet factory for
// connecting a prospector to the publisher pipeline.
// connecting an input to the publisher pipeline.
func NewOutletFactory(
done <-chan struct{},
pipeline beat.Pipeline,
Expand All @@ -63,12 +63,12 @@ func NewOutletFactory(
return o
}

// Create builds a new Outleter, while applying common prospector settings.
// Prospectors and all harvesters use the same pipeline client instance.
// Create builds a new Outleter, while applying common input settings.
// Inputs and all harvesters use the same pipeline client instance.
// This guarantees ordering between events as required by the registrar for
// file.State updates
func (f *OutletFactory) Create(cfg *common.Config, dynFields *common.MapStrPointer) (Outleter, error) {
config := prospectorOutletConfig{}
config := inputOutletConfig{}
if err := cfg.Unpack(&config); err != nil {
return nil, err
}
Expand Down Expand Up @@ -99,6 +99,9 @@ func (f *OutletFactory) Create(cfg *common.Config, dynFields *common.MapStrPoint
fields["prospector"] = common.MapStr{
"type": config.Type,
}
fields["event"] = common.MapStr{
"type": config.Type,
}
}

client, err := f.pipeline.ConnectWith(beat.ClientConfig{
Expand Down
2 changes: 1 addition & 1 deletion filebeat/channel/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
// Factory is used to create a new Outlet instance
type Factory func(*common.Config, *common.MapStrPointer) (Outleter, error)

// Outleter is the outlet for a prospector
// Outleter is the outlet for an input
type Outleter interface {
Close() error
OnEvent(data *util.Data) bool
Expand Down
31 changes: 20 additions & 11 deletions filebeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ const (
)

type Config struct {
Prospectors []*common.Config `config:"prospectors"`
RegistryFile string `config:"registry_file"`
RegistryFlush time.Duration `config:"registry_flush"`
ConfigDir string `config:"config_dir"`
ShutdownTimeout time.Duration `config:"shutdown_timeout"`
Modules []*common.Config `config:"modules"`
ConfigProspector *common.Config `config:"config.prospectors"`
ConfigModules *common.Config `config:"config.modules"`
Autodiscover *autodiscover.Config `config:"autodiscover"`
Inputs []*common.Config `config:"inputs"`
Prospectors []*common.Config `config:"prospectors"`
RegistryFile string `config:"registry_file"`
RegistryFlush time.Duration `config:"registry_flush"`
ConfigDir string `config:"config_dir"`
ShutdownTimeout time.Duration `config:"shutdown_timeout"`
Modules []*common.Config `config:"modules"`
ConfigInput *common.Config `config:"config.prospectors"`
ConfigModules *common.Config `config:"config.modules"`
Autodiscover *autodiscover.Config `config:"autodiscover"`
}

var (
Expand Down Expand Up @@ -82,7 +83,15 @@ func mergeConfigFiles(configFiles []string, config *Config) error {
return fmt.Errorf("Failed to read %s: %s", file, err)
}

config.Prospectors = append(config.Prospectors, tmpConfig.Filebeat.Prospectors...)
if len(tmpConfig.Filebeat.Prospectors) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some system tests for this would also be nice.

cfgwarn.Deprecate("7.0.0", "prospectors are deprecated, Use `inputs` instead.")
if len(tmpConfig.Filebeat.Inputs) > 0 {
return fmt.Errorf("prospectors and inputs used in the configuration file, define only inputs not both")
}
tmpConfig.Filebeat.Inputs = append(tmpConfig.Filebeat.Inputs, tmpConfig.Filebeat.Prospectors...)
}

config.Inputs = append(config.Inputs, tmpConfig.Filebeat.Inputs...)
}

return nil
Expand All @@ -97,7 +106,7 @@ func (config *Config) FetchConfigs() error {
return nil
}

cfgwarn.Deprecate("7.0.0", "config_dir is deprecated. Use `filebeat.config.prospectors` instead.")
cfgwarn.Deprecate("7.0.0", "config_dir is deprecated. Use `filebeat.config.inputs` instead.")

// If configDir is relative, consider it relative to the config path
configDir = paths.Resolve(paths.Config, configDir)
Expand Down
2 changes: 1 addition & 1 deletion filebeat/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,5 @@ func TestMergeConfigFiles(t *testing.T) {
config := &Config{}
mergeConfigFiles(files, config)

assert.Equal(t, 4, len(config.Prospectors))
assert.Equal(t, 4, len(config.Inputs))
}
Loading