Skip to content

Commit

Permalink
x-pack/filebeat/input/httpjson/internal/v2: allow split chains to con…
Browse files Browse the repository at this point in the history
…tinue past empty targets

This adds a configuration option "ignore_error" that allows a split
processor chain to continue if a target field is present but empty.

Updates #26008
  • Loading branch information
efd6 committed Sep 13, 2021
1 parent 6b41742 commit 36360c1
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add base64 Encode functionality to httpjson input. {pull}27681[27681]
- Add `join` and `sprintf` functions to `httpjson` input. {pull}27735[27735]
- Improve memory usage of line reader of `log` and `filestream` input. {pull}27782[27782]
- Add `ignore_error` flag to `httpjson` `split` processor. {pull}27880[27880]


*Heartbeat*
Expand Down
5 changes: 5 additions & 0 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,11 @@ Required if using split type of `string`. This is the sub string used to split

Valid when used with `type: map`. When not empty, defines a new field where the original key value will be stored.

[float]
==== `response.split[].ignore_error`

If set to true, failure to find a non-empty target will pass operation on to the next nested split operation instead of failing with an error. Default: `false`.

[float]
==== `response.split[].split`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type splitConfig struct {
KeepParent bool `config:"keep_parent"`
KeyField string `config:"key_field"`
DelimiterString string `config:"delimiter"`
IgnoreError bool `config:"ignore_error"`
}

func (c *responseConfig) Validate() error {
Expand Down
77 changes: 59 additions & 18 deletions x-pack/filebeat/input/httpjson/internal/v2/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,24 @@ var (
errExpectedSplitString = errors.New("split was expecting field to be a string")
)

// split is a split processor chain element. Split processing is executed
// by applying elements of the chain's linked list to an input until completed
// or an error state is encountered.
type split struct {
log *logp.Logger
targetInfo targetInfo
kind string
transforms []basicTransform
child *split
keepParent bool
keyField string
isRoot bool
delimiter string
log *logp.Logger
targetInfo targetInfo
kind string
transforms []basicTransform
child *split
keepParent bool
ignoreEmptyFieldError bool
keyField string
isRoot bool
delimiter string
}

// newSplitSplit returns a new split based on the provided config and
// logging to the provided logger, tagging the split as the root of the chain.
func newSplitResponse(cfg *splitConfig, log *logp.Logger) (*split, error) {
if cfg == nil {
return nil, nil
Expand All @@ -42,11 +48,13 @@ func newSplitResponse(cfg *splitConfig, log *logp.Logger) (*split, error) {
if err != nil {
return nil, err
}
// we want to be able to identify which split is the root of the chain
// We want to be able to identify which split is the root of the chain.
split.isRoot = true
return split, nil
}

// newSplit returns a new split based on the provided config and
// logging to the provided logger.
func newSplit(c *splitConfig, log *logp.Logger) (*split, error) {
ti, err := getTargetInfo(c.Target)
if err != nil {
Expand All @@ -71,29 +79,40 @@ func newSplit(c *splitConfig, log *logp.Logger) (*split, error) {
}

return &split{
log: log,
targetInfo: ti,
kind: c.Type,
keepParent: c.KeepParent,
keyField: c.KeyField,
delimiter: c.DelimiterString,
transforms: ts,
child: s,
log: log,
targetInfo: ti,
kind: c.Type,
keepParent: c.KeepParent,
ignoreEmptyFieldError: c.IgnoreError,
keyField: c.KeyField,
delimiter: c.DelimiterString,
transforms: ts,
child: s,
}, nil
}

// run runs the split operation on the contents of resp, sending successive
// split results on ch. ctx is passed to transforms that are called during
// the split.
func (s *split) run(ctx *transformContext, resp transformable, ch chan<- maybeMsg) error {
root := resp.body()
return s.split(ctx, root, ch)
}

// split recursively executes the split processor chain.
func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybeMsg) error {
v, err := root.GetValue(s.targetInfo.Name)
if err != nil && err != common.ErrKeyNotFound {
return err
}

if v == nil {
if s.ignoreEmptyFieldError {
if s.child != nil {
return s.child.split(ctx, root, ch)
}
return nil
}
if s.isRoot {
return errEmptyRootField
}
Expand All @@ -109,6 +128,12 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe
}

if len(varr) == 0 {
if s.ignoreEmptyFieldError {
if s.child != nil {
return s.child.split(ctx, root, ch)
}
return nil
}
if s.isRoot {
return errEmptyRootField
}
Expand All @@ -130,6 +155,12 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe
}

if len(vmap) == 0 {
if s.ignoreEmptyFieldError {
if s.child != nil {
return s.child.split(ctx, root, ch)
}
return nil
}
if s.isRoot {
return errEmptyRootField
}
Expand All @@ -151,6 +182,12 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe
}

if len(vstr) == 0 {
if s.ignoreEmptyFieldError {
if s.child != nil {
return s.child.split(ctx, root, ch)
}
return nil
}
if s.isRoot {
return errEmptyRootField
}
Expand All @@ -169,6 +206,8 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe
return errors.New("unknown split type")
}

// sendMessage sends an array or map split result value, v, on ch after performing
// any necessary transformations. If key is "", the value is an element of an array.
func (s *split) sendMessage(ctx *transformContext, root common.MapStr, key string, v interface{}, ch chan<- maybeMsg) error {
obj, ok := toMapStr(v)
if !ok {
Expand Down Expand Up @@ -220,6 +259,8 @@ func toMapStr(v interface{}) (common.MapStr, bool) {
return common.MapStr{}, false
}

// sendMessage sends a string split result value, v, on ch after performing any
// necessary transformations. If key is "", the value is an element of an array.
func (s *split) sendMessageSplitString(ctx *transformContext, root common.MapStr, v string, ch chan<- maybeMsg) error {
clone := root.Clone()
_, _ = clone.Put(s.targetInfo.Name, v)
Expand Down
135 changes: 135 additions & 0 deletions x-pack/filebeat/input/httpjson/internal/v2/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,141 @@ func TestSplit(t *testing.T) {
{"@timestamp": "1234567890", "items": "Line 3"},
},
},
{
name: "A missing array in an object",
config: &splitConfig{
Target: "body.response",
Type: "array",
Split: &splitConfig{
Target: "body.Event.Attributes",
IgnoreError: true,
KeepParent: true,
Split: &splitConfig{
Target: "body.Event.OtherAttributes",
KeepParent: true,
},
},
},
ctx: emptyTransformContext(),
resp: transformable{
"body": common.MapStr{
"response": []interface{}{
map[string]interface{}{
"Event": map[string]interface{}{
"timestamp": "1606324417",
"Attributes": []interface{}{},
"OtherAttributes": []interface{}{
map[string]interface{}{
"key": "value",
},
map[string]interface{}{
"key2": "value2",
},
},
},
},
},
},
},
expectedMessages: []common.MapStr{
{
"Event": common.MapStr{
"timestamp": "1606324417",
"Attributes": []interface{}{},
"OtherAttributes": common.MapStr{
"key": "value",
},
},
},
{
"Event": common.MapStr{
"timestamp": "1606324417",
"Attributes": []interface{}{},
"OtherAttributes": common.MapStr{
"key2": "value2",
},
},
},
},
expectedErr: nil,
},
{
name: "A missing map in an object",
config: &splitConfig{
Target: "body.response",
Type: "array",
Split: &splitConfig{
Target: "body.Event.Attributes",
Type: "map",
IgnoreError: true,
KeepParent: true,
Split: &splitConfig{
Type: "map",
Target: "body.Event.OtherAttributes",
KeepParent: true,
},
},
},
ctx: emptyTransformContext(),
resp: transformable{
"body": common.MapStr{
"response": []interface{}{
map[string]interface{}{
"Event": map[string]interface{}{
"timestamp": "1606324417",
"Attributes": map[string]interface{}{},
"OtherAttributes": map[string]interface{}{
// Only include a single item here to avoid
// map iteration order flakes.
"1": map[string]interface{}{
"key": "value",
},
},
},
},
},
},
},
expectedMessages: []common.MapStr{
{
"Event": common.MapStr{
"timestamp": "1606324417",
"Attributes": common.MapStr{},
"OtherAttributes": common.MapStr{
"key": "value",
},
},
},
},
expectedErr: nil,
},
{
name: "A missing string",
config: &splitConfig{
Target: "body.items",
Type: "string",
DelimiterString: "\n",
IgnoreError: true,
Split: &splitConfig{
Target: "body.other_items",
Type: "string",
DelimiterString: "\n",
},
},
ctx: emptyTransformContext(),
resp: transformable{
"body": common.MapStr{
"@timestamp": "1234567890",
"items": "",
"other_items": "Line 1\nLine 2\nLine 3",
},
},
expectedMessages: []common.MapStr{
{"@timestamp": "1234567890", "items": "", "other_items": "Line 1"},
{"@timestamp": "1234567890", "items": "", "other_items": "Line 2"},
{"@timestamp": "1234567890", "items": "", "other_items": "Line 3"},
},
},
}

for _, tc := range cases {
Expand Down

0 comments on commit 36360c1

Please sign in to comment.