Skip to content

Commit

Permalink
Add ignore_empty_value option to prevent cursor override with empty v…
Browse files Browse the repository at this point in the history
…alues (#25802)
  • Loading branch information
marc-gr authored May 21, 2021
1 parent 094c2a5 commit 4de534f
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix o365 module config when client_secret contains special characters. {issue}25058[25058]
- Fix issue with m365_defender, when parsing incidents that has no alerts attached: {pull}25421[25421]
- Mitigate deadlock is aws-s3 input when SQS visibility timeout is exceeded. {issue}25750[25750]
- Fix httpjson cursor override with empty values by adding `ignore_empty_value` option. {pull}25802[25802]

*Filebeat*

Expand Down
6 changes: 6 additions & 0 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,12 @@ This will output:

Cursor is a list of key value objects where arbitrary values are defined. The values are interpreted as <<value-templates,value templates>> and a default template can be set. Cursor state is kept between input restarts and updated once all the events for a request are published.

Each cursor entry is formed by:

- A `value` template, which will define the value to store when evaluated.
- A `default` template, which will define the value to store when the value template fails or is empty.
- An `ignore_empty_value` flag. When set to `true`, will not store empty values, preserving the previous one, if any. Default: `true`.

Can read state from: [`.last_response.*`, `.first_event.*`, `.last_event.*`].

NOTE: Default templates do not have access to any state, only to functions.
Expand Down
13 changes: 10 additions & 3 deletions x-pack/filebeat/input/httpjson/internal/v2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@ type config struct {
Cursor cursorConfig `config:"cursor"`
}

type cursorConfig map[string]struct {
Value *valueTpl `config:"value"`
Default *valueTpl `config:"default"`
type cursorConfig map[string]cursorEntry

type cursorEntry struct {
Value *valueTpl `config:"value"`
Default *valueTpl `config:"default"`
IgnoreEmptyValue *bool `config:"ignore_empty_value"`
}

func (ce cursorEntry) mustIgnoreEmptyValue() bool {
return ce.IgnoreEmptyValue == nil || *ce.IgnoreEmptyValue
}

func (c config) Validate() error {
Expand Down
24 changes: 23 additions & 1 deletion x-pack/filebeat/input/httpjson/internal/v2/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/oauth2/google"

"github.com/elastic/beats/v7/libbeat/common"
Expand Down Expand Up @@ -65,7 +66,6 @@ func TestGetTokenURLWithAzure(t *testing.T) {
const expectedWithTenantID = "https://login.microsoftonline.com/a_tenant_id/oauth2/v2.0/token"

assert.Equal(t, expectedWithTenantID, oauth2.getTokenURL())

}

func TestGetEndpointParams(t *testing.T) {
Expand Down Expand Up @@ -370,3 +370,25 @@ func TestConfigOauth2Validation(t *testing.T) {
})
}
}

func TestCursorEntryConfig(t *testing.T) {
in := map[string]interface{}{
"entry1": map[string]interface{}{
"ignore_empty_value": true,
},
"entry2": map[string]interface{}{
"ignore_empty_value": false,
},
"entry3": map[string]interface{}{
"ignore_empty_value": nil,
},
"entry4": map[string]interface{}{},
}
cfg := common.MustNewConfigFrom(in)
conf := cursorConfig{}
require.NoError(t, cfg.Unpack(&conf))
assert.True(t, conf["entry1"].mustIgnoreEmptyValue())
assert.False(t, conf["entry2"].mustIgnoreEmptyValue())
assert.True(t, conf["entry3"].mustIgnoreEmptyValue())
assert.True(t, conf["entry4"].mustIgnoreEmptyValue())
}
6 changes: 4 additions & 2 deletions x-pack/filebeat/input/httpjson/internal/v2/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ func (c *cursor) update(trCtx *transformContext) {

for k, cfg := range c.cfg {
v, _ := cfg.Value.Execute(trCtx, transformable{}, cfg.Default, c.log)
_, _ = c.state.Put(k, v)
c.log.Debugf("cursor.%s stored with %s", k, v)
if v != "" || !cfg.mustIgnoreEmptyValue() {
_, _ = c.state.Put(k, v)
c.log.Debugf("cursor.%s stored with %s", k, v)
}
}
}

Expand Down
118 changes: 118 additions & 0 deletions x-pack/filebeat/input/httpjson/internal/v2/cursor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package v2

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)

func TestCursorUpdate(t *testing.T) {
testCases := []struct {
name string
baseConfig map[string]interface{}
trCtx *transformContext
initialState common.MapStr
expectedState common.MapStr
}{
{
name: "update an unexisting value",
baseConfig: map[string]interface{}{
"entry1": map[string]interface{}{
"value": "v1",
},
},
trCtx: emptyTransformContext(),
initialState: common.MapStr{},
expectedState: common.MapStr{
"entry1": "v1",
},
},
{
name: "update an existing value with a template",
baseConfig: map[string]interface{}{
"entry1": map[string]interface{}{
"value": "[[ .last_response.body.foo ]]",
},
},
trCtx: func() *transformContext {
trCtx := emptyTransformContext()
trCtx.lastResponse.body = common.MapStr{
"foo": "v2",
}
return trCtx
}(),
initialState: common.MapStr{
"entry1": "v1",
},
expectedState: common.MapStr{
"entry1": "v2",
},
},
{
name: "don't update an existing value if template result is empty",
baseConfig: map[string]interface{}{
"entry1": map[string]interface{}{
"value": "[[ .last_response.body.unknown ]]",
},
"entry2": map[string]interface{}{
"value": "[[ .last_response.body.unknown ]]",
"ignore_empty_value": true,
},
"entry3": map[string]interface{}{
"value": "[[ .last_response.body.unknown ]]",
"ignore_empty_value": nil,
},
},
trCtx: emptyTransformContext(),
initialState: common.MapStr{
"entry1": "v1",
"entry2": "v2",
"entry3": "v3",
},
expectedState: common.MapStr{
"entry1": "v1",
"entry2": "v2",
"entry3": "v3",
},
},
{
name: "update an existing value if template result is empty and ignore_empty_value is false",
baseConfig: map[string]interface{}{
"entry1": map[string]interface{}{
"value": "[[ .last_response.body.unknown ]]",
"ignore_empty_value": false,
},
},
trCtx: emptyTransformContext(),
initialState: common.MapStr{
"entry1": "v1",
},
expectedState: common.MapStr{
"entry1": "",
},
},
}

for _, testCase := range testCases {
tc := testCase
t.Run(tc.name, func(t *testing.T) {
cfg := common.MustNewConfigFrom(tc.baseConfig)

conf := cursorConfig{}
require.NoError(t, cfg.Unpack(&conf))

c := newCursor(conf, logp.NewLogger("cursor-test"))
c.state = tc.initialState
c.update(tc.trCtx)
assert.Equal(t, tc.expectedState, c.state)
})
}
}

0 comments on commit 4de534f

Please sign in to comment.