From 5251b712803373121a8dae3129f0c385edc1d85e Mon Sep 17 00:00:00 2001 From: joeyyy09 Date: Mon, 26 Aug 2024 10:48:38 +0530 Subject: [PATCH 1/9] Add otlp_json support in kafka receiver Signed-off-by: joeyyy09 --- .../kafkareceiver/otlp_json_unmarshaler.go | 159 ++++++++++++ .../otlp_json_unmarshaler_test.go | 241 ++++++++++++++++++ receiver/kafkareceiver/unmarshaler.go | 2 + 3 files changed, 402 insertions(+) create mode 100644 receiver/kafkareceiver/otlp_json_unmarshaler.go create mode 100644 receiver/kafkareceiver/otlp_json_unmarshaler_test.go diff --git a/receiver/kafkareceiver/otlp_json_unmarshaler.go b/receiver/kafkareceiver/otlp_json_unmarshaler.go new file mode 100644 index 000000000000..89a3121f7fae --- /dev/null +++ b/receiver/kafkareceiver/otlp_json_unmarshaler.go @@ -0,0 +1,159 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package kafkareceiver + +import ( + "encoding/hex" + "encoding/json" + "fmt" + "strconv" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +type otlpJSONTracesUnmarshaler struct { + encoding string +} + +func (o otlpJSONTracesUnmarshaler) Unmarshal(buf []byte) (ptrace.Traces, error) { + var jsonData map[string]interface{} + if err := json.Unmarshal(buf, &jsonData); err != nil { + return ptrace.NewTraces(), fmt.Errorf("failed to unmarshal JSON: %w", err) + } + + traces := ptrace.NewTraces() + resourceSpans := traces.ResourceSpans() + + // Parse resourceSpans + if resourceSpansArr, ok := jsonData["resourceSpans"].([]interface{}); ok { + for _, rs := range resourceSpansArr { + if rsMap, ok := rs.(map[string]interface{}); ok { + parseResourceSpan(rsMap, resourceSpans.AppendEmpty()) + } + } + } + + return traces, nil +} + +func (o otlpJSONTracesUnmarshaler) Encoding() string { + return o.encoding +} + +func newOTLPJSONTracesUnmarshaler() TracesUnmarshaler { + return &otlpJSONTracesUnmarshaler{ + encoding: "otlp_json", + } +} + +func parseResourceSpan(rsMap map[string]interface{}, rs ptrace.ResourceSpans) { + // Parse resource + if resource, ok := rsMap["resource"].(map[string]interface{}); ok { + parseResource(resource, rs.Resource()) + } + + // Parse scopeSpans + if scopeSpans, ok := rsMap["scopeSpans"].([]interface{}); ok { + for _, ss := range scopeSpans { + if ssMap, ok := ss.(map[string]interface{}); ok { + parseScopeSpan(ssMap, rs.ScopeSpans().AppendEmpty()) + } + } + } +} + +func parseResource(resource map[string]interface{}, r pcommon.Resource) { + if attrs, ok := resource["attributes"].([]interface{}); ok { + parseAttributes(attrs, r.Attributes()) + } +} + +func parseScopeSpan(ssMap map[string]interface{}, ss ptrace.ScopeSpans) { + if scope, ok := ssMap["scope"].(map[string]interface{}); ok { + parseScope(scope, ss.Scope()) + } + + if spans, ok := ssMap["spans"].([]interface{}); ok { + for _, span := range spans { + if spanMap, ok := span.(map[string]interface{}); ok { + parseSpan(spanMap, ss.Spans().AppendEmpty()) + } + } + } +} + +func parseScope(scope map[string]interface{}, s pcommon.InstrumentationScope) { + if name, ok := scope["name"].(string); ok { + s.SetName(name) + } + if version, ok := scope["version"].(string); ok { + s.SetVersion(version) + } + if attrs, ok := scope["attributes"].([]interface{}); ok { + parseAttributes(attrs, s.Attributes()) + } +} + +func parseStatus(status map[string]interface{}, spanStatus ptrace.Status) { + if code, ok := status["code"].(float64); ok { + spanStatus.SetCode(ptrace.StatusCode(int32(code))) + } + if message, ok := status["message"].(string); ok { + spanStatus.SetMessage(message) + } +} + +func parseSpan(spanMap map[string]interface{}, span ptrace.Span) { + if name, ok := spanMap["name"].(string); ok { + span.SetName(name) + } + if traceIDStr, ok := spanMap["traceId"].(string); ok { + if traceID, err := hex.DecodeString(traceIDStr); err == nil && len(traceID) == 16 { + span.SetTraceID(pcommon.TraceID(traceID)) + } + } + if spanIDStr, ok := spanMap["spanId"].(string); ok { + if spanID, err := hex.DecodeString(spanIDStr); err == nil && len(spanID) == 8 { + span.SetSpanID(pcommon.SpanID(spanID)) + } + } + if parentSpanIDStr, ok := spanMap["parentSpanId"].(string); ok { + if parentSpanID, err := hex.DecodeString(parentSpanIDStr); err == nil && len(parentSpanID) == 8 { + span.SetParentSpanID(pcommon.SpanID(parentSpanID)) + } + } + if kindVal, ok := spanMap["kind"].(float64); ok { + span.SetKind(ptrace.SpanKind(int32(kindVal))) + } + if startTime, ok := spanMap["startTimeUnixNano"].(string); ok { + if startTimeInt, err := strconv.ParseUint(startTime, 10, 64); err == nil { + span.SetStartTimestamp(pcommon.Timestamp(startTimeInt)) + } + } + if endTime, ok := spanMap["endTimeUnixNano"].(string); ok { + if endTimeInt, err := strconv.ParseUint(endTime, 10, 64); err == nil { + span.SetEndTimestamp(pcommon.Timestamp(endTimeInt)) + } + } + if status, ok := spanMap["status"].(map[string]interface{}); ok { + parseStatus(status, span.Status()) + } + if attrs, ok := spanMap["attributes"].([]interface{}); ok { + parseAttributes(attrs, span.Attributes()) + } +} + +func parseAttributes(attrs []interface{}, dest pcommon.Map) { + for _, attr := range attrs { + if attrMap, ok := attr.(map[string]interface{}); ok { + key, _ := attrMap["key"].(string) + value, _ := attrMap["value"].(map[string]interface{}) + if strVal, ok := value["stringValue"].(string); ok { + dest.PutStr(key, strVal) + } + //TODO: WIll be adding more value types as needed + } + } +} \ No newline at end of file diff --git a/receiver/kafkareceiver/otlp_json_unmarshaler_test.go b/receiver/kafkareceiver/otlp_json_unmarshaler_test.go new file mode 100644 index 000000000000..fbe24458717b --- /dev/null +++ b/receiver/kafkareceiver/otlp_json_unmarshaler_test.go @@ -0,0 +1,241 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package kafkareceiver + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func TestOTLPJSONTracesUnmarshalerEncoding(t *testing.T) { + unmarshaler := newOTLPJSONTracesUnmarshaler() + assert.Equal(t, "otlp_json", unmarshaler.Encoding()) +} + +func TestOTLPJSONTracesUnmarshal(t *testing.T) { + unmarshaler := newOTLPJSONTracesUnmarshaler() + + tests := []struct { + name string + json string + wantErr bool + validate func(t *testing.T, traces ptrace.Traces) + }{ + { + name: "valid_single_span", + json: `{ + "resourceSpans": [{ + "resource": { + "attributes": [{ + "key": "service.name", + "value": {"stringValue": "test_service"} + }] + }, + "scopeSpans": [{ + "scope": { + "name": "test_scope", + "version": "1.0.0" + }, + "spans": [{ + "traceId": "5B8EFFF798038103D269B633813FC60C", + "spanId": "EEE19B7EC3C1B173", + "parentSpanId": "FFFFFFFFFFFFFFFF", + "name": "test_span", + "kind": 1, + "startTimeUnixNano": "1544712660300000000", + "endTimeUnixNano": "1544712661300000000", + "attributes": [{ + "key": "attr1", + "value": {"stringValue": "val1"} + }] + }] + }] + }] + }`, + wantErr: false, + validate: func(t *testing.T, traces ptrace.Traces) { + assert.Equal(t, 1, traces.ResourceSpans().Len()) + rs := traces.ResourceSpans().At(0) + v, ok := rs.Resource().Attributes().Get("service.name") + assert.True(t, ok) + assert.Equal(t, "test_service", v.Str()) + assert.Equal(t, 1, rs.ScopeSpans().Len()) + ss := rs.ScopeSpans().At(0) + assert.Equal(t, "test_scope", ss.Scope().Name()) + assert.Equal(t, "1.0.0", ss.Scope().Version()) + assert.Equal(t, 1, ss.Spans().Len()) + span := ss.Spans().At(0) + assert.Equal(t, "test_span", span.Name()) + assert.Equal(t, pcommon.TraceID([16]byte{0x5B, 0x8E, 0xFF, 0xF7, 0x98, 0x03, 0x81, 0x03, 0xD2, 0x69, 0xB6, 0x33, 0x81, 0x3F, 0xC6, 0x0C}), span.TraceID()) + assert.Equal(t, pcommon.SpanID([8]byte{0xEE, 0xE1, 0x9B, 0x7E, 0xC3, 0xC1, 0xB1, 0x73}), span.SpanID()) + v, ok = span.Attributes().Get("attr1") + assert.True(t, ok) + assert.Equal(t, "val1", v.Str()) + }, + }, + { + name: "invalid_json", + json: `{invalid_json`, + wantErr: true, + }, + { + name: "empty_json", + json: `{}`, + wantErr: false, + validate: func(t *testing.T, traces ptrace.Traces) { + assert.Equal(t, 0, traces.ResourceSpans().Len()) + }, + }, + { + name: "missing_resource_spans", + json: `{"someOtherField": []}`, + wantErr: false, + validate: func(t *testing.T, traces ptrace.Traces) { + assert.Equal(t, 0, traces.ResourceSpans().Len()) + }, + }, + { + name: "empty_resource_spans", + json: `{"resourceSpans": []}`, + wantErr: false, + validate: func(t *testing.T, traces ptrace.Traces) { + assert.Equal(t, 0, traces.ResourceSpans().Len()) + }, + }, + { + name: "multiple_resource_spans", + json: `{ + "resourceSpans": [ + { + "resource": { + "attributes": [{ + "key": "service.name", + "value": {"stringValue": "service1"} + }] + }, + "scopeSpans": [{ + "scope": {"name": "scope1"}, + "spans": [{"name": "span1"}] + }] + }, + { + "resource": { + "attributes": [{ + "key": "service.name", + "value": {"stringValue": "service2"} + }] + }, + "scopeSpans": [{ + "scope": {"name": "scope2"}, + "spans": [{"name": "span2"}] + }] + } + ] + }`, + wantErr: false, + validate: func(t *testing.T, traces ptrace.Traces) { + assert.Equal(t, 2, traces.ResourceSpans().Len()) + v, ok := traces.ResourceSpans().At(0).Resource().Attributes().Get("service.name") + assert.True(t, ok) + assert.Equal(t, "service1", v.Str()) + v, ok = traces.ResourceSpans().At(1).Resource().Attributes().Get("service.name") + assert.True(t, ok) + assert.Equal(t, "service2", v.Str()) + assert.Equal(t, "span1", traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Name()) + assert.Equal(t, "span2", traces.ResourceSpans().At(1).ScopeSpans().At(0).Spans().At(0).Name()) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + traces, err := unmarshaler.Unmarshal([]byte(tt.json)) + if tt.wantErr { + assert.Error(t, err) + return + } + require.NoError(t, err) + tt.validate(t, traces) + }) + } +} + +func TestParseAttributes(t *testing.T) { + attrs := []interface{}{ + map[string]interface{}{ + "key": "stringAttr", + "value": map[string]interface{}{"stringValue": "test"}, + }, + map[string]interface{}{ + "key": "intAttr", + "value": map[string]interface{}{"intValue": float64(123)}, // JSON numbers are float64 + }, + map[string]interface{}{ + "key": "doubleAttr", + "value": map[string]interface{}{"doubleValue": 123.456}, + }, + map[string]interface{}{ + "key": "boolAttr", + "value": map[string]interface{}{"boolValue": true}, + }, + } + + dest := pcommon.NewMap() + parseAttributes(attrs, dest) + + v, ok := dest.Get("stringAttr") + assert.True(t, ok) + assert.Equal(t, "test", v.Str()) + + v, ok = dest.Get("intAttr") + assert.True(t, ok) + assert.Equal(t, int64(123), v.Int()) + + v, ok = dest.Get("doubleAttr") + assert.True(t, ok) + assert.Equal(t, 123.456, v.Double()) + + v, ok = dest.Get("boolAttr") + assert.True(t, ok) + assert.True(t, v.Bool()) +} + +func TestParseSpan(t *testing.T) { + spanMap := map[string]interface{}{ + "name": "test_span", + "traceId": "5B8EFFF798038103D269B633813FC60C", + "spanId": "EEE19B7EC3C1B173", + "parentSpanId": "FFFFFFFFFFFFFFFF", + "kind": 2, + "startTimeUnixNano": "1544712660300000000", + "endTimeUnixNano": "1544712661300000000", + "status": map[string]interface{}{"code": 1, "message": "OK"}, + "attributes": []interface{}{ + map[string]interface{}{ + "key": "attr1", + "value": map[string]interface{}{"stringValue": "val1"}, + }, + }, + } + + span := ptrace.NewSpan() + parseSpan(spanMap, span) + + assert.Equal(t, "test_span", span.Name()) + assert.Equal(t, pcommon.TraceID([16]byte{0x5B, 0x8E, 0xFF, 0xF7, 0x98, 0x03, 0x81, 0x03, 0xD2, 0x69, 0xB6, 0x33, 0x81, 0x3F, 0xC6, 0x0C}), span.TraceID()) + assert.Equal(t, pcommon.SpanID([8]byte{0xEE, 0xE1, 0x9B, 0x7E, 0xC3, 0xC1, 0xB1, 0x73}), span.SpanID()) + assert.Equal(t, pcommon.SpanID([8]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}), span.ParentSpanID()) + assert.Equal(t, ptrace.SpanKindServer, span.Kind()) + assert.Equal(t, uint64(1544712660300000000), span.StartTimestamp()) + assert.Equal(t, uint64(1544712661300000000), span.EndTimestamp()) + assert.Equal(t, ptrace.StatusCodeOk, span.Status().Code()) + assert.Equal(t, "OK", span.Status().Message()) + v, ok := span.Attributes().Get("attr1") + assert.True(t, ok) + assert.Equal(t, "val1", v.Str()) +} \ No newline at end of file diff --git a/receiver/kafkareceiver/unmarshaler.go b/receiver/kafkareceiver/unmarshaler.go index bf44be7b496e..32dc1fd2a69a 100644 --- a/receiver/kafkareceiver/unmarshaler.go +++ b/receiver/kafkareceiver/unmarshaler.go @@ -50,6 +50,7 @@ type LogsUnmarshalerWithEnc interface { // defaultTracesUnmarshalers returns map of supported encodings with TracesUnmarshaler. func defaultTracesUnmarshalers() map[string]TracesUnmarshaler { otlpPb := newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding) + otlpJSON := newOTLPJSONTracesUnmarshaler() jaegerProto := jaegerProtoSpanUnmarshaler{} jaegerJSON := jaegerJSONSpanUnmarshaler{} zipkinProto := newPdataTracesUnmarshaler(zipkinv2.NewProtobufTracesUnmarshaler(false, false), "zipkin_proto") @@ -57,6 +58,7 @@ func defaultTracesUnmarshalers() map[string]TracesUnmarshaler { zipkinThrift := newPdataTracesUnmarshaler(zipkinv1.NewThriftTracesUnmarshaler(), "zipkin_thrift") return map[string]TracesUnmarshaler{ otlpPb.Encoding(): otlpPb, + otlpJSON.Encoding(): otlpJSON, jaegerProto.Encoding(): jaegerProto, jaegerJSON.Encoding(): jaegerJSON, zipkinProto.Encoding(): zipkinProto, From 237c0a5ddcd12f85511b9ac6fe0852b5e3420993 Mon Sep 17 00:00:00 2001 From: joeyyy09 Date: Mon, 26 Aug 2024 11:59:06 +0530 Subject: [PATCH 2/9] Add otlp_json support in kafka receiver Signed-off-by: joeyyy09 --- .../kafkareceiver/otlp_json_unmarshaler.go | 149 +---------- .../otlp_json_unmarshaler_test.go | 244 ++---------------- 2 files changed, 26 insertions(+), 367 deletions(-) diff --git a/receiver/kafkareceiver/otlp_json_unmarshaler.go b/receiver/kafkareceiver/otlp_json_unmarshaler.go index 89a3121f7fae..9264527175bf 100644 --- a/receiver/kafkareceiver/otlp_json_unmarshaler.go +++ b/receiver/kafkareceiver/otlp_json_unmarshaler.go @@ -1,159 +1,32 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package kafkareceiver +package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver" import ( - "encoding/hex" - "encoding/json" - "fmt" - "strconv" + "go.opentelemetry.io/collector/pdata/ptrace" +) - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/ptrace" +const ( + otlpJSONEncoding = "otlp_json" ) +// otlpJSONTracesUnmarshaler unmarshals OTLP JSON-encoded traces. type otlpJSONTracesUnmarshaler struct { - encoding string + ptrace.Unmarshaler } func (o otlpJSONTracesUnmarshaler) Unmarshal(buf []byte) (ptrace.Traces, error) { - var jsonData map[string]interface{} - if err := json.Unmarshal(buf, &jsonData); err != nil { - return ptrace.NewTraces(), fmt.Errorf("failed to unmarshal JSON: %w", err) - } - - traces := ptrace.NewTraces() - resourceSpans := traces.ResourceSpans() - - // Parse resourceSpans - if resourceSpansArr, ok := jsonData["resourceSpans"].([]interface{}); ok { - for _, rs := range resourceSpansArr { - if rsMap, ok := rs.(map[string]interface{}); ok { - parseResourceSpan(rsMap, resourceSpans.AppendEmpty()) - } - } - } - - return traces, nil + return o.Unmarshaler.UnmarshalTraces(buf) } func (o otlpJSONTracesUnmarshaler) Encoding() string { - return o.encoding + return otlpJSONEncoding } +// newOTLPJSONTracesUnmarshaler creates a new OTLP JSON traces unmarshaler. func newOTLPJSONTracesUnmarshaler() TracesUnmarshaler { return &otlpJSONTracesUnmarshaler{ - encoding: "otlp_json", - } -} - -func parseResourceSpan(rsMap map[string]interface{}, rs ptrace.ResourceSpans) { - // Parse resource - if resource, ok := rsMap["resource"].(map[string]interface{}); ok { - parseResource(resource, rs.Resource()) - } - - // Parse scopeSpans - if scopeSpans, ok := rsMap["scopeSpans"].([]interface{}); ok { - for _, ss := range scopeSpans { - if ssMap, ok := ss.(map[string]interface{}); ok { - parseScopeSpan(ssMap, rs.ScopeSpans().AppendEmpty()) - } - } - } -} - -func parseResource(resource map[string]interface{}, r pcommon.Resource) { - if attrs, ok := resource["attributes"].([]interface{}); ok { - parseAttributes(attrs, r.Attributes()) - } -} - -func parseScopeSpan(ssMap map[string]interface{}, ss ptrace.ScopeSpans) { - if scope, ok := ssMap["scope"].(map[string]interface{}); ok { - parseScope(scope, ss.Scope()) - } - - if spans, ok := ssMap["spans"].([]interface{}); ok { - for _, span := range spans { - if spanMap, ok := span.(map[string]interface{}); ok { - parseSpan(spanMap, ss.Spans().AppendEmpty()) - } - } - } -} - -func parseScope(scope map[string]interface{}, s pcommon.InstrumentationScope) { - if name, ok := scope["name"].(string); ok { - s.SetName(name) - } - if version, ok := scope["version"].(string); ok { - s.SetVersion(version) - } - if attrs, ok := scope["attributes"].([]interface{}); ok { - parseAttributes(attrs, s.Attributes()) - } -} - -func parseStatus(status map[string]interface{}, spanStatus ptrace.Status) { - if code, ok := status["code"].(float64); ok { - spanStatus.SetCode(ptrace.StatusCode(int32(code))) - } - if message, ok := status["message"].(string); ok { - spanStatus.SetMessage(message) - } -} - -func parseSpan(spanMap map[string]interface{}, span ptrace.Span) { - if name, ok := spanMap["name"].(string); ok { - span.SetName(name) - } - if traceIDStr, ok := spanMap["traceId"].(string); ok { - if traceID, err := hex.DecodeString(traceIDStr); err == nil && len(traceID) == 16 { - span.SetTraceID(pcommon.TraceID(traceID)) - } - } - if spanIDStr, ok := spanMap["spanId"].(string); ok { - if spanID, err := hex.DecodeString(spanIDStr); err == nil && len(spanID) == 8 { - span.SetSpanID(pcommon.SpanID(spanID)) - } - } - if parentSpanIDStr, ok := spanMap["parentSpanId"].(string); ok { - if parentSpanID, err := hex.DecodeString(parentSpanIDStr); err == nil && len(parentSpanID) == 8 { - span.SetParentSpanID(pcommon.SpanID(parentSpanID)) - } - } - if kindVal, ok := spanMap["kind"].(float64); ok { - span.SetKind(ptrace.SpanKind(int32(kindVal))) - } - if startTime, ok := spanMap["startTimeUnixNano"].(string); ok { - if startTimeInt, err := strconv.ParseUint(startTime, 10, 64); err == nil { - span.SetStartTimestamp(pcommon.Timestamp(startTimeInt)) - } - } - if endTime, ok := spanMap["endTimeUnixNano"].(string); ok { - if endTimeInt, err := strconv.ParseUint(endTime, 10, 64); err == nil { - span.SetEndTimestamp(pcommon.Timestamp(endTimeInt)) - } - } - if status, ok := spanMap["status"].(map[string]interface{}); ok { - parseStatus(status, span.Status()) - } - if attrs, ok := spanMap["attributes"].([]interface{}); ok { - parseAttributes(attrs, span.Attributes()) - } -} - -func parseAttributes(attrs []interface{}, dest pcommon.Map) { - for _, attr := range attrs { - if attrMap, ok := attr.(map[string]interface{}); ok { - key, _ := attrMap["key"].(string) - value, _ := attrMap["value"].(map[string]interface{}) - if strVal, ok := value["stringValue"].(string); ok { - dest.PutStr(key, strVal) - } - //TODO: WIll be adding more value types as needed - } + Unmarshaler: &ptrace.JSONUnmarshaler{}, } } \ No newline at end of file diff --git a/receiver/kafkareceiver/otlp_json_unmarshaler_test.go b/receiver/kafkareceiver/otlp_json_unmarshaler_test.go index fbe24458717b..ba28d4b58b70 100644 --- a/receiver/kafkareceiver/otlp_json_unmarshaler_test.go +++ b/receiver/kafkareceiver/otlp_json_unmarshaler_test.go @@ -4,238 +4,24 @@ package kafkareceiver import ( - "testing" + "testing" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/ptrace" + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/ptrace" ) -func TestOTLPJSONTracesUnmarshalerEncoding(t *testing.T) { - unmarshaler := newOTLPJSONTracesUnmarshaler() - assert.Equal(t, "otlp_json", unmarshaler.Encoding()) +func TestNewOTLPJSONTracesUnmarshaler(t *testing.T) { + t.Parallel() + um := newOTLPJSONTracesUnmarshaler() + assert.Equal(t, otlpJSONEncoding, um.Encoding()) } -func TestOTLPJSONTracesUnmarshal(t *testing.T) { - unmarshaler := newOTLPJSONTracesUnmarshaler() - - tests := []struct { - name string - json string - wantErr bool - validate func(t *testing.T, traces ptrace.Traces) - }{ - { - name: "valid_single_span", - json: `{ - "resourceSpans": [{ - "resource": { - "attributes": [{ - "key": "service.name", - "value": {"stringValue": "test_service"} - }] - }, - "scopeSpans": [{ - "scope": { - "name": "test_scope", - "version": "1.0.0" - }, - "spans": [{ - "traceId": "5B8EFFF798038103D269B633813FC60C", - "spanId": "EEE19B7EC3C1B173", - "parentSpanId": "FFFFFFFFFFFFFFFF", - "name": "test_span", - "kind": 1, - "startTimeUnixNano": "1544712660300000000", - "endTimeUnixNano": "1544712661300000000", - "attributes": [{ - "key": "attr1", - "value": {"stringValue": "val1"} - }] - }] - }] - }] - }`, - wantErr: false, - validate: func(t *testing.T, traces ptrace.Traces) { - assert.Equal(t, 1, traces.ResourceSpans().Len()) - rs := traces.ResourceSpans().At(0) - v, ok := rs.Resource().Attributes().Get("service.name") - assert.True(t, ok) - assert.Equal(t, "test_service", v.Str()) - assert.Equal(t, 1, rs.ScopeSpans().Len()) - ss := rs.ScopeSpans().At(0) - assert.Equal(t, "test_scope", ss.Scope().Name()) - assert.Equal(t, "1.0.0", ss.Scope().Version()) - assert.Equal(t, 1, ss.Spans().Len()) - span := ss.Spans().At(0) - assert.Equal(t, "test_span", span.Name()) - assert.Equal(t, pcommon.TraceID([16]byte{0x5B, 0x8E, 0xFF, 0xF7, 0x98, 0x03, 0x81, 0x03, 0xD2, 0x69, 0xB6, 0x33, 0x81, 0x3F, 0xC6, 0x0C}), span.TraceID()) - assert.Equal(t, pcommon.SpanID([8]byte{0xEE, 0xE1, 0x9B, 0x7E, 0xC3, 0xC1, 0xB1, 0x73}), span.SpanID()) - v, ok = span.Attributes().Get("attr1") - assert.True(t, ok) - assert.Equal(t, "val1", v.Str()) - }, - }, - { - name: "invalid_json", - json: `{invalid_json`, - wantErr: true, - }, - { - name: "empty_json", - json: `{}`, - wantErr: false, - validate: func(t *testing.T, traces ptrace.Traces) { - assert.Equal(t, 0, traces.ResourceSpans().Len()) - }, - }, - { - name: "missing_resource_spans", - json: `{"someOtherField": []}`, - wantErr: false, - validate: func(t *testing.T, traces ptrace.Traces) { - assert.Equal(t, 0, traces.ResourceSpans().Len()) - }, - }, - { - name: "empty_resource_spans", - json: `{"resourceSpans": []}`, - wantErr: false, - validate: func(t *testing.T, traces ptrace.Traces) { - assert.Equal(t, 0, traces.ResourceSpans().Len()) - }, - }, - { - name: "multiple_resource_spans", - json: `{ - "resourceSpans": [ - { - "resource": { - "attributes": [{ - "key": "service.name", - "value": {"stringValue": "service1"} - }] - }, - "scopeSpans": [{ - "scope": {"name": "scope1"}, - "spans": [{"name": "span1"}] - }] - }, - { - "resource": { - "attributes": [{ - "key": "service.name", - "value": {"stringValue": "service2"} - }] - }, - "scopeSpans": [{ - "scope": {"name": "scope2"}, - "spans": [{"name": "span2"}] - }] - } - ] - }`, - wantErr: false, - validate: func(t *testing.T, traces ptrace.Traces) { - assert.Equal(t, 2, traces.ResourceSpans().Len()) - v, ok := traces.ResourceSpans().At(0).Resource().Attributes().Get("service.name") - assert.True(t, ok) - assert.Equal(t, "service1", v.Str()) - v, ok = traces.ResourceSpans().At(1).Resource().Attributes().Get("service.name") - assert.True(t, ok) - assert.Equal(t, "service2", v.Str()) - assert.Equal(t, "span1", traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Name()) - assert.Equal(t, "span2", traces.ResourceSpans().At(1).ScopeSpans().At(0).Spans().At(0).Name()) - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - traces, err := unmarshaler.Unmarshal([]byte(tt.json)) - if tt.wantErr { - assert.Error(t, err) - return - } - require.NoError(t, err) - tt.validate(t, traces) - }) - } -} - -func TestParseAttributes(t *testing.T) { - attrs := []interface{}{ - map[string]interface{}{ - "key": "stringAttr", - "value": map[string]interface{}{"stringValue": "test"}, - }, - map[string]interface{}{ - "key": "intAttr", - "value": map[string]interface{}{"intValue": float64(123)}, // JSON numbers are float64 - }, - map[string]interface{}{ - "key": "doubleAttr", - "value": map[string]interface{}{"doubleValue": 123.456}, - }, - map[string]interface{}{ - "key": "boolAttr", - "value": map[string]interface{}{"boolValue": true}, - }, - } - - dest := pcommon.NewMap() - parseAttributes(attrs, dest) - - v, ok := dest.Get("stringAttr") - assert.True(t, ok) - assert.Equal(t, "test", v.Str()) - - v, ok = dest.Get("intAttr") - assert.True(t, ok) - assert.Equal(t, int64(123), v.Int()) - - v, ok = dest.Get("doubleAttr") - assert.True(t, ok) - assert.Equal(t, 123.456, v.Double()) - - v, ok = dest.Get("boolAttr") - assert.True(t, ok) - assert.True(t, v.Bool()) -} - -func TestParseSpan(t *testing.T) { - spanMap := map[string]interface{}{ - "name": "test_span", - "traceId": "5B8EFFF798038103D269B633813FC60C", - "spanId": "EEE19B7EC3C1B173", - "parentSpanId": "FFFFFFFFFFFFFFFF", - "kind": 2, - "startTimeUnixNano": "1544712660300000000", - "endTimeUnixNano": "1544712661300000000", - "status": map[string]interface{}{"code": 1, "message": "OK"}, - "attributes": []interface{}{ - map[string]interface{}{ - "key": "attr1", - "value": map[string]interface{}{"stringValue": "val1"}, - }, - }, - } - - span := ptrace.NewSpan() - parseSpan(spanMap, span) - - assert.Equal(t, "test_span", span.Name()) - assert.Equal(t, pcommon.TraceID([16]byte{0x5B, 0x8E, 0xFF, 0xF7, 0x98, 0x03, 0x81, 0x03, 0xD2, 0x69, 0xB6, 0x33, 0x81, 0x3F, 0xC6, 0x0C}), span.TraceID()) - assert.Equal(t, pcommon.SpanID([8]byte{0xEE, 0xE1, 0x9B, 0x7E, 0xC3, 0xC1, 0xB1, 0x73}), span.SpanID()) - assert.Equal(t, pcommon.SpanID([8]byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}), span.ParentSpanID()) - assert.Equal(t, ptrace.SpanKindServer, span.Kind()) - assert.Equal(t, uint64(1544712660300000000), span.StartTimestamp()) - assert.Equal(t, uint64(1544712661300000000), span.EndTimestamp()) - assert.Equal(t, ptrace.StatusCodeOk, span.Status().Code()) - assert.Equal(t, "OK", span.Status().Message()) - v, ok := span.Attributes().Get("attr1") - assert.True(t, ok) - assert.Equal(t, "val1", v.Str()) +func TestOTLPJSONTracesUnmarshalerReturnType(t *testing.T) { + t.Parallel() + um := newOTLPJSONTracesUnmarshaler() + json := `{"resourceSpans":[{"resource":{},"scopeSpans":[{"scope":{},"spans":[]}]}]}` + unmarshaledTraces, err := um.Unmarshal([]byte(json)) + assert.NoError(t, err) + var expectedType ptrace.Traces + assert.IsType(t, expectedType, unmarshaledTraces) } \ No newline at end of file From 96a41ed38d789b4230ad7ab4fe3000981e8c151b Mon Sep 17 00:00:00 2001 From: joeyyy09 Date: Mon, 26 Aug 2024 12:06:07 +0530 Subject: [PATCH 3/9] Update tests and documentation Signed-off-by: joeyyy09 --- receiver/kafkareceiver/README.md | 1 + receiver/kafkareceiver/unmarshaler_test.go | 1 + 2 files changed, 2 insertions(+) diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index 84dc5a8dcc9e..751c552723f2 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -31,6 +31,7 @@ The following settings can be optionally configured: Only one telemetry type may be used for a given topic. - `encoding` (default = otlp_proto): The encoding of the payload received from kafka. Available encodings: - `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`, `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively. + - `otlp_json`: The payload is deserialized into OpenTelemetry traces using JSON format.This encoding uses the `ptrace.JSONUnmarshaler` to convert the JSON-encoded trace data into the internal OpenTelemetry format. The expected JSON structure includes resourceSpans containing trace data. - `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`. - `jaeger_json`: the payload is deserialized to a single Jaeger JSON Span using `jsonpb`. - `zipkin_proto`: the payload is deserialized into a list of Zipkin proto spans. diff --git a/receiver/kafkareceiver/unmarshaler_test.go b/receiver/kafkareceiver/unmarshaler_test.go index fd1f998ee0a7..8fb500169322 100644 --- a/receiver/kafkareceiver/unmarshaler_test.go +++ b/receiver/kafkareceiver/unmarshaler_test.go @@ -14,6 +14,7 @@ import ( func TestDefaultTracesUnMarshaler(t *testing.T) { expectedEncodings := []string{ "otlp_proto", + "otlp_json", "jaeger_proto", "jaeger_json", "zipkin_proto", From 2f3cef08baf99e4068a7aeb68558f59973c9e6da Mon Sep 17 00:00:00 2001 From: joeyyy09 Date: Mon, 26 Aug 2024 19:59:21 +0530 Subject: [PATCH 4/9] Add support for metrics, logs Signed-off-by: joeyyy09 --- receiver/kafkareceiver/README.md | 2 +- .../kafkareceiver/otlp_json_unmarshaler.go | 25 ++++++-------- .../otlp_json_unmarshaler_test.go | 34 +++++++++++++++++++ receiver/kafkareceiver/unmarshaler.go | 4 +++ receiver/kafkareceiver/unmarshaler_test.go | 2 ++ 5 files changed, 52 insertions(+), 15 deletions(-) diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index 751c552723f2..4bfa192dad6e 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -31,7 +31,7 @@ The following settings can be optionally configured: Only one telemetry type may be used for a given topic. - `encoding` (default = otlp_proto): The encoding of the payload received from kafka. Available encodings: - `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`, `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively. - - `otlp_json`: The payload is deserialized into OpenTelemetry traces using JSON format.This encoding uses the `ptrace.JSONUnmarshaler` to convert the JSON-encoded trace data into the internal OpenTelemetry format. The expected JSON structure includes resourceSpans containing trace data. + - `otlp_json`: the payload is deserialized to `ExportTraceServiceRequest` `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively using JSON encoding - `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`. - `jaeger_json`: the payload is deserialized to a single Jaeger JSON Span using `jsonpb`. - `zipkin_proto`: the payload is deserialized into a list of Zipkin proto spans. diff --git a/receiver/kafkareceiver/otlp_json_unmarshaler.go b/receiver/kafkareceiver/otlp_json_unmarshaler.go index 9264527175bf..a96d472a5fe6 100644 --- a/receiver/kafkareceiver/otlp_json_unmarshaler.go +++ b/receiver/kafkareceiver/otlp_json_unmarshaler.go @@ -4,6 +4,8 @@ package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver" import ( + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -11,22 +13,17 @@ const ( otlpJSONEncoding = "otlp_json" ) -// otlpJSONTracesUnmarshaler unmarshals OTLP JSON-encoded traces. -type otlpJSONTracesUnmarshaler struct { - ptrace.Unmarshaler -} - -func (o otlpJSONTracesUnmarshaler) Unmarshal(buf []byte) (ptrace.Traces, error) { - return o.Unmarshaler.UnmarshalTraces(buf) +// newOTLPJSONTracesUnmarshaler creates a new OTLP JSON traces unmarshaler. +func newOTLPJSONTracesUnmarshaler() TracesUnmarshaler { + return newPdataTracesUnmarshaler(&ptrace.JSONUnmarshaler{}, otlpJSONEncoding) } -func (o otlpJSONTracesUnmarshaler) Encoding() string { - return otlpJSONEncoding +// newOTLPJSONMetricsUnmarshaler creates a new OTLP JSON metrics unmarshaler. +func newOTLPJSONMetricsUnmarshaler() MetricsUnmarshaler { + return newPdataMetricsUnmarshaler(&pmetric.JSONUnmarshaler{}, otlpJSONEncoding) } -// newOTLPJSONTracesUnmarshaler creates a new OTLP JSON traces unmarshaler. -func newOTLPJSONTracesUnmarshaler() TracesUnmarshaler { - return &otlpJSONTracesUnmarshaler{ - Unmarshaler: &ptrace.JSONUnmarshaler{}, - } +// newOTLPJSONLogsUnmarshaler creates a new OTLP JSON logs unmarshaler. +func newOTLPJSONLogsUnmarshaler() LogsUnmarshaler { + return newPdataLogsUnmarshaler(&plog.JSONUnmarshaler{}, otlpJSONEncoding) } \ No newline at end of file diff --git a/receiver/kafkareceiver/otlp_json_unmarshaler_test.go b/receiver/kafkareceiver/otlp_json_unmarshaler_test.go index ba28d4b58b70..cc3f0c1db897 100644 --- a/receiver/kafkareceiver/otlp_json_unmarshaler_test.go +++ b/receiver/kafkareceiver/otlp_json_unmarshaler_test.go @@ -7,6 +7,8 @@ import ( "testing" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -24,4 +26,36 @@ func TestOTLPJSONTracesUnmarshalerReturnType(t *testing.T) { assert.NoError(t, err) var expectedType ptrace.Traces assert.IsType(t, expectedType, unmarshaledTraces) +} + +func TestNewOTLPJSONMetricsUnmarshaler(t *testing.T) { + t.Parallel() + um := newOTLPJSONMetricsUnmarshaler() + assert.Equal(t, otlpJSONEncoding, um.Encoding()) +} + +func TestOTLPJSONMetricsUnmarshalerReturnType(t *testing.T) { + t.Parallel() + um := newOTLPJSONMetricsUnmarshaler() + json := `{"resourceMetrics":[{"resource":{},"scopeMetrics":[{"scope":{},"metrics":[]}]}]}` + unmarshaledMetrics, err := um.Unmarshal([]byte(json)) + assert.NoError(t, err) + var expectedType pmetric.Metrics + assert.IsType(t, expectedType, unmarshaledMetrics) +} + +func TestNewOTLPJSONLogsUnmarshaler(t *testing.T) { + t.Parallel() + um := newOTLPJSONLogsUnmarshaler() + assert.Equal(t, otlpJSONEncoding, um.Encoding()) +} + +func TestOTLPJSONLogsUnmarshalerReturnType(t *testing.T) { + t.Parallel() + um := newOTLPJSONLogsUnmarshaler() + json := `{"resourceLogs":[{"resource":{},"scopeLogs":[{"scope":{},"logRecords":[]}]}]}` + unmarshaledLogs, err := um.Unmarshal([]byte(json)) + assert.NoError(t, err) + var expectedType plog.Logs + assert.IsType(t, expectedType, unmarshaledLogs) } \ No newline at end of file diff --git a/receiver/kafkareceiver/unmarshaler.go b/receiver/kafkareceiver/unmarshaler.go index 32dc1fd2a69a..a772e056e036 100644 --- a/receiver/kafkareceiver/unmarshaler.go +++ b/receiver/kafkareceiver/unmarshaler.go @@ -69,19 +69,23 @@ func defaultTracesUnmarshalers() map[string]TracesUnmarshaler { func defaultMetricsUnmarshalers() map[string]MetricsUnmarshaler { otlpPb := newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding) + otlpJSON := newOTLPJSONMetricsUnmarshaler() return map[string]MetricsUnmarshaler{ otlpPb.Encoding(): otlpPb, + otlpJSON.Encoding(): otlpJSON, } } func defaultLogsUnmarshalers(version string, logger *zap.Logger) map[string]LogsUnmarshaler { azureResourceLogs := newAzureResourceLogsUnmarshaler(version, logger) + otlpJSON := newOTLPJSONLogsUnmarshaler() otlpPb := newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding) raw := newRawLogsUnmarshaler() text := newTextLogsUnmarshaler() json := newJSONLogsUnmarshaler() return map[string]LogsUnmarshaler{ azureResourceLogs.Encoding(): azureResourceLogs, + otlpJSON.Encoding(): otlpJSON, otlpPb.Encoding(): otlpPb, raw.Encoding(): raw, text.Encoding(): text, diff --git a/receiver/kafkareceiver/unmarshaler_test.go b/receiver/kafkareceiver/unmarshaler_test.go index 8fb500169322..bb86ab8dfcd5 100644 --- a/receiver/kafkareceiver/unmarshaler_test.go +++ b/receiver/kafkareceiver/unmarshaler_test.go @@ -35,6 +35,7 @@ func TestDefaultTracesUnMarshaler(t *testing.T) { func TestDefaultMetricsUnMarshaler(t *testing.T) { expectedEncodings := []string{ "otlp_proto", + "otlp_json", } marshalers := defaultMetricsUnmarshalers() assert.Equal(t, len(expectedEncodings), len(marshalers)) @@ -50,6 +51,7 @@ func TestDefaultMetricsUnMarshaler(t *testing.T) { func TestDefaultLogsUnMarshaler(t *testing.T) { expectedEncodings := []string{ "otlp_proto", + "otlp_json", "raw", "text", "json", From a6a5d571536d7744440acf85597f4282d0c02220 Mon Sep 17 00:00:00 2001 From: joeyyy09 Date: Mon, 26 Aug 2024 20:07:50 +0530 Subject: [PATCH 5/9] Add changelog entry for otlp_json support in Kafka receiver --- .chloggen/otlp_logs.yaml | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 .chloggen/otlp_logs.yaml diff --git a/.chloggen/otlp_logs.yaml b/.chloggen/otlp_logs.yaml new file mode 100644 index 000000000000..239dacb1b21e --- /dev/null +++ b/.chloggen/otlp_logs.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkareceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add support for `otlp_json` encoding to Kafka receiver. The payload is deserialized into OpenTelemetry traces using JSON format." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33627] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + This encoding allows the Kafka receiver to handle trace data in JSON format, + enabling integration with systems that export traces as JSON-encoded data. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] From 5d0b74291d4adb0af44ce7ad6c321a8d9b2285b9 Mon Sep 17 00:00:00 2001 From: joeyyy09 Date: Mon, 26 Aug 2024 20:19:30 +0530 Subject: [PATCH 6/9] Maintain consistency Signed-off-by: joeyyy09 --- receiver/kafkareceiver/unmarshaler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/receiver/kafkareceiver/unmarshaler.go b/receiver/kafkareceiver/unmarshaler.go index a772e056e036..0c6f7ddd859f 100644 --- a/receiver/kafkareceiver/unmarshaler.go +++ b/receiver/kafkareceiver/unmarshaler.go @@ -78,15 +78,15 @@ func defaultMetricsUnmarshalers() map[string]MetricsUnmarshaler { func defaultLogsUnmarshalers(version string, logger *zap.Logger) map[string]LogsUnmarshaler { azureResourceLogs := newAzureResourceLogsUnmarshaler(version, logger) - otlpJSON := newOTLPJSONLogsUnmarshaler() otlpPb := newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding) + otlpJSON := newOTLPJSONLogsUnmarshaler() raw := newRawLogsUnmarshaler() text := newTextLogsUnmarshaler() json := newJSONLogsUnmarshaler() return map[string]LogsUnmarshaler{ azureResourceLogs.Encoding(): azureResourceLogs, - otlpJSON.Encoding(): otlpJSON, otlpPb.Encoding(): otlpPb, + otlpJSON.Encoding(): otlpJSON, raw.Encoding(): raw, text.Encoding(): text, json.Encoding(): json, From ba7e5a0b04377d0102f4d51630b5ebd3c00804bf Mon Sep 17 00:00:00 2001 From: joeyyy09 Date: Tue, 3 Sep 2024 16:31:53 +0530 Subject: [PATCH 7/9] Instantiate inline Signed-off-by: joeyyy09 --- .../kafkareceiver/otlp_json_unmarshaler.go | 29 --------- .../otlp_json_unmarshaler_test.go | 61 ------------------- receiver/kafkareceiver/unmarshaler.go | 6 +- 3 files changed, 3 insertions(+), 93 deletions(-) delete mode 100644 receiver/kafkareceiver/otlp_json_unmarshaler.go delete mode 100644 receiver/kafkareceiver/otlp_json_unmarshaler_test.go diff --git a/receiver/kafkareceiver/otlp_json_unmarshaler.go b/receiver/kafkareceiver/otlp_json_unmarshaler.go deleted file mode 100644 index a96d472a5fe6..000000000000 --- a/receiver/kafkareceiver/otlp_json_unmarshaler.go +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver" - -import ( - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/ptrace" -) - -const ( - otlpJSONEncoding = "otlp_json" -) - -// newOTLPJSONTracesUnmarshaler creates a new OTLP JSON traces unmarshaler. -func newOTLPJSONTracesUnmarshaler() TracesUnmarshaler { - return newPdataTracesUnmarshaler(&ptrace.JSONUnmarshaler{}, otlpJSONEncoding) -} - -// newOTLPJSONMetricsUnmarshaler creates a new OTLP JSON metrics unmarshaler. -func newOTLPJSONMetricsUnmarshaler() MetricsUnmarshaler { - return newPdataMetricsUnmarshaler(&pmetric.JSONUnmarshaler{}, otlpJSONEncoding) -} - -// newOTLPJSONLogsUnmarshaler creates a new OTLP JSON logs unmarshaler. -func newOTLPJSONLogsUnmarshaler() LogsUnmarshaler { - return newPdataLogsUnmarshaler(&plog.JSONUnmarshaler{}, otlpJSONEncoding) -} \ No newline at end of file diff --git a/receiver/kafkareceiver/otlp_json_unmarshaler_test.go b/receiver/kafkareceiver/otlp_json_unmarshaler_test.go deleted file mode 100644 index cc3f0c1db897..000000000000 --- a/receiver/kafkareceiver/otlp_json_unmarshaler_test.go +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package kafkareceiver - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/ptrace" -) - -func TestNewOTLPJSONTracesUnmarshaler(t *testing.T) { - t.Parallel() - um := newOTLPJSONTracesUnmarshaler() - assert.Equal(t, otlpJSONEncoding, um.Encoding()) -} - -func TestOTLPJSONTracesUnmarshalerReturnType(t *testing.T) { - t.Parallel() - um := newOTLPJSONTracesUnmarshaler() - json := `{"resourceSpans":[{"resource":{},"scopeSpans":[{"scope":{},"spans":[]}]}]}` - unmarshaledTraces, err := um.Unmarshal([]byte(json)) - assert.NoError(t, err) - var expectedType ptrace.Traces - assert.IsType(t, expectedType, unmarshaledTraces) -} - -func TestNewOTLPJSONMetricsUnmarshaler(t *testing.T) { - t.Parallel() - um := newOTLPJSONMetricsUnmarshaler() - assert.Equal(t, otlpJSONEncoding, um.Encoding()) -} - -func TestOTLPJSONMetricsUnmarshalerReturnType(t *testing.T) { - t.Parallel() - um := newOTLPJSONMetricsUnmarshaler() - json := `{"resourceMetrics":[{"resource":{},"scopeMetrics":[{"scope":{},"metrics":[]}]}]}` - unmarshaledMetrics, err := um.Unmarshal([]byte(json)) - assert.NoError(t, err) - var expectedType pmetric.Metrics - assert.IsType(t, expectedType, unmarshaledMetrics) -} - -func TestNewOTLPJSONLogsUnmarshaler(t *testing.T) { - t.Parallel() - um := newOTLPJSONLogsUnmarshaler() - assert.Equal(t, otlpJSONEncoding, um.Encoding()) -} - -func TestOTLPJSONLogsUnmarshalerReturnType(t *testing.T) { - t.Parallel() - um := newOTLPJSONLogsUnmarshaler() - json := `{"resourceLogs":[{"resource":{},"scopeLogs":[{"scope":{},"logRecords":[]}]}]}` - unmarshaledLogs, err := um.Unmarshal([]byte(json)) - assert.NoError(t, err) - var expectedType plog.Logs - assert.IsType(t, expectedType, unmarshaledLogs) -} \ No newline at end of file diff --git a/receiver/kafkareceiver/unmarshaler.go b/receiver/kafkareceiver/unmarshaler.go index 0c6f7ddd859f..9963e6637ec1 100644 --- a/receiver/kafkareceiver/unmarshaler.go +++ b/receiver/kafkareceiver/unmarshaler.go @@ -50,7 +50,7 @@ type LogsUnmarshalerWithEnc interface { // defaultTracesUnmarshalers returns map of supported encodings with TracesUnmarshaler. func defaultTracesUnmarshalers() map[string]TracesUnmarshaler { otlpPb := newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding) - otlpJSON := newOTLPJSONTracesUnmarshaler() + otlpJSON := newPdataTracesUnmarshaler(&ptrace.JSONUnmarshaler{}, "otlp_json") jaegerProto := jaegerProtoSpanUnmarshaler{} jaegerJSON := jaegerJSONSpanUnmarshaler{} zipkinProto := newPdataTracesUnmarshaler(zipkinv2.NewProtobufTracesUnmarshaler(false, false), "zipkin_proto") @@ -69,7 +69,7 @@ func defaultTracesUnmarshalers() map[string]TracesUnmarshaler { func defaultMetricsUnmarshalers() map[string]MetricsUnmarshaler { otlpPb := newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding) - otlpJSON := newOTLPJSONMetricsUnmarshaler() + otlpJSON := newPdataMetricsUnmarshaler(&pmetric.JSONUnmarshaler{}, "otlp_json") return map[string]MetricsUnmarshaler{ otlpPb.Encoding(): otlpPb, otlpJSON.Encoding(): otlpJSON, @@ -79,7 +79,7 @@ func defaultMetricsUnmarshalers() map[string]MetricsUnmarshaler { func defaultLogsUnmarshalers(version string, logger *zap.Logger) map[string]LogsUnmarshaler { azureResourceLogs := newAzureResourceLogsUnmarshaler(version, logger) otlpPb := newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding) - otlpJSON := newOTLPJSONLogsUnmarshaler() + otlpJSON := newPdataLogsUnmarshaler(&plog.JSONUnmarshaler{}, "otlp_json") raw := newRawLogsUnmarshaler() text := newTextLogsUnmarshaler() json := newJSONLogsUnmarshaler() From 4d2ca6760b15a60462fa1f870e832149754ea24a Mon Sep 17 00:00:00 2001 From: joeyyy09 Date: Wed, 4 Sep 2024 00:24:03 +0530 Subject: [PATCH 8/9] Lint fix Signed-off-by: joeyyy09 --- receiver/kafkareceiver/unmarshaler.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/receiver/kafkareceiver/unmarshaler.go b/receiver/kafkareceiver/unmarshaler.go index 9963e6637ec1..a60459111a3c 100644 --- a/receiver/kafkareceiver/unmarshaler.go +++ b/receiver/kafkareceiver/unmarshaler.go @@ -17,7 +17,6 @@ import ( type TracesUnmarshaler interface { // Unmarshal deserializes the message body into traces. Unmarshal([]byte) (ptrace.Traces, error) - // Encoding of the serialized messages. Encoding() string } @@ -26,7 +25,6 @@ type TracesUnmarshaler interface { type MetricsUnmarshaler interface { // Unmarshal deserializes the message body into traces Unmarshal([]byte) (pmetric.Metrics, error) - // Encoding of the serialized messages Encoding() string } @@ -35,14 +33,12 @@ type MetricsUnmarshaler interface { type LogsUnmarshaler interface { // Unmarshal deserializes the message body into traces. Unmarshal([]byte) (plog.Logs, error) - // Encoding of the serialized messages. Encoding() string } type LogsUnmarshalerWithEnc interface { LogsUnmarshaler - // WithEnc sets the character encoding (UTF-8, GBK, etc.) of the unmarshaler. WithEnc(string) (LogsUnmarshalerWithEnc, error) } @@ -58,7 +54,7 @@ func defaultTracesUnmarshalers() map[string]TracesUnmarshaler { zipkinThrift := newPdataTracesUnmarshaler(zipkinv1.NewThriftTracesUnmarshaler(), "zipkin_thrift") return map[string]TracesUnmarshaler{ otlpPb.Encoding(): otlpPb, - otlpJSON.Encoding(): otlpJSON, + otlpJSON.Encoding(): otlpJSON, jaegerProto.Encoding(): jaegerProto, jaegerJSON.Encoding(): jaegerJSON, zipkinProto.Encoding(): zipkinProto, @@ -71,7 +67,7 @@ func defaultMetricsUnmarshalers() map[string]MetricsUnmarshaler { otlpPb := newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding) otlpJSON := newPdataMetricsUnmarshaler(&pmetric.JSONUnmarshaler{}, "otlp_json") return map[string]MetricsUnmarshaler{ - otlpPb.Encoding(): otlpPb, + otlpPb.Encoding(): otlpPb, otlpJSON.Encoding(): otlpJSON, } } @@ -86,7 +82,7 @@ func defaultLogsUnmarshalers(version string, logger *zap.Logger) map[string]Logs return map[string]LogsUnmarshaler{ azureResourceLogs.Encoding(): azureResourceLogs, otlpPb.Encoding(): otlpPb, - otlpJSON.Encoding(): otlpJSON, + otlpJSON.Encoding(): otlpJSON, raw.Encoding(): raw, text.Encoding(): text, json.Encoding(): json, From 40ac6c3adbdfa7f8a0334aaf700e03eff7e98d41 Mon Sep 17 00:00:00 2001 From: Harshith Mente <109957201+joeyyy09@users.noreply.github.com> Date: Mon, 9 Sep 2024 18:32:48 +0530 Subject: [PATCH 9/9] nit fix --- receiver/kafkareceiver/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index 49a7afa79d3a..38c014fe8937 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -31,7 +31,7 @@ The following settings can be optionally configured: Only one telemetry type may be used for a given topic. - `encoding` (default = otlp_proto): The encoding of the payload received from kafka. Supports encoding extensions. Tries to load an encoding extension and falls back to internal encodings if no extension was loaded. Available internal encodings: - `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`, `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively. - - `otlp_json`: the payload is deserialized to `ExportTraceServiceRequest` `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively using JSON encoding + - `otlp_json`: the payload is deserialized to `ExportTraceServiceRequest` `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively using JSON encoding. - `jaeger_proto`: the payload is deserialized to a single Jaeger proto `Span`. - `jaeger_json`: the payload is deserialized to a single Jaeger JSON Span using `jsonpb`. - `zipkin_proto`: the payload is deserialized into a list of Zipkin proto spans.