Skip to content

Commit

Permalink
Simplify error handling and fix race
Browse files Browse the repository at this point in the history
$ benchcmp old.txt new.txt
benchmark                                             old ns/op     new ns/op     delta
BenchmarkProcessor_Run/single_object-12               15691         15686         -0.03%
BenchmarkProcessor_Run/nested_and_array_object-12     39673         39098         -1.45%

benchmark                                             old allocs     new allocs     delta
BenchmarkProcessor_Run/single_object-12               158            158            +0.00%
BenchmarkProcessor_Run/nested_and_array_object-12     376            374            -0.53%

benchmark                                             old bytes     new bytes     delta
BenchmarkProcessor_Run/single_object-12               8597          8597          +0.00%
BenchmarkProcessor_Run/nested_and_array_object-12     20310         19798         -2.52%
  • Loading branch information
andrewkroh committed Feb 14, 2021
1 parent 3078147 commit fd0a8da
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 120 deletions.
25 changes: 18 additions & 7 deletions libbeat/common/enc/mxj/mxj.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,38 @@
package mxj

import (
"sync"

"github.com/clbanning/mxj/v2"
)

// The third-party library uses global options. It is unsafe to use the library
// concurrently.
var mutex sync.Mutex

// UnmarshalXML takes a slice of bytes, and returns a map[string]interface{}.
// If the slice is not valid XML, it will return an error.
// This uses the MXJ library compared to the built-in encoding/xml since the latter does not
// support unmarshalling XML to an unknown or empty struct/interface.
func UnmarshalXML(body []byte, prepend bool, toLower bool) (obj map[string]interface{}, err error) {
var xmlobj mxj.Map
// Disables attribute prefixes and forces all lines to lowercase to meet ECS standards
//
// Beware that this function acquires a mutux to protect against race conditions
// in the third-party library it wraps.
func UnmarshalXML(body []byte, prepend bool, toLower bool) (map[string]interface{}, error) {
mutex.Lock()
defer mutex.Unlock()

// Disables attribute prefixes and forces all lines to lowercase to meet ECS standards.
mxj.PrependAttrWithHyphen(prepend)
mxj.CoerceKeysToLower(toLower)

xmlobj, err = mxj.NewMapXml(body)
xmlObj, err := mxj.NewMapXml(body)
if err != nil {
return nil, err
}

err = xmlobj.Struct(&obj)
if err != nil {
var out map[string]interface{}
if err = xmlObj.Struct(&out); err != nil {
return nil, err
}
return obj, nil
return out, nil
}
2 changes: 1 addition & 1 deletion libbeat/processors/decode_xml/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func defaultConfig() decodeXMLConfig {
return decodeXMLConfig{
Field: "message",
IgnoreMissing: false,
Target: new(string),
Target: nil,
OverwriteKeys: true,
IgnoreFailure: false,
ToLower: true,
Expand Down
109 changes: 49 additions & 60 deletions libbeat/processors/decode_xml/decode_xml.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ package decode_xml

import (
"encoding/json"
"errors"
"fmt"

"github.com/pkg/errors"
"go.uber.org/multierr"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
Expand All @@ -36,8 +34,8 @@ import (
)

type decodeXML struct {
config decodeXMLConfig
log *logp.Logger
decodeXMLConfig
log *logp.Logger
}

var (
Expand All @@ -57,7 +55,7 @@ func init() {
jsprocessor.RegisterPlugin(procName, New)
}

// New construct a new decode_xml processor.
// New constructs a new decode_xml processor.
func New(c *common.Config) (processors.Processor, error) {
config := defaultConfig()

Expand All @@ -66,83 +64,74 @@ func New(c *common.Config) (processors.Processor, error) {
}

return newDecodeXML(config)

}

func newDecodeXML(config decodeXMLConfig) (processors.Processor, error) {
cfgwarn.Experimental("The " + procName + " processor is experimental.")

log := logp.NewLogger(logName)

return &decodeXML{config: config, log: log}, nil
// Default target to overwriting field.
if config.Target == nil {
config.Target = &config.Field
}

return &decodeXML{
decodeXMLConfig: config,
log: logp.NewLogger(logName),
}, nil
}

func (x *decodeXML) Run(event *beat.Event) (*beat.Event, error) {
var errs []error
var field = x.config.Field
data, err := event.GetValue(field)
if err := x.run(event); err != nil && !x.IgnoreFailure {
err = fmt.Errorf("failed in decode_xml on the %q field: %w", x.Field, err)
event.PutValue("error.message", err.Error())
return event, err
}
return event, nil
}

func (x *decodeXML) run(event *beat.Event) error {
data, err := event.GetValue(x.Field)
if err != nil {
if x.config.IgnoreMissing && err == common.ErrKeyNotFound {
return event, nil
if x.IgnoreMissing && err == common.ErrKeyNotFound {
return nil
}
errs = append(errs, err)
return err
}

text, ok := data.(string)
if !ok {
errs = append(errs, errFieldIsNotString)
} else {
xmloutput, err := x.decodeField(text)
if err != nil {
errs = append(errs, fmt.Errorf("failed to decode fields in decode_xml processor: %v", err))
}

target := field
if x.config.Target != nil {
target = *x.config.Target
}

var id string
if key := x.config.DocumentID; key != "" {
if tmp, err := common.MapStr(xmloutput).GetValue(key); err == nil {
if v, ok := tmp.(string); ok {
id = v
common.MapStr(xmloutput).Delete(key)
}
}
}
return errFieldIsNotString
}

if target != "" {
_, err = event.PutValue(target, xmloutput)
} else {
if x.config.IgnoreFailure {
jsontransform.WriteJSONKeys(event, xmloutput, false, x.config.OverwriteKeys, false)
} else {
jsontransform.WriteJSONKeys(event, xmloutput, false, x.config.OverwriteKeys, true)
}
xmlOutput, err := x.decodeField(text)
if err != nil {
return err
}

var id string
if tmp, err := common.MapStr(xmlOutput).GetValue(x.DocumentID); err == nil {
if v, ok := tmp.(string); ok {
id = v
common.MapStr(xmlOutput).Delete(x.DocumentID)
}
}

if err != nil {
errs = append(errs, fmt.Errorf("Error trying to Put value %v for field: %s. Error: %w", xmloutput, field, err))
}
if id != "" {
event.SetID(id)
if *x.Target != "" {
if _, err = event.PutValue(*x.Target, xmlOutput); err != nil {
return fmt.Errorf("failed to put value %v into field %q: %w", xmlOutput, *x.Target, err)
}
} else {
jsontransform.WriteJSONKeys(event, xmlOutput, false, x.OverwriteKeys, !x.IgnoreFailure)
}
// If error has not already been set, add errors if ignore_failure is false.
if len(errs) > 0 {
var combinedErrors = multierr.Combine(errs...)
if !x.config.IgnoreFailure {
event.Fields["error"] = combinedErrors.Error()
}
return event, combinedErrors

if id != "" {
event.SetID(id)
}
return event, nil
return nil
}

func (x *decodeXML) decodeField(data string) (decodedData map[string]interface{}, err error) {
decodedData, err = mxj.UnmarshalXML([]byte(data), false, x.config.ToLower)
decodedData, err = mxj.UnmarshalXML([]byte(data), false, x.ToLower)
if err != nil {
return nil, fmt.Errorf("error decoding XML field: %w", err)
}
Expand All @@ -151,6 +140,6 @@ func (x *decodeXML) decodeField(data string) (decodedData map[string]interface{}
}

func (x *decodeXML) String() string {
json, _ := json.Marshal(x.config)
json, _ := json.Marshal(x.decodeXMLConfig)
return procName + "=" + string(json)
}
Loading

0 comments on commit fd0a8da

Please sign in to comment.