From 44f1fbfb958a85b29e94111768e148af019ceb2e Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Thu, 18 Jun 2020 13:03:41 +0200 Subject: [PATCH] Initialize input v2 API (#19158) This change adds the package filebeat/input/v2. It introduces the interfaces for the v2 input API only. The integration with filebat, actual InputManagers and input implementations will follow. This is the first PR introducing the new API. The current state of the full implementation can be seen [here](https://github.com/urso/beats/tree/fb-input-v2-combined/filebeat/input/v2) and [sample inputs based on the new API](https://github.com/urso/beats/tree/fb-input-v2-combined/filebeat/features/input). The full list of changes will include: - Introduce v2 API interfaces - Introduce [compatibility layer](https://github.com/urso/beats/tree/fb-input-v2-combined/filebeat/input/v2/compat) to integrate API with existing functionality - Introduce helpers for writing [stateless](https://github.com/urso/beats/blob/fb-input-v2-combined/filebeat/input/v2/input-stateless/stateless.go) inputs. - Introduce helpers for writing [inputs that store a state](https://github.com/urso/beats/tree/fb-input-v2-combined/filebeat/input/v2/input-cursor) between restarts. - Integrate new API with [existing inputs and modules](https://github.com/urso/beats/blob/fb-input-v2-combined/filebeat/beater/filebeat.go#L301) in filebeat. This PR only introduces the v2 API interfaces. --- filebeat/input/v2/error.go | 105 +++++++++++++++ filebeat/input/v2/input.go | 113 ++++++++++++++++ filebeat/input/v2/loader.go | 132 +++++++++++++++++++ filebeat/input/v2/loader_test.go | 203 +++++++++++++++++++++++++++++ filebeat/input/v2/mode_string.go | 42 ++++++ filebeat/input/v2/plugin.go | 92 +++++++++++++ filebeat/input/v2/plugin_test.go | 84 ++++++++++++ filebeat/input/v2/simplemanager.go | 44 +++++++ filebeat/input/v2/util_test.go | 85 ++++++++++++ 9 files changed, 900 insertions(+) create mode 100644 filebeat/input/v2/error.go create mode 100644 filebeat/input/v2/input.go create mode 100644 filebeat/input/v2/loader.go create mode 100644 filebeat/input/v2/loader_test.go create mode 100644 filebeat/input/v2/mode_string.go create mode 100644 filebeat/input/v2/plugin.go create mode 100644 filebeat/input/v2/plugin_test.go create mode 100644 filebeat/input/v2/simplemanager.go create mode 100644 filebeat/input/v2/util_test.go diff --git a/filebeat/input/v2/error.go b/filebeat/input/v2/error.go new file mode 100644 index 00000000000..4fe7a6ffa49 --- /dev/null +++ b/filebeat/input/v2/error.go @@ -0,0 +1,105 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package v2 + +import ( + "errors" + "fmt" + "strings" +) + +// LoadError is returned by Loaders in case of failures. +type LoadError struct { + // Name of input/module that failed to load (if applicable) + Name string + + // Reason why the loader failed. Can either be the cause reported by the + // Plugin or some other indicator like ErrUnknown + Reason error + + // (optional) Message to report in additon. + Message string +} + +// SetupError indicates that the loader initialization has detected +// errors in individual plugin configurations or duplicates. +type SetupError struct { + Fails []error +} + +// ErrUnknownInput indicates that the plugin type does not exist. Either +// because the 'type' setting name does not match the loaders expectations, +// or because the type is unknown. +var ErrUnknownInput = errors.New("unknown input type") + +// ErrNoInputConfigured indicates that the 'type' setting is missing. +var ErrNoInputConfigured = errors.New("no input type configured") + +// ErrPluginWithoutName reports that the operation failed because +// the plugin is required to have a Name. +var ErrPluginWithoutName = errors.New("the plugin has no name") + +// IsUnknownInputError checks if an error value indicates an input load +// error because there is no existing plugin that can create the input. +func IsUnknownInputError(err error) bool { return errors.Is(err, ErrUnknownInput) } + +func failedInputName(err error) string { + switch e := err.(type) { + case *LoadError: + return e.Name + default: + return "" + } +} + +// Unwrap returns the reason if present +func (e *LoadError) Unwrap() error { return e.Reason } + +// Error returns the errors string repesentation +func (e *LoadError) Error() string { + var buf strings.Builder + + if e.Message != "" { + buf.WriteString(e.Message) + } else if e.Name != "" { + buf.WriteString("failed to load ") + buf.WriteString(e.Name) + } + + if e.Reason != nil { + if buf.Len() > 0 { + buf.WriteString(": ") + } + fmt.Fprintf(&buf, "%v", e.Reason) + } + + if buf.Len() == 0 { + return "" + } + return buf.String() +} + +// Error returns the errors string repesentation +func (e *SetupError) Error() string { + var buf strings.Builder + buf.WriteString("invalid plugin setup found:") + for _, err := range e.Fails { + fmt.Fprintf(&buf, "\n\t%v", err) + } + return buf.String() +} diff --git a/filebeat/input/v2/input.go b/filebeat/input/v2/input.go new file mode 100644 index 00000000000..9b66856a54f --- /dev/null +++ b/filebeat/input/v2/input.go @@ -0,0 +1,113 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package v2 + +import ( + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + + "github.com/elastic/go-concert/unison" +) + +// InputManager creates and maintains actions and background processes for an +// input type. +// The InputManager is used to create inputs. The InputManager can provide +// additional functionality like coordination between input of the same type, +// custom functionality for querying or caching shared information, application +// of common settings not unique to a particular input type, or require a more +// specific Input interface to be implemented by the actual input. +type InputManager interface { + // Init signals to InputManager to initialize internal resources. + // The mode tells the input manager if the Beat is actually running the inputs or + // if inputs are only configured for testing/validation purposes. + Init(grp unison.Group, mode Mode) error + + // Creates builds a new Input instance from the given configuation, or returns + // an error if the configuation is invalid. + // The input must establish any connection for data collection yet. The Beat + // will use the Test/Run methods of the input. + Create(*common.Config) (Input, error) +} + +// Mode tells the InputManager in which mode it is initialized. +type Mode uint8 + +//go:generate stringer -type Mode -trimprefix Mode +const ( + ModeRun Mode = iota + ModeTest + ModeOther +) + +// Input is a configured input object that can be used to test or start +// the actual data collection. +type Input interface { + // Name reports the input name. + // + // XXX: check if/how we can remove this method. Currently it is required for + // compatibility reasons with existing interfaces in libbeat, autodiscovery + // and filebeat. + Name() string + + // Test checks the configuaration and runs additional checks if the Input can + // actually collect data for the given configuration (e.g. check if host/port or files are + // accessible). + Test(TestContext) error + + // Run starts the data collection. Run must return an error only if the + // error is fatal making it impossible for the input to recover. + Run(Context, beat.PipelineConnector) error +} + +// Context provides the Input Run function with common environmental +// information and services. +type Context struct { + // Logger provides a structured logger to inputs. The logger is initialized + // with labels that will identify logs for the input. + Logger *logp.Logger + + // The input ID. + ID string + + // Agent provides additional Beat info like instance ID or beat name. + Agent beat.Info + + // Cancelation is used by Beats to signal the input to shutdown. + Cancelation Canceler +} + +// TestContext provides the Input Test function with common environmental +// information and services. +type TestContext struct { + // Logger provides a structured logger to inputs. The logger is initialized + // with labels that will identify logs for the input. + Logger *logp.Logger + + // Agent provides additional Beat info like instance ID or beat name. + Agent beat.Info + + // Cancelation is used by Beats to signal the input to shutdown. + Cancelation Canceler +} + +// Canceler is used to provide shutdown handling to the Context. +type Canceler interface { + Done() <-chan struct{} + Err() error +} diff --git a/filebeat/input/v2/loader.go b/filebeat/input/v2/loader.go new file mode 100644 index 00000000000..9035e7283eb --- /dev/null +++ b/filebeat/input/v2/loader.go @@ -0,0 +1,132 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package v2 + +import ( + "fmt" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/go-concert/unison" +) + +// Loader can be used to create Inputs from configurations. +// The loader is initialized with a list of plugins, and finds the correct plugin +// when a configuration is passed to Configure. +type Loader struct { + log *logp.Logger + registry map[string]Plugin + typeField string + defaultType string +} + +// NewLoader creates a new Loader for configuring inputs from a slice if plugins. +// NewLoader returns a SetupError if invalid plugin configurations or duplicates in the slice are detected. +// The Loader will read the plugin name from the configuration object as is +// configured by typeField. If typeField is empty, it defaults to "type". +func NewLoader(log *logp.Logger, plugins []Plugin, typeField, defaultType string) (*Loader, error) { + if typeField == "" { + typeField = "type" + } + + if errs := validatePlugins(plugins); len(errs) > 0 { + return nil, &SetupError{errs} + } + + registry := make(map[string]Plugin, len(plugins)) + for _, p := range plugins { + registry[p.Name] = p + } + + return &Loader{ + log: log, + registry: registry, + typeField: typeField, + defaultType: defaultType, + }, nil +} + +// Init runs Init on all InputManagers for all plugins known to the loader. +func (l *Loader) Init(group unison.Group, mode Mode) error { + for _, p := range l.registry { + if err := p.Manager.Init(group, mode); err != nil { + return err + } + } + return nil +} + +// Configure creates a new input from a Config object. +// The loader reads the input type name from the cfg object and tries to find a +// matching plugin. If a plugin is found, the plugin it's InputManager is used to create +// the input. +// Returns a LoadError if the input name can not be read from the config or if +// the type does not exist. Error values for Ccnfiguration errors do depend on +// the InputManager. +func (l *Loader) Configure(cfg *common.Config) (Input, error) { + name, err := cfg.String(l.typeField, -1) + if err != nil { + if l.defaultType == "" { + return nil, &LoadError{ + Reason: ErrNoInputConfigured, + Message: fmt.Sprintf("%v setting is missing", l.typeField), + } + } + name = l.defaultType + } + + p, exists := l.registry[name] + if !exists { + return nil, &LoadError{Name: name, Reason: ErrUnknownInput} + } + + log := l.log.With("input", name, "stability", p.Stability, "deprecated", p.Deprecated) + switch p.Stability { + case feature.Experimental: + log.Warnf("EXPERIMENTAL: The %v input is experimental", name) + case feature.Beta: + log.Warnf("BETA: The %v input is beta", name) + } + if p.Deprecated { + log.Warnf("DEPRECATED: The %v input is deprecated", name) + } + + return p.Manager.Create(cfg) +} + +// validatePlugins checks if there are multiple plugins with the same name in +// the registry. +func validatePlugins(plugins []Plugin) []error { + var errs []error + + counts := map[string]int{} + for _, p := range plugins { + counts[p.Name]++ + if err := p.validate(); err != nil { + errs = append(errs, err) + } + } + + for name, count := range counts { + if count > 1 { + errs = append(errs, fmt.Errorf("plugin '%v' found %v times", name, count)) + } + } + return errs +} diff --git a/filebeat/input/v2/loader_test.go b/filebeat/input/v2/loader_test.go new file mode 100644 index 00000000000..72f8013c4db --- /dev/null +++ b/filebeat/input/v2/loader_test.go @@ -0,0 +1,203 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package v2 + +import ( + "errors" + "testing" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/logp" +) + +type loaderConfig struct { + Plugins []Plugin + TypeField string + DefaultType string +} + +type inputCheck func(t *testing.T, input Input, err error) + +func TestLoader_New(t *testing.T) { + cases := map[string]struct { + setup loaderConfig + check func(*testing.T, error) + }{ + "ok": { + setup: loaderConfig{ + Plugins: []Plugin{ + {Name: "a", Stability: feature.Stable, Manager: ConfigureWith(nil)}, + {Name: "b", Stability: feature.Stable, Manager: ConfigureWith(nil)}, + {Name: "c", Stability: feature.Stable, Manager: ConfigureWith(nil)}, + }, + }, + check: expectNoError, + }, + "duplicate": { + setup: loaderConfig{ + Plugins: []Plugin{ + {Name: "a", Stability: feature.Stable, Manager: ConfigureWith(nil)}, + {Name: "a", Stability: feature.Stable, Manager: ConfigureWith(nil)}, + }, + }, + check: expectError, + }, + "fail with invalid plugin": { + setup: loaderConfig{ + Plugins: []Plugin{{Name: "", Manager: nil}}, + }, + check: expectError, + }, + } + + for name, test := range cases { + t.Run(name, func(t *testing.T) { + _, err := test.setup.NewLoader() + test.check(t, err) + }) + } +} + +func TestLoader_Init(t *testing.T) { + pluginWithInit := func(name string, fn func(Mode) error) Plugin { + return Plugin{ + Name: name, + Stability: feature.Stable, + Manager: &fakeInputManager{OnInit: fn}, + } + } + + t.Run("calls all input managers", func(t *testing.T) { + count := 0 + incCountOnInit := func(_ Mode) error { count++; return nil } + + setup := loaderConfig{ + Plugins: []Plugin{ + pluginWithInit("a", incCountOnInit), + pluginWithInit("b", incCountOnInit), + }, + } + loader := setup.MustNewLoader() + err := loader.Init(nil, ModeRun) + expectNoError(t, err) + if count != 2 { + t.Errorf("expected init count 2, but got %v", count) + } + }) + + t.Run("stop init on error", func(t *testing.T) { + count := 0 + incCountOnInit := func(_ Mode) error { count++; return errors.New("oops") } + setup := loaderConfig{ + Plugins: []Plugin{ + pluginWithInit("a", incCountOnInit), + pluginWithInit("b", incCountOnInit), + }, + } + loader := setup.MustNewLoader() + err := loader.Init(nil, ModeRun) + expectError(t, err) + if count != 1 { + t.Errorf("expected init count 1, but got %v", count) + } + }) +} + +func TestLoader_Configure(t *testing.T) { + createManager := func(name string) InputManager { + return ConfigureWith(makeConfigFakeInput(fakeInput{Type: name})) + } + createPlugin := func(name string) Plugin { + return Plugin{Name: name, Stability: feature.Stable, Manager: createManager(name)} + } + plugins := []Plugin{ + createPlugin("a"), + createPlugin("b"), + createPlugin("c"), + } + defaultSetup := loaderConfig{Plugins: plugins, TypeField: "type"} + + cases := map[string]struct { + setup loaderConfig + config map[string]interface{} + check inputCheck + }{ + "success": { + setup: defaultSetup, + config: map[string]interface{}{"type": "a"}, + check: okSetup, + }, + "load default": { + setup: defaultSetup.WithDefaultType("a"), + config: map[string]interface{}{}, + check: okSetup, + }, + "type is missing": { + setup: defaultSetup, + config: map[string]interface{}{}, + check: failSetup, + }, + "unknown type": { + setup: defaultSetup, + config: map[string]interface{}{"type": "unknown"}, + check: failSetup, + }, + "input config fails": { + setup: defaultSetup.WithPlugins(Plugin{ + Name: "a", + Stability: feature.Beta, + Manager: ConfigureWith(func(_ *common.Config) (Input, error) { + return nil, errors.New("oops") + }), + }), + config: map[string]interface{}{"type": "a"}, + check: failSetup, + }, + } + + for name, test := range cases { + t.Run(name, func(t *testing.T) { + loader := test.setup.MustNewLoader() + input, err := loader.Configure(common.MustNewConfigFrom(test.config)) + test.check(t, input, err) + }) + } +} + +func (b loaderConfig) MustNewLoader() *Loader { + l, err := b.NewLoader() + if err != nil { + panic(err) + } + return l +} +func (b loaderConfig) NewLoader() (*Loader, error) { + return NewLoader(logp.NewLogger("test"), b.Plugins, b.TypeField, b.DefaultType) +} +func (b loaderConfig) WithPlugins(p ...Plugin) loaderConfig { b.Plugins = p; return b } +func (b loaderConfig) WithTypeField(name string) loaderConfig { b.TypeField = name; return b } +func (b loaderConfig) WithDefaultType(name string) loaderConfig { b.DefaultType = name; return b } + +func failSetup(t *testing.T, _ Input, err error) { + expectError(t, err) +} + +func okSetup(t *testing.T, _ Input, err error) { + expectNoError(t, err) +} diff --git a/filebeat/input/v2/mode_string.go b/filebeat/input/v2/mode_string.go new file mode 100644 index 00000000000..330b15f3a80 --- /dev/null +++ b/filebeat/input/v2/mode_string.go @@ -0,0 +1,42 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Code generated by "stringer -type Mode -trimprefix Mode"; DO NOT EDIT. + +package v2 + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[ModeRun-0] + _ = x[ModeTest-1] + _ = x[ModeOther-2] +} + +const _Mode_name = "RunTestOther" + +var _Mode_index = [...]uint8{0, 3, 7, 12} + +func (i Mode) String() string { + if i >= Mode(len(_Mode_index)-1) { + return "Mode(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _Mode_name[_Mode_index[i]:_Mode_index[i+1]] +} diff --git a/filebeat/input/v2/plugin.go b/filebeat/input/v2/plugin.go new file mode 100644 index 00000000000..81084976ebd --- /dev/null +++ b/filebeat/input/v2/plugin.go @@ -0,0 +1,92 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package v2 + +import ( + "fmt" + + "github.com/elastic/beats/v7/libbeat/feature" +) + +// Plugin describes an input type. Input types should provide a constructor +// function that requires dependencies to be passed and fills out the Plugin structure. +// The Manager is used to finally create and manage inputs of the same type. +// The input-stateless and input-cursor packages, as well as the ConfigureWith function provide +// sample input managers. +// +// Example (stateless input): +// +// func Plugin() input.Plugin { +// return input.Plugin{ +// Name: "myservice", +// Stability: feature.Stable, +// Deprecated: false, +// Info: "collect data from myservice", +// Manager: stateless.NewInputManager(configure), +// } +// } +// +type Plugin struct { + // Name of the input type. + Name string + + // Configure the input stability. If the stability is not 'Stable' a message + // is logged when the input type is configured. + Stability feature.Stability + + // Deprecated marks the plugin as deprecated. If set a deprecation message is logged if + // an input is configured. + Deprecated bool + + // Info contains a short description of the input type. + Info string + + // Doc contains an optional longer description. + Doc string + + // Manager MUST be configured. The manager is used to create the inputs. + Manager InputManager +} + +// Details returns a generic feature description that is compatible with the +// feature package. +func (p Plugin) Details() feature.Details { + return feature.Details{ + Name: p.Name, + Stability: p.Stability, + Deprecated: p.Deprecated, + Info: p.Info, + Doc: p.Doc, + } +} + +func (p Plugin) validate() error { + if p.Name == "" { + return fmt.Errorf("input plugin without name found") + } + switch p.Stability { + case feature.Beta, feature.Experimental, feature.Stable: + break + default: + return fmt.Errorf("plugin '%v' has stability not set", p.Name) + } + if p.Manager == nil { + return fmt.Errorf("invalid plugin (%v) structure detected", p.Name) + } + return nil +} diff --git a/filebeat/input/v2/plugin_test.go b/filebeat/input/v2/plugin_test.go new file mode 100644 index 00000000000..6a111736db2 --- /dev/null +++ b/filebeat/input/v2/plugin_test.go @@ -0,0 +1,84 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package v2 + +import ( + "testing" + + "github.com/elastic/beats/v7/libbeat/feature" +) + +func TestPlugin_Validate(t *testing.T) { + cases := map[string]struct { + valid bool + plugin Plugin + }{ + "valid": { + valid: true, + plugin: Plugin{ + Name: "test", + Stability: feature.Stable, + Deprecated: false, + Info: "test", + Doc: "doc string", + Manager: ConfigureWith(nil), + }, + }, + "missing name": { + valid: false, + plugin: Plugin{ + Stability: feature.Stable, + Deprecated: false, + Info: "test", + Doc: "doc string", + Manager: ConfigureWith(nil), + }, + }, + "invalid stability": { + valid: false, + plugin: Plugin{ + Name: "test", + Deprecated: false, + Info: "test", + Doc: "doc string", + Manager: ConfigureWith(nil), + }, + }, + "missing manager": { + valid: false, + plugin: Plugin{ + Name: "test", + Stability: feature.Stable, + Deprecated: false, + Info: "test", + Doc: "doc string", + }, + }, + } + + for name, test := range cases { + t.Run(name, func(t *testing.T) { + err := test.plugin.validate() + if test.valid { + expectNoError(t, err) + } else { + expectError(t, err) + } + }) + } +} diff --git a/filebeat/input/v2/simplemanager.go b/filebeat/input/v2/simplemanager.go new file mode 100644 index 00000000000..76ade85c9f5 --- /dev/null +++ b/filebeat/input/v2/simplemanager.go @@ -0,0 +1,44 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package v2 + +import ( + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/go-concert/unison" +) + +type simpleInputManager struct { + configure func(*common.Config) (Input, error) +} + +// ConfigureWith creates an InputManager that provides no extra logic and +// allows each input to fully control event collection and publishing in +// isolation. The function fn will be called for every input to be configured. +func ConfigureWith(fn func(*common.Config) (Input, error)) InputManager { + return &simpleInputManager{configure: fn} +} + +// Init is required to fullfil the input.InputManager interface. +// For the kafka input no special initialization is required. +func (*simpleInputManager) Init(grp unison.Group, m Mode) error { return nil } + +// Creates builds a new Input instance from the given configuation, or returns +// an error if the configuation is invalid. +func (manager *simpleInputManager) Create(cfg *common.Config) (Input, error) { + return manager.configure(cfg) +} diff --git a/filebeat/input/v2/util_test.go b/filebeat/input/v2/util_test.go new file mode 100644 index 00000000000..6fbe55b99d1 --- /dev/null +++ b/filebeat/input/v2/util_test.go @@ -0,0 +1,85 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package v2 + +import ( + "errors" + "testing" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/go-concert/unison" +) + +type fakeInputManager struct { + OnInit func(Mode) error + OnConfigure func(*common.Config) (Input, error) +} + +type fakeInput struct { + Type string + OnTest func(TestContext) error + OnRun func(Context, beat.PipelineConnector) error +} + +func makeConfigFakeInput(prototype fakeInput) func(*common.Config) (Input, error) { + return func(cfg *common.Config) (Input, error) { + tmp := prototype + return &tmp, nil + } +} + +func (m *fakeInputManager) Init(_ unison.Group, mode Mode) error { + if m.OnInit != nil { + return m.OnInit(mode) + } + return nil +} + +func (m *fakeInputManager) Create(cfg *common.Config) (Input, error) { + if m.OnConfigure != nil { + return m.OnConfigure(cfg) + } + return nil, errors.New("oops") +} + +func (f *fakeInput) Name() string { return f.Type } +func (f *fakeInput) Test(ctx TestContext) error { + if f.OnTest != nil { + return f.OnTest(ctx) + } + return nil +} +func (f *fakeInput) Run(ctx Context, pipeline beat.PipelineConnector) error { + if f.OnRun != nil { + return f.OnRun(ctx, pipeline) + } + return nil +} + +func expectError(t *testing.T, err error) { + if err == nil { + t.Errorf("expected error") + } +} + +func expectNoError(t *testing.T, err error) { + if err != nil { + t.Errorf("unexpected error: %v", err) + } +}