Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

json_decode_fields processor #2605

Merged
merged 5 commits into from
Nov 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ https://github.com/elastic/beats/compare/v5.0.0...master[Check the HEAD diff]
==== Bugfixes

*Affecting all Beats*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably not be under bug fixes? This is a new feature as far as I understand.

- Added decode_json_fields processor for decoding fields containing JSON strings. {pull}2605[2605]

*Metricbeat*

Expand Down
44 changes: 2 additions & 42 deletions filebeat/harvester/reader/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/jsontransform"
"github.com/elastic/beats/libbeat/logp"
)

Expand Down Expand Up @@ -69,51 +70,10 @@ func unmarshal(text []byte, fields *map[string]interface{}) error {
if err != nil {
return err
}
transformNumbersDict(*fields)
jsontransform.TransformNumbers(*fields)
return nil
}

// transformNumbersDict walks a json decoded tree an replaces json.Number
// with int64, float64, or string, in this order of preference (i.e. if it
// parses as an int, use int. if it parses as a float, use float. etc).
func transformNumbersDict(dict common.MapStr) {
for k, v := range dict {
switch vv := v.(type) {
case json.Number:
dict[k] = transformNumber(vv)
case map[string]interface{}:
transformNumbersDict(vv)
case []interface{}:
transformNumbersArray(vv)
}
}
}

func transformNumber(value json.Number) interface{} {
i64, err := value.Int64()
if err == nil {
return i64
}
f64, err := value.Float64()
if err == nil {
return f64
}
return value.String()
}

func transformNumbersArray(arr []interface{}) {
for i, v := range arr {
switch vv := v.(type) {
case json.Number:
arr[i] = transformNumber(vv)
case map[string]interface{}:
transformNumbersDict(vv)
case []interface{}:
transformNumbersArray(vv)
}
}
}

// Next decodes JSON and returns the filled Line object.
func (r *JSON) Next() (Message, error) {
message, err := r.reader.Next()
Expand Down
4 changes: 2 additions & 2 deletions filebeat/tests/system/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def test_with_generic_filtering(self):
message_key="message",
keys_under_root=True,
overwrite_keys=True,
add_error_key=True,
add_error_key=True
),
processors=[{
"drop_fields": {
Expand Down Expand Up @@ -305,7 +305,7 @@ def test_with_generic_filtering_remove_headers(self):
message_key="message",
keys_under_root=True,
overwrite_keys=True,
add_error_key=True,
add_error_key=True
),
processors=[{
"drop_fields": {
Expand Down
8 changes: 8 additions & 0 deletions filebeat/tests/system/test_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,10 @@ def test_clean_inactive(self):
lambda: self.log_contains_count("Registry file updated") > 1,
max_timeout=15)

if os.name == "nt":
# On windows registry recreation can take a bit longer
time.sleep(1)

data = self.get_registry()
assert len(data) == 2

Expand Down Expand Up @@ -834,6 +838,10 @@ def test_clean_removed(self):
lambda: self.log_contains_count("Registry file updated") > 1,
max_timeout=15)

if os.name == "nt":
# On windows registry recration can take a bit longer
time.sleep(1)

data = self.get_registry()
assert len(data) == 2

Expand Down
48 changes: 48 additions & 0 deletions libbeat/common/jsontransform/transform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package jsontransform

import (
"encoding/json"

"github.com/elastic/beats/libbeat/common"
)

// TransformNumbers walks a json decoded tree an replaces json.Number
// with int64, float64, or string, in this order of preference (i.e. if it
// parses as an int, use int. if it parses as a float, use float. etc).
func TransformNumbers(dict common.MapStr) {
for k, v := range dict {
switch vv := v.(type) {
case json.Number:
dict[k] = transformNumber(vv)
case map[string]interface{}:
TransformNumbers(vv)
case []interface{}:
transformNumbersArray(vv)
}
}
}

func transformNumber(value json.Number) interface{} {
i64, err := value.Int64()
if err == nil {
return i64
}
f64, err := value.Float64()
if err == nil {
return f64
}
return value.String()
}

func transformNumbersArray(arr []interface{}) {
for i, v := range arr {
switch vv := v.(type) {
case json.Number:
arr[i] = transformNumber(vv)
case map[string]interface{}:
TransformNumbers(vv)
case []interface{}:
transformNumbersArray(vv)
}
}
}
156 changes: 156 additions & 0 deletions libbeat/processors/actions/decode_json_fields.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package actions

import (
"bytes"
"encoding/json"
"fmt"
"strings"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/jsontransform"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
"github.com/pkg/errors"
)

type decodeJSONFields struct {
fields []string
maxDepth int
processArray bool
}

type config struct {
Fields []string `config:"fields"`
MaxDepth int `config:"maxDepth" validate:"min=1"`
ProcessArray bool `config:"processArray"`
}

var (
defaultConfig = config{
MaxDepth: 1,
ProcessArray: false,
}
)

var debug = logp.MakeDebug("filters")

func init() {
processors.RegisterPlugin("decode_json_fields",
configChecked(newDecodeJSONFields,
requireFields("fields"),
allowedFields("fields", "maxDepth", "processArray")))
}

func newDecodeJSONFields(c common.Config) (processors.Processor, error) {
config := defaultConfig

err := c.Unpack(&config)

if err != nil {
logp.Warn("Error unpacking config for decode_json_fields")
return nil, fmt.Errorf("fail to unpack the decode_json_fields configuration: %s", err)
}

f := decodeJSONFields{fields: config.Fields, maxDepth: config.MaxDepth, processArray: config.ProcessArray}
return f, nil
}

func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) {
var errs []string

for _, field := range f.fields {
data, err := event.GetValue(field)
if err != nil && errors.Cause(err) != common.ErrKeyNotFound {
debug("Error trying to GetValue for field : %s in event : %v", field, event)
errs = append(errs, err.Error())
continue
}
text, ok := data.(string)
if ok {
var output interface{}
err := unmarshal(f.maxDepth, []byte(text), &output, f.processArray)
if err != nil {
debug("Error trying to unmarshal %s", event[field])
errs = append(errs, err.Error())
continue
}

_, err = event.Put(field, output)
if err != nil {
debug("Error trying to Put value %v for field : %s", output, field)
errs = append(errs, err.Error())
continue
}
}
}

return event, fmt.Errorf(strings.Join(errs, ", "))
}

func unmarshal(maxDepth int, text []byte, fields *interface{}, processArray bool) error {
if err := DecodeJSON(text, fields); err != nil {
return err
}

maxDepth--
if maxDepth == 0 {
return nil
}

tryUnmarshal := func(v interface{}) (interface{}, bool) {
str, isString := v.(string)
if !isString {
return v, false
}

var tmp interface{}
err := unmarshal(maxDepth, []byte(str), &tmp, processArray)
if err != nil {
return v, false
}

return tmp, true
}

// try to deep unmarshal fields
switch O := interface{}(*fields).(type) {
case map[string]interface{}:
for k, v := range O {
if decoded, ok := tryUnmarshal(v); ok {
O[k] = decoded
}
}
// We want to process arrays here
case []interface{}:
if !processArray {
break
}

for i, v := range O {
if decoded, ok := tryUnmarshal(v); ok {
O[i] = decoded
}
}
}
return nil
}

func DecodeJSON(text []byte, to *interface{}) error {
dec := json.NewDecoder(bytes.NewReader(text))
dec.UseNumber()
err := dec.Decode(to)

if err != nil {
return err
}

switch O := interface{}(*to).(type) {
case map[string]interface{}:
jsontransform.TransformNumbers(O)
}
return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking a little more about TransformNumbers and unmarshal, I wonder why wen need to put TransformNumbers at the end. By moving TransformNumber before the deep-parsing stuff, the initial decoding would be exactly similar to the filebeat reader.

func DecodeJson(text []byte, to interface{}) {
  dec := json.NewDecoder(bytes.NewReader(text))
  dec.UseNumber()
  err := dec.Decode(to)
  if err != nil {

  if m, ok := to.(*map[string]interface{}); ok {
    jsontransform.TransformNumbers(common.MapStr(m))
  }
  return err
}

Then (extending unmarshal a little), unmarshal can be generalized a little:

func unmarshal(maxDepth int, text []byte, fields interface{}, processArray bool) error {
  if err := DecodeJson(text, fields); err != nil {
    return err
  }

  maxDepth--
  if maxDepth == 0 {
    return nil
  }

  tryUnmarshal := func(v interface{}) (interface{}, bool) {
    str, isString := v.(string)
    if !isString {
      return v, false
    }

    var tmp interface{}
    err := unmarshal(maxDepth, []byte(str), &tmp, processArray)
    if err != nil {
      return v, false
    }

    return tmp, true
  }

  // try to deep unmarshal fields
  switch O := fields.(type); {
  case *map[string]interface{}:
    for k, v := range O {
      if decoded, ok := tryUnmarshal(v); ok {
        O[k] = decoded
      }
    }

  // We want to process arrays here
  case []interface{}:
    if !processArray {
      break
    }

    for i, v := range O {
      if decoded, ok := tryUnmarshal(v); ok {
        O[i] = decoded
      }
    }
  }
}

As you can see, I added a maximum parsing depth. Not sure we really want to parse fully recursively here. I'd use maxDepth=1 by default (Make it configurable). More parsing can be done by configuring a second processor continue parsing on already parsed field.
I also added optional array support, plus lifted the requirement for JSON Objects parsing only.

Having DecodeJson, there is no need for exporting jsontransform as is (but I'd keep jsontransform package for now, as other modules in beats might make use of it).

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of having a maxDepth setup so that we don't recursively parse till the end.
This does give us the ability to configure the processors for different fields with different levels of depth parsing.


func (f decodeJSONFields) String() string {
return "decode_json_fields=" + strings.Join(f.fields, ", ")
}
Loading