Skip to content

Commit

Permalink
[receiver/fluentforward] Fix string conversions (#2314)
Browse files Browse the repository at this point in the history
* [receiver/fluentforward] Fix string conversions

- Sometimes strings are received as []uint8
- fluentd uses "message", fluent-bit uses "log" for the log message

* lint fix
  • Loading branch information
jrcamp authored Dec 21, 2020
1 parent 36c22b9 commit d75a2f9
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 26 deletions.
21 changes: 18 additions & 3 deletions receiver/fluentforwardreceiver/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,30 @@ func parseRecordToLogRecord(dc *msgp.Reader, lr pdata.LogRecord) error {
recordLen--
key, err := dc.ReadString()
if err != nil {
return msgp.WrapError(err, "Record")
// The protocol doesn't specify this but apparently some map keys
// can be binary type instead of string
keyBytes, keyBytesErr := dc.ReadBytes(nil)
if keyBytesErr != nil {
return msgp.WrapError(keyBytesErr, "Record")
}
key = string(keyBytes)
}
val, err := dc.ReadIntf()
if err != nil {
return msgp.WrapError(err, "Record", key)
}

if s, ok := val.(string); ok && key == "log" {
lr.Body().SetStringVal(s)
// fluentd uses message, fluentbit log.
if key == "message" || key == "log" {
switch v := val.(type) {
case string:
lr.Body().SetStringVal(v)
case []uint8:
// Sometimes strings come in as uint8's.
lr.Body().SetStringVal(string(v))
default:
return fmt.Errorf("cannot convert message type %T to string", val)
}
} else {
insertToAttributeMap(key, val, &attrs)
}
Expand Down
10 changes: 7 additions & 3 deletions receiver/fluentforwardreceiver/conversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,12 @@ func TestMessageEventConversion(t *testing.T) {
}

func TestAttributeTypeConversion(t *testing.T) {

var b []byte

b = msgp.AppendArrayHeader(b, 3)
b = msgp.AppendString(b, "my-tag")
b = msgp.AppendInt(b, 5000)
b = msgp.AppendMapHeader(b, 14)
b = msgp.AppendMapHeader(b, 15)
b = msgp.AppendString(b, "a")
b = msgp.AppendFloat64(b, 5.0)
b = msgp.AppendString(b, "b")
Expand Down Expand Up @@ -90,11 +89,15 @@ func TestAttributeTypeConversion(t *testing.T) {
b = msgp.AppendArrayHeader(b, 2)
b = msgp.AppendString(b, "first")
b = msgp.AppendString(b, "second")
b = msgp.AppendString(b, "o")
b, err := msgp.AppendIntf(b, []uint8{99, 100, 101})

require.NoError(t, err)

reader := msgp.NewReader(bytes.NewReader(b))

var event MessageEventLogRecord
err := event.DecodeMsg(reader)
err = event.DecodeMsg(reader)
require.Nil(t, err)

le := event.LogRecords().At(0)
Expand All @@ -119,6 +122,7 @@ func TestAttributeTypeConversion(t *testing.T) {
"l": pdata.NewAttributeValueString("(0+0i)"),
"m": pdata.NewAttributeValueString("\001e\002"),
"n": pdata.NewAttributeValueString(`["first","second"]`),
"o": pdata.NewAttributeValueString("cde"),
},
},
).ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs().At(0), le)
Expand Down
30 changes: 10 additions & 20 deletions receiver/fluentforwardreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,46 +237,41 @@ func TestForwardPackedEvent(t *testing.T) {
require.EqualValues(t, logstest.Logs(
logstest.Log{
Timestamp: 1593032517024597622,
Body: pdata.NewAttributeValueNull(),
Body: pdata.NewAttributeValueString("starting fluentd worker pid=17 ppid=7 worker=0"),
Attributes: map[string]pdata.AttributeValue{
"fluent.tag": pdata.NewAttributeValueString("fluent.info"),
"message": pdata.NewAttributeValueString("starting fluentd worker pid=17 ppid=7 worker=0"),
"pid": pdata.NewAttributeValueInt(17),
"ppid": pdata.NewAttributeValueInt(7),
"worker": pdata.NewAttributeValueInt(0),
},
},
logstest.Log{
Timestamp: 1593032517028573686,
Body: pdata.NewAttributeValueNull(),
Body: pdata.NewAttributeValueString("delayed_commit_timeout is overwritten by ack_response_timeout"),
Attributes: map[string]pdata.AttributeValue{
"fluent.tag": pdata.NewAttributeValueString("fluent.info"),
"message": pdata.NewAttributeValueString("delayed_commit_timeout is overwritten by ack_response_timeout"),
},
},
logstest.Log{
Timestamp: 1593032517028815948,
Body: pdata.NewAttributeValueNull(),
Body: pdata.NewAttributeValueString("following tail of /var/log/kern.log"),
Attributes: map[string]pdata.AttributeValue{
"fluent.tag": pdata.NewAttributeValueString("fluent.info"),
"message": pdata.NewAttributeValueString("following tail of /var/log/kern.log"),
},
},
logstest.Log{
Timestamp: 1593032517031174229,
Body: pdata.NewAttributeValueNull(),
Body: pdata.NewAttributeValueString("fluentd worker is now running worker=0"),
Attributes: map[string]pdata.AttributeValue{
"fluent.tag": pdata.NewAttributeValueString("fluent.info"),
"message": pdata.NewAttributeValueString("fluentd worker is now running worker=0"),
"worker": pdata.NewAttributeValueInt(0),
},
},
logstest.Log{
Timestamp: 1593032522187382822,
Body: pdata.NewAttributeValueNull(),
Body: pdata.NewAttributeValueString("fluentd worker is now stopping worker=0"),
Attributes: map[string]pdata.AttributeValue{
"fluent.tag": pdata.NewAttributeValueString("fluent.info"),
"message": pdata.NewAttributeValueString("fluentd worker is now stopping worker=0"),
"worker": pdata.NewAttributeValueInt(0),
},
},
Expand Down Expand Up @@ -308,46 +303,41 @@ func TestForwardPackedCompressedEvent(t *testing.T) {
require.EqualValues(t, logstest.Logs(
logstest.Log{
Timestamp: 1593032426012197420,
Body: pdata.NewAttributeValueNull(),
Body: pdata.NewAttributeValueString("starting fluentd worker pid=17 ppid=7 worker=0"),
Attributes: map[string]pdata.AttributeValue{
"fluent.tag": pdata.NewAttributeValueString("fluent.info"),
"message": pdata.NewAttributeValueString("starting fluentd worker pid=17 ppid=7 worker=0"),
"pid": pdata.NewAttributeValueInt(17),
"ppid": pdata.NewAttributeValueInt(7),
"worker": pdata.NewAttributeValueInt(0),
},
},
logstest.Log{
Timestamp: 1593032426013724933,
Body: pdata.NewAttributeValueNull(),
Body: pdata.NewAttributeValueString("delayed_commit_timeout is overwritten by ack_response_timeout"),
Attributes: map[string]pdata.AttributeValue{
"fluent.tag": pdata.NewAttributeValueString("fluent.info"),
"message": pdata.NewAttributeValueString("delayed_commit_timeout is overwritten by ack_response_timeout"),
},
},
logstest.Log{
Timestamp: 1593032426020510455,
Body: pdata.NewAttributeValueNull(),
Body: pdata.NewAttributeValueString("following tail of /var/log/kern.log"),
Attributes: map[string]pdata.AttributeValue{
"fluent.tag": pdata.NewAttributeValueString("fluent.info"),
"message": pdata.NewAttributeValueString("following tail of /var/log/kern.log"),
},
},
logstest.Log{
Timestamp: 1593032426024346580,
Body: pdata.NewAttributeValueNull(),
Body: pdata.NewAttributeValueString("fluentd worker is now running worker=0"),
Attributes: map[string]pdata.AttributeValue{
"fluent.tag": pdata.NewAttributeValueString("fluent.info"),
"message": pdata.NewAttributeValueString("fluentd worker is now running worker=0"),
"worker": pdata.NewAttributeValueInt(0),
},
},
logstest.Log{
Timestamp: 1593032434346935532,
Body: pdata.NewAttributeValueNull(),
Body: pdata.NewAttributeValueString("fluentd worker is now stopping worker=0"),
Attributes: map[string]pdata.AttributeValue{
"fluent.tag": pdata.NewAttributeValueString("fluent.info"),
"message": pdata.NewAttributeValueString("fluentd worker is now stopping worker=0"),
"worker": pdata.NewAttributeValueInt(0),
},
},
Expand Down

0 comments on commit d75a2f9

Please sign in to comment.