Skip to content

Commit

Permalink
[recevier/cloudfoundryreceiver] WIP: Reimplement function to parse RT…
Browse files Browse the repository at this point in the history
…R log lines extracting fields and add tests

Co-authored-by: Cem Deniz Kabakci <cem.kabakci@springer.com>
  • Loading branch information
jriguera and CemDK committed May 23, 2024
1 parent 6dcf1fc commit af160a4
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 42 deletions.
134 changes: 98 additions & 36 deletions receiver/cloudfoundryreceiver/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
package cloudfoundryreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/cloudfoundryreceiver"

import (
"fmt"
"strings"
"time"
"unicode"

"code.cloudfoundry.org/go-loggregator/rpc/loggregator_v2"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand All @@ -15,7 +17,11 @@ import (
)

const (
attributeNamePrefix = "org.cloudfoundry."
attributeNamePrefix = "org.cloudfoundry."
envelopeSourceTypeTag = "org.cloudfoundry.source_type"
envelopeSourceTypeValueRTR = "RTR"
logLineRTRTraceIDKey = "x_b3_traceid"
logLineRTRSpanIDKey = "x_b3_spanid"
)

func convertEnvelopeToMetrics(envelope *loggregator_v2.Envelope, metricSlice pmetric.MetricSlice, startTime time.Time) {
Expand Down Expand Up @@ -43,9 +49,8 @@ func convertEnvelopeToMetrics(envelope *loggregator_v2.Envelope, metricSlice pme
}
}

func convertEnvelopeToLogs(envelope *loggregator_v2.Envelope, logSlice plog.LogRecordSlice, startTime time.Time) {
switch envelope.Message.(type) {
case *loggregator_v2.Envelope_Log:
func convertEnvelopeToLogs(envelope *loggregator_v2.Envelope, logSlice plog.LogRecordSlice, startTime time.Time) error {
if _, isLog := envelope.Message.(*loggregator_v2.Envelope_Log); isLog {
log := logSlice.AppendEmpty()
log.SetTimestamp(pcommon.Timestamp(envelope.GetTimestamp()))
log.SetObservedTimestamp(pcommon.NewTimestampFromTime(startTime))
Expand All @@ -59,9 +64,30 @@ func convertEnvelopeToLogs(envelope *loggregator_v2.Envelope, logSlice plog.LogR
log.SetSeverityNumber(plog.SeverityNumberError)
}
copyEnvelopeAttributes(log.Attributes(), envelope)
_ = parseLogTracingFields(log)
default:
if value, found := log.Attributes().Get(envelopeSourceTypeTag); found && value.AsString() == envelopeSourceTypeValueRTR {
_, wordsMap := parseLogLine(log.Body().AsString())
traceIDStr, found := wordsMap[logLineRTRTraceIDKey]
if !found {
return fmt.Errorf("traceid key %s not found in log", logLineRTRTraceIDKey)
}
spanIDStr, found := wordsMap[logLineRTRSpanIDKey]
if !found {
return fmt.Errorf("spanid key %s not found in log", logLineRTRSpanIDKey)
}
traceID, err := trace.TraceIDFromHex(traceIDStr)
if err != nil {
return err
}
spanID, err := trace.SpanIDFromHex(spanIDStr)
if err != nil {
return err
}
log.SetTraceID([16]byte(traceID))
log.SetSpanID([8]byte(spanID))
}
return nil
}
return fmt.Errorf("envelope is not a log")
}

func copyEnvelopeAttributes(attributes pcommon.Map, envelope *loggregator_v2.Envelope) {
Expand All @@ -78,37 +104,73 @@ func copyEnvelopeAttributes(attributes pcommon.Map, envelope *loggregator_v2.Env
}
}

func parseLogTracingFields(log plog.LogRecord) error {
if value, found := log.Attributes().Get("org.cloudfoundry.source_type"); !found || value.AsString() != "RTR" {
return nil
}
s := log.Body().AsString()
quoted := false
a := strings.FieldsFunc(s, func(r rune) bool {
if r == '"' {
quoted = !quoted
func parseLogLine(s string) ([]string, map[string]string) {
wordList := make([]string, 0, 20)
sb := &strings.Builder{}
mapValue := &strings.Builder{}
timestamp := &strings.Builder{}
isTimeStamp := false
mapKey := ""
isMap := false
isQuoted := false
wordMap := make(map[string]string)
for _, ch := range s {
if ch == '"' {
isQuoted = !isQuoted
sb.WriteRune(ch)
continue
}
return !quoted && r == ' '
})

traceIDStr := strings.Split(a[21], ":")[1]
traceIDStr = strings.Trim(traceIDStr, "\"")

spanIDStr := strings.Split(a[22], ":")[1]
spanIDStr = strings.Trim(spanIDStr, "\"")

traceID, err := trace.TraceIDFromHex(traceIDStr)
if err != nil {
return err
if isQuoted {
sb.WriteRune(ch)
if isMap {
mapValue.WriteRune(ch)
}
continue
}
if ch == '[' && sb.Len() == 0 {
// first char after space
isTimeStamp = true
continue
}
if ch == ']' && isTimeStamp {
wordList = append(wordList, timestamp.String())
timestamp.Reset()
isTimeStamp = false
continue
}
if isTimeStamp {
timestamp.WriteRune(ch)
continue
}
if unicode.IsSpace(ch) {
if sb.Len() > 0 {
word := sb.String()
if isMap {
wordMap[mapKey] = mapValue.String()
} else if strings.HasPrefix(word, `"`) && strings.HasSuffix(word, `"`) {
// remove " if the item is not a keyMap and starts and ends with it
word = strings.Trim(word, `"`)
}
wordList = append(wordList, word)
}
isMap = false
mapValue.Reset()
sb.Reset()
continue
}
if isMap {
mapValue.WriteRune(ch)
} else if ch == ':' {
mapKey = sb.String()
isMap = true
}
sb.WriteRune(ch)
}

spanID, err := trace.SpanIDFromHex(spanIDStr)
if err != nil {
return err
if sb.Len() > 0 {
wordList = append(wordList, sb.String())
if isMap {
wordMap[mapKey] = mapValue.String()
}
}

log.SetTraceID([16]byte(traceID))
log.SetSpanID([8]byte(spanID))

return nil
return wordList, wordMap
}
50 changes: 45 additions & 5 deletions receiver/cloudfoundryreceiver/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,49 @@ func TestConvertGaugeEnvelope(t *testing.T) {
assertAttributes(t, dataPoint.Attributes(), expectedAttributes)
}

func TestParseLogLine(t *testing.T) {
logLines := []string{
`www.example.com - [2024-05-21T15:40:13.892179798Z] "GET /articles/ssdfws HTTP/1.1" 200 0 110563 "-" "python-requests/2.26.0" "20.191.2.244:52238" "10.88.195.81:61222" x_forwarded_for:"18.21.57.150, 10.28.45.29, 35.16.25.46, 20.191.2.244" x_forwarded_proto:"https" vcap_request_id:"766afb19-1779-4bb9-65d4-f01306f9f912" response_time:0.191835 gorouter_time:0.000139 app_id:"e3267823-0938-43ce-85ff-003e3e3a5a23" app_index:"4" instance_id:"918dd283-a0ed-48be-7f0c-253b" x_cf_routererror:"-" x_forwarded_host:"www.example.com" x_b3_traceid:"766afb1917794bb965d4f01306f9f912" x_b3_spanid:"65d4f01306f9f912" x_b3_parentspanid:"-" b3:"766afb1917794bb965d4f01306f9f912-65d4f01306f9f912" traceparent:"00-766afb1917794bb965d4f01306f9f912-65d4f01306f9f912-01" tracestate:"gorouter=65d4f01306f9f912"`,
}
wordListExpected := [][]string{
{"www.example.com", "-", "2024-05-21T15:40:13.892179798Z", "GET /articles/ssdfws HTTP/1.1", "200", "0", "110563", "-", "python-requests/2.26.0", "20.191.2.244:52238", "10.88.195.81:61222", `x_forwarded_for:"18.21.57.150, 10.28.45.29, 35.16.25.46, 20.191.2.244"`, `x_forwarded_proto:"https"`, `vcap_request_id:"766afb19-1779-4bb9-65d4-f01306f9f912"`, `response_time:0.191835`, `gorouter_time:0.000139`, `app_id:"e3267823-0938-43ce-85ff-003e3e3a5a23"`, `app_index:"4"`, `instance_id:"918dd283-a0ed-48be-7f0c-253b"`, `x_cf_routererror:"-"`, `x_forwarded_host:"www.example.com"`, `x_b3_traceid:"766afb1917794bb965d4f01306f9f912"`, `x_b3_spanid:"65d4f01306f9f912"`, `x_b3_parentspanid:"-"`, `b3:"766afb1917794bb965d4f01306f9f912-65d4f01306f9f912"`, `traceparent:"00-766afb1917794bb965d4f01306f9f912-65d4f01306f9f912-01"`, `tracestate:"gorouter=65d4f01306f9f912"`},
}
wordMapExpected := []map[string]string{
{
"x_forwarded_for": "18.21.57.150, 10.28.45.29, 35.16.25.46, 20.191.2.244",
"x_forwarded_proto": "https",
"vcap_request_id": "766afb19-1779-4bb9-65d4-f01306f9f912",
"response_time": "0.191835",
"gorouter_time": "0.000139",
"app_id": "e3267823-0938-43ce-85ff-003e3e3a5a23",
"app_index": "4",
"instance_id": "918dd283-a0ed-48be-7f0c-253b",
"x_cf_routererror": "-",
"x_forwarded_host": "www.example.com",
"x_b3_traceid": "766afb1917794bb965d4f01306f9f912",
"x_b3_spanid": "65d4f01306f9f912",
"x_b3_parentspanid": "-",
"b3": "766afb1917794bb965d4f01306f9f912-65d4f01306f9f912",
"traceparent": "00-766afb1917794bb965d4f01306f9f912-65d4f01306f9f912-01",
"tracestate": "gorouter=65d4f01306f9f912",
},
}
for index, logLine := range logLines {
wordList, wordMap := parseLogLine(logLine)
require.Equal(t, len(wordList), len(wordListExpected[index]))
require.Equal(t, len(wordMap), len(wordMapExpected[index]))

for wordExpectedIndex, wordExpected := range wordListExpected[index] {
assert.Equal(t, wordExpected, wordList[wordExpectedIndex], "List Item %s value", wordList[wordExpectedIndex])
}
for k, v := range wordMapExpected[index] {
value, present := wordMap[k]
assert.True(t, present, "Map Item %s presence", k)
assert.Equal(t, v, value, "Map Item %s value", v)
}
}
}

func TestConvertLogsEnvelope(t *testing.T) {
now := time.Now()
before := time.Now().Add(-time.Second)
Expand All @@ -166,17 +209,14 @@ func TestConvertLogsEnvelope(t *testing.T) {
}

logSlice := plog.NewLogRecordSlice()

convertEnvelopeToLogs(&envelope, logSlice, now)

e := convertEnvelopeToLogs(&envelope, logSlice, now)
require.Equal(t, nil, e)
require.Equal(t, 1, logSlice.Len())

log := logSlice.At(0)
assert.Equal(t, "log message payload", log.Body().AsString())
assert.Equal(t, plog.SeverityNumberInfo.String(), log.SeverityText())
assert.Equal(t, pcommon.NewTimestampFromTime(before), log.Timestamp())
assert.Equal(t, pcommon.NewTimestampFromTime(now), log.ObservedTimestamp())

assertAttributes(t, log.Attributes(), map[string]string{
"org.cloudfoundry.source_id": "uaa",
"org.cloudfoundry.origin": "gorouter",
Expand Down
2 changes: 1 addition & 1 deletion receiver/cloudfoundryreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (cfr *cloudFoundryReceiver) streamLogs(
observedTime := time.Now()
for _, envelope := range envelopes {
if envelope != nil {
convertEnvelopeToLogs(envelope, libraryLogs, observedTime)
_ = convertEnvelopeToLogs(envelope, libraryLogs, observedTime)
}
}

Expand Down

0 comments on commit af160a4

Please sign in to comment.