-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New processor: copy_fields #11303
New processor: copy_fields #11303
Changes from 12 commits
7c56114
4e4da1f
215ebcb
69f555f
ec97c65
207273e
c3cd832
00dd7eb
56d468a
c19dc47
bba6555
4d3c2f5
185f8cc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,108 @@ | ||||||
// Licensed to Elasticsearch B.V. under one or more contributor | ||||||
// license agreements. See the NOTICE file distributed with | ||||||
// this work for additional information regarding copyright | ||||||
// ownership. Elasticsearch B.V. licenses this file to you under | ||||||
// the Apache License, Version 2.0 (the "License"); you may | ||||||
// not use this file except in compliance with the License. | ||||||
// You may obtain a copy of the License at | ||||||
// | ||||||
// http://www.apache.org/licenses/LICENSE-2.0 | ||||||
// | ||||||
// Unless required by applicable law or agreed to in writing, | ||||||
// software distributed under the License is distributed on an | ||||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||||
// KIND, either express or implied. See the License for the | ||||||
// specific language governing permissions and limitations | ||||||
// under the License. | ||||||
|
||||||
package actions | ||||||
|
||||||
import ( | ||||||
"fmt" | ||||||
|
||||||
"github.com/pkg/errors" | ||||||
|
||||||
"github.com/elastic/beats/libbeat/beat" | ||||||
"github.com/elastic/beats/libbeat/common" | ||||||
"github.com/elastic/beats/libbeat/logp" | ||||||
"github.com/elastic/beats/libbeat/processors" | ||||||
) | ||||||
|
||||||
type copyFields struct { | ||||||
config copyFieldsConfig | ||||||
} | ||||||
|
||||||
type copyFieldsConfig struct { | ||||||
Fields []fromTo `config:"fields"` | ||||||
IgnoreMissing bool `config:"ignore_missing"` | ||||||
FailOnError bool `config:"fail_on_error"` | ||||||
} | ||||||
|
||||||
func init() { | ||||||
processors.RegisterPlugin("copy_fields", | ||||||
configChecked(newCopyFields, | ||||||
requireFields("fields"), | ||||||
), | ||||||
) | ||||||
} | ||||||
|
||||||
func newCopyFields(c *common.Config) (processors.Processor, error) { | ||||||
config := copyFieldsConfig{ | ||||||
IgnoreMissing: false, | ||||||
FailOnError: true, | ||||||
} | ||||||
err := c.Unpack(&config) | ||||||
if err != nil { | ||||||
return nil, fmt.Errorf("failed to unpack the configuration of copy processor: %s", err) | ||||||
} | ||||||
|
||||||
f := ©Fields{ | ||||||
config: config, | ||||||
} | ||||||
return f, nil | ||||||
} | ||||||
|
||||||
func (f *copyFields) Run(event *beat.Event) (*beat.Event, error) { | ||||||
var backup common.MapStr | ||||||
if f.config.FailOnError { | ||||||
backup = event.Fields.Clone() | ||||||
} | ||||||
|
||||||
for _, field := range f.config.Fields { | ||||||
err := f.copyField(field.From, field.To, event.Fields) | ||||||
if err != nil && f.config.FailOnError { | ||||||
errMsg := fmt.Errorf("Failed to copy fields in copy_fields processor: %s", err) | ||||||
logp.Debug("copy_fields", errMsg.Error()) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we have a selector for logger? I like what @andrewkroh's did in the script processor, see
And
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TBH in case of this simple processor, I don't see the added value of letting a user set a debug selector. I doubt that users would need to filter for a specific copy processor in their logs, or if they have to, I think messages let then identify the processor easily. If we want to let user set a selector, it would be much better to let them configure it for all processors. There are other processors which would benefit more from having such setting than this one. What if I open an issue and implement the functionality for all processors in a follow up PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am OK with a followup PR. |
||||||
event.Fields = backup | ||||||
event.PutValue("error.message", errMsg.Error()) | ||||||
return event, err | ||||||
} | ||||||
} | ||||||
|
||||||
return event, nil | ||||||
} | ||||||
|
||||||
func (f *copyFields) copyField(from string, to string, fields common.MapStr) error { | ||||||
exists, _ := fields.HasKey(to) | ||||||
ruflin marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
if exists { | ||||||
return fmt.Errorf("target field %s already exists, drop or rename this field first", to) | ||||||
} | ||||||
|
||||||
value, err := fields.GetValue(from) | ||||||
if err != nil { | ||||||
if f.config.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound { | ||||||
return nil | ||||||
} | ||||||
return fmt.Errorf("could not fetch value for key: %s, Error: %s", from, err) | ||||||
} | ||||||
|
||||||
_, err = fields.Put(to, value) | ||||||
if err != nil { | ||||||
return fmt.Errorf("could not copy value to %s: %v, %+v", to, value, err) | ||||||
} | ||||||
return nil | ||||||
} | ||||||
|
||||||
func (f *copyFields) String() string { | ||||||
return "copy_fields=" + fmt.Sprintf("%+v", f.config.Fields) | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package actions | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
|
||
"github.com/elastic/beats/libbeat/beat" | ||
"github.com/elastic/beats/libbeat/common" | ||
) | ||
|
||
func TestCopyFields(t *testing.T) { | ||
|
||
var tests = map[string]struct { | ||
FromTo fromTo | ||
Input common.MapStr | ||
Expected common.MapStr | ||
}{ | ||
"copy string from message to message_copied": { | ||
FromTo: fromTo{ | ||
From: "message", | ||
To: "message_copied", | ||
}, | ||
Input: common.MapStr{ | ||
kvch marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"message": "please copy this line", | ||
}, | ||
Expected: common.MapStr{ | ||
"message": "please copy this line", | ||
"message_copied": "please copy this line", | ||
}, | ||
}, | ||
"copy string from nested key nested.message to top level field message_copied": { | ||
FromTo: fromTo{ | ||
From: "nested.message", | ||
To: "message_copied", | ||
}, | ||
Input: common.MapStr{ | ||
"nested": common.MapStr{ | ||
"message": "please copy this line", | ||
}, | ||
}, | ||
Expected: common.MapStr{ | ||
"nested": common.MapStr{ | ||
"message": "please copy this line", | ||
}, | ||
"message_copied": "please copy this line", | ||
}, | ||
}, | ||
"copy string from fieldname with dot to message_copied": { | ||
FromTo: fromTo{ | ||
From: "dotted.message", | ||
To: "message_copied", | ||
}, | ||
Input: common.MapStr{ | ||
"dotted.message": "please copy this line", | ||
}, | ||
Expected: common.MapStr{ | ||
"dotted.message": "please copy this line", | ||
"message_copied": "please copy this line", | ||
}, | ||
}, | ||
"copy number from fieldname with dot to dotted message.copied": { | ||
FromTo: fromTo{ | ||
From: "message.original", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if you copy There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I set If the test input was different e.g. the key Input: common.MapStr{
"message": common.MapStr{
"original": 42,
},
} It would fail and tell the user to drop the field This is a more realistic test case, so I am adding it also. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interesting, good to know. Could you add a test case for both and one of them would fail. The reason I'm asking also for the failing one is because in case we change the behaviour of the above in the future, we will know that it has an impact. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test with the description |
||
To: "message.copied", | ||
}, | ||
Input: common.MapStr{ | ||
"message.original": 42, | ||
}, | ||
Expected: common.MapStr{ | ||
"message.original": 42, | ||
"message": common.MapStr{ | ||
"copied": 42, | ||
}, | ||
}, | ||
}, | ||
"copy number from hierarchical message.original to top level message which fails": { | ||
FromTo: fromTo{ | ||
From: "message.original", | ||
To: "message", | ||
}, | ||
Input: common.MapStr{ | ||
"message": common.MapStr{ | ||
"original": 42, | ||
}, | ||
}, | ||
Expected: common.MapStr{ | ||
"message": common.MapStr{ | ||
"original": 42, | ||
}, | ||
}, | ||
}, | ||
"copy number from hierarchical message.original to top level message": { | ||
FromTo: fromTo{ | ||
From: "message.original", | ||
To: "message", | ||
}, | ||
Input: common.MapStr{ | ||
"message.original": 42, | ||
}, | ||
Expected: common.MapStr{ | ||
"message.original": 42, | ||
"message": 42, | ||
}, | ||
}, | ||
} | ||
|
||
for name, test := range tests { | ||
t.Run(name, func(t *testing.T) { | ||
p := copyFields{ | ||
copyFieldsConfig{ | ||
Fields: []fromTo{ | ||
test.FromTo, | ||
}, | ||
}, | ||
} | ||
|
||
event := &beat.Event{ | ||
Fields: test.Input, | ||
} | ||
|
||
newEvent, err := p.Run(event) | ||
assert.NoError(t, err) | ||
|
||
assert.Equal(t, test.Expected, newEvent.Fields) | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
problems with the rebase?