Skip to content

Commit

Permalink
Fix drop_fields when the first field is unknown (elastic#2685) (elast…
Browse files Browse the repository at this point in the history
  • Loading branch information
monicasarbu authored and tsg committed Oct 5, 2016
1 parent c103738 commit b548f70
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 7 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ https://github.com/elastic/beats/compare/v5.0.0-beta1...master[Check the HEAD di
- Make sure Beats sent always float values when they are defined as float by sending 5.00000 instead of 5.
{pull}2627[2627]

- Fix ignoring all fields from drop_fields in case the first field is unknown. {pull}2685[2685]

*Metricbeat*

- Fix default configuration file on Windows to not enabled the `load` metricset. {pull}2632[2632]
Expand Down
8 changes: 6 additions & 2 deletions libbeat/common/mapstr.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (m MapStr) Delete(key string) error {
keysLen := len(keyParts)

mapp := m

for i := 0; i < keysLen-1; i++ {
keyPart := keyParts[i]

Expand All @@ -75,8 +76,11 @@ func (m MapStr) Delete(key string) error {
return fmt.Errorf("unknown key %s", keyPart)
}
}
delete(mapp, keyParts[keysLen-1])
return nil
if _, ok := mapp[keyParts[keysLen-1]]; ok {
delete(mapp, keyParts[keysLen-1])
return nil
}
return fmt.Errorf("unknown key %s", keyParts[keysLen-1])
}

func (m MapStr) CopyFieldsTo(to MapStr, key string) error {
Expand Down
6 changes: 4 additions & 2 deletions libbeat/processors/actions/drop_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,16 @@ func newDropFields(c common.Config) (processors.Processor, error) {
}

func (f dropFields) Run(event common.MapStr) (common.MapStr, error) {
errors := []string{}

for _, field := range f.Fields {
err := event.Delete(field)
if err != nil {
return event, fmt.Errorf("Fail to delete key %s: %s", field, err)
errors = append(errors, err.Error())
}

}
return event, nil
return event, fmt.Errorf(strings.Join(errors, ", "))
}

func (f dropFields) String() string {
Expand Down
7 changes: 4 additions & 3 deletions libbeat/processors/actions/include_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,23 @@ func newIncludeFields(c common.Config) (processors.Processor, error) {

func (f includeFields) Run(event common.MapStr) (common.MapStr, error) {
filtered := common.MapStr{}
errors := []string{}

for _, field := range f.Fields {
hasKey, err := event.HasKey(field)
if err != nil {
return filtered, fmt.Errorf("Fail to check the key %s: %s", field, err)
errors = append(errors, err.Error())
}

if hasKey {
errorOnCopy := event.CopyFieldsTo(filtered, field)
if errorOnCopy != nil {
return filtered, fmt.Errorf("Fail to copy key %s: %s", field, err)
errors = append(errors, err.Error())
}
}
}

return filtered, nil
return filtered, fmt.Errorf(strings.Join(errors, ", "))
}

func (f includeFields) String() string {
Expand Down
51 changes: 51 additions & 0 deletions libbeat/processors/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,3 +569,54 @@ func TestBadConditionConfig(t *testing.T) {
assert.NotNil(t, err)

}

func TestDropMissingFields(t *testing.T) {

yml := []map[string]interface{}{
map[string]interface{}{
"drop_fields": map[string]interface{}{
"fields": []string{"foo.bar", "proc.cpu", "proc.sss", "beat", "mem"},
},
},
}

processors := GetProcessors(t, yml)

event := common.MapStr{
"@timestamp": "2016-01-24T18:35:19.308Z",
"beat": common.MapStr{
"hostname": "mar",
"name": "my-shipper-1",
},

"proc": common.MapStr{
"cpu": common.MapStr{
"start_time": "Jan14",
"system": 26027,
"total": 79390,
"total_p": 0,
"user": 53363,
},
"cmdline": "/sbin/launchd",
},
"mem": common.MapStr{
"rss": 11194368,
"rss_p": 0,
"share": 0,
"size": 2555572224,
},
"type": "process",
}

processedEvent := processors.Run(event)

expectedEvent := common.MapStr{
"@timestamp": "2016-01-24T18:35:19.308Z",
"proc": common.MapStr{
"cmdline": "/sbin/launchd",
},
"type": "process",
}

assert.Equal(t, expectedEvent, processedEvent)
}

0 comments on commit b548f70

Please sign in to comment.