Skip to content

Commit

Permalink
Initialize input v2 API (elastic#19158)
Browse files Browse the repository at this point in the history
This change adds the package filebeat/input/v2. It introduces the
interfaces for the v2 input API only. The integration with filebat,
actual InputManagers and input implementations will follow.

This is the first PR introducing the new API. The current state of the full implementation can be seen [here](https://github.com/urso/beats/tree/fb-input-v2-combined/filebeat/input/v2) and [sample inputs based on the new API](https://github.com/urso/beats/tree/fb-input-v2-combined/filebeat/features/input).

The full list of changes will include:
- Introduce v2 API interfaces
- Introduce [compatibility layer](https://github.com/urso/beats/tree/fb-input-v2-combined/filebeat/input/v2/compat) to integrate API with existing functionality 
- Introduce helpers for writing [stateless](https://github.com/urso/beats/blob/fb-input-v2-combined/filebeat/input/v2/input-stateless/stateless.go) inputs.
- Introduce helpers for writing [inputs that store a state](https://github.com/urso/beats/tree/fb-input-v2-combined/filebeat/input/v2/input-cursor) between restarts.
- Integrate new API with [existing inputs and modules](https://github.com/urso/beats/blob/fb-input-v2-combined/filebeat/beater/filebeat.go#L301) in filebeat.

This PR only introduces the v2 API interfaces.
  • Loading branch information
Steffen Siering authored and melchiormoulin committed Oct 14, 2020
1 parent 58ac775 commit 44f1fbf
Show file tree
Hide file tree
Showing 9 changed files with 900 additions and 0 deletions.
105 changes: 105 additions & 0 deletions filebeat/input/v2/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package v2

import (
"errors"
"fmt"
"strings"
)

// LoadError is returned by Loaders in case of failures.
type LoadError struct {
// Name of input/module that failed to load (if applicable)
Name string

// Reason why the loader failed. Can either be the cause reported by the
// Plugin or some other indicator like ErrUnknown
Reason error

// (optional) Message to report in additon.
Message string
}

// SetupError indicates that the loader initialization has detected
// errors in individual plugin configurations or duplicates.
type SetupError struct {
Fails []error
}

// ErrUnknownInput indicates that the plugin type does not exist. Either
// because the 'type' setting name does not match the loaders expectations,
// or because the type is unknown.
var ErrUnknownInput = errors.New("unknown input type")

// ErrNoInputConfigured indicates that the 'type' setting is missing.
var ErrNoInputConfigured = errors.New("no input type configured")

// ErrPluginWithoutName reports that the operation failed because
// the plugin is required to have a Name.
var ErrPluginWithoutName = errors.New("the plugin has no name")

// IsUnknownInputError checks if an error value indicates an input load
// error because there is no existing plugin that can create the input.
func IsUnknownInputError(err error) bool { return errors.Is(err, ErrUnknownInput) }

func failedInputName(err error) string {
switch e := err.(type) {
case *LoadError:
return e.Name
default:
return ""
}
}

// Unwrap returns the reason if present
func (e *LoadError) Unwrap() error { return e.Reason }

// Error returns the errors string repesentation
func (e *LoadError) Error() string {
var buf strings.Builder

if e.Message != "" {
buf.WriteString(e.Message)
} else if e.Name != "" {
buf.WriteString("failed to load ")
buf.WriteString(e.Name)
}

if e.Reason != nil {
if buf.Len() > 0 {
buf.WriteString(": ")
}
fmt.Fprintf(&buf, "%v", e.Reason)
}

if buf.Len() == 0 {
return "<loader error>"
}
return buf.String()
}

// Error returns the errors string repesentation
func (e *SetupError) Error() string {
var buf strings.Builder
buf.WriteString("invalid plugin setup found:")
for _, err := range e.Fails {
fmt.Fprintf(&buf, "\n\t%v", err)
}
return buf.String()
}
113 changes: 113 additions & 0 deletions filebeat/input/v2/input.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package v2

import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"

"github.com/elastic/go-concert/unison"
)

// InputManager creates and maintains actions and background processes for an
// input type.
// The InputManager is used to create inputs. The InputManager can provide
// additional functionality like coordination between input of the same type,
// custom functionality for querying or caching shared information, application
// of common settings not unique to a particular input type, or require a more
// specific Input interface to be implemented by the actual input.
type InputManager interface {
// Init signals to InputManager to initialize internal resources.
// The mode tells the input manager if the Beat is actually running the inputs or
// if inputs are only configured for testing/validation purposes.
Init(grp unison.Group, mode Mode) error

// Creates builds a new Input instance from the given configuation, or returns
// an error if the configuation is invalid.
// The input must establish any connection for data collection yet. The Beat
// will use the Test/Run methods of the input.
Create(*common.Config) (Input, error)
}

// Mode tells the InputManager in which mode it is initialized.
type Mode uint8

//go:generate stringer -type Mode -trimprefix Mode
const (
ModeRun Mode = iota
ModeTest
ModeOther
)

// Input is a configured input object that can be used to test or start
// the actual data collection.
type Input interface {
// Name reports the input name.
//
// XXX: check if/how we can remove this method. Currently it is required for
// compatibility reasons with existing interfaces in libbeat, autodiscovery
// and filebeat.
Name() string

// Test checks the configuaration and runs additional checks if the Input can
// actually collect data for the given configuration (e.g. check if host/port or files are
// accessible).
Test(TestContext) error

// Run starts the data collection. Run must return an error only if the
// error is fatal making it impossible for the input to recover.
Run(Context, beat.PipelineConnector) error
}

// Context provides the Input Run function with common environmental
// information and services.
type Context struct {
// Logger provides a structured logger to inputs. The logger is initialized
// with labels that will identify logs for the input.
Logger *logp.Logger

// The input ID.
ID string

// Agent provides additional Beat info like instance ID or beat name.
Agent beat.Info

// Cancelation is used by Beats to signal the input to shutdown.
Cancelation Canceler
}

// TestContext provides the Input Test function with common environmental
// information and services.
type TestContext struct {
// Logger provides a structured logger to inputs. The logger is initialized
// with labels that will identify logs for the input.
Logger *logp.Logger

// Agent provides additional Beat info like instance ID or beat name.
Agent beat.Info

// Cancelation is used by Beats to signal the input to shutdown.
Cancelation Canceler
}

// Canceler is used to provide shutdown handling to the Context.
type Canceler interface {
Done() <-chan struct{}
Err() error
}
132 changes: 132 additions & 0 deletions filebeat/input/v2/loader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package v2

import (
"fmt"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/go-concert/unison"
)

// Loader can be used to create Inputs from configurations.
// The loader is initialized with a list of plugins, and finds the correct plugin
// when a configuration is passed to Configure.
type Loader struct {
log *logp.Logger
registry map[string]Plugin
typeField string
defaultType string
}

// NewLoader creates a new Loader for configuring inputs from a slice if plugins.
// NewLoader returns a SetupError if invalid plugin configurations or duplicates in the slice are detected.
// The Loader will read the plugin name from the configuration object as is
// configured by typeField. If typeField is empty, it defaults to "type".
func NewLoader(log *logp.Logger, plugins []Plugin, typeField, defaultType string) (*Loader, error) {
if typeField == "" {
typeField = "type"
}

if errs := validatePlugins(plugins); len(errs) > 0 {
return nil, &SetupError{errs}
}

registry := make(map[string]Plugin, len(plugins))
for _, p := range plugins {
registry[p.Name] = p
}

return &Loader{
log: log,
registry: registry,
typeField: typeField,
defaultType: defaultType,
}, nil
}

// Init runs Init on all InputManagers for all plugins known to the loader.
func (l *Loader) Init(group unison.Group, mode Mode) error {
for _, p := range l.registry {
if err := p.Manager.Init(group, mode); err != nil {
return err
}
}
return nil
}

// Configure creates a new input from a Config object.
// The loader reads the input type name from the cfg object and tries to find a
// matching plugin. If a plugin is found, the plugin it's InputManager is used to create
// the input.
// Returns a LoadError if the input name can not be read from the config or if
// the type does not exist. Error values for Ccnfiguration errors do depend on
// the InputManager.
func (l *Loader) Configure(cfg *common.Config) (Input, error) {
name, err := cfg.String(l.typeField, -1)
if err != nil {
if l.defaultType == "" {
return nil, &LoadError{
Reason: ErrNoInputConfigured,
Message: fmt.Sprintf("%v setting is missing", l.typeField),
}
}
name = l.defaultType
}

p, exists := l.registry[name]
if !exists {
return nil, &LoadError{Name: name, Reason: ErrUnknownInput}
}

log := l.log.With("input", name, "stability", p.Stability, "deprecated", p.Deprecated)
switch p.Stability {
case feature.Experimental:
log.Warnf("EXPERIMENTAL: The %v input is experimental", name)
case feature.Beta:
log.Warnf("BETA: The %v input is beta", name)
}
if p.Deprecated {
log.Warnf("DEPRECATED: The %v input is deprecated", name)
}

return p.Manager.Create(cfg)
}

// validatePlugins checks if there are multiple plugins with the same name in
// the registry.
func validatePlugins(plugins []Plugin) []error {
var errs []error

counts := map[string]int{}
for _, p := range plugins {
counts[p.Name]++
if err := p.validate(); err != nil {
errs = append(errs, err)
}
}

for name, count := range counts {
if count > 1 {
errs = append(errs, fmt.Errorf("plugin '%v' found %v times", name, count))
}
}
return errs
}
Loading

0 comments on commit 44f1fbf

Please sign in to comment.