From af160a49078ec6369cd02cfe0dd7734b0b527a91 Mon Sep 17 00:00:00 2001 From: Jose Riguera Date: Thu, 23 May 2024 15:06:18 +0200 Subject: [PATCH] [recevier/cloudfoundryreceiver] WIP: Reimplement function to parse RTR log lines extracting fields and add tests Co-authored-by: Cem Deniz Kabakci --- receiver/cloudfoundryreceiver/converter.go | 134 +++++++++++++----- .../cloudfoundryreceiver/converter_test.go | 50 ++++++- receiver/cloudfoundryreceiver/receiver.go | 2 +- 3 files changed, 144 insertions(+), 42 deletions(-) diff --git a/receiver/cloudfoundryreceiver/converter.go b/receiver/cloudfoundryreceiver/converter.go index 2cd0579d1ab8..c8cf41a34800 100644 --- a/receiver/cloudfoundryreceiver/converter.go +++ b/receiver/cloudfoundryreceiver/converter.go @@ -4,8 +4,10 @@ package cloudfoundryreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/cloudfoundryreceiver" import ( + "fmt" "strings" "time" + "unicode" "code.cloudfoundry.org/go-loggregator/rpc/loggregator_v2" "go.opentelemetry.io/collector/pdata/pcommon" @@ -15,7 +17,11 @@ import ( ) const ( - attributeNamePrefix = "org.cloudfoundry." + attributeNamePrefix = "org.cloudfoundry." + envelopeSourceTypeTag = "org.cloudfoundry.source_type" + envelopeSourceTypeValueRTR = "RTR" + logLineRTRTraceIDKey = "x_b3_traceid" + logLineRTRSpanIDKey = "x_b3_spanid" ) func convertEnvelopeToMetrics(envelope *loggregator_v2.Envelope, metricSlice pmetric.MetricSlice, startTime time.Time) { @@ -43,9 +49,8 @@ func convertEnvelopeToMetrics(envelope *loggregator_v2.Envelope, metricSlice pme } } -func convertEnvelopeToLogs(envelope *loggregator_v2.Envelope, logSlice plog.LogRecordSlice, startTime time.Time) { - switch envelope.Message.(type) { - case *loggregator_v2.Envelope_Log: +func convertEnvelopeToLogs(envelope *loggregator_v2.Envelope, logSlice plog.LogRecordSlice, startTime time.Time) error { + if _, isLog := envelope.Message.(*loggregator_v2.Envelope_Log); isLog { log := logSlice.AppendEmpty() log.SetTimestamp(pcommon.Timestamp(envelope.GetTimestamp())) log.SetObservedTimestamp(pcommon.NewTimestampFromTime(startTime)) @@ -59,9 +64,30 @@ func convertEnvelopeToLogs(envelope *loggregator_v2.Envelope, logSlice plog.LogR log.SetSeverityNumber(plog.SeverityNumberError) } copyEnvelopeAttributes(log.Attributes(), envelope) - _ = parseLogTracingFields(log) - default: + if value, found := log.Attributes().Get(envelopeSourceTypeTag); found && value.AsString() == envelopeSourceTypeValueRTR { + _, wordsMap := parseLogLine(log.Body().AsString()) + traceIDStr, found := wordsMap[logLineRTRTraceIDKey] + if !found { + return fmt.Errorf("traceid key %s not found in log", logLineRTRTraceIDKey) + } + spanIDStr, found := wordsMap[logLineRTRSpanIDKey] + if !found { + return fmt.Errorf("spanid key %s not found in log", logLineRTRSpanIDKey) + } + traceID, err := trace.TraceIDFromHex(traceIDStr) + if err != nil { + return err + } + spanID, err := trace.SpanIDFromHex(spanIDStr) + if err != nil { + return err + } + log.SetTraceID([16]byte(traceID)) + log.SetSpanID([8]byte(spanID)) + } + return nil } + return fmt.Errorf("envelope is not a log") } func copyEnvelopeAttributes(attributes pcommon.Map, envelope *loggregator_v2.Envelope) { @@ -78,37 +104,73 @@ func copyEnvelopeAttributes(attributes pcommon.Map, envelope *loggregator_v2.Env } } -func parseLogTracingFields(log plog.LogRecord) error { - if value, found := log.Attributes().Get("org.cloudfoundry.source_type"); !found || value.AsString() != "RTR" { - return nil - } - s := log.Body().AsString() - quoted := false - a := strings.FieldsFunc(s, func(r rune) bool { - if r == '"' { - quoted = !quoted +func parseLogLine(s string) ([]string, map[string]string) { + wordList := make([]string, 0, 20) + sb := &strings.Builder{} + mapValue := &strings.Builder{} + timestamp := &strings.Builder{} + isTimeStamp := false + mapKey := "" + isMap := false + isQuoted := false + wordMap := make(map[string]string) + for _, ch := range s { + if ch == '"' { + isQuoted = !isQuoted + sb.WriteRune(ch) + continue } - return !quoted && r == ' ' - }) - - traceIDStr := strings.Split(a[21], ":")[1] - traceIDStr = strings.Trim(traceIDStr, "\"") - - spanIDStr := strings.Split(a[22], ":")[1] - spanIDStr = strings.Trim(spanIDStr, "\"") - - traceID, err := trace.TraceIDFromHex(traceIDStr) - if err != nil { - return err + if isQuoted { + sb.WriteRune(ch) + if isMap { + mapValue.WriteRune(ch) + } + continue + } + if ch == '[' && sb.Len() == 0 { + // first char after space + isTimeStamp = true + continue + } + if ch == ']' && isTimeStamp { + wordList = append(wordList, timestamp.String()) + timestamp.Reset() + isTimeStamp = false + continue + } + if isTimeStamp { + timestamp.WriteRune(ch) + continue + } + if unicode.IsSpace(ch) { + if sb.Len() > 0 { + word := sb.String() + if isMap { + wordMap[mapKey] = mapValue.String() + } else if strings.HasPrefix(word, `"`) && strings.HasSuffix(word, `"`) { + // remove " if the item is not a keyMap and starts and ends with it + word = strings.Trim(word, `"`) + } + wordList = append(wordList, word) + } + isMap = false + mapValue.Reset() + sb.Reset() + continue + } + if isMap { + mapValue.WriteRune(ch) + } else if ch == ':' { + mapKey = sb.String() + isMap = true + } + sb.WriteRune(ch) } - - spanID, err := trace.SpanIDFromHex(spanIDStr) - if err != nil { - return err + if sb.Len() > 0 { + wordList = append(wordList, sb.String()) + if isMap { + wordMap[mapKey] = mapValue.String() + } } - - log.SetTraceID([16]byte(traceID)) - log.SetSpanID([8]byte(spanID)) - - return nil + return wordList, wordMap } diff --git a/receiver/cloudfoundryreceiver/converter_test.go b/receiver/cloudfoundryreceiver/converter_test.go index e666afea9b61..a0c19da6bf6a 100644 --- a/receiver/cloudfoundryreceiver/converter_test.go +++ b/receiver/cloudfoundryreceiver/converter_test.go @@ -143,6 +143,49 @@ func TestConvertGaugeEnvelope(t *testing.T) { assertAttributes(t, dataPoint.Attributes(), expectedAttributes) } +func TestParseLogLine(t *testing.T) { + logLines := []string{ + `www.example.com - [2024-05-21T15:40:13.892179798Z] "GET /articles/ssdfws HTTP/1.1" 200 0 110563 "-" "python-requests/2.26.0" "20.191.2.244:52238" "10.88.195.81:61222" x_forwarded_for:"18.21.57.150, 10.28.45.29, 35.16.25.46, 20.191.2.244" x_forwarded_proto:"https" vcap_request_id:"766afb19-1779-4bb9-65d4-f01306f9f912" response_time:0.191835 gorouter_time:0.000139 app_id:"e3267823-0938-43ce-85ff-003e3e3a5a23" app_index:"4" instance_id:"918dd283-a0ed-48be-7f0c-253b" x_cf_routererror:"-" x_forwarded_host:"www.example.com" x_b3_traceid:"766afb1917794bb965d4f01306f9f912" x_b3_spanid:"65d4f01306f9f912" x_b3_parentspanid:"-" b3:"766afb1917794bb965d4f01306f9f912-65d4f01306f9f912" traceparent:"00-766afb1917794bb965d4f01306f9f912-65d4f01306f9f912-01" tracestate:"gorouter=65d4f01306f9f912"`, + } + wordListExpected := [][]string{ + {"www.example.com", "-", "2024-05-21T15:40:13.892179798Z", "GET /articles/ssdfws HTTP/1.1", "200", "0", "110563", "-", "python-requests/2.26.0", "20.191.2.244:52238", "10.88.195.81:61222", `x_forwarded_for:"18.21.57.150, 10.28.45.29, 35.16.25.46, 20.191.2.244"`, `x_forwarded_proto:"https"`, `vcap_request_id:"766afb19-1779-4bb9-65d4-f01306f9f912"`, `response_time:0.191835`, `gorouter_time:0.000139`, `app_id:"e3267823-0938-43ce-85ff-003e3e3a5a23"`, `app_index:"4"`, `instance_id:"918dd283-a0ed-48be-7f0c-253b"`, `x_cf_routererror:"-"`, `x_forwarded_host:"www.example.com"`, `x_b3_traceid:"766afb1917794bb965d4f01306f9f912"`, `x_b3_spanid:"65d4f01306f9f912"`, `x_b3_parentspanid:"-"`, `b3:"766afb1917794bb965d4f01306f9f912-65d4f01306f9f912"`, `traceparent:"00-766afb1917794bb965d4f01306f9f912-65d4f01306f9f912-01"`, `tracestate:"gorouter=65d4f01306f9f912"`}, + } + wordMapExpected := []map[string]string{ + { + "x_forwarded_for": "18.21.57.150, 10.28.45.29, 35.16.25.46, 20.191.2.244", + "x_forwarded_proto": "https", + "vcap_request_id": "766afb19-1779-4bb9-65d4-f01306f9f912", + "response_time": "0.191835", + "gorouter_time": "0.000139", + "app_id": "e3267823-0938-43ce-85ff-003e3e3a5a23", + "app_index": "4", + "instance_id": "918dd283-a0ed-48be-7f0c-253b", + "x_cf_routererror": "-", + "x_forwarded_host": "www.example.com", + "x_b3_traceid": "766afb1917794bb965d4f01306f9f912", + "x_b3_spanid": "65d4f01306f9f912", + "x_b3_parentspanid": "-", + "b3": "766afb1917794bb965d4f01306f9f912-65d4f01306f9f912", + "traceparent": "00-766afb1917794bb965d4f01306f9f912-65d4f01306f9f912-01", + "tracestate": "gorouter=65d4f01306f9f912", + }, + } + for index, logLine := range logLines { + wordList, wordMap := parseLogLine(logLine) + require.Equal(t, len(wordList), len(wordListExpected[index])) + require.Equal(t, len(wordMap), len(wordMapExpected[index])) + + for wordExpectedIndex, wordExpected := range wordListExpected[index] { + assert.Equal(t, wordExpected, wordList[wordExpectedIndex], "List Item %s value", wordList[wordExpectedIndex]) + } + for k, v := range wordMapExpected[index] { + value, present := wordMap[k] + assert.True(t, present, "Map Item %s presence", k) + assert.Equal(t, v, value, "Map Item %s value", v) + } + } +} + func TestConvertLogsEnvelope(t *testing.T) { now := time.Now() before := time.Now().Add(-time.Second) @@ -166,17 +209,14 @@ func TestConvertLogsEnvelope(t *testing.T) { } logSlice := plog.NewLogRecordSlice() - - convertEnvelopeToLogs(&envelope, logSlice, now) - + e := convertEnvelopeToLogs(&envelope, logSlice, now) + require.Equal(t, nil, e) require.Equal(t, 1, logSlice.Len()) - log := logSlice.At(0) assert.Equal(t, "log message payload", log.Body().AsString()) assert.Equal(t, plog.SeverityNumberInfo.String(), log.SeverityText()) assert.Equal(t, pcommon.NewTimestampFromTime(before), log.Timestamp()) assert.Equal(t, pcommon.NewTimestampFromTime(now), log.ObservedTimestamp()) - assertAttributes(t, log.Attributes(), map[string]string{ "org.cloudfoundry.source_id": "uaa", "org.cloudfoundry.origin": "gorouter", diff --git a/receiver/cloudfoundryreceiver/receiver.go b/receiver/cloudfoundryreceiver/receiver.go index 8895b0f76d5a..cb94b8c431b9 100644 --- a/receiver/cloudfoundryreceiver/receiver.go +++ b/receiver/cloudfoundryreceiver/receiver.go @@ -218,7 +218,7 @@ func (cfr *cloudFoundryReceiver) streamLogs( observedTime := time.Now() for _, envelope := range envelopes { if envelope != nil { - convertEnvelopeToLogs(envelope, libraryLogs, observedTime) + _ = convertEnvelopeToLogs(envelope, libraryLogs, observedTime) } }