Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

json_decode_fields processor #2605

Merged
merged 5 commits into from
Nov 21, 2016
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
/build
/*/data
/*/logs
/.vscode

# Files
.DS_Store
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ https://github.com/elastic/beats/compare/v5.0.0...master[Check the HEAD diff]
==== Bugfixes

*Affecting all Beats*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably not be under bug fixes? This is a new feature as far as I understand.

- Added decode_json_fields processor for decoding fields containing JSON strings. {pull}2605[2605]

*Metricbeat*

Expand Down
44 changes: 2 additions & 42 deletions filebeat/harvester/reader/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/jsontransform"
"github.com/elastic/beats/libbeat/logp"
)

Expand Down Expand Up @@ -69,51 +70,10 @@ func unmarshal(text []byte, fields *map[string]interface{}) error {
if err != nil {
return err
}
transformNumbersDict(*fields)
jsontransform.TransformNumbers(*fields)
return nil
}

// transformNumbersDict walks a json decoded tree an replaces json.Number
// with int64, float64, or string, in this order of preference (i.e. if it
// parses as an int, use int. if it parses as a float, use float. etc).
func transformNumbersDict(dict common.MapStr) {
for k, v := range dict {
switch vv := v.(type) {
case json.Number:
dict[k] = transformNumber(vv)
case map[string]interface{}:
transformNumbersDict(vv)
case []interface{}:
transformNumbersArray(vv)
}
}
}

func transformNumber(value json.Number) interface{} {
i64, err := value.Int64()
if err == nil {
return i64
}
f64, err := value.Float64()
if err == nil {
return f64
}
return value.String()
}

func transformNumbersArray(arr []interface{}) {
for i, v := range arr {
switch vv := v.(type) {
case json.Number:
arr[i] = transformNumber(vv)
case map[string]interface{}:
transformNumbersDict(vv)
case []interface{}:
transformNumbersArray(vv)
}
}
}

// Next decodes JSON and returns the filled Line object.
func (r *JSON) Next() (Message, error) {
message, err := r.reader.Next()
Expand Down
4 changes: 2 additions & 2 deletions filebeat/tests/system/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def test_with_generic_filtering(self):
message_key="message",
keys_under_root=True,
overwrite_keys=True,
add_error_key=True,
add_error_key=True
),
processors=[{
"drop_fields": {
Expand Down Expand Up @@ -305,7 +305,7 @@ def test_with_generic_filtering_remove_headers(self):
message_key="message",
keys_under_root=True,
overwrite_keys=True,
add_error_key=True,
add_error_key=True
),
processors=[{
"drop_fields": {
Expand Down
8 changes: 8 additions & 0 deletions filebeat/tests/system/test_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,10 @@ def test_clean_inactive(self):
lambda: self.log_contains_count("Registry file updated") > 1,
max_timeout=15)

if os.name == "nt":
# On windows registry recreation can take a bit longer
time.sleep(1)

data = self.get_registry()
assert len(data) == 2

Expand Down Expand Up @@ -834,6 +838,10 @@ def test_clean_removed(self):
lambda: self.log_contains_count("Registry file updated") > 1,
max_timeout=15)

if os.name == "nt":
# On windows registry recration can take a bit longer
time.sleep(1)

data = self.get_registry()
assert len(data) == 2

Expand Down
48 changes: 48 additions & 0 deletions libbeat/common/jsontransform/transform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package jsontransform

import (
"encoding/json"

"github.com/elastic/beats/libbeat/common"
)

// TransformNumbers walks a json decoded tree an replaces json.Number
// with int64, float64, or string, in this order of preference (i.e. if it
// parses as an int, use int. if it parses as a float, use float. etc).
func TransformNumbers(dict common.MapStr) {
for k, v := range dict {
switch vv := v.(type) {
case json.Number:
dict[k] = transformNumber(vv)
case map[string]interface{}:
TransformNumbers(vv)
case []interface{}:
transformNumbersArray(vv)
}
}
}

func transformNumber(value json.Number) interface{} {
i64, err := value.Int64()
if err == nil {
return i64
}
f64, err := value.Float64()
if err == nil {
return f64
}
return value.String()
}

func transformNumbersArray(arr []interface{}) {
for i, v := range arr {
switch vv := v.(type) {
case json.Number:
arr[i] = transformNumber(vv)
case map[string]interface{}:
TransformNumbers(vv)
case []interface{}:
transformNumbersArray(vv)
}
}
}
101 changes: 101 additions & 0 deletions libbeat/processors/actions/decode_json_fields.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package actions

import (
"bytes"
"encoding/json"
"fmt"
"strings"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/jsontransform"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
"github.com/pkg/errors"
)

type decodeJSONFields struct {
Fields []string
}

var debug = logp.MakeDebug("filters")

func init() {
processors.RegisterPlugin("decode_json_fields", configChecked(newDecodeJSONFields,
requireFields("fields"), allowedFields("fields", "when")))
}

func newDecodeJSONFields(c common.Config) (processors.Processor, error) {
config := struct {
Fields []string `config:"fields"`
}{}
err := c.Unpack(&config)
if err != nil {
logp.Warn("Error unpacking config for decode_json_fields")
return nil, fmt.Errorf("fail to unpack the decode_json_fields configuration: %s", err)
}

f := decodeJSONFields{Fields: config.Fields}
return f, nil
}

func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) {
var errs []string

for _, field := range f.Fields {
data, err := event.GetValue(field)
if err != nil && errors.Cause(err) != common.ErrKeyNotFound {
debug("Error trying to GetValue for field : %s in event : %v", field, event)
errs = append(errs, err.Error())
continue
}
text, ok := data.(string)
if ok {
var output map[string]interface{}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want to restrict ourselves to json objects only here? Using var output interface{} we can decode booleans, number, objects and arrays. Any valid json.

err := unmarshal([]byte(text), &output)
if err != nil {
debug("Error trying to unmarshal %s", event[field])
errs = append(errs, err.Error())
continue
}

_, err = event.Put(field, output)
if err != nil {
debug("Error trying to Put value %v for field : %s", output, field)
errs = append(errs, err.Error())
continue
}
}
}

return event, fmt.Errorf(strings.Join(errs, ", "))
}

// unmarshal is equivalent with json.Unmarshal but it converts numbers
// to int64 where possible, instead of using always float64.
func unmarshal(text []byte, fields *map[string]interface{}) error {
dec := json.NewDecoder(bytes.NewReader(text))
dec.UseNumber()
err := dec.Decode(fields)
if err != nil {
return err
}

//Iterate through all the fields to perform deep parsing
for k, v := range *fields {
switch vv := v.(type) {
case string:
var output map[string]interface{}
sErr := unmarshal([]byte(vv), &output)
if sErr == nil {
(*fields)[k] = output
}
}
}

jsontransform.TransformNumbers(*fields)
return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking a little more about TransformNumbers and unmarshal, I wonder why wen need to put TransformNumbers at the end. By moving TransformNumber before the deep-parsing stuff, the initial decoding would be exactly similar to the filebeat reader.

func DecodeJson(text []byte, to interface{}) {
  dec := json.NewDecoder(bytes.NewReader(text))
  dec.UseNumber()
  err := dec.Decode(to)
  if err != nil {

  if m, ok := to.(*map[string]interface{}); ok {
    jsontransform.TransformNumbers(common.MapStr(m))
  }
  return err
}

Then (extending unmarshal a little), unmarshal can be generalized a little:

func unmarshal(maxDepth int, text []byte, fields interface{}, processArray bool) error {
  if err := DecodeJson(text, fields); err != nil {
    return err
  }

  maxDepth--
  if maxDepth == 0 {
    return nil
  }

  tryUnmarshal := func(v interface{}) (interface{}, bool) {
    str, isString := v.(string)
    if !isString {
      return v, false
    }

    var tmp interface{}
    err := unmarshal(maxDepth, []byte(str), &tmp, processArray)
    if err != nil {
      return v, false
    }

    return tmp, true
  }

  // try to deep unmarshal fields
  switch O := fields.(type); {
  case *map[string]interface{}:
    for k, v := range O {
      if decoded, ok := tryUnmarshal(v); ok {
        O[k] = decoded
      }
    }

  // We want to process arrays here
  case []interface{}:
    if !processArray {
      break
    }

    for i, v := range O {
      if decoded, ok := tryUnmarshal(v); ok {
        O[i] = decoded
      }
    }
  }
}

As you can see, I added a maximum parsing depth. Not sure we really want to parse fully recursively here. I'd use maxDepth=1 by default (Make it configurable). More parsing can be done by configuring a second processor continue parsing on already parsed field.
I also added optional array support, plus lifted the requirement for JSON Objects parsing only.

Having DecodeJson, there is no need for exporting jsontransform as is (but I'd keep jsontransform package for now, as other modules in beats might make use of it).

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of having a maxDepth setup so that we don't recursively parse till the end.
This does give us the ability to configure the processors for different fields with different levels of depth parsing.


func (f decodeJSONFields) String() string {
return "decode_json_fields=" + strings.Join(f.Fields, ", ")
}
100 changes: 100 additions & 0 deletions libbeat/processors/actions/decode_json_fields_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package actions

import (
"testing"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/stretchr/testify/assert"
)

var fields = [1]string{"msg"}
var config, _ = common.NewConfigFrom(map[string]interface{}{
"fields": fields,
})

func TestMissingKey(t *testing.T) {
input := common.MapStr{
"pipeline": "us1",
}

actual := getActualValue(t, config, input)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These test cases that expect an error should also assert that an error occurred. You can use assert.Error().


expected := common.MapStr{
"pipeline": "us1",
}

assert.Equal(t, expected.String(), actual.String())
}

func TestFieldNotString(t *testing.T) {
input := common.MapStr{
"msg": 123,
"pipeline": "us1",
}

actual := getActualValue(t, config, input)

expected := common.MapStr{
"msg": 123,
"pipeline": "us1",
}

assert.Equal(t, expected.String(), actual.String())

}

func TestInvalidJSON(t *testing.T) {
input := common.MapStr{
"msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you create a raw string literal by surrounding the string with back ticks (`) then you don't need to use escaping on those quotes. I don't care if you fix this, it's more of a golang tip to keep you sane.

"pipeline": "us1",
}

actual := getActualValue(t, config, input)

expected := common.MapStr{
"msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3",
"pipeline": "us1",
}
assert.Equal(t, expected.String(), actual.String())

}

func TestValidJSON(t *testing.T) {
input := common.MapStr{
"msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}",
"pipeline": "us1",
}

actual := getActualValue(t, config, input)

expected := common.MapStr{
"msg": map[string]interface{}{
"log": map[string]interface{}{
"level": "info",
},
"stream": "stderr",
"count": 3,
},
"pipeline": "us1",
}

assert.Equal(t, expected.String(), actual.String())

}

func getActualValue(t *testing.T, config *common.Config, input common.MapStr) common.MapStr {
if testing.Verbose() {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"})
}

p, err := newDecodeJSONFields(*config)
if err != nil {
logp.Err("Error initializing decode_json_fields")
t.Fatal(err)
}

actual, err := p.Run(input)

return actual
}
1 change: 1 addition & 0 deletions vendor/github.com/elastic/go-ucfg/CHANGELOG.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.