diff --git a/filebeat/input/v2/error.go b/filebeat/input/v2/error.go new file mode 100644 index 00000000000..4fe7a6ffa49 --- /dev/null +++ b/filebeat/input/v2/error.go @@ -0,0 +1,105 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package v2 + +import ( + "errors" + "fmt" + "strings" +) + +// LoadError is returned by Loaders in case of failures. +type LoadError struct { + // Name of input/module that failed to load (if applicable) + Name string + + // Reason why the loader failed. Can either be the cause reported by the + // Plugin or some other indicator like ErrUnknown + Reason error + + // (optional) Message to report in additon. + Message string +} + +// SetupError indicates that the loader initialization has detected +// errors in individual plugin configurations or duplicates. +type SetupError struct { + Fails []error +} + +// ErrUnknownInput indicates that the plugin type does not exist. Either +// because the 'type' setting name does not match the loaders expectations, +// or because the type is unknown. +var ErrUnknownInput = errors.New("unknown input type") + +// ErrNoInputConfigured indicates that the 'type' setting is missing. +var ErrNoInputConfigured = errors.New("no input type configured") + +// ErrPluginWithoutName reports that the operation failed because +// the plugin is required to have a Name. +var ErrPluginWithoutName = errors.New("the plugin has no name") + +// IsUnknownInputError checks if an error value indicates an input load +// error because there is no existing plugin that can create the input. +func IsUnknownInputError(err error) bool { return errors.Is(err, ErrUnknownInput) } + +func failedInputName(err error) string { + switch e := err.(type) { + case *LoadError: + return e.Name + default: + return "" + } +} + +// Unwrap returns the reason if present +func (e *LoadError) Unwrap() error { return e.Reason } + +// Error returns the errors string repesentation +func (e *LoadError) Error() string { + var buf strings.Builder + + if e.Message != "" { + buf.WriteString(e.Message) + } else if e.Name != "" { + buf.WriteString("failed to load ") + buf.WriteString(e.Name) + } + + if e.Reason != nil { + if buf.Len() > 0 { + buf.WriteString(": ") + } + fmt.Fprintf(&buf, "%v", e.Reason) + } + + if buf.Len() == 0 { + return "" + } + return buf.String() +} + +// Error returns the errors string repesentation +func (e *SetupError) Error() string { + var buf strings.Builder + buf.WriteString("invalid plugin setup found:") + for _, err := range e.Fails { + fmt.Fprintf(&buf, "\n\t%v", err) + } + return buf.String() +} diff --git a/filebeat/input/v2/input.go b/filebeat/input/v2/input.go new file mode 100644 index 00000000000..9b66856a54f --- /dev/null +++ b/filebeat/input/v2/input.go @@ -0,0 +1,113 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package v2 + +import ( + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + + "github.com/elastic/go-concert/unison" +) + +// InputManager creates and maintains actions and background processes for an +// input type. +// The InputManager is used to create inputs. The InputManager can provide +// additional functionality like coordination between input of the same type, +// custom functionality for querying or caching shared information, application +// of common settings not unique to a particular input type, or require a more +// specific Input interface to be implemented by the actual input. +type InputManager interface { + // Init signals to InputManager to initialize internal resources. + // The mode tells the input manager if the Beat is actually running the inputs or + // if inputs are only configured for testing/validation purposes. + Init(grp unison.Group, mode Mode) error + + // Creates builds a new Input instance from the given configuation, or returns + // an error if the configuation is invalid. + // The input must establish any connection for data collection yet. The Beat + // will use the Test/Run methods of the input. + Create(*common.Config) (Input, error) +} + +// Mode tells the InputManager in which mode it is initialized. +type Mode uint8 + +//go:generate stringer -type Mode -trimprefix Mode +const ( + ModeRun Mode = iota + ModeTest + ModeOther +) + +// Input is a configured input object that can be used to test or start +// the actual data collection. +type Input interface { + // Name reports the input name. + // + // XXX: check if/how we can remove this method. Currently it is required for + // compatibility reasons with existing interfaces in libbeat, autodiscovery + // and filebeat. + Name() string + + // Test checks the configuaration and runs additional checks if the Input can + // actually collect data for the given configuration (e.g. check if host/port or files are + // accessible). + Test(TestContext) error + + // Run starts the data collection. Run must return an error only if the + // error is fatal making it impossible for the input to recover. + Run(Context, beat.PipelineConnector) error +} + +// Context provides the Input Run function with common environmental +// information and services. +type Context struct { + // Logger provides a structured logger to inputs. The logger is initialized + // with labels that will identify logs for the input. + Logger *logp.Logger + + // The input ID. + ID string + + // Agent provides additional Beat info like instance ID or beat name. + Agent beat.Info + + // Cancelation is used by Beats to signal the input to shutdown. + Cancelation Canceler +} + +// TestContext provides the Input Test function with common environmental +// information and services. +type TestContext struct { + // Logger provides a structured logger to inputs. The logger is initialized + // with labels that will identify logs for the input. + Logger *logp.Logger + + // Agent provides additional Beat info like instance ID or beat name. + Agent beat.Info + + // Cancelation is used by Beats to signal the input to shutdown. + Cancelation Canceler +} + +// Canceler is used to provide shutdown handling to the Context. +type Canceler interface { + Done() <-chan struct{} + Err() error +} diff --git a/filebeat/input/v2/loader.go b/filebeat/input/v2/loader.go new file mode 100644 index 00000000000..9035e7283eb --- /dev/null +++ b/filebeat/input/v2/loader.go @@ -0,0 +1,132 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package v2 + +import ( + "fmt" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/go-concert/unison" +) + +// Loader can be used to create Inputs from configurations. +// The loader is initialized with a list of plugins, and finds the correct plugin +// when a configuration is passed to Configure. +type Loader struct { + log *logp.Logger + registry map[string]Plugin + typeField string + defaultType string +} + +// NewLoader creates a new Loader for configuring inputs from a slice if plugins. +// NewLoader returns a SetupError if invalid plugin configurations or duplicates in the slice are detected. +// The Loader will read the plugin name from the configuration object as is +// configured by typeField. If typeField is empty, it defaults to "type". +func NewLoader(log *logp.Logger, plugins []Plugin, typeField, defaultType string) (*Loader, error) { + if typeField == "" { + typeField = "type" + } + + if errs := validatePlugins(plugins); len(errs) > 0 { + return nil, &SetupError{errs} + } + + registry := make(map[string]Plugin, len(plugins)) + for _, p := range plugins { + registry[p.Name] = p + } + + return &Loader{ + log: log, + registry: registry, + typeField: typeField, + defaultType: defaultType, + }, nil +} + +// Init runs Init on all InputManagers for all plugins known to the loader. +func (l *Loader) Init(group unison.Group, mode Mode) error { + for _, p := range l.registry { + if err := p.Manager.Init(group, mode); err != nil { + return err + } + } + return nil +} + +// Configure creates a new input from a Config object. +// The loader reads the input type name from the cfg object and tries to find a +// matching plugin. If a plugin is found, the plugin it's InputManager is used to create +// the input. +// Returns a LoadError if the input name can not be read from the config or if +// the type does not exist. Error values for Ccnfiguration errors do depend on +// the InputManager. +func (l *Loader) Configure(cfg *common.Config) (Input, error) { + name, err := cfg.String(l.typeField, -1) + if err != nil { + if l.defaultType == "" { + return nil, &LoadError{ + Reason: ErrNoInputConfigured, + Message: fmt.Sprintf("%v setting is missing", l.typeField), + } + } + name = l.defaultType + } + + p, exists := l.registry[name] + if !exists { + return nil, &LoadError{Name: name, Reason: ErrUnknownInput} + } + + log := l.log.With("input", name, "stability", p.Stability, "deprecated", p.Deprecated) + switch p.Stability { + case feature.Experimental: + log.Warnf("EXPERIMENTAL: The %v input is experimental", name) + case feature.Beta: + log.Warnf("BETA: The %v input is beta", name) + } + if p.Deprecated { + log.Warnf("DEPRECATED: The %v input is deprecated", name) + } + + return p.Manager.Create(cfg) +} + +// validatePlugins checks if there are multiple plugins with the same name in +// the registry. +func validatePlugins(plugins []Plugin) []error { + var errs []error + + counts := map[string]int{} + for _, p := range plugins { + counts[p.Name]++ + if err := p.validate(); err != nil { + errs = append(errs, err) + } + } + + for name, count := range counts { + if count > 1 { + errs = append(errs, fmt.Errorf("plugin '%v' found %v times", name, count)) + } + } + return errs +} diff --git a/filebeat/input/v2/loader_test.go b/filebeat/input/v2/loader_test.go new file mode 100644 index 00000000000..72f8013c4db --- /dev/null +++ b/filebeat/input/v2/loader_test.go @@ -0,0 +1,203 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package v2 + +import ( + "errors" + "testing" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/logp" +) + +type loaderConfig struct { + Plugins []Plugin + TypeField string + DefaultType string +} + +type inputCheck func(t *testing.T, input Input, err error) + +func TestLoader_New(t *testing.T) { + cases := map[string]struct { + setup loaderConfig + check func(*testing.T, error) + }{ + "ok": { + setup: loaderConfig{ + Plugins: []Plugin{ + {Name: "a", Stability: feature.Stable, Manager: ConfigureWith(nil)}, + {Name: "b", Stability: feature.Stable, Manager: ConfigureWith(nil)}, + {Name: "c", Stability: feature.Stable, Manager: ConfigureWith(nil)}, + }, + }, + check: expectNoError, + }, + "duplicate": { + setup: loaderConfig{ + Plugins: []Plugin{ + {Name: "a", Stability: feature.Stable, Manager: ConfigureWith(nil)}, + {Name: "a", Stability: feature.Stable, Manager: ConfigureWith(nil)}, + }, + }, + check: expectError, + }, + "fail with invalid plugin": { + setup: loaderConfig{ + Plugins: []Plugin{{Name: "", Manager: nil}}, + }, + check: expectError, + }, + } + + for name, test := range cases { + t.Run(name, func(t *testing.T) { + _, err := test.setup.NewLoader() + test.check(t, err) + }) + } +} + +func TestLoader_Init(t *testing.T) { + pluginWithInit := func(name string, fn func(Mode) error) Plugin { + return Plugin{ + Name: name, + Stability: feature.Stable, + Manager: &fakeInputManager{OnInit: fn}, + } + } + + t.Run("calls all input managers", func(t *testing.T) { + count := 0 + incCountOnInit := func(_ Mode) error { count++; return nil } + + setup := loaderConfig{ + Plugins: []Plugin{ + pluginWithInit("a", incCountOnInit), + pluginWithInit("b", incCountOnInit), + }, + } + loader := setup.MustNewLoader() + err := loader.Init(nil, ModeRun) + expectNoError(t, err) + if count != 2 { + t.Errorf("expected init count 2, but got %v", count) + } + }) + + t.Run("stop init on error", func(t *testing.T) { + count := 0 + incCountOnInit := func(_ Mode) error { count++; return errors.New("oops") } + setup := loaderConfig{ + Plugins: []Plugin{ + pluginWithInit("a", incCountOnInit), + pluginWithInit("b", incCountOnInit), + }, + } + loader := setup.MustNewLoader() + err := loader.Init(nil, ModeRun) + expectError(t, err) + if count != 1 { + t.Errorf("expected init count 1, but got %v", count) + } + }) +} + +func TestLoader_Configure(t *testing.T) { + createManager := func(name string) InputManager { + return ConfigureWith(makeConfigFakeInput(fakeInput{Type: name})) + } + createPlugin := func(name string) Plugin { + return Plugin{Name: name, Stability: feature.Stable, Manager: createManager(name)} + } + plugins := []Plugin{ + createPlugin("a"), + createPlugin("b"), + createPlugin("c"), + } + defaultSetup := loaderConfig{Plugins: plugins, TypeField: "type"} + + cases := map[string]struct { + setup loaderConfig + config map[string]interface{} + check inputCheck + }{ + "success": { + setup: defaultSetup, + config: map[string]interface{}{"type": "a"}, + check: okSetup, + }, + "load default": { + setup: defaultSetup.WithDefaultType("a"), + config: map[string]interface{}{}, + check: okSetup, + }, + "type is missing": { + setup: defaultSetup, + config: map[string]interface{}{}, + check: failSetup, + }, + "unknown type": { + setup: defaultSetup, + config: map[string]interface{}{"type": "unknown"}, + check: failSetup, + }, + "input config fails": { + setup: defaultSetup.WithPlugins(Plugin{ + Name: "a", + Stability: feature.Beta, + Manager: ConfigureWith(func(_ *common.Config) (Input, error) { + return nil, errors.New("oops") + }), + }), + config: map[string]interface{}{"type": "a"}, + check: failSetup, + }, + } + + for name, test := range cases { + t.Run(name, func(t *testing.T) { + loader := test.setup.MustNewLoader() + input, err := loader.Configure(common.MustNewConfigFrom(test.config)) + test.check(t, input, err) + }) + } +} + +func (b loaderConfig) MustNewLoader() *Loader { + l, err := b.NewLoader() + if err != nil { + panic(err) + } + return l +} +func (b loaderConfig) NewLoader() (*Loader, error) { + return NewLoader(logp.NewLogger("test"), b.Plugins, b.TypeField, b.DefaultType) +} +func (b loaderConfig) WithPlugins(p ...Plugin) loaderConfig { b.Plugins = p; return b } +func (b loaderConfig) WithTypeField(name string) loaderConfig { b.TypeField = name; return b } +func (b loaderConfig) WithDefaultType(name string) loaderConfig { b.DefaultType = name; return b } + +func failSetup(t *testing.T, _ Input, err error) { + expectError(t, err) +} + +func okSetup(t *testing.T, _ Input, err error) { + expectNoError(t, err) +} diff --git a/filebeat/input/v2/mode_string.go b/filebeat/input/v2/mode_string.go new file mode 100644 index 00000000000..330b15f3a80 --- /dev/null +++ b/filebeat/input/v2/mode_string.go @@ -0,0 +1,42 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Code generated by "stringer -type Mode -trimprefix Mode"; DO NOT EDIT. + +package v2 + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[ModeRun-0] + _ = x[ModeTest-1] + _ = x[ModeOther-2] +} + +const _Mode_name = "RunTestOther" + +var _Mode_index = [...]uint8{0, 3, 7, 12} + +func (i Mode) String() string { + if i >= Mode(len(_Mode_index)-1) { + return "Mode(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _Mode_name[_Mode_index[i]:_Mode_index[i+1]] +} diff --git a/filebeat/input/v2/plugin.go b/filebeat/input/v2/plugin.go new file mode 100644 index 00000000000..81084976ebd --- /dev/null +++ b/filebeat/input/v2/plugin.go @@ -0,0 +1,92 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package v2 + +import ( + "fmt" + + "github.com/elastic/beats/v7/libbeat/feature" +) + +// Plugin describes an input type. Input types should provide a constructor +// function that requires dependencies to be passed and fills out the Plugin structure. +// The Manager is used to finally create and manage inputs of the same type. +// The input-stateless and input-cursor packages, as well as the ConfigureWith function provide +// sample input managers. +// +// Example (stateless input): +// +// func Plugin() input.Plugin { +// return input.Plugin{ +// Name: "myservice", +// Stability: feature.Stable, +// Deprecated: false, +// Info: "collect data from myservice", +// Manager: stateless.NewInputManager(configure), +// } +// } +// +type Plugin struct { + // Name of the input type. + Name string + + // Configure the input stability. If the stability is not 'Stable' a message + // is logged when the input type is configured. + Stability feature.Stability + + // Deprecated marks the plugin as deprecated. If set a deprecation message is logged if + // an input is configured. + Deprecated bool + + // Info contains a short description of the input type. + Info string + + // Doc contains an optional longer description. + Doc string + + // Manager MUST be configured. The manager is used to create the inputs. + Manager InputManager +} + +// Details returns a generic feature description that is compatible with the +// feature package. +func (p Plugin) Details() feature.Details { + return feature.Details{ + Name: p.Name, + Stability: p.Stability, + Deprecated: p.Deprecated, + Info: p.Info, + Doc: p.Doc, + } +} + +func (p Plugin) validate() error { + if p.Name == "" { + return fmt.Errorf("input plugin without name found") + } + switch p.Stability { + case feature.Beta, feature.Experimental, feature.Stable: + break + default: + return fmt.Errorf("plugin '%v' has stability not set", p.Name) + } + if p.Manager == nil { + return fmt.Errorf("invalid plugin (%v) structure detected", p.Name) + } + return nil +} diff --git a/filebeat/input/v2/plugin_test.go b/filebeat/input/v2/plugin_test.go new file mode 100644 index 00000000000..6a111736db2 --- /dev/null +++ b/filebeat/input/v2/plugin_test.go @@ -0,0 +1,84 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package v2 + +import ( + "testing" + + "github.com/elastic/beats/v7/libbeat/feature" +) + +func TestPlugin_Validate(t *testing.T) { + cases := map[string]struct { + valid bool + plugin Plugin + }{ + "valid": { + valid: true, + plugin: Plugin{ + Name: "test", + Stability: feature.Stable, + Deprecated: false, + Info: "test", + Doc: "doc string", + Manager: ConfigureWith(nil), + }, + }, + "missing name": { + valid: false, + plugin: Plugin{ + Stability: feature.Stable, + Deprecated: false, + Info: "test", + Doc: "doc string", + Manager: ConfigureWith(nil), + }, + }, + "invalid stability": { + valid: false, + plugin: Plugin{ + Name: "test", + Deprecated: false, + Info: "test", + Doc: "doc string", + Manager: ConfigureWith(nil), + }, + }, + "missing manager": { + valid: false, + plugin: Plugin{ + Name: "test", + Stability: feature.Stable, + Deprecated: false, + Info: "test", + Doc: "doc string", + }, + }, + } + + for name, test := range cases { + t.Run(name, func(t *testing.T) { + err := test.plugin.validate() + if test.valid { + expectNoError(t, err) + } else { + expectError(t, err) + } + }) + } +} diff --git a/filebeat/input/v2/simplemanager.go b/filebeat/input/v2/simplemanager.go new file mode 100644 index 00000000000..76ade85c9f5 --- /dev/null +++ b/filebeat/input/v2/simplemanager.go @@ -0,0 +1,44 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package v2 + +import ( + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/go-concert/unison" +) + +type simpleInputManager struct { + configure func(*common.Config) (Input, error) +} + +// ConfigureWith creates an InputManager that provides no extra logic and +// allows each input to fully control event collection and publishing in +// isolation. The function fn will be called for every input to be configured. +func ConfigureWith(fn func(*common.Config) (Input, error)) InputManager { + return &simpleInputManager{configure: fn} +} + +// Init is required to fullfil the input.InputManager interface. +// For the kafka input no special initialization is required. +func (*simpleInputManager) Init(grp unison.Group, m Mode) error { return nil } + +// Creates builds a new Input instance from the given configuation, or returns +// an error if the configuation is invalid. +func (manager *simpleInputManager) Create(cfg *common.Config) (Input, error) { + return manager.configure(cfg) +} diff --git a/filebeat/input/v2/util_test.go b/filebeat/input/v2/util_test.go new file mode 100644 index 00000000000..6fbe55b99d1 --- /dev/null +++ b/filebeat/input/v2/util_test.go @@ -0,0 +1,85 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package v2 + +import ( + "errors" + "testing" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/go-concert/unison" +) + +type fakeInputManager struct { + OnInit func(Mode) error + OnConfigure func(*common.Config) (Input, error) +} + +type fakeInput struct { + Type string + OnTest func(TestContext) error + OnRun func(Context, beat.PipelineConnector) error +} + +func makeConfigFakeInput(prototype fakeInput) func(*common.Config) (Input, error) { + return func(cfg *common.Config) (Input, error) { + tmp := prototype + return &tmp, nil + } +} + +func (m *fakeInputManager) Init(_ unison.Group, mode Mode) error { + if m.OnInit != nil { + return m.OnInit(mode) + } + return nil +} + +func (m *fakeInputManager) Create(cfg *common.Config) (Input, error) { + if m.OnConfigure != nil { + return m.OnConfigure(cfg) + } + return nil, errors.New("oops") +} + +func (f *fakeInput) Name() string { return f.Type } +func (f *fakeInput) Test(ctx TestContext) error { + if f.OnTest != nil { + return f.OnTest(ctx) + } + return nil +} +func (f *fakeInput) Run(ctx Context, pipeline beat.PipelineConnector) error { + if f.OnRun != nil { + return f.OnRun(ctx, pipeline) + } + return nil +} + +func expectError(t *testing.T, err error) { + if err == nil { + t.Errorf("expected error") + } +} + +func expectNoError(t *testing.T, err error) { + if err != nil { + t.Errorf("unexpected error: %v", err) + } +}