Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] Select output index based on the source input #14010

Merged
merged 21 commits into from
Nov 7, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions filebeat/channel/wrapped.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package channel

import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
)

// A wrapper around a generic Outleter that applies the given transformation to
// incoming events before forwarding them.
type wrappedOutlet struct {
outlet Outleter
eventTransform func(beat.Event)
}

// WrapOutlet takes an Outleter and an event transformation function and
// returns an Outleter that applies that transformation before forwarding them.
// The transformation operates in place (it modifies its input events).
// The new Outleter uses the same underlying state, e.g. calling Close on the
// wrapped Outleter will close the original as well. If this is not the intent,
// call SubOutlet first.
func WrapOutlet(outlet Outleter, eventTransform func(beat.Event)) Outleter {
return &wrappedOutlet{outlet: outlet, eventTransform: eventTransform}
}

func (o *wrappedOutlet) Close() error {
return o.outlet.Close()
}

func (o *wrappedOutlet) Done() <-chan struct{} {
return o.outlet.Done()
}

func (o *wrappedOutlet) OnEvent(event beat.Event) bool {
// Mutate the event then pass it on.
o.eventTransform(event)
return o.outlet.OnEvent(event)
}

// A wrapper around a generic Outleter that produces Outleters that apply the
// given transformation to incoming events before sending them.
type wrappedConnector struct {
connector Connector
eventTransform func(beat.Event)
}

func (c *wrappedConnector) Connect(conf *common.Config) (Outleter, error) {
outleter, err := c.connector.Connect(conf)
if err != nil {
return outleter, err
}
return WrapOutlet(outleter, c.eventTransform), nil
}

func (c *wrappedConnector) ConnectWith(
conf *common.Config, clientConf beat.ClientConfig,
) (Outleter, error) {
outleter, err := c.connector.ConnectWith(conf, clientConf)
if err != nil {
return outleter, err
}
return WrapOutlet(outleter, c.eventTransform), nil
}

// WrapConnector takes a Connector and an event transformation function and
// returns a new Connector whose generated Outleters apply the given
// transformation to incoming events before forwarding them.
func WrapConnector(
connector Connector, eventTransform func(beat.Event),
) Connector {
return &wrappedConnector{connector: connector, eventTransform: eventTransform}
}
1 change: 1 addition & 0 deletions filebeat/input/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type inputConfig struct {
ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"`
Type string `config:"type"`
InputType string `config:"input_type"`
Index string `config:"index"`
}

func (c *inputConfig) Validate() error {
Expand Down
21 changes: 19 additions & 2 deletions filebeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
)
Expand Down Expand Up @@ -60,7 +62,7 @@ type Runner struct {
// New instantiates a new Runner
func New(
conf *common.Config,
outlet channel.Connector,
connector channel.Connector,
beatDone chan struct{},
states []file.State,
dynFields *common.MapStrPointer,
Expand Down Expand Up @@ -91,6 +93,21 @@ func New(
return input, err
}

// If there is an input-level index setting, pass it through to event.Meta
// via the input's Connector.
if input.config.Index != "" {
indexPattern, err := fmtstr.CompileEvent(input.config.Index)
if err != nil {
return input, err
}
connector = channel.WrapConnector(connector, func(event beat.Event) {
if event.Meta == nil {
event.Meta = common.MapStr{}
}
event.Meta["index-pattern"] = indexPattern
})
}

context := Context{
States: states,
Done: input.done,
Expand All @@ -99,7 +116,7 @@ func New(
Meta: nil,
}
var ipt Input
ipt, err = f(conf, outlet, context)
ipt, err = f(conf, connector, context)
if err != nil {
return input, err
}
Expand Down
71 changes: 62 additions & 9 deletions libbeat/idxmgmt/std.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ package idxmgmt
import (
"errors"
"fmt"
"time"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/atomic"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/idxmgmt/ilm"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
Expand Down Expand Up @@ -54,12 +56,16 @@ type indexManager struct {
assets Asseter
}

type indexSelector outil.Selector
type indexSelector struct {
sel outil.Selector
beatInfo beat.Info
}

type ilmIndexSelector struct {
index outil.Selector
alias outil.Selector
st *indexState
index outil.Selector
alias outil.Selector
st *indexState
beatInfo beat.Info
}

type componentType uint8
Expand Down Expand Up @@ -201,7 +207,7 @@ func (s *indexSupport) BuildSelector(cfg *common.Config) (outputs.IndexSelector,
}

if mode != ilm.ModeAuto {
return indexSelector(indexSel), nil
return indexSelector{indexSel, s.info}, nil
}

selCfg.SetString("index", -1, alias)
Expand Down Expand Up @@ -321,7 +327,7 @@ func (m *indexManager) setupWithILM() (bool, error) {
}

func (s *ilmIndexSelector) Select(evt *beat.Event) (string, error) {
if idx := getEventCustomIndex(evt); idx != "" {
if idx := getEventCustomIndex(evt, s.beatInfo); idx != "" {
return idx, nil
}

Expand All @@ -335,13 +341,13 @@ func (s *ilmIndexSelector) Select(evt *beat.Event) (string, error) {
}

func (s indexSelector) Select(evt *beat.Event) (string, error) {
if idx := getEventCustomIndex(evt); idx != "" {
if idx := getEventCustomIndex(evt, s.beatInfo); idx != "" {
return idx, nil
}
return outil.Selector(s).Select(evt)
return s.sel.Select(evt)
}

func getEventCustomIndex(evt *beat.Event) string {
func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string {
if len(evt.Meta) == 0 {
return ""
}
Expand All @@ -360,9 +366,56 @@ func getEventCustomIndex(evt *beat.Event) string {
}
}

// index-pattern expands the index as formatted text rather than a fixed
// or dated string. This was introduced to support the `Index` configuration
// field on Filebeat inputs, so it's essentially a workaround to communicate
// a specific output behavior to libbeat based on the originating input
// (which as implemented now is a Filebeat-specific concept).
// If we find ourselves adding additional backchannels like this in Meta,
// we should probably add a more formal (non-MapStr) configuration struct to
// beat.Event that inputs can use to modulate output behavior.
if tmp := evt.Meta["index-pattern"]; tmp != nil {
if pattern, ok := tmp.(*fmtstr.EventFormatString); ok {
idx, err := expandIndexPattern(pattern, evt.Timestamp, beatInfo)
if err == nil {
return idx
}
// Should we log a warning here somehow?
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm... where is alias used? The name looks very specific to some use-case. I guess it was added to work around the timestamp addition when index is used. Should we go with 'alias' or 'raw-index' in general?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know who might be using alias or how, which is why I was reluctant to overload it (that, and because of the potentially confusing name)... my preference is for raw-index or something similar (raw-index-name?) since the functional behavior is independent of whether aliases are being used, though I could be convinced to go with alias instead to avoid proliferating magic Metadata fields...

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we need to grep the source. I guess this comes from Metricbeat or Heartbeat.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is odd I don't remember that?

Copy link

@urso urso Oct 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I copied it from the original ILM PR. It was introduced in 7.0. Not sure if anyone still uses it. Removing could break things. We should at least create a follow up issue (technical debt) to investigate and remove it in the future.

+1 on not relying on alias here.


return ""
}

// Expand the given pattern string as a formatted text field with access to
// the event's agent and timestamp.
// This helper mimicks applyStaticFmtstr in ilm.go, creating a placeholder
// event for the restricted set of fields we allow here. It might be worth
// making this a shared helper function or otherwise specializing for this case.
func expandIndexPattern(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have this pattern in a few places. When 7.0 was in the making, devs did forget to adapt one or the other place. Maybe we can provide a common formatter type/instance that can provide these default fields?

pattern *fmtstr.EventFormatString, timestamp time.Time, info beat.Info,
) (string, error) {
return pattern.Run(&beat.Event{
Fields: common.MapStr{
// beat object was left in for backward compatibility reason for older configs.
"beat": common.MapStr{
"name": info.Beat,
"version": info.Version,
},
"agent": common.MapStr{
"name": info.Beat,
"version": info.Version,
},
// For the Beats that have an observer role
"observer": common.MapStr{
"name": info.Beat,
"version": info.Version,
},
},
Timestamp: time.Now(),
})
}

func unpackTemplateConfig(cfg *common.Config) (config template.TemplateConfig, err error) {
config = template.DefaultConfig()
if cfg != nil {
Expand Down