-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
json_decode_fields processor #2605
Conversation
Jenkins standing by to test this. If you aren't a maintainer, you can ignore this comment. Someone with commit access, please review this and clear it for Jenkins to run. |
1 similar comment
Jenkins standing by to test this. If you aren't a maintainer, you can ignore this comment. Someone with commit access, please review this and clear it for Jenkins to run. |
@suraj-soni Thanks a lot for the PR! I'd like to ask for a system test that implements the use case that you have (e.g. log file + configuration). Also not sure why the PR seems to contain 110 commits, when the diff is actually small, but we can figure this out later. |
@tsg : For some reason git wouldn't allow me to check in without doing a rebase on my branch. The 110 commits are mostly just rolling forward to the then commit on master branch. I'll share the log file and configuration used for testing the fix soon. |
json_harvester_test.zip |
@@ -177,6 +177,25 @@ func (m MapStr) GetValue(key string) (interface{}, error) { | |||
return mapp[keyParts[keyPartsLen-1]], nil | |||
} | |||
|
|||
func (m MapStr) AddValue(value MapStr, field string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use the MapStr.Put()
method that was recently added?
} | ||
|
||
func init() { | ||
processors.RegisterPlugin("json_decode_fields", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming-wise I'd make it decode_json_fields
. The other processors put the verb first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Will make that change.
|
||
data, err := event.GetValue(field) | ||
if err != nil { | ||
return event, fmt.Errorf("no %s field found: %s", field, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the processor short-circuit on error or save the error and continue processing the remaining fields? The other processors save the error, continue, then when done they return all the errors that occurred.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at the drop_field processor and looks like it takes the same approach as well.
err := event.Delete(field) if err != nil { return event, fmt.Errorf("Fail to delete key %s: %s", field, err) }
mapstr.GetValue() returns an error when it doesn't find a key. I'm hesitant to use a hasKey + getValue since it will involve parsing the data twice.
Any suggestions on how to fix this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You must be looking at a stale copy of the drop_fields.go. Here's the latest version that puts the errors into a slice and continues.
mapstr.GetValue() returns an error when it doesn't find a key. I'm hesitant to use a hasKey + getValue since it will involve parsing the data twice. Any suggestions on how to fix this ?
If you want to ignore errors caused by the key being absent, you can check if the cause is an common.ErrKeyNotFound
like the include_fields action does.
@andrewkroh : Have incorporated the changes that you suggested. However the build fails with an error : Not sure how to fix this since I've updated the filebeat.full.yml to include an example of the new processor. |
Also I think you need to rebase your changes. There should only be 1 commit in the PR. |
b54fdae
to
eaff781
Compare
@andrewkroh : |
jenkins, test it |
@@ -9,6 +9,7 @@ | |||
|
|||
=== Beats version HEAD | |||
https://github.com/elastic/beats/compare/v5.0.0-rc1...master[Check the HEAD diff] | |||
https://github.com/elastic/beats/compare/v5.0.0-beta1...master[Check the HEAD diff] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That should be removed (merge error probably)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add tests for this new processor? I'd like to have tests for a missing key, field value is not a string, JSON is invalid, and JSON is valid.
|
||
data, err := event.GetValue(field) | ||
if err != nil && errors.Cause(err) != common.ErrKeyNotFound { | ||
errs = append(errs, err.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should continue
the for loop after the error.
err := unmarshal([]byte(text), &output) | ||
if err != nil { | ||
errs = append(errs, err.Error()) | ||
event.Put(field, text) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why Put
the value into the event on error? The value is already associated with the key in the event.
} | ||
|
||
func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) { | ||
errs := []string{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use var errs []string
. https://github.com/golang/go/wiki/CodeReviewComments#declaring-empty-slices
errs := []string{} | ||
|
||
for _, field := range f.Fields { | ||
var output map[string]interface{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't need to be declared until you are actually going to use it. So move it inside of the if ok { unmarshal ...
block.
continue | ||
} | ||
|
||
_, addErr := event.Put(field, output) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can reuse err
here. _, err = event.Put(field, output)
.
|
||
_, addErr := event.Put(field, output) | ||
if addErr != nil { | ||
errs = append(errs, addErr.Error()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be consistent and prevent future bugs this should also continue
after the error. It doesn't really matter in this case, but if in the future some logic gets added after the location where the error occurred it could cause a bug.
e8af13c
to
35b174e
Compare
Thank you for your feedback. Have made the suggested changes and added test cases as well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding the test cases. Can you please fix the CHANGELOG then we can get this merged.
|
||
func TestInvalidJSON(t *testing.T) { | ||
input := common.MapStr{ | ||
"msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you create a raw string literal by surrounding the string with back ticks (`) then you don't need to use escaping on those quotes. I don't care if you fix this, it's more of a golang tip to keep you sane.
"pipeline": "us1", | ||
} | ||
|
||
actual := getActualValue(t, config, input) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These test cases that expect an error should also assert that an error occurred. You can use assert.Error()
.
@@ -8,6 +8,7 @@ | |||
// Template, add newest changes here | |||
|
|||
=== Beats version HEAD | |||
https://github.com/elastic/beats/compare/v5.0.0-rc1...master[Check the HEAD diff] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like you made no changes to this file, so can you reset this file so that there are no changes in the diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or better yet, reset it and then add a line in the Added / Affecting all Beats section that contains something like:
- Added decode_json_fields processor for decoding fields containing JSON strings. {pull}2605[2605]
@andrewkroh Updated the changelog.asciidoc file to incorporate your suggestions. |
@@ -0,0 +1,46 @@ | |||
package common |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this file moves helpers and exposes the json specific helpers with generic names. I'd rather see some json parsing utility function/module doing the right thing instead of exposing internal helpers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@urso : Apologies for missing to see this comment.
Does wrapping it into something like this make sense?
package libbeat/common/jsontransform
func TransformNumbers(dict MapStr) //exposed to consume
func transformValue(value json.Number) //not exposed
func transformArray(arr []interface{}) //not exposed
@suraj-soni Can you please address the most recent comments, before we merge the PR? |
Sorry for having you wait, but I wanted to give this feature a little more thought. See my comments. |
} | ||
text, ok := data.(string) | ||
if ok { | ||
var output map[string]interface{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we want to restrict ourselves to json objects only here? Using var output interface{}
we can decode booleans, number, objects and arrays. Any valid json.
|
||
jsontransform.TransformNumbers(*fields) | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thinking a little more about TransformNumbers
and unmarshal
, I wonder why wen need to put TransformNumbers
at the end. By moving TransformNumber
before the deep-parsing stuff, the initial decoding would be exactly similar to the filebeat reader.
func DecodeJson(text []byte, to interface{}) {
dec := json.NewDecoder(bytes.NewReader(text))
dec.UseNumber()
err := dec.Decode(to)
if err != nil {
if m, ok := to.(*map[string]interface{}); ok {
jsontransform.TransformNumbers(common.MapStr(m))
}
return err
}
Then (extending unmarshal a little), unmarshal can be generalized a little:
func unmarshal(maxDepth int, text []byte, fields interface{}, processArray bool) error {
if err := DecodeJson(text, fields); err != nil {
return err
}
maxDepth--
if maxDepth == 0 {
return nil
}
tryUnmarshal := func(v interface{}) (interface{}, bool) {
str, isString := v.(string)
if !isString {
return v, false
}
var tmp interface{}
err := unmarshal(maxDepth, []byte(str), &tmp, processArray)
if err != nil {
return v, false
}
return tmp, true
}
// try to deep unmarshal fields
switch O := fields.(type); {
case *map[string]interface{}:
for k, v := range O {
if decoded, ok := tryUnmarshal(v); ok {
O[k] = decoded
}
}
// We want to process arrays here
case []interface{}:
if !processArray {
break
}
for i, v := range O {
if decoded, ok := tryUnmarshal(v); ok {
O[i] = decoded
}
}
}
}
As you can see, I added a maximum parsing depth. Not sure we really want to parse fully recursively here. I'd use maxDepth=1
by default (Make it configurable). More parsing can be done by configuring a second processor continue parsing on already parsed field.
I also added optional array support, plus lifted the requirement for JSON Objects parsing only.
Having DecodeJson, there is no need for exporting jsontransform as is (but I'd keep jsontransform package for now, as other modules in beats might make use of it).
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of having a maxDepth setup so that we don't recursively parse till the end.
This does give us the ability to configure the processors for different fields with different levels of depth parsing.
I like your suggestions and will work towards incorporating them in the changes and share it for further review. |
Hi @urso , Have made the suggested changes and added a couple of test cases to verify the maxDepth feature as well. Thanks once again for your help in refining this. |
@suraj-soni LGTM. Unfortunately you've got a merge conflict on |
@urso : I've merged the changes from the master and also removed the conflict with .gitignore. |
Merged. Thanks! |
Thanks for helping to get this in @urso and @andrewkroh ! |
@@ -120,6 +120,7 @@ This project adheres to [Semantic Versioning](http://semver.org/). | |||
|
|||
[Unreleased]: https://github.com/elastic/go-ucfg/compare/v0.3.7...HEAD | |||
[0.3.7]: https://github.com/elastic/go-ucfg/compare/v0.3.6...v0.3.7 | |||
[Unreleased]: https://github.com/elastic/go-ucfg/compare/v0.3.6...HEAD |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -33,6 +33,7 @@ https://github.com/elastic/beats/compare/v5.0.0...master[Check the HEAD diff] | |||
==== Bugfixes | |||
|
|||
*Affecting all Beats* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably not be under bug fixes? This is a new feature as far as I understand.
@dedemorton @urso There are no docs for this yet and also no entries yet. As this will probably make it into 5.1 we should definitively add docs to it with examples on how to use it. @suraj-soni In case you can help here, all docs are in asciidoc and can be found here for general processors: https://github.com/elastic/beats/tree/master/libbeat/docs As this is mostly filebeat related, it probably makes sense to add examples to filebeat docs: https://github.com/elastic/beats/tree/master/filebeat/docs |
Here is a fix for the two issues mentioned: #3047 |
@ruflin : Sure, I'd be happy to help with the documentation for this. |
This pull request captures a feature to enable/disable nested json parsing in filebeat. It's picked up from where the conversations were left off in #2435.
The commit includes :