From e243d96d4354e7fc8db8515faefa58fe8f1f8238 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 9 Oct 2019 16:28:48 -0400 Subject: [PATCH 01/19] Define wrapped Connectors and Outleters and use them to pass through input.index config --- filebeat/channel/wrapped.go | 71 +++++++++++++++++++++++++++++++++++++ filebeat/input/config.go | 1 + filebeat/input/input.go | 13 +++++-- 3 files changed, 83 insertions(+), 2 deletions(-) create mode 100644 filebeat/channel/wrapped.go diff --git a/filebeat/channel/wrapped.go b/filebeat/channel/wrapped.go new file mode 100644 index 00000000000..eb689880498 --- /dev/null +++ b/filebeat/channel/wrapped.go @@ -0,0 +1,71 @@ +package channel + +import ( + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" +) + +// A wrapper around a generic Outleter that applies the given transformation to +// incoming events before forwarding them. +type wrappedOutlet struct { + outlet Outleter + eventTransform func(beat.Event) +} + +// WrapOutlet takes an Outleter and an event transformation function and +// returns an Outleter that applies that transformation before forwarding them. +// The transformation operates in place (it modifies its input events). +// The new Outleter uses the same underlying state, e.g. calling Close on the +// wrapped Outleter will close the original as well. If this is not the intent, +// call SubOutlet first. +func WrapOutlet(outlet Outleter, eventTransform func(beat.Event)) Outleter { + return &wrappedOutlet{outlet: outlet, eventTransform: eventTransform} +} + +func (o *wrappedOutlet) Close() error { + return o.outlet.Close() +} + +func (o *wrappedOutlet) Done() <-chan struct{} { + return o.outlet.Done() +} + +func (o *wrappedOutlet) OnEvent(event beat.Event) bool { + // Mutate the event then pass it on. + o.eventTransform(event) + return o.outlet.OnEvent(event) +} + +// A wrapper around a generic Outleter that produces Outleters that apply the +// given transformation to incoming events before sending them. +type wrappedConnector struct { + connector Connector + eventTransform func(beat.Event) +} + +func (c *wrappedConnector) Connect(conf *common.Config) (Outleter, error) { + outleter, err := c.connector.Connect(conf) + if err != nil { + return outleter, err + } + return WrapOutlet(outleter, c.eventTransform), nil +} + +func (c *wrappedConnector) ConnectWith( + conf *common.Config, clientConf beat.ClientConfig, +) (Outleter, error) { + outleter, err := c.connector.ConnectWith(conf, clientConf) + if err != nil { + return outleter, err + } + return WrapOutlet(outleter, c.eventTransform), nil +} + +// WrapConnector takes a Connector and an event transformation function and +// returns a new Connector whose generated Outleters apply the given +// transformation to incoming events before forwarding them. +func WrapConnector( + connector Connector, eventTransform func(beat.Event), +) Connector { + return &wrappedConnector{connector: connector, eventTransform: eventTransform} +} diff --git a/filebeat/input/config.go b/filebeat/input/config.go index 750b370419b..72c107c9737 100644 --- a/filebeat/input/config.go +++ b/filebeat/input/config.go @@ -35,6 +35,7 @@ type inputConfig struct { ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"` Type string `config:"type"` InputType string `config:"input_type"` + Index string `config:"index"` } func (c *inputConfig) Validate() error { diff --git a/filebeat/input/input.go b/filebeat/input/input.go index 98eea51db8a..f624427f4a6 100644 --- a/filebeat/input/input.go +++ b/filebeat/input/input.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/filebeat/channel" "github.com/elastic/beats/filebeat/input/file" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" @@ -60,7 +61,7 @@ type Runner struct { // New instantiates a new Runner func New( conf *common.Config, - outlet channel.Connector, + connector channel.Connector, beatDone chan struct{}, states []file.State, dynFields *common.MapStrPointer, @@ -91,6 +92,14 @@ func New( return input, err } + // If there is an input-level index setting, pass it through to event.Meta + // via the input's Connector. + if input.config.Index != "" { + connector = channel.WrapConnector(connector, func(event beat.Event) { + event.Meta["index"] = input.config.Index + }) + } + context := Context{ States: states, Done: input.done, @@ -99,7 +108,7 @@ func New( Meta: nil, } var ipt Input - ipt, err = f(conf, outlet, context) + ipt, err = f(conf, connector, context) if err != nil { return input, err } From de02aeba7eef416c8a30bb6d97f4c0ee22ad2190 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 10 Oct 2019 17:17:50 -0400 Subject: [PATCH 02/19] Add string formatting to the input index field --- filebeat/input/input.go | 10 +++++- libbeat/idxmgmt/std.go | 71 +++++++++++++++++++++++++++++++++++------ 2 files changed, 71 insertions(+), 10 deletions(-) diff --git a/filebeat/input/input.go b/filebeat/input/input.go index f624427f4a6..c31eedde3fd 100644 --- a/filebeat/input/input.go +++ b/filebeat/input/input.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/fmtstr" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" ) @@ -95,8 +96,15 @@ func New( // If there is an input-level index setting, pass it through to event.Meta // via the input's Connector. if input.config.Index != "" { + indexPattern, err := fmtstr.CompileEvent(input.config.Index) + if err != nil { + return input, err + } connector = channel.WrapConnector(connector, func(event beat.Event) { - event.Meta["index"] = input.config.Index + if event.Meta == nil { + event.Meta = common.MapStr{} + } + event.Meta["index-pattern"] = indexPattern }) } diff --git a/libbeat/idxmgmt/std.go b/libbeat/idxmgmt/std.go index 13d1a201fcf..14fe9b93db0 100644 --- a/libbeat/idxmgmt/std.go +++ b/libbeat/idxmgmt/std.go @@ -20,10 +20,12 @@ package idxmgmt import ( "errors" "fmt" + "time" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/atomic" + "github.com/elastic/beats/libbeat/common/fmtstr" "github.com/elastic/beats/libbeat/idxmgmt/ilm" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" @@ -54,12 +56,16 @@ type indexManager struct { assets Asseter } -type indexSelector outil.Selector +type indexSelector struct { + sel outil.Selector + beatInfo beat.Info +} type ilmIndexSelector struct { - index outil.Selector - alias outil.Selector - st *indexState + index outil.Selector + alias outil.Selector + st *indexState + beatInfo beat.Info } type componentType uint8 @@ -201,7 +207,7 @@ func (s *indexSupport) BuildSelector(cfg *common.Config) (outputs.IndexSelector, } if mode != ilm.ModeAuto { - return indexSelector(indexSel), nil + return indexSelector{indexSel, s.info}, nil } selCfg.SetString("index", -1, alias) @@ -321,7 +327,7 @@ func (m *indexManager) setupWithILM() (bool, error) { } func (s *ilmIndexSelector) Select(evt *beat.Event) (string, error) { - if idx := getEventCustomIndex(evt); idx != "" { + if idx := getEventCustomIndex(evt, s.beatInfo); idx != "" { return idx, nil } @@ -335,13 +341,13 @@ func (s *ilmIndexSelector) Select(evt *beat.Event) (string, error) { } func (s indexSelector) Select(evt *beat.Event) (string, error) { - if idx := getEventCustomIndex(evt); idx != "" { + if idx := getEventCustomIndex(evt, s.beatInfo); idx != "" { return idx, nil } - return outil.Selector(s).Select(evt) + return s.sel.Select(evt) } -func getEventCustomIndex(evt *beat.Event) string { +func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string { if len(evt.Meta) == 0 { return "" } @@ -360,9 +366,56 @@ func getEventCustomIndex(evt *beat.Event) string { } } + // index-pattern expands the index as formatted text rather than a fixed + // or dated string. This was introduced to support the `Index` configuration + // field on Filebeat inputs, so it's essentially a workaround to communicate + // a specific output behavior to libbeat based on the originating input + // (which as implemented now is a Filebeat-specific concept). + // If we find ourselves adding additional backchannels like this in Meta, + // we should probably add a more formal (non-MapStr) configuration struct to + // beat.Event that inputs can use to modulate output behavior. + if tmp := evt.Meta["index-pattern"]; tmp != nil { + if pattern, ok := tmp.(*fmtstr.EventFormatString); ok { + idx, err := expandIndexPattern(pattern, evt.Timestamp, beatInfo) + if err == nil { + return idx + } + // Should we log a warning here somehow? + } + } + return "" } +// Expand the given pattern string as a formatted text field with access to +// the event's agent and timestamp. +// This helper mimicks applyStaticFmtstr in ilm.go, creating a placeholder +// event for the restricted set of fields we allow here. It might be worth +// making this a shared helper function or otherwise specializing for this case. +func expandIndexPattern( + pattern *fmtstr.EventFormatString, timestamp time.Time, info beat.Info, +) (string, error) { + return pattern.Run(&beat.Event{ + Fields: common.MapStr{ + // beat object was left in for backward compatibility reason for older configs. + "beat": common.MapStr{ + "name": info.Beat, + "version": info.Version, + }, + "agent": common.MapStr{ + "name": info.Beat, + "version": info.Version, + }, + // For the Beats that have an observer role + "observer": common.MapStr{ + "name": info.Beat, + "version": info.Version, + }, + }, + Timestamp: time.Now(), + }) +} + func unpackTemplateConfig(cfg *common.Config) (config template.TemplateConfig, err error) { config = template.DefaultConfig() if cfg != nil { From 0515696e0d1238350e0264ac083d4e337357131a Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 18 Oct 2019 13:06:42 -0400 Subject: [PATCH 03/19] Replace custom wrapped channels with processors --- filebeat/channel/connector.go | 85 ++++++++++++++++++++++++++++++----- filebeat/channel/factory.go | 2 +- filebeat/channel/wrapped.go | 71 ----------------------------- filebeat/input/config.go | 1 - filebeat/input/input.go | 17 ------- libbeat/idxmgmt/std.go | 53 +++------------------- 6 files changed, 82 insertions(+), 147 deletions(-) delete mode 100644 filebeat/channel/wrapped.go diff --git a/filebeat/channel/connector.go b/filebeat/channel/connector.go index af1cf33c06b..adab6be2d72 100644 --- a/filebeat/channel/connector.go +++ b/filebeat/channel/connector.go @@ -18,8 +18,12 @@ package channel import ( + "fmt" + "time" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/fmtstr" "github.com/elastic/beats/libbeat/processors" ) @@ -51,22 +55,24 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien return nil, err } - var err error - var userProcessors beat.ProcessorList + procs := processors.NewList(nil) - userProcessors, err = processors.New(config.Processors) + userProcessors, err := processors.New(config.Processors) if err != nil { return nil, err } + procs.List = append(procs.List, userProcessors) - if lst := clientCfg.Processing.Processor; lst != nil { - if len(userProcessors.All()) == 0 { - userProcessors = lst - } else if orig := lst.All(); len(orig) > 0 { - newLst := processors.NewList(nil) - newLst.List = append(newLst.List, lst, userProcessors) - userProcessors = newLst + if config.Index != "" { + indexProcessor, err := newAddIndexPattern(config.Index) + if err != nil { + return nil, err } + procs.List = append(procs.List, indexProcessor) + } + + if lst := clientCfg.Processing.Processor; lst != nil { + procs.List = append(procs.List, lst) } setOptional := func(to common.MapStr, key string, value string) { @@ -105,7 +111,7 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien clientCfg.Processing.EventMetadata = config.EventMetadata clientCfg.Processing.Meta = meta clientCfg.Processing.Fields = fields - clientCfg.Processing.Processor = userProcessors + clientCfg.Processing.Processor = procs client, err := c.pipeline.ConnectWith(clientCfg) if err != nil { return nil, err @@ -117,3 +123,60 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien } return outlet, nil } + +type addIndexPattern struct { + pattern string + beatInfo beat.Info + formatString *fmtstr.EventFormatString +} + +func newAddIndexPattern( + beatInfo beat.Info, pattern string, +) (processors.Processor, error) { + formatString, err := fmtstr.CompileEvent(pattern) + if err != nil { + return nil, err + } + return &addIndexPattern{ + pattern: pattern, + beatInfo: beatInfo, + formatString: formatString, + }, nil +} + +func (aip *addIndexPattern) Run(event *beat.Event) (*beat.Event, error) { + if event.Meta == nil { + event.Meta = common.MapStr{} + } + index, err := expandIndexPattern( + aip.formatString, event.Timestamp, aip.beatInfo) + if err != nil { + return nil, err + } + + event.Meta["raw-index"] = index + return event, nil +} + +func (aip *addIndexPattern) String() string { + return fmt.Sprintf("add_index_pattern=%s", aip.pattern) +} + +// Expand the given pattern string as a formatted text field with access to +// the event's agent and timestamp. +// This helper mimicks applyStaticFmtstr in ilm.go, creating a placeholder +// event for the restricted set of fields we allow here. It might be worth +// making this a shared helper function or otherwise specializing for this case. +func expandIndexPattern( + pattern *fmtstr.EventFormatString, timestamp time.Time, info beat.Info, +) (string, error) { + return pattern.Run(&beat.Event{ + Fields: common.MapStr{ + "agent": common.MapStr{ + "name": info.Beat, + "version": info.Version, + }, + }, + Timestamp: time.Now(), + }) +} diff --git a/filebeat/channel/factory.go b/filebeat/channel/factory.go index e31c3f5240d..600c7362710 100644 --- a/filebeat/channel/factory.go +++ b/filebeat/channel/factory.go @@ -57,7 +57,7 @@ type inputOutletConfig struct { // Output meta data settings Pipeline string `config:"pipeline"` // ES Ingest pipeline name - + Index string `config:"index"` // ES output index pattern } // NewOutletFactory creates a new outlet factory for diff --git a/filebeat/channel/wrapped.go b/filebeat/channel/wrapped.go deleted file mode 100644 index eb689880498..00000000000 --- a/filebeat/channel/wrapped.go +++ /dev/null @@ -1,71 +0,0 @@ -package channel - -import ( - "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/common" -) - -// A wrapper around a generic Outleter that applies the given transformation to -// incoming events before forwarding them. -type wrappedOutlet struct { - outlet Outleter - eventTransform func(beat.Event) -} - -// WrapOutlet takes an Outleter and an event transformation function and -// returns an Outleter that applies that transformation before forwarding them. -// The transformation operates in place (it modifies its input events). -// The new Outleter uses the same underlying state, e.g. calling Close on the -// wrapped Outleter will close the original as well. If this is not the intent, -// call SubOutlet first. -func WrapOutlet(outlet Outleter, eventTransform func(beat.Event)) Outleter { - return &wrappedOutlet{outlet: outlet, eventTransform: eventTransform} -} - -func (o *wrappedOutlet) Close() error { - return o.outlet.Close() -} - -func (o *wrappedOutlet) Done() <-chan struct{} { - return o.outlet.Done() -} - -func (o *wrappedOutlet) OnEvent(event beat.Event) bool { - // Mutate the event then pass it on. - o.eventTransform(event) - return o.outlet.OnEvent(event) -} - -// A wrapper around a generic Outleter that produces Outleters that apply the -// given transformation to incoming events before sending them. -type wrappedConnector struct { - connector Connector - eventTransform func(beat.Event) -} - -func (c *wrappedConnector) Connect(conf *common.Config) (Outleter, error) { - outleter, err := c.connector.Connect(conf) - if err != nil { - return outleter, err - } - return WrapOutlet(outleter, c.eventTransform), nil -} - -func (c *wrappedConnector) ConnectWith( - conf *common.Config, clientConf beat.ClientConfig, -) (Outleter, error) { - outleter, err := c.connector.ConnectWith(conf, clientConf) - if err != nil { - return outleter, err - } - return WrapOutlet(outleter, c.eventTransform), nil -} - -// WrapConnector takes a Connector and an event transformation function and -// returns a new Connector whose generated Outleters apply the given -// transformation to incoming events before forwarding them. -func WrapConnector( - connector Connector, eventTransform func(beat.Event), -) Connector { - return &wrappedConnector{connector: connector, eventTransform: eventTransform} -} diff --git a/filebeat/input/config.go b/filebeat/input/config.go index 72c107c9737..750b370419b 100644 --- a/filebeat/input/config.go +++ b/filebeat/input/config.go @@ -35,7 +35,6 @@ type inputConfig struct { ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"` Type string `config:"type"` InputType string `config:"input_type"` - Index string `config:"index"` } func (c *inputConfig) Validate() error { diff --git a/filebeat/input/input.go b/filebeat/input/input.go index c31eedde3fd..e1931d4e4bf 100644 --- a/filebeat/input/input.go +++ b/filebeat/input/input.go @@ -26,9 +26,7 @@ import ( "github.com/elastic/beats/filebeat/channel" "github.com/elastic/beats/filebeat/input/file" - "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/common/fmtstr" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" ) @@ -93,21 +91,6 @@ func New( return input, err } - // If there is an input-level index setting, pass it through to event.Meta - // via the input's Connector. - if input.config.Index != "" { - indexPattern, err := fmtstr.CompileEvent(input.config.Index) - if err != nil { - return input, err - } - connector = channel.WrapConnector(connector, func(event beat.Event) { - if event.Meta == nil { - event.Meta = common.MapStr{} - } - event.Meta["index-pattern"] = indexPattern - }) - } - context := Context{ States: states, Done: input.done, diff --git a/libbeat/idxmgmt/std.go b/libbeat/idxmgmt/std.go index 14fe9b93db0..e3e285f0ced 100644 --- a/libbeat/idxmgmt/std.go +++ b/libbeat/idxmgmt/std.go @@ -20,12 +20,10 @@ package idxmgmt import ( "errors" "fmt" - "time" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/atomic" - "github.com/elastic/beats/libbeat/common/fmtstr" "github.com/elastic/beats/libbeat/idxmgmt/ilm" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" @@ -366,56 +364,19 @@ func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string { } } - // index-pattern expands the index as formatted text rather than a fixed - // or dated string. This was introduced to support the `Index` configuration - // field on Filebeat inputs, so it's essentially a workaround to communicate - // a specific output behavior to libbeat based on the originating input - // (which as implemented now is a Filebeat-specific concept). - // If we find ourselves adding additional backchannels like this in Meta, - // we should probably add a more formal (non-MapStr) configuration struct to - // beat.Event that inputs can use to modulate output behavior. - if tmp := evt.Meta["index-pattern"]; tmp != nil { - if pattern, ok := tmp.(*fmtstr.EventFormatString); ok { - idx, err := expandIndexPattern(pattern, evt.Timestamp, beatInfo) - if err == nil { - return idx - } - // Should we log a warning here somehow? + // This is functionally identical to Meta["alias"], returning the overriding + // metadata as the index name if present. It is currently used by Filebeat + // to send the index for particular inputs to formatted string templates, + // which are then expanded by a processor to the "raw-index" field. + if tmp := evt.Meta["raw-index"]; tmp != nil { + if idx, ok := tmp.(string); ok { + return idx } } return "" } -// Expand the given pattern string as a formatted text field with access to -// the event's agent and timestamp. -// This helper mimicks applyStaticFmtstr in ilm.go, creating a placeholder -// event for the restricted set of fields we allow here. It might be worth -// making this a shared helper function or otherwise specializing for this case. -func expandIndexPattern( - pattern *fmtstr.EventFormatString, timestamp time.Time, info beat.Info, -) (string, error) { - return pattern.Run(&beat.Event{ - Fields: common.MapStr{ - // beat object was left in for backward compatibility reason for older configs. - "beat": common.MapStr{ - "name": info.Beat, - "version": info.Version, - }, - "agent": common.MapStr{ - "name": info.Beat, - "version": info.Version, - }, - // For the Beats that have an observer role - "observer": common.MapStr{ - "name": info.Beat, - "version": info.Version, - }, - }, - Timestamp: time.Now(), - }) -} - func unpackTemplateConfig(cfg *common.Config) (config template.TemplateConfig, err error) { config = template.DefaultConfig() if cfg != nil { From e5104933862de7b45cfc0ebfe6fdea93d4eb7456 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 21 Oct 2019 14:42:45 -0400 Subject: [PATCH 04/19] Add beat.Info to OutletFactory --- filebeat/channel/connector.go | 2 +- filebeat/channel/factory.go | 3 +++ filebeat/kafka.yml | 8 ++++++++ 3 files changed, 12 insertions(+), 1 deletion(-) create mode 100644 filebeat/kafka.yml diff --git a/filebeat/channel/connector.go b/filebeat/channel/connector.go index adab6be2d72..cd2f9a34e00 100644 --- a/filebeat/channel/connector.go +++ b/filebeat/channel/connector.go @@ -64,7 +64,7 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien procs.List = append(procs.List, userProcessors) if config.Index != "" { - indexProcessor, err := newAddIndexPattern(config.Index) + indexProcessor, err := newAddIndexPattern(c.parent.beatInfo, config.Index) if err != nil { return nil, err } diff --git a/filebeat/channel/factory.go b/filebeat/channel/factory.go index 600c7362710..d2f735a22a6 100644 --- a/filebeat/channel/factory.go +++ b/filebeat/channel/factory.go @@ -28,6 +28,7 @@ type OutletFactory struct { eventer beat.ClientEventer wgEvents eventCounter + beatInfo beat.Info } type eventCounter interface { @@ -65,10 +66,12 @@ type inputOutletConfig struct { func NewOutletFactory( done <-chan struct{}, wgEvents eventCounter, + beatInfo beat.Info, ) *OutletFactory { o := &OutletFactory{ done: done, wgEvents: wgEvents, + beatInfo: beatInfo, } if wgEvents != nil { diff --git a/filebeat/kafka.yml b/filebeat/kafka.yml new file mode 100644 index 00000000000..81842758971 --- /dev/null +++ b/filebeat/kafka.yml @@ -0,0 +1,8 @@ +version: '2.3' +services: + kafka: + build: ${ES_BEATS}/testing/environments/docker/kafka + expose: + - 9092 + environment: + - ADVERTISED_HOST=kafka From ac46926e867953ab11df610156eec77b4eb5f63b Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 23 Oct 2019 16:08:49 -0400 Subject: [PATCH 05/19] Clean up minimal event string formatting + move to shared location --- filebeat/channel/connector.go | 39 +++++------------- libbeat/common/fmtstr/formatevents.go | 59 +++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 29 deletions(-) diff --git a/filebeat/channel/connector.go b/filebeat/channel/connector.go index cd2f9a34e00..cc492bb35b3 100644 --- a/filebeat/channel/connector.go +++ b/filebeat/channel/connector.go @@ -19,7 +19,6 @@ package channel import ( "fmt" - "time" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" @@ -124,36 +123,37 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien return outlet, nil } +////////////////////////////// +// addIndexPattern is a Processor to add an index pattern (specified by the +// input configuration) to event metadata in the "raw-index" field. + type addIndexPattern struct { pattern string - beatInfo beat.Info - formatString *fmtstr.EventFormatString + formatString *fmtstr.StaticEventFormatString } func newAddIndexPattern( beatInfo beat.Info, pattern string, ) (processors.Processor, error) { - formatString, err := fmtstr.CompileEvent(pattern) + formatString, err := fmtstr.MinimalStaticEventFormatString(pattern, beatInfo) if err != nil { return nil, err } return &addIndexPattern{ pattern: pattern, - beatInfo: beatInfo, formatString: formatString, }, nil } func (aip *addIndexPattern) Run(event *beat.Event) (*beat.Event, error) { - if event.Meta == nil { - event.Meta = common.MapStr{} - } - index, err := expandIndexPattern( - aip.formatString, event.Timestamp, aip.beatInfo) + index, err := aip.formatString.Run(event) if err != nil { return nil, err } + if event.Meta == nil { + event.Meta = common.MapStr{} + } event.Meta["raw-index"] = index return event, nil } @@ -161,22 +161,3 @@ func (aip *addIndexPattern) Run(event *beat.Event) (*beat.Event, error) { func (aip *addIndexPattern) String() string { return fmt.Sprintf("add_index_pattern=%s", aip.pattern) } - -// Expand the given pattern string as a formatted text field with access to -// the event's agent and timestamp. -// This helper mimicks applyStaticFmtstr in ilm.go, creating a placeholder -// event for the restricted set of fields we allow here. It might be worth -// making this a shared helper function or otherwise specializing for this case. -func expandIndexPattern( - pattern *fmtstr.EventFormatString, timestamp time.Time, info beat.Info, -) (string, error) { - return pattern.Run(&beat.Event{ - Fields: common.MapStr{ - "agent": common.MapStr{ - "name": info.Beat, - "version": info.Version, - }, - }, - Timestamp: time.Now(), - }) -} diff --git a/libbeat/common/fmtstr/formatevents.go b/libbeat/common/fmtstr/formatevents.go index b6a5d54b312..9660e6e68b8 100644 --- a/libbeat/common/fmtstr/formatevents.go +++ b/libbeat/common/fmtstr/formatevents.go @@ -49,6 +49,14 @@ type EventFormatString struct { timestamp bool } +// StaticEventFormatString is a wrapper around EventFormatString for the +// common special case where the format expression should only have access to +// shared static fields (typically agent / version) and the event timestamp. +type StaticEventFormatString struct { + eventFormatString *EventFormatString + cachedFields common.MapStr +} + type eventFieldEvaler struct { index int } @@ -151,6 +159,47 @@ func CompileEvent(in string) (*EventFormatString, error) { return efs, nil } +// NewStaticEventFormatString creates from the given expression a +// StaticEventFormatString that can refer only to the given static fields and +// the event timestamp. +func NewStaticEventFormatString( + expression string, staticFields common.MapStr, +) (*StaticEventFormatString, error) { + efs, err := CompileEvent(expression) + if err != nil { + return nil, err + } + return &StaticEventFormatString{ + eventFormatString: efs, + cachedFields: staticFields.Clone(), + }, nil +} + +// MinimalStaticEventFormatString creates a formatter that has access only +// to the agent name / version and event timestamp. This was adapted from +// applyStaticFmtstr in libbeat/idxmgmt/ilm/ilm.go. +func MinimalStaticEventFormatString( + expression string, beatInfo beat.Info, +) (*StaticEventFormatString, error) { + staticFields := common.MapStr{ + // beat object was left in for backward compatibility reason for older configs. + "beat": common.MapStr{ + "name": beatInfo.Beat, + "version": beatInfo.Version, + }, + "agent": common.MapStr{ + "name": beatInfo.Beat, + "version": beatInfo.Version, + }, + // For the Beats that have an observer role + "observer": common.MapStr{ + "name": beatInfo.Beat, + "version": beatInfo.Version, + }, + } + return NewStaticEventFormatString(expression, staticFields) +} + // Unpack tries to initialize the EventFormatString from provided value // (which must be a string). Unpack method satisfies go-ucfg.Unpacker interface // required by common.Config, in order to use EventFormatString with @@ -274,6 +323,16 @@ func (fs *EventFormatString) IsEmpty() bool { return len(fs.expression) == 0 } +// Run executes the format string returning a new expanded string or an error +// if execution or event field expansion fails. +func (fs *StaticEventFormatString) Run(event *beat.Event) (string, error) { + placeholderEvent := &beat.Event{ + Fields: fs.cachedFields, + Timestamp: event.Timestamp, + } + return fs.eventFormatString.Run(placeholderEvent) +} + func (e *eventFieldCompiler) compileExpression( s string, opts []VariableOp, From 40ae68cca63b520e95d731fcda555d902bc762ad Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 24 Oct 2019 10:30:48 -0400 Subject: [PATCH 06/19] StaticEventFormatString -> TimestampFormatString --- filebeat/channel/connector.go | 6 +- libbeat/common/fmtstr/formatevents.go | 59 ---------- libbeat/common/fmtstr/formattimestamp.go | 84 +++++++++++++++ libbeat/common/fmtstr/formattimestamp_test.go | 101 ++++++++++++++++++ 4 files changed, 188 insertions(+), 62 deletions(-) create mode 100644 libbeat/common/fmtstr/formattimestamp.go create mode 100644 libbeat/common/fmtstr/formattimestamp_test.go diff --git a/filebeat/channel/connector.go b/filebeat/channel/connector.go index cc492bb35b3..5e46cd0b2fa 100644 --- a/filebeat/channel/connector.go +++ b/filebeat/channel/connector.go @@ -129,13 +129,13 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien type addIndexPattern struct { pattern string - formatString *fmtstr.StaticEventFormatString + formatString *fmtstr.TimestampFormatString } func newAddIndexPattern( beatInfo beat.Info, pattern string, ) (processors.Processor, error) { - formatString, err := fmtstr.MinimalStaticEventFormatString(pattern, beatInfo) + formatString, err := fmtstr.MinimalTimestampFormatString(pattern, beatInfo) if err != nil { return nil, err } @@ -146,7 +146,7 @@ func newAddIndexPattern( } func (aip *addIndexPattern) Run(event *beat.Event) (*beat.Event, error) { - index, err := aip.formatString.Run(event) + index, err := aip.formatString.Run(event.Timestamp) if err != nil { return nil, err } diff --git a/libbeat/common/fmtstr/formatevents.go b/libbeat/common/fmtstr/formatevents.go index 9660e6e68b8..b6a5d54b312 100644 --- a/libbeat/common/fmtstr/formatevents.go +++ b/libbeat/common/fmtstr/formatevents.go @@ -49,14 +49,6 @@ type EventFormatString struct { timestamp bool } -// StaticEventFormatString is a wrapper around EventFormatString for the -// common special case where the format expression should only have access to -// shared static fields (typically agent / version) and the event timestamp. -type StaticEventFormatString struct { - eventFormatString *EventFormatString - cachedFields common.MapStr -} - type eventFieldEvaler struct { index int } @@ -159,47 +151,6 @@ func CompileEvent(in string) (*EventFormatString, error) { return efs, nil } -// NewStaticEventFormatString creates from the given expression a -// StaticEventFormatString that can refer only to the given static fields and -// the event timestamp. -func NewStaticEventFormatString( - expression string, staticFields common.MapStr, -) (*StaticEventFormatString, error) { - efs, err := CompileEvent(expression) - if err != nil { - return nil, err - } - return &StaticEventFormatString{ - eventFormatString: efs, - cachedFields: staticFields.Clone(), - }, nil -} - -// MinimalStaticEventFormatString creates a formatter that has access only -// to the agent name / version and event timestamp. This was adapted from -// applyStaticFmtstr in libbeat/idxmgmt/ilm/ilm.go. -func MinimalStaticEventFormatString( - expression string, beatInfo beat.Info, -) (*StaticEventFormatString, error) { - staticFields := common.MapStr{ - // beat object was left in for backward compatibility reason for older configs. - "beat": common.MapStr{ - "name": beatInfo.Beat, - "version": beatInfo.Version, - }, - "agent": common.MapStr{ - "name": beatInfo.Beat, - "version": beatInfo.Version, - }, - // For the Beats that have an observer role - "observer": common.MapStr{ - "name": beatInfo.Beat, - "version": beatInfo.Version, - }, - } - return NewStaticEventFormatString(expression, staticFields) -} - // Unpack tries to initialize the EventFormatString from provided value // (which must be a string). Unpack method satisfies go-ucfg.Unpacker interface // required by common.Config, in order to use EventFormatString with @@ -323,16 +274,6 @@ func (fs *EventFormatString) IsEmpty() bool { return len(fs.expression) == 0 } -// Run executes the format string returning a new expanded string or an error -// if execution or event field expansion fails. -func (fs *StaticEventFormatString) Run(event *beat.Event) (string, error) { - placeholderEvent := &beat.Event{ - Fields: fs.cachedFields, - Timestamp: event.Timestamp, - } - return fs.eventFormatString.Run(placeholderEvent) -} - func (e *eventFieldCompiler) compileExpression( s string, opts []VariableOp, diff --git a/libbeat/common/fmtstr/formattimestamp.go b/libbeat/common/fmtstr/formattimestamp.go new file mode 100644 index 00000000000..d666c8cd21d --- /dev/null +++ b/libbeat/common/fmtstr/formattimestamp.go @@ -0,0 +1,84 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package fmtstr + +import ( + "time" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" +) + +// TimestampFormatString is a wrapper around EventFormatString for the +// common special case where the format expression should only have access to +// shared static fields (typically agent / version) and the event timestamp. +type TimestampFormatString struct { + eventFormatString *EventFormatString + cachedFields common.MapStr +} + +// NewTimestampFormatString creates from the given expression a +// TimestampFormatString that can refer only to the given static fields and +// the event timestamp. +func NewTimestampFormatString( + expression string, staticFields common.MapStr, +) (*TimestampFormatString, error) { + efs, err := CompileEvent(expression) + if err != nil { + return nil, err + } + return &TimestampFormatString{ + eventFormatString: efs, + cachedFields: staticFields.Clone(), + }, nil +} + +// MinimalTimestampFormatString creates a formatter that has access only +// to the agent name / version and event timestamp. This was adapted from +// applyStaticFmtstr in libbeat/idxmgmt/ilm/ilm.go. +func MinimalTimestampFormatString( + expression string, beatInfo beat.Info, +) (*TimestampFormatString, error) { + staticFields := common.MapStr{ + // beat object was left in for backward compatibility reason for older configs. + "beat": common.MapStr{ + "name": beatInfo.Beat, + "version": beatInfo.Version, + }, + "agent": common.MapStr{ + "name": beatInfo.Beat, + "version": beatInfo.Version, + }, + // For the Beats that have an observer role + "observer": common.MapStr{ + "name": beatInfo.Beat, + "version": beatInfo.Version, + }, + } + return NewTimestampFormatString(expression, staticFields) +} + +// Run executes the format string returning a new expanded string or an error +// if execution or event field expansion fails. +func (fs *TimestampFormatString) Run(timestamp time.Time) (string, error) { + placeholderEvent := &beat.Event{ + Fields: fs.cachedFields, + Timestamp: timestamp, + } + return fs.eventFormatString.Run(placeholderEvent) +} diff --git a/libbeat/common/fmtstr/formattimestamp_test.go b/libbeat/common/fmtstr/formattimestamp_test.go new file mode 100644 index 00000000000..b03c3a29183 --- /dev/null +++ b/libbeat/common/fmtstr/formattimestamp_test.go @@ -0,0 +1,101 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package fmtstr + +import ( + "testing" + "time" + + "github.com/elastic/beats/libbeat/common" + "github.com/stretchr/testify/assert" +) + +func TestTimestampFormatString(t *testing.T) { + tests := []struct { + title string + format string + staticFields common.MapStr + timestamp time.Time + expected string + }{ + { + "empty string", + "", + nil, + time.Time{}, + "", + }, + { + "no fields configured", + "format string", + nil, + time.Time{}, + "format string", + }, + { + "expand field", + "%{[key]}", + common.MapStr{"key": "value"}, + time.Time{}, + "value", + }, + { + "expand with default", + "%{[key]:default}", + nil, + time.Time{}, + "default", + }, + { + "expand nested field", + "%{[nested.key]}", + common.MapStr{"nested": common.MapStr{"key": "value"}}, + time.Time{}, + "value", + }, + { + "test timestamp formatter", + "%{[key]}: %{+YYYY.MM.dd}", + common.MapStr{"key": "timestamp"}, + time.Date(2015, 5, 1, 20, 12, 34, 0, time.Local), + "timestamp: 2015.05.01", + }, + { + "test timestamp formatter", + "%{[@timestamp]}: %{+YYYY.MM.dd}", + common.MapStr{"key": "timestamp"}, + time.Date(2015, 5, 1, 20, 12, 34, 0, time.Local), + "2015-05-01T20:12:34.000Z: 2015.05.01", + }, + } + + for i, test := range tests { + t.Logf("test(%v): %v", i, test.title) + + fs, err := NewTimestampFormatString(test.format, test.staticFields) + if err != nil { + t.Error(err) + continue + } + + actual, err := fs.Run(test.timestamp) + + assert.NoError(t, err) + assert.Equal(t, test.expected, actual) + } +} From 2d5da878ffc45809924d15c693e61b5ac0937198 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 24 Oct 2019 13:20:02 -0400 Subject: [PATCH 07/19] Clean up the api for TimestampFormatString --- filebeat/channel/connector.go | 38 ++++++--------- filebeat/channel/factory.go | 5 +- libbeat/common/fmtstr/formattimestamp.go | 48 +++++++++---------- libbeat/common/fmtstr/formattimestamp_test.go | 8 +++- libbeat/idxmgmt/ilm/ilm.go | 24 ++-------- 5 files changed, 52 insertions(+), 71 deletions(-) diff --git a/filebeat/channel/connector.go b/filebeat/channel/connector.go index 5e46cd0b2fa..8b17a531921 100644 --- a/filebeat/channel/connector.go +++ b/filebeat/channel/connector.go @@ -62,11 +62,15 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien } procs.List = append(procs.List, userProcessors) - if config.Index != "" { - indexProcessor, err := newAddIndexPattern(c.parent.beatInfo, config.Index) + if !config.Index.IsEmpty() { + staticFields := fmtstr.FieldsForBeat( + c.parent.beatInfo.Beat, c.parent.beatInfo.Version) + timestampFormat, err := + fmtstr.NewTimestampFormatString(&config.Index, staticFields) if err != nil { return nil, err } + indexProcessor := &addFormattedIndex{timestampFormat} procs.List = append(procs.List, indexProcessor) } @@ -124,29 +128,17 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien } ////////////////////////////// -// addIndexPattern is a Processor to add an index pattern (specified by the -// input configuration) to event metadata in the "raw-index" field. +// addFormattedIndex is a Processor to set an event's "raw-index" metadata field +// with a given TimestampFormatString. The elasticsearch output interprets +// that field as specifying the (raw string) index the event should be sent to; +// in other outputs it is just included in the metadata. -type addIndexPattern struct { - pattern string +type addFormattedIndex struct { formatString *fmtstr.TimestampFormatString } -func newAddIndexPattern( - beatInfo beat.Info, pattern string, -) (processors.Processor, error) { - formatString, err := fmtstr.MinimalTimestampFormatString(pattern, beatInfo) - if err != nil { - return nil, err - } - return &addIndexPattern{ - pattern: pattern, - formatString: formatString, - }, nil -} - -func (aip *addIndexPattern) Run(event *beat.Event) (*beat.Event, error) { - index, err := aip.formatString.Run(event.Timestamp) +func (p *addFormattedIndex) Run(event *beat.Event) (*beat.Event, error) { + index, err := p.formatString.Run(event.Timestamp) if err != nil { return nil, err } @@ -158,6 +150,6 @@ func (aip *addIndexPattern) Run(event *beat.Event) (*beat.Event, error) { return event, nil } -func (aip *addIndexPattern) String() string { - return fmt.Sprintf("add_index_pattern=%s", aip.pattern) +func (p *addFormattedIndex) String() string { + return fmt.Sprintf("add_index_pattern=%v", p.formatString) } diff --git a/filebeat/channel/factory.go b/filebeat/channel/factory.go index d2f735a22a6..52cb4c119c5 100644 --- a/filebeat/channel/factory.go +++ b/filebeat/channel/factory.go @@ -20,6 +20,7 @@ package channel import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/fmtstr" "github.com/elastic/beats/libbeat/processors" ) @@ -57,8 +58,8 @@ type inputOutletConfig struct { Fileset string `config:"_fileset_name"` // hidden setting // Output meta data settings - Pipeline string `config:"pipeline"` // ES Ingest pipeline name - Index string `config:"index"` // ES output index pattern + Pipeline string `config:"pipeline"` // ES Ingest pipeline name + Index fmtstr.EventFormatString `config:"index"` // ES output index pattern } // NewOutletFactory creates a new outlet factory for diff --git a/libbeat/common/fmtstr/formattimestamp.go b/libbeat/common/fmtstr/formattimestamp.go index d666c8cd21d..68c22d63649 100644 --- a/libbeat/common/fmtstr/formattimestamp.go +++ b/libbeat/common/fmtstr/formattimestamp.go @@ -29,56 +29,52 @@ import ( // shared static fields (typically agent / version) and the event timestamp. type TimestampFormatString struct { eventFormatString *EventFormatString - cachedFields common.MapStr + fields common.MapStr } -// NewTimestampFormatString creates from the given expression a -// TimestampFormatString that can refer only to the given static fields and -// the event timestamp. +// NewTimestampFormatString creates from the given event format string a +// TimestampFormatString that includes only the given static fields and +// a timestamp. func NewTimestampFormatString( - expression string, staticFields common.MapStr, + eventFormatString *EventFormatString, staticFields common.MapStr, ) (*TimestampFormatString, error) { - efs, err := CompileEvent(expression) - if err != nil { - return nil, err - } return &TimestampFormatString{ - eventFormatString: efs, - cachedFields: staticFields.Clone(), + eventFormatString: eventFormatString, + fields: staticFields.Clone(), }, nil } -// MinimalTimestampFormatString creates a formatter that has access only -// to the agent name / version and event timestamp. This was adapted from -// applyStaticFmtstr in libbeat/idxmgmt/ilm/ilm.go. -func MinimalTimestampFormatString( - expression string, beatInfo beat.Info, -) (*TimestampFormatString, error) { - staticFields := common.MapStr{ +// FieldsForBeat returns a common.MapStr with the given beat name and +// version assigned to their standard field names. +func FieldsForBeat(beat string, version string) common.MapStr { + return common.MapStr{ // beat object was left in for backward compatibility reason for older configs. "beat": common.MapStr{ - "name": beatInfo.Beat, - "version": beatInfo.Version, + "name": beat, + "version": version, }, "agent": common.MapStr{ - "name": beatInfo.Beat, - "version": beatInfo.Version, + "name": beat, + "version": version, }, // For the Beats that have an observer role "observer": common.MapStr{ - "name": beatInfo.Beat, - "version": beatInfo.Version, + "name": beat, + "version": version, }, } - return NewTimestampFormatString(expression, staticFields) } // Run executes the format string returning a new expanded string or an error // if execution or event field expansion fails. func (fs *TimestampFormatString) Run(timestamp time.Time) (string, error) { placeholderEvent := &beat.Event{ - Fields: fs.cachedFields, + Fields: fs.fields, Timestamp: timestamp, } return fs.eventFormatString.Run(placeholderEvent) } + +func (fs *TimestampFormatString) String() string { + return fs.eventFormatString.expression +} diff --git a/libbeat/common/fmtstr/formattimestamp_test.go b/libbeat/common/fmtstr/formattimestamp_test.go index b03c3a29183..c596cd44616 100644 --- a/libbeat/common/fmtstr/formattimestamp_test.go +++ b/libbeat/common/fmtstr/formattimestamp_test.go @@ -87,7 +87,13 @@ func TestTimestampFormatString(t *testing.T) { for i, test := range tests { t.Logf("test(%v): %v", i, test.title) - fs, err := NewTimestampFormatString(test.format, test.staticFields) + efs, err := CompileEvent(test.format) + if err != nil { + t.Error(err) + continue + } + + fs, err := NewTimestampFormatString(efs, test.staticFields) if err != nil { t.Error(err) continue diff --git a/libbeat/idxmgmt/ilm/ilm.go b/libbeat/idxmgmt/ilm/ilm.go index 07c924214ec..84ca92c432e 100644 --- a/libbeat/idxmgmt/ilm/ilm.go +++ b/libbeat/idxmgmt/ilm/ilm.go @@ -142,23 +142,9 @@ func NoopSupport(_ *logp.Logger, info beat.Info, config *common.Config) (Support } func applyStaticFmtstr(info beat.Info, fmt *fmtstr.EventFormatString) (string, error) { - return fmt.Run(&beat.Event{ - Fields: common.MapStr{ - // beat object was left in for backward compatibility reason for older configs. - "beat": common.MapStr{ - "name": info.Beat, - "version": info.Version, - }, - "agent": common.MapStr{ - "name": info.Beat, - "version": info.Version, - }, - // For the Beats that have an observer role - "observer": common.MapStr{ - "name": info.Beat, - "version": info.Version, - }, - }, - Timestamp: time.Now(), - }) + return fmt.Run( + &beat.Event{ + Fields: fmtstr.FieldsForBeat(info.Beat, info.Version), + Timestamp: time.Now(), + }) } From 0042aaf9afe0039a5d795b3163dc75d64446256c Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 24 Oct 2019 16:25:20 -0400 Subject: [PATCH 08/19] Working on pipelineConnector unit test --- filebeat/channel/connector_test.go | 93 ++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 filebeat/channel/connector_test.go diff --git a/filebeat/channel/connector_test.go b/filebeat/channel/connector_test.go new file mode 100644 index 00000000000..9a7b5b19513 --- /dev/null +++ b/filebeat/channel/connector_test.go @@ -0,0 +1,93 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package channel + +import ( + "testing" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" +) + +func TestThing(t *testing.T) { + done := make(chan struct{}) + beatInfo := beat.Info{Beat: "TestBeat", Version: "3.9.27"} + outletFactory := NewOutletFactory(done, emptyCounter{}, beatInfo) + pipeline := newChannelPipeline() + connector := outletFactory.Create(pipeline) + config, err := common.NewConfigFrom("index: 'test'") + if err != nil { + t.Error(err) + } + //field, _ := config.String("index", -1) + //fmt.Printf("config index: %v\n", field) + //config := common.NewConfig() + outleter, err := connector.ConnectWith( + config, + beat.ClientConfig{}, + ) + outleter.OnEvent(beat.Event{}) + if err != nil { + t.Error(err) + } + processedEvent := <-pipeline.events + if processedEvent.Meta == nil { + //t.Error("Event Meta shouldn't be empty") + } +} + +type emptyCounter struct{} + +func (c emptyCounter) Add(n int) {} +func (c emptyCounter) Done() {} + +// channelPipeline is a Pipeline (and Client) whose connections just echo their +// events to a shared events channel for testing. +type channelPipeline struct { + events chan beat.Event +} + +func newChannelPipeline() *channelPipeline { + return &channelPipeline{make(chan beat.Event, 100)} +} + +func (cp *channelPipeline) SetACKHandler(h beat.PipelineACKHandler) error { + return nil +} + +func (cp *channelPipeline) ConnectWith(conf beat.ClientConfig) (beat.Client, error) { + return cp, nil +} + +func (cp *channelPipeline) Connect() (beat.Client, error) { + return cp, nil +} + +func (cp *channelPipeline) Publish(event beat.Event) { + cp.events <- event +} + +func (cp *channelPipeline) PublishAll(events []beat.Event) { + for _, event := range events { + cp.Publish(event) + } +} + +func (cp *channelPipeline) Close() error { + return nil +} From e9b01e972b516ecc3be8c8824d49d70c17b061c9 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Tue, 29 Oct 2019 15:48:20 -0400 Subject: [PATCH 09/19] Factor out processor list construction, finish unit tests --- filebeat/channel/connector.go | 54 +++++--- filebeat/channel/connector_test.go | 208 +++++++++++++++++++++-------- 2 files changed, 189 insertions(+), 73 deletions(-) diff --git a/filebeat/channel/connector.go b/filebeat/channel/connector.go index 8b17a531921..7089d0f3a19 100644 --- a/filebeat/channel/connector.go +++ b/filebeat/channel/connector.go @@ -54,29 +54,10 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien return nil, err } - procs := processors.NewList(nil) - - userProcessors, err := processors.New(config.Processors) + procs, err := buildProcessorList(c.parent.beatInfo, config, clientCfg) if err != nil { return nil, err } - procs.List = append(procs.List, userProcessors) - - if !config.Index.IsEmpty() { - staticFields := fmtstr.FieldsForBeat( - c.parent.beatInfo.Beat, c.parent.beatInfo.Version) - timestampFormat, err := - fmtstr.NewTimestampFormatString(&config.Index, staticFields) - if err != nil { - return nil, err - } - indexProcessor := &addFormattedIndex{timestampFormat} - procs.List = append(procs.List, indexProcessor) - } - - if lst := clientCfg.Processing.Processor; lst != nil { - procs.List = append(procs.List, lst) - } setOptional := func(to common.MapStr, key string, value string) { if value != "" { @@ -127,6 +108,39 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien return outlet, nil } +// buildProcessorList assembles the Processors for a pipelineConnector. +func buildProcessorList( + beatInfo beat.Info, config inputOutletConfig, clientCfg beat.ClientConfig, +) (*processors.Processors, error) { + procs := processors.NewList(nil) + + // Processor ordering is important: + // 1. Index configuration + if !config.Index.IsEmpty() { + staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version) + timestampFormat, err := + fmtstr.NewTimestampFormatString(&config.Index, staticFields) + if err != nil { + return nil, err + } + indexProcessor := &addFormattedIndex{timestampFormat} + procs.List = append(procs.List, indexProcessor) + } + + // 2. ClientConfig processors + if lst := clientCfg.Processing.Processor; lst != nil { + procs.List = append(procs.List, lst) + } + + // 3. User processors + userProcessors, err := processors.New(config.Processors) + if err != nil { + return nil, err + } + procs.List = append(procs.List, userProcessors) + return procs, nil +} + ////////////////////////////// // addFormattedIndex is a Processor to set an event's "raw-index" metadata field // with a given TimestampFormatString. The elasticsearch output interprets diff --git a/filebeat/channel/connector_test.go b/filebeat/channel/connector_test.go index 9a7b5b19513..79c0b9f5dac 100644 --- a/filebeat/channel/connector_test.go +++ b/filebeat/channel/connector_test.go @@ -18,76 +18,178 @@ package channel import ( + "fmt" "testing" + "time" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/actions" + "github.com/stretchr/testify/assert" ) -func TestThing(t *testing.T) { - done := make(chan struct{}) - beatInfo := beat.Info{Beat: "TestBeat", Version: "3.9.27"} - outletFactory := NewOutletFactory(done, emptyCounter{}, beatInfo) - pipeline := newChannelPipeline() - connector := outletFactory.Create(pipeline) - config, err := common.NewConfigFrom("index: 'test'") - if err != nil { - t.Error(err) - } - //field, _ := config.String("index", -1) - //fmt.Printf("config index: %v\n", field) - //config := common.NewConfig() - outleter, err := connector.ConnectWith( - config, - beat.ClientConfig{}, - ) - outleter.OnEvent(beat.Event{}) - if err != nil { - t.Error(err) +func TestBuildProcessorList(t *testing.T) { + testCases := []struct { + description string + beatInfo beat.Info + configStr string + clientCfg beat.ClientConfig + event beat.Event + expectedFields map[string]string + }{ + { + description: "Simple static index", + configStr: "index: 'test'", + expectedFields: map[string]string{ + "@metadata.raw-index": "test", + }, + }, + { + description: "Index with agent info + timestamp", + beatInfo: beat.Info{Beat: "TestBeat", Version: "3.9.27"}, + configStr: "index: 'beat-%{[agent.name]}-%{[agent.version]}-%{+yyyy.MM.dd}'", + event: beat.Event{Timestamp: time.Date(1999, time.December, 31, 23, 0, 0, 0, time.UTC)}, + expectedFields: map[string]string{ + "@metadata.raw-index": "beat-TestBeat-3.9.27-1999.12.31", + }, + }, + { + description: "Set index in ClientConfig", + clientCfg: beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + Processor: makeProcessors(&setRawIndex{"clientCfgIndex"}), + }, + }, + expectedFields: map[string]string{ + "@metadata.raw-index": "clientCfgIndex", + }, + }, + { + description: "ClientConfig processor runs after beat input Index", + configStr: "index: 'test'", + clientCfg: beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + Processor: makeProcessors(&setRawIndex{"clientCfgIndex"}), + }, + }, + expectedFields: map[string]string{ + "@metadata.raw-index": "clientCfgIndex", + }, + }, + { + description: "Set field in input config", + configStr: `processors: +- add_fields: {fields: {testField: inputConfig}}`, + expectedFields: map[string]string{ + "fields.testField": "inputConfig", + }, + }, + { + description: "Set field in ClientConfig", + clientCfg: beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + Processor: makeProcessors(actions.NewAddFields(common.MapStr{ + "fields": common.MapStr{"testField": "clientConfig"}, + }, false)), + }, + }, + expectedFields: map[string]string{ + "fields.testField": "clientConfig", + }, + }, + { + description: "Input config processors run after ClientConfig", + configStr: `processors: +- add_fields: {fields: {testField: inputConfig}}`, + clientCfg: beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + Processor: makeProcessors(actions.NewAddFields(common.MapStr{ + "fields": common.MapStr{"testField": "clientConfig"}, + }, false)), + }, + }, + expectedFields: map[string]string{ + "fields.testField": "inputConfig", + }, + }, } - processedEvent := <-pipeline.events - if processedEvent.Meta == nil { - //t.Error("Event Meta shouldn't be empty") + for _, test := range testCases { + if test.event.Fields == nil { + test.event.Fields = common.MapStr{} + } + config, err := outletConfigFromString(test.configStr) + if err != nil { + t.Errorf("[%s] %v", test.description, err) + continue + } + processors, err := buildProcessorList(test.beatInfo, config, test.clientCfg) + if err != nil { + t.Errorf("[%s] %v", test.description, err) + continue + } + processedEvent, err := processors.Run(&test.event) + if err != nil { + t.Error(err) + continue + } + for key, value := range test.expectedFields { + field, err := processedEvent.GetValue(key) + if err != nil { + t.Errorf("[%s] Couldn't get field %s from event: %v", test.description, key, err) + continue + } + assert.Equal(t, field, value) + fieldStr, ok := field.(string) + if !ok { + // Note that requiring a string here is just to simplify the test setup, + // not a requirement of the underlying api. + t.Errorf("[%s] Field [%s] should be a string", test.description, key) + continue + } + if fieldStr != value { + t.Errorf("[%s] Event field [%s]: expected [%s], got [%s]", test.description, key, value, fieldStr) + } + } } } -type emptyCounter struct{} - -func (c emptyCounter) Add(n int) {} -func (c emptyCounter) Done() {} - -// channelPipeline is a Pipeline (and Client) whose connections just echo their -// events to a shared events channel for testing. -type channelPipeline struct { - events chan beat.Event -} - -func newChannelPipeline() *channelPipeline { - return &channelPipeline{make(chan beat.Event, 100)} +// setRawIndex is a bare-bones processor to set the raw-index field to a +// constant string in the event metadata. It is used to test order of operations +// for buildProcessorList. +type setRawIndex struct { + indexStr string } -func (cp *channelPipeline) SetACKHandler(h beat.PipelineACKHandler) error { - return nil -} - -func (cp *channelPipeline) ConnectWith(conf beat.ClientConfig) (beat.Client, error) { - return cp, nil -} - -func (cp *channelPipeline) Connect() (beat.Client, error) { - return cp, nil +func (p *setRawIndex) Run(event *beat.Event) (*beat.Event, error) { + if event.Meta == nil { + event.Meta = common.MapStr{} + } + event.Meta["raw-index"] = p.indexStr + return event, nil } -func (cp *channelPipeline) Publish(event beat.Event) { - cp.events <- event +func (p *setRawIndex) String() string { + return fmt.Sprintf("set_raw_index=%v", p.indexStr) } -func (cp *channelPipeline) PublishAll(events []beat.Event) { - for _, event := range events { - cp.Publish(event) +// Helper function to convert from YML input string to an unpacked +// inputOutletConfig +func outletConfigFromString(s string) (inputOutletConfig, error) { + config := inputOutletConfig{} + cfg, err := common.NewConfigFrom(s) + if err != nil { + return config, err + } + if err := cfg.Unpack(&config); err != nil { + return config, err } + return config, nil } -func (cp *channelPipeline) Close() error { - return nil +// makeProcessors wraps one or more bare Processor objects in Processors. +func makeProcessors(procs ...processors.Processor) *processors.Processors { + procList := processors.NewList(nil) + procList.List = procs + return procList } From 5d735a9f5f23d33c2461265050acf8cf6d795b5e Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 30 Oct 2019 10:14:29 -0400 Subject: [PATCH 10/19] Address review comments --- filebeat/channel/connector.go | 20 +++++----- filebeat/channel/connector_test.go | 64 +++++++++++++----------------- libbeat/idxmgmt/std.go | 4 +- 3 files changed, 38 insertions(+), 50 deletions(-) diff --git a/filebeat/channel/connector.go b/filebeat/channel/connector.go index 7089d0f3a19..cd55fe9e946 100644 --- a/filebeat/channel/connector.go +++ b/filebeat/channel/connector.go @@ -34,6 +34,14 @@ type pipelineConnector struct { pipeline beat.Pipeline } +// addFormattedIndex is a Processor to set an event's "raw_index" metadata field +// with a given TimestampFormatString. The elasticsearch output interprets +// that field as specifying the (raw string) index the event should be sent to; +// in other outputs it is just included in the metadata. +type addFormattedIndex struct { + formatString *fmtstr.TimestampFormatString +} + // Connect passes the cfg and the zero value of beat.ClientConfig to the underlying function. func (fn ConnectorFunc) Connect(cfg *common.Config) (Outleter, error) { return fn(cfg, beat.ClientConfig{}) @@ -141,16 +149,6 @@ func buildProcessorList( return procs, nil } -////////////////////////////// -// addFormattedIndex is a Processor to set an event's "raw-index" metadata field -// with a given TimestampFormatString. The elasticsearch output interprets -// that field as specifying the (raw string) index the event should be sent to; -// in other outputs it is just included in the metadata. - -type addFormattedIndex struct { - formatString *fmtstr.TimestampFormatString -} - func (p *addFormattedIndex) Run(event *beat.Event) (*beat.Event, error) { index, err := p.formatString.Run(event.Timestamp) if err != nil { @@ -160,7 +158,7 @@ func (p *addFormattedIndex) Run(event *beat.Event) (*beat.Event, error) { if event.Meta == nil { event.Meta = common.MapStr{} } - event.Meta["raw-index"] = index + event.Meta["raw_index"] = index return event, nil } diff --git a/filebeat/channel/connector_test.go b/filebeat/channel/connector_test.go index 79c0b9f5dac..a3bf0d4d986 100644 --- a/filebeat/channel/connector_test.go +++ b/filebeat/channel/connector_test.go @@ -30,63 +30,55 @@ import ( ) func TestBuildProcessorList(t *testing.T) { - testCases := []struct { - description string + testCases := map[string]struct { beatInfo beat.Info configStr string clientCfg beat.ClientConfig event beat.Event expectedFields map[string]string }{ - { - description: "Simple static index", - configStr: "index: 'test'", + "Simple static index": { + configStr: "index: 'test'", expectedFields: map[string]string{ - "@metadata.raw-index": "test", + "@metadata.raw_index": "test", }, }, - { - description: "Index with agent info + timestamp", - beatInfo: beat.Info{Beat: "TestBeat", Version: "3.9.27"}, - configStr: "index: 'beat-%{[agent.name]}-%{[agent.version]}-%{+yyyy.MM.dd}'", - event: beat.Event{Timestamp: time.Date(1999, time.December, 31, 23, 0, 0, 0, time.UTC)}, + "Index with agent info + timestamp": { + beatInfo: beat.Info{Beat: "TestBeat", Version: "3.9.27"}, + configStr: "index: 'beat-%{[agent.name]}-%{[agent.version]}-%{+yyyy.MM.dd}'", + event: beat.Event{Timestamp: time.Date(1999, time.December, 31, 23, 0, 0, 0, time.UTC)}, expectedFields: map[string]string{ - "@metadata.raw-index": "beat-TestBeat-3.9.27-1999.12.31", + "@metadata.raw_index": "beat-TestBeat-3.9.27-1999.12.31", }, }, - { - description: "Set index in ClientConfig", + "Set index in ClientConfig": { clientCfg: beat.ClientConfig{ Processing: beat.ProcessingConfig{ Processor: makeProcessors(&setRawIndex{"clientCfgIndex"}), }, }, expectedFields: map[string]string{ - "@metadata.raw-index": "clientCfgIndex", + "@metadata.raw_index": "clientCfgIndex", }, }, - { - description: "ClientConfig processor runs after beat input Index", - configStr: "index: 'test'", + "ClientConfig processor runs after beat input Index": { + configStr: "index: 'test'", clientCfg: beat.ClientConfig{ Processing: beat.ProcessingConfig{ Processor: makeProcessors(&setRawIndex{"clientCfgIndex"}), }, }, expectedFields: map[string]string{ - "@metadata.raw-index": "clientCfgIndex", + "@metadata.raw_index": "clientCfgIndex", }, }, - { - description: "Set field in input config", - configStr: `processors: -- add_fields: {fields: {testField: inputConfig}}`, + "Set field in input config": { + configStr: `processors: [add_fields: {fields: {testField: inputConfig}}]`, expectedFields: map[string]string{ "fields.testField": "inputConfig", }, }, - { - description: "Set field in ClientConfig", + "Set field in ClientConfig": { clientCfg: beat.ClientConfig{ Processing: beat.ProcessingConfig{ Processor: makeProcessors(actions.NewAddFields(common.MapStr{ @@ -98,10 +90,8 @@ func TestBuildProcessorList(t *testing.T) { "fields.testField": "clientConfig", }, }, - { - description: "Input config processors run after ClientConfig", - configStr: `processors: -- add_fields: {fields: {testField: inputConfig}}`, + "Input config processors run after ClientConfig": { + configStr: `processors: [add_fields: {fields: {testField: inputConfig}}]`, clientCfg: beat.ClientConfig{ Processing: beat.ProcessingConfig{ Processor: makeProcessors(actions.NewAddFields(common.MapStr{ @@ -114,18 +104,18 @@ func TestBuildProcessorList(t *testing.T) { }, }, } - for _, test := range testCases { + for description, test := range testCases { if test.event.Fields == nil { test.event.Fields = common.MapStr{} } config, err := outletConfigFromString(test.configStr) if err != nil { - t.Errorf("[%s] %v", test.description, err) + t.Errorf("[%s] %v", description, err) continue } processors, err := buildProcessorList(test.beatInfo, config, test.clientCfg) if err != nil { - t.Errorf("[%s] %v", test.description, err) + t.Errorf("[%s] %v", description, err) continue } processedEvent, err := processors.Run(&test.event) @@ -136,7 +126,7 @@ func TestBuildProcessorList(t *testing.T) { for key, value := range test.expectedFields { field, err := processedEvent.GetValue(key) if err != nil { - t.Errorf("[%s] Couldn't get field %s from event: %v", test.description, key, err) + t.Errorf("[%s] Couldn't get field %s from event: %v", description, key, err) continue } assert.Equal(t, field, value) @@ -144,17 +134,17 @@ func TestBuildProcessorList(t *testing.T) { if !ok { // Note that requiring a string here is just to simplify the test setup, // not a requirement of the underlying api. - t.Errorf("[%s] Field [%s] should be a string", test.description, key) + t.Errorf("[%s] Field [%s] should be a string", description, key) continue } if fieldStr != value { - t.Errorf("[%s] Event field [%s]: expected [%s], got [%s]", test.description, key, value, fieldStr) + t.Errorf("[%s] Event field [%s]: expected [%s], got [%s]", description, key, value, fieldStr) } } } } -// setRawIndex is a bare-bones processor to set the raw-index field to a +// setRawIndex is a bare-bones processor to set the raw_index field to a // constant string in the event metadata. It is used to test order of operations // for buildProcessorList. type setRawIndex struct { @@ -165,7 +155,7 @@ func (p *setRawIndex) Run(event *beat.Event) (*beat.Event, error) { if event.Meta == nil { event.Meta = common.MapStr{} } - event.Meta["raw-index"] = p.indexStr + event.Meta["raw_index"] = p.indexStr return event, nil } diff --git a/libbeat/idxmgmt/std.go b/libbeat/idxmgmt/std.go index e3e285f0ced..f6951d729ad 100644 --- a/libbeat/idxmgmt/std.go +++ b/libbeat/idxmgmt/std.go @@ -367,8 +367,8 @@ func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string { // This is functionally identical to Meta["alias"], returning the overriding // metadata as the index name if present. It is currently used by Filebeat // to send the index for particular inputs to formatted string templates, - // which are then expanded by a processor to the "raw-index" field. - if tmp := evt.Meta["raw-index"]; tmp != nil { + // which are then expanded by a processor to the "raw_index" field. + if tmp := evt.Meta["raw_index"]; tmp != nil { if idx, ok := tmp.(string); ok { return idx } From d255d44fd80621551eedf248ca50a0a1bfb94bd8 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 30 Oct 2019 10:47:58 -0400 Subject: [PATCH 11/19] Document the new option --- filebeat/docs/inputs/input-common-options.asciidoc | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/filebeat/docs/inputs/input-common-options.asciidoc b/filebeat/docs/inputs/input-common-options.asciidoc index 53d745e0cad..29a478ee2fe 100644 --- a/filebeat/docs/inputs/input-common-options.asciidoc +++ b/filebeat/docs/inputs/input-common-options.asciidoc @@ -64,7 +64,7 @@ If this option is set to true, the custom <<{beatname_lc}-input-{type}-fields,fields>> are stored as top-level fields in the output document instead of being grouped under a `fields` sub-dictionary. If the custom field names conflict with other field names added by {beatname_uc}, -then the custom fields overwrite the other fields. +then the custom fields overwrite the other fields. [float] ===== `processors` @@ -84,3 +84,14 @@ this option usually results in simpler configuration files. If the pipeline is configured both in the input and output, the option from the input is used. +[float] +===== `index` + +If present, this formatted string sets the index for events from this input +(for elasticsearch outputs), or sets the `raw_index` field of the event's +metadata (for other outputs). This string can only refer to the agent name and +version and the event timestamp; for access to dynamic fields, use +`output.elasticsearch.index` or a processor. + +Example value: `"%{[agent.name]}-myindex-%{+yyyy.MM.dd}"` might +expand to `"filebeat-myindex-2019.11.01"`. From 5badc1407c5020389ec8769d637b96fc36a55168 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 30 Oct 2019 10:54:29 -0400 Subject: [PATCH 12/19] Update changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 13d5b3cf4a3..2cafea8aca1 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -379,6 +379,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add Kibana Dashboard for MISP module. {pull}14147[14147] - Add JSON options to autodiscover hints {pull}14208[14208] - Add more filesets to Zeek module. {pull}14150[14150] +- Add `index` option to all inputs to directly set a per-input index value. {pull}14010[14010] *Heartbeat* - Add non-privileged icmp on linux and darwin(mac). {pull}13795[13795] {issue}11498[11498] From beac274c3f07727b6996a3410162f88892c306ac Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 30 Oct 2019 12:10:52 -0400 Subject: [PATCH 13/19] regenerate formatting --- filebeat/channel/connector_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/filebeat/channel/connector_test.go b/filebeat/channel/connector_test.go index a3bf0d4d986..0522110bd1a 100644 --- a/filebeat/channel/connector_test.go +++ b/filebeat/channel/connector_test.go @@ -22,11 +22,12 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/processors" "github.com/elastic/beats/libbeat/processors/actions" - "github.com/stretchr/testify/assert" ) func TestBuildProcessorList(t *testing.T) { From 486bcc95dc2500302e83483ad058b58600a9bd8e Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 30 Oct 2019 12:36:54 -0400 Subject: [PATCH 14/19] update *more* formatting --- libbeat/common/fmtstr/formattimestamp_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libbeat/common/fmtstr/formattimestamp_test.go b/libbeat/common/fmtstr/formattimestamp_test.go index c596cd44616..d194f597ad5 100644 --- a/libbeat/common/fmtstr/formattimestamp_test.go +++ b/libbeat/common/fmtstr/formattimestamp_test.go @@ -21,8 +21,9 @@ import ( "testing" "time" - "github.com/elastic/beats/libbeat/common" "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" ) func TestTimestampFormatString(t *testing.T) { From 5faafb1331f3c48f1a7fe6dc06fd741c449cae03 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 30 Oct 2019 12:52:07 -0400 Subject: [PATCH 15/19] Use the right arguments in filebeat beater? --- filebeat/beater/filebeat.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index c30203dea16..76e7e8b12f8 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -326,7 +326,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { outDone := make(chan struct{}) // outDone closes down all active pipeline connections crawler, err := crawler.New( - channel.NewOutletFactory(outDone, wgEvents).Create, + channel.NewOutletFactory(outDone, wgEvents, b.Info).Create, config.Inputs, b.Info.Version, fb.done, From 4337b5c68c63ddf89ec4bedc0f3a910e99f2d58f Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 7 Nov 2019 13:38:17 -0500 Subject: [PATCH 16/19] painfully subtle processor ordering fix --- filebeat/channel/connector.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/filebeat/channel/connector.go b/filebeat/channel/connector.go index 99e270a7629..5acc253953a 100644 --- a/filebeat/channel/connector.go +++ b/filebeat/channel/connector.go @@ -146,7 +146,18 @@ func buildProcessorList( if err != nil { return nil, err } - procs.List = append(procs.List, userProcessors) + // Subtlety: it is important here that we append the individual elements of + // userProcessors, rather than userProcessors itself, even though + // userProcessors implements the processors.Processor interface. This is + // because the contents of what we return are later pulled out into a + // processing.group rather than a processors.Processors, and the two have + // different error semantics: processors.Processors aborts processing on + // any error, whereas `processing.group` only aborts on fatal errors. The + // latter is the most common behavior, and the one we are preserving here for + // backwards compatibility. + // We are unhappy about this and have plans to fix this inconsistency at a + // higher level, but for now we need to respect the existing semantics. + procs.List = append(procs.List, userProcessors.List...) return procs, nil } From f74af38188b40eb64b68215202536bc67f9f65ed Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 7 Nov 2019 14:05:13 -0500 Subject: [PATCH 17/19] add unit test for processor semantic fix --- filebeat/channel/connector_test.go | 32 +++++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/filebeat/channel/connector_test.go b/filebeat/channel/connector_test.go index 0522110bd1a..6b10f035511 100644 --- a/filebeat/channel/connector_test.go +++ b/filebeat/channel/connector_test.go @@ -120,9 +120,11 @@ func TestBuildProcessorList(t *testing.T) { continue } processedEvent, err := processors.Run(&test.event) - if err != nil { - t.Error(err) - continue + // We don't check if err != nil, because we are testing the final outcome + // of running the processors, including when some of them fail. + if processedEvent == nil { + t.Errorf("[%s] Unexpected fatal error running processors: %v\n", + description, err) } for key, value := range test.expectedFields { field, err := processedEvent.GetValue(key) @@ -145,6 +147,30 @@ func TestBuildProcessorList(t *testing.T) { } } +func TestBuildProcessorListIsFlat(t *testing.T) { + // This test is regrettable, and exists because of inconsistencies in + // processor handling between processors.Processors and processing.group + // (which implements beat.ProcessorList) -- see buildProcessorList for + // details. The upshot is that, for now, if the input configuration specifies + // processors, they must be returned as direct children of the resulting + // processors.Processors (rather than being collected in additional tree + // structure). + // This test should be removed once we have a more consistent mechanism for + // collecting and running processors. + configStr := `processors: +- add_fields: {fields: {testField: value}} +- add_fields: {fields: {testField2: stuff}}` + config, err := outletConfigFromString(configStr) + if err != nil { + t.Fatal(err) + } + processors, err := buildProcessorList(beat.Info{}, config, beat.ClientConfig{}) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, 2, len(processors.List)) +} + // setRawIndex is a bare-bones processor to set the raw_index field to a // constant string in the event metadata. It is used to test order of operations // for buildProcessorList. From 88698bcdeddf25697811e952265ccc095baf66ba Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 7 Nov 2019 14:08:14 -0500 Subject: [PATCH 18/19] rename buildProcessorList -> processorsForConfig --- filebeat/channel/connector.go | 6 +++--- filebeat/channel/connector_test.go | 13 +++++++------ 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/filebeat/channel/connector.go b/filebeat/channel/connector.go index 5acc253953a..8556c5635ea 100644 --- a/filebeat/channel/connector.go +++ b/filebeat/channel/connector.go @@ -62,7 +62,7 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien return nil, err } - procs, err := buildProcessorList(c.parent.beatInfo, config, clientCfg) + procs, err := processorsForConfig(c.parent.beatInfo, config, clientCfg) if err != nil { return nil, err } @@ -117,8 +117,8 @@ func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.Clien return outlet, nil } -// buildProcessorList assembles the Processors for a pipelineConnector. -func buildProcessorList( +// processorsForConfig assembles the Processors for a pipelineConnector. +func processorsForConfig( beatInfo beat.Info, config inputOutletConfig, clientCfg beat.ClientConfig, ) (*processors.Processors, error) { procs := processors.NewList(nil) diff --git a/filebeat/channel/connector_test.go b/filebeat/channel/connector_test.go index 6b10f035511..4708a7e45a5 100644 --- a/filebeat/channel/connector_test.go +++ b/filebeat/channel/connector_test.go @@ -30,7 +30,7 @@ import ( "github.com/elastic/beats/libbeat/processors/actions" ) -func TestBuildProcessorList(t *testing.T) { +func TestProcessorsForConfig(t *testing.T) { testCases := map[string]struct { beatInfo beat.Info configStr string @@ -114,7 +114,7 @@ func TestBuildProcessorList(t *testing.T) { t.Errorf("[%s] %v", description, err) continue } - processors, err := buildProcessorList(test.beatInfo, config, test.clientCfg) + processors, err := processorsForConfig(test.beatInfo, config, test.clientCfg) if err != nil { t.Errorf("[%s] %v", description, err) continue @@ -147,10 +147,10 @@ func TestBuildProcessorList(t *testing.T) { } } -func TestBuildProcessorListIsFlat(t *testing.T) { +func TestProcessorsForConfigIsFlat(t *testing.T) { // This test is regrettable, and exists because of inconsistencies in // processor handling between processors.Processors and processing.group - // (which implements beat.ProcessorList) -- see buildProcessorList for + // (which implements beat.ProcessorList) -- see processorsForConfig for // details. The upshot is that, for now, if the input configuration specifies // processors, they must be returned as direct children of the resulting // processors.Processors (rather than being collected in additional tree @@ -164,7 +164,8 @@ func TestBuildProcessorListIsFlat(t *testing.T) { if err != nil { t.Fatal(err) } - processors, err := buildProcessorList(beat.Info{}, config, beat.ClientConfig{}) + processors, err := processorsForConfig( + beat.Info{}, config, beat.ClientConfig{}) if err != nil { t.Fatal(err) } @@ -173,7 +174,7 @@ func TestBuildProcessorListIsFlat(t *testing.T) { // setRawIndex is a bare-bones processor to set the raw_index field to a // constant string in the event metadata. It is used to test order of operations -// for buildProcessorList. +// for processorsForConfig. type setRawIndex struct { indexStr string } From 406c09ebdb2e488a75503e941252e345756592d5 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 7 Nov 2019 14:23:25 -0500 Subject: [PATCH 19/19] remove vestigial backticks in comment --- filebeat/channel/connector.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filebeat/channel/connector.go b/filebeat/channel/connector.go index 8556c5635ea..ebd5983a3f7 100644 --- a/filebeat/channel/connector.go +++ b/filebeat/channel/connector.go @@ -152,7 +152,7 @@ func processorsForConfig( // because the contents of what we return are later pulled out into a // processing.group rather than a processors.Processors, and the two have // different error semantics: processors.Processors aborts processing on - // any error, whereas `processing.group` only aborts on fatal errors. The + // any error, whereas processing.group only aborts on fatal errors. The // latter is the most common behavior, and the one we are preserving here for // backwards compatibility. // We are unhappy about this and have plans to fix this inconsistency at a