From 1cae31da32fcc7f1ba72dd2972a7cb681fc2a605 Mon Sep 17 00:00:00 2001 From: Dan Kortschak <90160302+efd6@users.noreply.github.com> Date: Tue, 14 Sep 2021 18:43:06 +0930 Subject: [PATCH] x-pack/filebeat/input/httpjson/internal/v2: allow split chains to continue past empty targets (#27880) This adds a configuration option "ignore_empty_value" that allows a split processor chain to continue if a target field is missing or empty. Updates #26008 (cherry picked from commit 2036ad8c590a8bac1750d9e16978f1dfbbbc75b1) --- CHANGELOG.next.asciidoc | 2 + .../docs/inputs/input-httpjson.asciidoc | 5 + .../httpjson/internal/v2/config_response.go | 15 +- .../input/httpjson/internal/v2/split.go | 77 +++-- .../input/httpjson/internal/v2/split_test.go | 264 ++++++++++++++++++ 5 files changed, 338 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 532b694dab8..c5ab0764847 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -431,6 +431,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Added support for parsing syslog dates containing a leading 0 (e.g. `Sep 01`) rather than a space. {pull}27775[27775] - Add base64 Encode functionality to httpjson input. {pull}27681[27681] - Add `join` and `sprintf` functions to `httpjson` input. {pull}27735[27735] +- Improve memory usage of line reader of `log` and `filestream` input. {pull}27782[27782] +- Add `ignore_empty_value` flag to `httpjson` `split` processor. {pull}27880[27880] *Heartbeat* diff --git a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc index 6b5444be055..0fd64311ef4 100644 --- a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc @@ -566,6 +566,11 @@ Required if using split type of `string`. This is the sub string used to split Valid when used with `type: map`. When not empty, defines a new field where the original key value will be stored. +[float] +==== `response.split[].ignore_empty_value` + +If set to true, empty or missing value will be ignored and processing will pass on to the next nested split operation instead of failing with an error. Default: `false`. + [float] ==== `response.split[].split` diff --git a/x-pack/filebeat/input/httpjson/internal/v2/config_response.go b/x-pack/filebeat/input/httpjson/internal/v2/config_response.go index 0bb51910387..1bc3056ab17 100644 --- a/x-pack/filebeat/input/httpjson/internal/v2/config_response.go +++ b/x-pack/filebeat/input/httpjson/internal/v2/config_response.go @@ -24,13 +24,14 @@ type responseConfig struct { } type splitConfig struct { - Target string `config:"target" validation:"required"` - Type string `config:"type"` - Transforms transformsConfig `config:"transforms"` - Split *splitConfig `config:"split"` - KeepParent bool `config:"keep_parent"` - KeyField string `config:"key_field"` - DelimiterString string `config:"delimiter"` + Target string `config:"target" validation:"required"` + Type string `config:"type"` + Transforms transformsConfig `config:"transforms"` + Split *splitConfig `config:"split"` + KeepParent bool `config:"keep_parent"` + KeyField string `config:"key_field"` + DelimiterString string `config:"delimiter"` + IgnoreEmptyValue bool `config:"ignore_empty_value"` } func (c *responseConfig) Validate() error { diff --git a/x-pack/filebeat/input/httpjson/internal/v2/split.go b/x-pack/filebeat/input/httpjson/internal/v2/split.go index 9cb686e63ad..56c89f7f9ef 100644 --- a/x-pack/filebeat/input/httpjson/internal/v2/split.go +++ b/x-pack/filebeat/input/httpjson/internal/v2/split.go @@ -21,18 +21,24 @@ var ( errExpectedSplitString = errors.New("split was expecting field to be a string") ) +// split is a split processor chain element. Split processing is executed +// by applying elements of the chain's linked list to an input until completed +// or an error state is encountered. type split struct { - log *logp.Logger - targetInfo targetInfo - kind string - transforms []basicTransform - child *split - keepParent bool - keyField string - isRoot bool - delimiter string + log *logp.Logger + targetInfo targetInfo + kind string + transforms []basicTransform + child *split + keepParent bool + ignoreEmptyValue bool + keyField string + isRoot bool + delimiter string } +// newSplitResponse returns a new split based on the provided config and +// logging to the provided logger, tagging the split as the root of the chain. func newSplitResponse(cfg *splitConfig, log *logp.Logger) (*split, error) { if cfg == nil { return nil, nil @@ -42,11 +48,13 @@ func newSplitResponse(cfg *splitConfig, log *logp.Logger) (*split, error) { if err != nil { return nil, err } - // we want to be able to identify which split is the root of the chain + // We want to be able to identify which split is the root of the chain. split.isRoot = true return split, nil } +// newSplit returns a new split based on the provided config and +// logging to the provided logger. func newSplit(c *splitConfig, log *logp.Logger) (*split, error) { ti, err := getTargetInfo(c.Target) if err != nil { @@ -71,22 +79,27 @@ func newSplit(c *splitConfig, log *logp.Logger) (*split, error) { } return &split{ - log: log, - targetInfo: ti, - kind: c.Type, - keepParent: c.KeepParent, - keyField: c.KeyField, - delimiter: c.DelimiterString, - transforms: ts, - child: s, + log: log, + targetInfo: ti, + kind: c.Type, + keepParent: c.KeepParent, + ignoreEmptyValue: c.IgnoreEmptyValue, + keyField: c.KeyField, + delimiter: c.DelimiterString, + transforms: ts, + child: s, }, nil } +// run runs the split operation on the contents of resp, sending successive +// split results on ch. ctx is passed to transforms that are called during +// the split. func (s *split) run(ctx *transformContext, resp transformable, ch chan<- maybeMsg) error { root := resp.body() return s.split(ctx, root, ch) } +// split recursively executes the split processor chain. func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybeMsg) error { v, err := root.GetValue(s.targetInfo.Name) if err != nil && err != common.ErrKeyNotFound { @@ -94,6 +107,12 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe } if v == nil { + if s.ignoreEmptyValue { + if s.child != nil { + return s.child.split(ctx, root, ch) + } + return nil + } if s.isRoot { return errEmptyRootField } @@ -109,6 +128,12 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe } if len(varr) == 0 { + if s.ignoreEmptyValue { + if s.child != nil { + return s.child.split(ctx, root, ch) + } + return nil + } if s.isRoot { return errEmptyRootField } @@ -130,6 +155,12 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe } if len(vmap) == 0 { + if s.ignoreEmptyValue { + if s.child != nil { + return s.child.split(ctx, root, ch) + } + return nil + } if s.isRoot { return errEmptyRootField } @@ -151,6 +182,12 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe } if len(vstr) == 0 { + if s.ignoreEmptyValue { + if s.child != nil { + return s.child.split(ctx, root, ch) + } + return nil + } if s.isRoot { return errEmptyRootField } @@ -169,6 +206,8 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe return errors.New("unknown split type") } +// sendMessage sends an array or map split result value, v, on ch after performing +// any necessary transformations. If key is "", the value is an element of an array. func (s *split) sendMessage(ctx *transformContext, root common.MapStr, key string, v interface{}, ch chan<- maybeMsg) error { obj, ok := toMapStr(v) if !ok { @@ -220,6 +259,8 @@ func toMapStr(v interface{}) (common.MapStr, bool) { return common.MapStr{}, false } +// sendMessage sends a string split result value, v, on ch after performing any +// necessary transformations. If key is "", the value is an element of an array. func (s *split) sendMessageSplitString(ctx *transformContext, root common.MapStr, v string, ch chan<- maybeMsg) error { clone := root.Clone() _, _ = clone.Put(s.targetInfo.Name, v) diff --git a/x-pack/filebeat/input/httpjson/internal/v2/split_test.go b/x-pack/filebeat/input/httpjson/internal/v2/split_test.go index 2c53d0fcbe1..c385771667b 100644 --- a/x-pack/filebeat/input/httpjson/internal/v2/split_test.go +++ b/x-pack/filebeat/input/httpjson/internal/v2/split_test.go @@ -354,6 +354,270 @@ func TestSplit(t *testing.T) { {"@timestamp": "1234567890", "items": "Line 3"}, }, }, + { + name: "An empty array in an object", + config: &splitConfig{ + Target: "body.response", + Type: "array", + Split: &splitConfig{ + Target: "body.Event.Attributes", + IgnoreEmptyValue: true, + KeepParent: true, + Split: &splitConfig{ + Target: "body.Event.OtherAttributes", + KeepParent: true, + }, + }, + }, + ctx: emptyTransformContext(), + resp: transformable{ + "body": common.MapStr{ + "response": []interface{}{ + map[string]interface{}{ + "Event": map[string]interface{}{ + "timestamp": "1606324417", + "Attributes": []interface{}{}, + "OtherAttributes": []interface{}{ + map[string]interface{}{ + "key": "value", + }, + map[string]interface{}{ + "key2": "value2", + }, + }, + }, + }, + }, + }, + }, + expectedMessages: []common.MapStr{ + { + "Event": common.MapStr{ + "timestamp": "1606324417", + "Attributes": []interface{}{}, + "OtherAttributes": common.MapStr{ + "key": "value", + }, + }, + }, + { + "Event": common.MapStr{ + "timestamp": "1606324417", + "Attributes": []interface{}{}, + "OtherAttributes": common.MapStr{ + "key2": "value2", + }, + }, + }, + }, + expectedErr: nil, + }, + { + name: "A missing array in an object", + config: &splitConfig{ + Target: "body.response", + Type: "array", + Split: &splitConfig{ + Target: "body.Event.Attributes", + IgnoreEmptyValue: true, + KeepParent: true, + Split: &splitConfig{ + Target: "body.Event.OtherAttributes", + KeepParent: true, + }, + }, + }, + ctx: emptyTransformContext(), + resp: transformable{ + "body": common.MapStr{ + "response": []interface{}{ + map[string]interface{}{ + "Event": map[string]interface{}{ + "timestamp": "1606324417", + "OtherAttributes": []interface{}{ + map[string]interface{}{ + "key": "value", + }, + map[string]interface{}{ + "key2": "value2", + }, + }, + }, + }, + }, + }, + }, + expectedMessages: []common.MapStr{ + { + "Event": common.MapStr{ + "timestamp": "1606324417", + "OtherAttributes": common.MapStr{ + "key": "value", + }, + }, + }, + { + "Event": common.MapStr{ + "timestamp": "1606324417", + "OtherAttributes": common.MapStr{ + "key2": "value2", + }, + }, + }, + }, + expectedErr: nil, + }, + { + name: "An empty map in an object", + config: &splitConfig{ + Target: "body.response", + Type: "array", + Split: &splitConfig{ + Target: "body.Event.Attributes", + Type: "map", + IgnoreEmptyValue: true, + KeepParent: true, + Split: &splitConfig{ + Type: "map", + Target: "body.Event.OtherAttributes", + KeepParent: true, + }, + }, + }, + ctx: emptyTransformContext(), + resp: transformable{ + "body": common.MapStr{ + "response": []interface{}{ + map[string]interface{}{ + "Event": map[string]interface{}{ + "timestamp": "1606324417", + "Attributes": map[string]interface{}{}, + "OtherAttributes": map[string]interface{}{ + // Only include a single item here to avoid + // map iteration order flakes. + "1": map[string]interface{}{ + "key": "value", + }, + }, + }, + }, + }, + }, + }, + expectedMessages: []common.MapStr{ + { + "Event": common.MapStr{ + "timestamp": "1606324417", + "Attributes": common.MapStr{}, + "OtherAttributes": common.MapStr{ + "key": "value", + }, + }, + }, + }, + expectedErr: nil, + }, + { + name: "A missing map in an object", + config: &splitConfig{ + Target: "body.response", + Type: "array", + Split: &splitConfig{ + Target: "body.Event.Attributes", + Type: "map", + IgnoreEmptyValue: true, + KeepParent: true, + Split: &splitConfig{ + Type: "map", + Target: "body.Event.OtherAttributes", + KeepParent: true, + }, + }, + }, + ctx: emptyTransformContext(), + resp: transformable{ + "body": common.MapStr{ + "response": []interface{}{ + map[string]interface{}{ + "Event": map[string]interface{}{ + "timestamp": "1606324417", + "OtherAttributes": map[string]interface{}{ + // Only include a single item here to avoid + // map iteration order flakes. + "1": map[string]interface{}{ + "key": "value", + }, + }, + }, + }, + }, + }, + }, + expectedMessages: []common.MapStr{ + { + "Event": common.MapStr{ + "timestamp": "1606324417", + "OtherAttributes": common.MapStr{ + "key": "value", + }, + }, + }, + }, + expectedErr: nil, + }, + { + name: "An empty string", + config: &splitConfig{ + Target: "body.items", + Type: "string", + DelimiterString: "\n", + IgnoreEmptyValue: true, + Split: &splitConfig{ + Target: "body.other_items", + Type: "string", + DelimiterString: "\n", + }, + }, + ctx: emptyTransformContext(), + resp: transformable{ + "body": common.MapStr{ + "@timestamp": "1234567890", + "items": "", + "other_items": "Line 1\nLine 2\nLine 3", + }, + }, + expectedMessages: []common.MapStr{ + {"@timestamp": "1234567890", "items": "", "other_items": "Line 1"}, + {"@timestamp": "1234567890", "items": "", "other_items": "Line 2"}, + {"@timestamp": "1234567890", "items": "", "other_items": "Line 3"}, + }, + }, + { + name: "A missing string", + config: &splitConfig{ + Target: "body.items", + Type: "string", + DelimiterString: "\n", + IgnoreEmptyValue: true, + Split: &splitConfig{ + Target: "body.other_items", + Type: "string", + DelimiterString: "\n", + }, + }, + ctx: emptyTransformContext(), + resp: transformable{ + "body": common.MapStr{ + "@timestamp": "1234567890", + "other_items": "Line 1\nLine 2\nLine 3", + }, + }, + expectedMessages: []common.MapStr{ + {"@timestamp": "1234567890", "other_items": "Line 1"}, + {"@timestamp": "1234567890", "other_items": "Line 2"}, + {"@timestamp": "1234567890", "other_items": "Line 3"}, + }, + }, } for _, tc := range cases {