From fd0a8da02239225e4c21fa2dc2d421dbe48f31e4 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Thu, 11 Feb 2021 18:49:41 -0500 Subject: [PATCH] Simplify error handling and fix race $ benchcmp old.txt new.txt benchmark old ns/op new ns/op delta BenchmarkProcessor_Run/single_object-12 15691 15686 -0.03% BenchmarkProcessor_Run/nested_and_array_object-12 39673 39098 -1.45% benchmark old allocs new allocs delta BenchmarkProcessor_Run/single_object-12 158 158 +0.00% BenchmarkProcessor_Run/nested_and_array_object-12 376 374 -0.53% benchmark old bytes new bytes delta BenchmarkProcessor_Run/single_object-12 8597 8597 +0.00% BenchmarkProcessor_Run/nested_and_array_object-12 20310 19798 -2.52% --- libbeat/common/enc/mxj/mxj.go | 25 ++-- libbeat/processors/decode_xml/config.go | 2 +- libbeat/processors/decode_xml/decode_xml.go | 109 ++++++++---------- .../processors/decode_xml/decode_xml_test.go | 88 ++++++-------- 4 files changed, 104 insertions(+), 120 deletions(-) diff --git a/libbeat/common/enc/mxj/mxj.go b/libbeat/common/enc/mxj/mxj.go index 062c164b784..b83975201ea 100644 --- a/libbeat/common/enc/mxj/mxj.go +++ b/libbeat/common/enc/mxj/mxj.go @@ -18,27 +18,38 @@ package mxj import ( + "sync" + "github.com/clbanning/mxj/v2" ) +// The third-party library uses global options. It is unsafe to use the library +// concurrently. +var mutex sync.Mutex + // UnmarshalXML takes a slice of bytes, and returns a map[string]interface{}. // If the slice is not valid XML, it will return an error. // This uses the MXJ library compared to the built-in encoding/xml since the latter does not // support unmarshalling XML to an unknown or empty struct/interface. -func UnmarshalXML(body []byte, prepend bool, toLower bool) (obj map[string]interface{}, err error) { - var xmlobj mxj.Map - // Disables attribute prefixes and forces all lines to lowercase to meet ECS standards +// +// Beware that this function acquires a mutux to protect against race conditions +// in the third-party library it wraps. +func UnmarshalXML(body []byte, prepend bool, toLower bool) (map[string]interface{}, error) { + mutex.Lock() + defer mutex.Unlock() + + // Disables attribute prefixes and forces all lines to lowercase to meet ECS standards. mxj.PrependAttrWithHyphen(prepend) mxj.CoerceKeysToLower(toLower) - xmlobj, err = mxj.NewMapXml(body) + xmlObj, err := mxj.NewMapXml(body) if err != nil { return nil, err } - err = xmlobj.Struct(&obj) - if err != nil { + var out map[string]interface{} + if err = xmlObj.Struct(&out); err != nil { return nil, err } - return obj, nil + return out, nil } diff --git a/libbeat/processors/decode_xml/config.go b/libbeat/processors/decode_xml/config.go index 3c9c42ddbb4..e5796382c82 100644 --- a/libbeat/processors/decode_xml/config.go +++ b/libbeat/processors/decode_xml/config.go @@ -31,7 +31,7 @@ func defaultConfig() decodeXMLConfig { return decodeXMLConfig{ Field: "message", IgnoreMissing: false, - Target: new(string), + Target: nil, OverwriteKeys: true, IgnoreFailure: false, ToLower: true, diff --git a/libbeat/processors/decode_xml/decode_xml.go b/libbeat/processors/decode_xml/decode_xml.go index 72524174f60..fb1a411df4f 100644 --- a/libbeat/processors/decode_xml/decode_xml.go +++ b/libbeat/processors/decode_xml/decode_xml.go @@ -19,11 +19,9 @@ package decode_xml import ( "encoding/json" + "errors" "fmt" - "github.com/pkg/errors" - "go.uber.org/multierr" - "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" @@ -36,8 +34,8 @@ import ( ) type decodeXML struct { - config decodeXMLConfig - log *logp.Logger + decodeXMLConfig + log *logp.Logger } var ( @@ -57,7 +55,7 @@ func init() { jsprocessor.RegisterPlugin(procName, New) } -// New construct a new decode_xml processor. +// New constructs a new decode_xml processor. func New(c *common.Config) (processors.Processor, error) { config := defaultConfig() @@ -66,83 +64,74 @@ func New(c *common.Config) (processors.Processor, error) { } return newDecodeXML(config) - } func newDecodeXML(config decodeXMLConfig) (processors.Processor, error) { cfgwarn.Experimental("The " + procName + " processor is experimental.") - log := logp.NewLogger(logName) - - return &decodeXML{config: config, log: log}, nil + // Default target to overwriting field. + if config.Target == nil { + config.Target = &config.Field + } + return &decodeXML{ + decodeXMLConfig: config, + log: logp.NewLogger(logName), + }, nil } func (x *decodeXML) Run(event *beat.Event) (*beat.Event, error) { - var errs []error - var field = x.config.Field - data, err := event.GetValue(field) + if err := x.run(event); err != nil && !x.IgnoreFailure { + err = fmt.Errorf("failed in decode_xml on the %q field: %w", x.Field, err) + event.PutValue("error.message", err.Error()) + return event, err + } + return event, nil +} + +func (x *decodeXML) run(event *beat.Event) error { + data, err := event.GetValue(x.Field) if err != nil { - if x.config.IgnoreMissing && err == common.ErrKeyNotFound { - return event, nil + if x.IgnoreMissing && err == common.ErrKeyNotFound { + return nil } - errs = append(errs, err) + return err } + text, ok := data.(string) if !ok { - errs = append(errs, errFieldIsNotString) - } else { - xmloutput, err := x.decodeField(text) - if err != nil { - errs = append(errs, fmt.Errorf("failed to decode fields in decode_xml processor: %v", err)) - } - - target := field - if x.config.Target != nil { - target = *x.config.Target - } - - var id string - if key := x.config.DocumentID; key != "" { - if tmp, err := common.MapStr(xmloutput).GetValue(key); err == nil { - if v, ok := tmp.(string); ok { - id = v - common.MapStr(xmloutput).Delete(key) - } - } - } + return errFieldIsNotString + } - if target != "" { - _, err = event.PutValue(target, xmloutput) - } else { - if x.config.IgnoreFailure { - jsontransform.WriteJSONKeys(event, xmloutput, false, x.config.OverwriteKeys, false) - } else { - jsontransform.WriteJSONKeys(event, xmloutput, false, x.config.OverwriteKeys, true) - } + xmlOutput, err := x.decodeField(text) + if err != nil { + return err + } + var id string + if tmp, err := common.MapStr(xmlOutput).GetValue(x.DocumentID); err == nil { + if v, ok := tmp.(string); ok { + id = v + common.MapStr(xmlOutput).Delete(x.DocumentID) } + } - if err != nil { - errs = append(errs, fmt.Errorf("Error trying to Put value %v for field: %s. Error: %w", xmloutput, field, err)) - } - if id != "" { - event.SetID(id) + if *x.Target != "" { + if _, err = event.PutValue(*x.Target, xmlOutput); err != nil { + return fmt.Errorf("failed to put value %v into field %q: %w", xmlOutput, *x.Target, err) } + } else { + jsontransform.WriteJSONKeys(event, xmlOutput, false, x.OverwriteKeys, !x.IgnoreFailure) } - // If error has not already been set, add errors if ignore_failure is false. - if len(errs) > 0 { - var combinedErrors = multierr.Combine(errs...) - if !x.config.IgnoreFailure { - event.Fields["error"] = combinedErrors.Error() - } - return event, combinedErrors + + if id != "" { + event.SetID(id) } - return event, nil + return nil } func (x *decodeXML) decodeField(data string) (decodedData map[string]interface{}, err error) { - decodedData, err = mxj.UnmarshalXML([]byte(data), false, x.config.ToLower) + decodedData, err = mxj.UnmarshalXML([]byte(data), false, x.ToLower) if err != nil { return nil, fmt.Errorf("error decoding XML field: %w", err) } @@ -151,6 +140,6 @@ func (x *decodeXML) decodeField(data string) (decodedData map[string]interface{} } func (x *decodeXML) String() string { - json, _ := json.Marshal(x.config) + json, _ := json.Marshal(x.decodeXMLConfig) return procName + "=" + string(json) } diff --git a/libbeat/processors/decode_xml/decode_xml_test.go b/libbeat/processors/decode_xml/decode_xml_test.go index fb6533ff30e..3ef7b12c5d2 100644 --- a/libbeat/processors/decode_xml/decode_xml_test.go +++ b/libbeat/processors/decode_xml/decode_xml_test.go @@ -25,7 +25,6 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/logp" ) var ( @@ -76,8 +75,6 @@ func TestDecodeXML(t *testing.T) { `, }, - error: false, - errorMessage: "", }, { description: "Test with target set to root", @@ -111,8 +108,6 @@ func TestDecodeXML(t *testing.T) { `, }, - error: false, - errorMessage: "", }, { description: "Simple xml decode with xml string to same field name when Target is null", @@ -141,8 +136,6 @@ func TestDecodeXML(t *testing.T) { }, }, }, - error: false, - errorMessage: "", }, { description: "Decoding with array input", @@ -182,8 +175,6 @@ func TestDecodeXML(t *testing.T) { }, }, }, - error: false, - errorMessage: "", }, { description: "Decoding with multiple xml objects", @@ -206,9 +197,7 @@ func TestDecodeXML(t *testing.T) { Ralls, Kim - A former architect battles corporate zombies, - an evil sorceress, and her own childhood to become queen - of the world. + A former architect battles corporate zombies, an evil sorceress, and her own childhood to become queen of the world. `, @@ -230,7 +219,7 @@ func TestDecodeXML(t *testing.T) { }, "secondcategory": map[string]interface{}{ "paper": map[string]interface{}{ - "description": "A former architect battles corporate zombies, \n\t\t\t\t\t\tan evil sorceress, and her own childhood to become queen \n\t\t\t\t\t\tof the world.", + "description": "A former architect battles corporate zombies, an evil sorceress, and her own childhood to become queen of the world.", "id": "bk102", "test2": "Ralls, Kim", }, @@ -238,8 +227,6 @@ func TestDecodeXML(t *testing.T) { }, }, }, - error: false, - errorMessage: "", }, { description: "Decoding with broken XML format, with IgnoreFailure false", @@ -258,11 +245,18 @@ func TestDecodeXML(t *testing.T) { catalog>`, }, Output: common.MapStr{ - "message": (map[string]interface{})(nil), - "error": "failed to decode fields in decode_xml processor: error decoding XML field: xml.Decoder.Token() - XML syntax error on line 7: element closed by ", + "message": ` + + + William H. Gaddis + The Recognitions + One of the great seminal American novels of the 20th century. + + catalog>`, + "error": common.MapStr{"message": "failed in decode_xml on the \"message\" field: error decoding XML field: xml.Decoder.Token() - XML syntax error on line 7: element closed by "}, }, error: true, - errorMessage: "failed to decode fields in decode_xml processor: error decoding XML field: xml.Decoder.Token() - XML syntax error on line 7: element closed by ", + errorMessage: "error decoding XML field:", }, { description: "Decoding with broken XML format, with IgnoreFailure true", @@ -281,10 +275,15 @@ func TestDecodeXML(t *testing.T) { catalog>`, }, Output: common.MapStr{ - "message": (map[string]interface{})(nil), + "message": ` + + + William H. Gaddis + The Recognitions + One of the great seminal American novels of the 20th century. + + catalog>`, }, - error: true, - errorMessage: "failed to decode fields in decode_xml processor: error decoding XML field: xml.Decoder.Token() - XML syntax error on line 7: element closed by ", }, { description: "Test when the XML field is empty, IgnoreMissing false", @@ -297,10 +296,10 @@ func TestDecodeXML(t *testing.T) { }, Output: common.MapStr{ "message": "testing message", - "error": "key not found; field value is not a string", + "error": common.MapStr{"message": "failed in decode_xml on the \"message2\" field: key not found"}, }, error: true, - errorMessage: "key not found; field value is not a string", + errorMessage: "key not found", }, { description: "Test when the XML field is empty IgnoreMissing true", @@ -314,11 +313,9 @@ func TestDecodeXML(t *testing.T) { Output: common.MapStr{ "message": "testing message", }, - error: false, - errorMessage: "", }, { - description: "Test when the XML field not a string, Ignorefailure false", + description: "Test when the XML field not a string, IgnoreFailure false", config: decodeXMLConfig{ Field: "message", IgnoreFailure: false, @@ -328,7 +325,7 @@ func TestDecodeXML(t *testing.T) { }, Output: common.MapStr{ "message": 1, - "error": "field value is not a string", + "error": common.MapStr{"message": "failed in decode_xml on the \"message\" field: field value is not a string"}, }, error: true, errorMessage: "field value is not a string", @@ -345,8 +342,6 @@ func TestDecodeXML(t *testing.T) { Output: common.MapStr{ "message": 1, }, - error: true, - errorMessage: "field value is not a string", }, } @@ -354,10 +349,9 @@ func TestDecodeXML(t *testing.T) { test := test t.Run(test.description, func(t *testing.T) { t.Parallel() - f := &decodeXML{ - log: logp.NewLogger("decode_xml"), - config: test.config, - } + + f, err := newDecodeXML(test.config) + require.NoError(t, err) event := &beat.Event{ Fields: test.Input, @@ -366,8 +360,9 @@ func TestDecodeXML(t *testing.T) { if !test.error { assert.NoError(t, err) } else { - assert.Error(t, err) - assert.EqualError(t, err, test.errorMessage) + if assert.Error(t, err) { + assert.Contains(t, err.Error(), test.errorMessage) + } } assert.Equal(t, test.Output, newEvent.Fields) }) @@ -417,9 +412,7 @@ func BenchmarkProcessor_Run(b *testing.B) { Ralls, Kim - A former architect battles corporate zombies, - an evil sorceress, and her own childhood to become queen - of the world. + A former architect battles corporate zombies, an evil sorceress, and her own childhood to become queen of the world. `, @@ -435,7 +428,11 @@ func BenchmarkProcessor_Run(b *testing.B) { } func TestXMLToDocumentID(t *testing.T) { - log := logp.NewLogger("decode_xml") + p, err := newDecodeXML(decodeXMLConfig{ + Field: "message", + DocumentID: "catalog.book.seq", + }) + require.NoError(t, err) input := common.MapStr{ "message": ` @@ -446,19 +443,6 @@ func TestXMLToDocumentID(t *testing.T) { `, } - - config := common.MustNewConfigFrom(map[string]interface{}{ - "fields": []string{"message"}, - "document_id": "catalog.book.seq", - "target": "message", - }) - - p, err := New(config) - if err != nil { - log.Error("Error initializing decode_xml") - t.Fatal(err) - } - actual, err := p.Run(&beat.Event{Fields: input}) require.NoError(t, err)