From d75a2f967267da1aeddf7d62b427cf73329436eb Mon Sep 17 00:00:00 2001 From: Jay Camp Date: Mon, 21 Dec 2020 17:18:24 -0500 Subject: [PATCH] [receiver/fluentforward] Fix string conversions (#2314) * [receiver/fluentforward] Fix string conversions - Sometimes strings are received as []uint8 - fluentd uses "message", fluent-bit uses "log" for the log message * lint fix --- receiver/fluentforwardreceiver/conversion.go | 21 +++++++++++-- .../fluentforwardreceiver/conversion_test.go | 10 +++++-- .../fluentforwardreceiver/receiver_test.go | 30 +++++++------------ 3 files changed, 35 insertions(+), 26 deletions(-) diff --git a/receiver/fluentforwardreceiver/conversion.go b/receiver/fluentforwardreceiver/conversion.go index 74e3b0febe2..45fd8147144 100644 --- a/receiver/fluentforwardreceiver/conversion.go +++ b/receiver/fluentforwardreceiver/conversion.go @@ -147,15 +147,30 @@ func parseRecordToLogRecord(dc *msgp.Reader, lr pdata.LogRecord) error { recordLen-- key, err := dc.ReadString() if err != nil { - return msgp.WrapError(err, "Record") + // The protocol doesn't specify this but apparently some map keys + // can be binary type instead of string + keyBytes, keyBytesErr := dc.ReadBytes(nil) + if keyBytesErr != nil { + return msgp.WrapError(keyBytesErr, "Record") + } + key = string(keyBytes) } val, err := dc.ReadIntf() if err != nil { return msgp.WrapError(err, "Record", key) } - if s, ok := val.(string); ok && key == "log" { - lr.Body().SetStringVal(s) + // fluentd uses message, fluentbit log. + if key == "message" || key == "log" { + switch v := val.(type) { + case string: + lr.Body().SetStringVal(v) + case []uint8: + // Sometimes strings come in as uint8's. + lr.Body().SetStringVal(string(v)) + default: + return fmt.Errorf("cannot convert message type %T to string", val) + } } else { insertToAttributeMap(key, val, &attrs) } diff --git a/receiver/fluentforwardreceiver/conversion_test.go b/receiver/fluentforwardreceiver/conversion_test.go index 5681895c272..740e30219d3 100644 --- a/receiver/fluentforwardreceiver/conversion_test.go +++ b/receiver/fluentforwardreceiver/conversion_test.go @@ -53,13 +53,12 @@ func TestMessageEventConversion(t *testing.T) { } func TestAttributeTypeConversion(t *testing.T) { - var b []byte b = msgp.AppendArrayHeader(b, 3) b = msgp.AppendString(b, "my-tag") b = msgp.AppendInt(b, 5000) - b = msgp.AppendMapHeader(b, 14) + b = msgp.AppendMapHeader(b, 15) b = msgp.AppendString(b, "a") b = msgp.AppendFloat64(b, 5.0) b = msgp.AppendString(b, "b") @@ -90,11 +89,15 @@ func TestAttributeTypeConversion(t *testing.T) { b = msgp.AppendArrayHeader(b, 2) b = msgp.AppendString(b, "first") b = msgp.AppendString(b, "second") + b = msgp.AppendString(b, "o") + b, err := msgp.AppendIntf(b, []uint8{99, 100, 101}) + + require.NoError(t, err) reader := msgp.NewReader(bytes.NewReader(b)) var event MessageEventLogRecord - err := event.DecodeMsg(reader) + err = event.DecodeMsg(reader) require.Nil(t, err) le := event.LogRecords().At(0) @@ -119,6 +122,7 @@ func TestAttributeTypeConversion(t *testing.T) { "l": pdata.NewAttributeValueString("(0+0i)"), "m": pdata.NewAttributeValueString("\001e\002"), "n": pdata.NewAttributeValueString(`["first","second"]`), + "o": pdata.NewAttributeValueString("cde"), }, }, ).ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs().At(0), le) diff --git a/receiver/fluentforwardreceiver/receiver_test.go b/receiver/fluentforwardreceiver/receiver_test.go index a7122825f1f..44afaa00511 100644 --- a/receiver/fluentforwardreceiver/receiver_test.go +++ b/receiver/fluentforwardreceiver/receiver_test.go @@ -237,10 +237,9 @@ func TestForwardPackedEvent(t *testing.T) { require.EqualValues(t, logstest.Logs( logstest.Log{ Timestamp: 1593032517024597622, - Body: pdata.NewAttributeValueNull(), + Body: pdata.NewAttributeValueString("starting fluentd worker pid=17 ppid=7 worker=0"), Attributes: map[string]pdata.AttributeValue{ "fluent.tag": pdata.NewAttributeValueString("fluent.info"), - "message": pdata.NewAttributeValueString("starting fluentd worker pid=17 ppid=7 worker=0"), "pid": pdata.NewAttributeValueInt(17), "ppid": pdata.NewAttributeValueInt(7), "worker": pdata.NewAttributeValueInt(0), @@ -248,35 +247,31 @@ func TestForwardPackedEvent(t *testing.T) { }, logstest.Log{ Timestamp: 1593032517028573686, - Body: pdata.NewAttributeValueNull(), + Body: pdata.NewAttributeValueString("delayed_commit_timeout is overwritten by ack_response_timeout"), Attributes: map[string]pdata.AttributeValue{ "fluent.tag": pdata.NewAttributeValueString("fluent.info"), - "message": pdata.NewAttributeValueString("delayed_commit_timeout is overwritten by ack_response_timeout"), }, }, logstest.Log{ Timestamp: 1593032517028815948, - Body: pdata.NewAttributeValueNull(), + Body: pdata.NewAttributeValueString("following tail of /var/log/kern.log"), Attributes: map[string]pdata.AttributeValue{ "fluent.tag": pdata.NewAttributeValueString("fluent.info"), - "message": pdata.NewAttributeValueString("following tail of /var/log/kern.log"), }, }, logstest.Log{ Timestamp: 1593032517031174229, - Body: pdata.NewAttributeValueNull(), + Body: pdata.NewAttributeValueString("fluentd worker is now running worker=0"), Attributes: map[string]pdata.AttributeValue{ "fluent.tag": pdata.NewAttributeValueString("fluent.info"), - "message": pdata.NewAttributeValueString("fluentd worker is now running worker=0"), "worker": pdata.NewAttributeValueInt(0), }, }, logstest.Log{ Timestamp: 1593032522187382822, - Body: pdata.NewAttributeValueNull(), + Body: pdata.NewAttributeValueString("fluentd worker is now stopping worker=0"), Attributes: map[string]pdata.AttributeValue{ "fluent.tag": pdata.NewAttributeValueString("fluent.info"), - "message": pdata.NewAttributeValueString("fluentd worker is now stopping worker=0"), "worker": pdata.NewAttributeValueInt(0), }, }, @@ -308,10 +303,9 @@ func TestForwardPackedCompressedEvent(t *testing.T) { require.EqualValues(t, logstest.Logs( logstest.Log{ Timestamp: 1593032426012197420, - Body: pdata.NewAttributeValueNull(), + Body: pdata.NewAttributeValueString("starting fluentd worker pid=17 ppid=7 worker=0"), Attributes: map[string]pdata.AttributeValue{ "fluent.tag": pdata.NewAttributeValueString("fluent.info"), - "message": pdata.NewAttributeValueString("starting fluentd worker pid=17 ppid=7 worker=0"), "pid": pdata.NewAttributeValueInt(17), "ppid": pdata.NewAttributeValueInt(7), "worker": pdata.NewAttributeValueInt(0), @@ -319,35 +313,31 @@ func TestForwardPackedCompressedEvent(t *testing.T) { }, logstest.Log{ Timestamp: 1593032426013724933, - Body: pdata.NewAttributeValueNull(), + Body: pdata.NewAttributeValueString("delayed_commit_timeout is overwritten by ack_response_timeout"), Attributes: map[string]pdata.AttributeValue{ "fluent.tag": pdata.NewAttributeValueString("fluent.info"), - "message": pdata.NewAttributeValueString("delayed_commit_timeout is overwritten by ack_response_timeout"), }, }, logstest.Log{ Timestamp: 1593032426020510455, - Body: pdata.NewAttributeValueNull(), + Body: pdata.NewAttributeValueString("following tail of /var/log/kern.log"), Attributes: map[string]pdata.AttributeValue{ "fluent.tag": pdata.NewAttributeValueString("fluent.info"), - "message": pdata.NewAttributeValueString("following tail of /var/log/kern.log"), }, }, logstest.Log{ Timestamp: 1593032426024346580, - Body: pdata.NewAttributeValueNull(), + Body: pdata.NewAttributeValueString("fluentd worker is now running worker=0"), Attributes: map[string]pdata.AttributeValue{ "fluent.tag": pdata.NewAttributeValueString("fluent.info"), - "message": pdata.NewAttributeValueString("fluentd worker is now running worker=0"), "worker": pdata.NewAttributeValueInt(0), }, }, logstest.Log{ Timestamp: 1593032434346935532, - Body: pdata.NewAttributeValueNull(), + Body: pdata.NewAttributeValueString("fluentd worker is now stopping worker=0"), Attributes: map[string]pdata.AttributeValue{ "fluent.tag": pdata.NewAttributeValueString("fluent.info"), - "message": pdata.NewAttributeValueString("fluentd worker is now stopping worker=0"), "worker": pdata.NewAttributeValueInt(0), }, },