diff --git a/enrichments/trace/internal/elastic/span.go b/enrichments/trace/internal/elastic/span.go index d1fdb0f..ce8e25d 100644 --- a/enrichments/trace/internal/elastic/span.go +++ b/enrichments/trace/internal/elastic/span.go @@ -33,7 +33,8 @@ import ( "github.com/elastic/opentelemetry-lib/enrichments/trace/config" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" - semconv "go.opentelemetry.io/collector/semconv/v1.25.0" + semconv25 "go.opentelemetry.io/collector/semconv/v1.25.0" + semconv27 "go.opentelemetry.io/collector/semconv/v1.27.0" tracepb "go.opentelemetry.io/proto/otlp/trace/v1" "google.golang.org/grpc/codes" ) @@ -68,6 +69,7 @@ type spanEnrichmentContext struct { dbSystem string messagingSystem string messagingDestinationName string + genAiSystem string serverPort int64 urlPort int64 @@ -82,6 +84,7 @@ type spanEnrichmentContext struct { isHTTP bool isDB bool messagingDestinationTemp bool + isGenAi bool } func (s *spanEnrichmentContext) Enrich(span ptrace.Span, cfg config.Config) { @@ -91,86 +94,89 @@ func (s *spanEnrichmentContext) Enrich(span ptrace.Span, cfg config.Config) { // Extract information from span attributes. span.Attributes().Range(func(k string, v pcommon.Value) bool { switch k { - case semconv.AttributePeerService: + case semconv25.AttributePeerService: s.peerService = v.Str() - case semconv.AttributeServerAddress: + case semconv25.AttributeServerAddress: s.serverAddress = v.Str() - case semconv.AttributeServerPort: + case semconv25.AttributeServerPort: s.serverPort = v.Int() - case semconv.AttributeNetPeerName: + case semconv25.AttributeNetPeerName: if s.serverAddress == "" { // net.peer.name is deprecated, so has lower priority // only set when not already set with server.address // and allowed to be overridden by server.address. s.serverAddress = v.Str() } - case semconv.AttributeNetPeerPort: + case semconv25.AttributeNetPeerPort: if s.serverPort == 0 { // net.peer.port is deprecated, so has lower priority // only set when not already set with server.port and // allowed to be overridden by server.port. s.serverPort = v.Int() } - case semconv.AttributeMessagingDestinationName: + case semconv25.AttributeMessagingDestinationName: s.isMessaging = true s.messagingDestinationName = v.Str() - case semconv.AttributeMessagingOperation: + case semconv25.AttributeMessagingOperation: s.isMessaging = true - case semconv.AttributeMessagingSystem: + case semconv25.AttributeMessagingSystem: s.isMessaging = true s.messagingSystem = v.Str() - case semconv.AttributeMessagingDestinationTemporary: + case semconv25.AttributeMessagingDestinationTemporary: s.isMessaging = true s.messagingDestinationTemp = true - case semconv.AttributeHTTPStatusCode, - semconv.AttributeHTTPResponseStatusCode: + case semconv25.AttributeHTTPStatusCode, + semconv25.AttributeHTTPResponseStatusCode: s.isHTTP = true s.httpStatusCode = v.Int() - case semconv.AttributeHTTPMethod, - semconv.AttributeHTTPRequestMethod, - semconv.AttributeHTTPTarget, - semconv.AttributeHTTPScheme, - semconv.AttributeHTTPFlavor, - semconv.AttributeNetHostName: + case semconv25.AttributeHTTPMethod, + semconv25.AttributeHTTPRequestMethod, + semconv25.AttributeHTTPTarget, + semconv25.AttributeHTTPScheme, + semconv25.AttributeHTTPFlavor, + semconv25.AttributeNetHostName: s.isHTTP = true - case semconv.AttributeURLFull, - semconv.AttributeHTTPURL: + case semconv25.AttributeURLFull, + semconv25.AttributeHTTPURL: s.isHTTP = true // ignoring error as if parse fails then we don't want the url anyway s.urlFull, _ = url.Parse(v.Str()) - case semconv.AttributeURLScheme: + case semconv25.AttributeURLScheme: s.isHTTP = true s.urlScheme = v.Str() - case semconv.AttributeURLDomain: + case semconv25.AttributeURLDomain: s.isHTTP = true s.urlDomain = v.Str() - case semconv.AttributeURLPort: + case semconv25.AttributeURLPort: s.isHTTP = true s.urlPort = v.Int() - case semconv.AttributeURLPath: + case semconv25.AttributeURLPath: s.isHTTP = true s.urlPath = v.Str() - case semconv.AttributeURLQuery: + case semconv25.AttributeURLQuery: s.isHTTP = true s.urlQuery = v.Str() - case semconv.AttributeRPCGRPCStatusCode: + case semconv25.AttributeRPCGRPCStatusCode: s.isRPC = true s.grpcStatus = codes.Code(v.Int()).String() - case semconv.AttributeRPCSystem: + case semconv25.AttributeRPCSystem: s.isRPC = true s.rpcSystem = v.Str() - case semconv.AttributeRPCService: + case semconv25.AttributeRPCService: s.isRPC = true s.rpcService = v.Str() - case semconv.AttributeDBStatement, - semconv.AttributeDBUser: + case semconv25.AttributeDBStatement, + semconv25.AttributeDBUser: s.isDB = true - case semconv.AttributeDBName: + case semconv25.AttributeDBName: s.isDB = true s.dbName = v.Str() - case semconv.AttributeDBSystem: + case semconv25.AttributeDBSystem: s.isDB = true s.dbSystem = v.Str() + case semconv27.AttributeGenAiSystem: + s.isGenAi = true + s.genAiSystem = v.Str() } return true }) @@ -361,6 +367,9 @@ func (s *spanEnrichmentContext) setSpanTypeSubtype(span ptrace.Span) { case s.isHTTP: spanType = "external" spanSubtype = "http" + case s.isGenAi: + spanType = "genai" + spanSubtype = s.genAiSystem default: switch span.Kind() { case ptrace.SpanKindInternal: @@ -504,11 +513,11 @@ func (s *spanEventEnrichmentContext) enrich( if s.exception { se.Attributes().Range(func(k string, v pcommon.Value) bool { switch k { - case semconv.AttributeExceptionEscaped: + case semconv25.AttributeExceptionEscaped: s.exceptionEscaped = v.Bool() - case semconv.AttributeExceptionType: + case semconv25.AttributeExceptionType: s.exceptionType = v.Str() - case semconv.AttributeExceptionMessage: + case semconv25.AttributeExceptionMessage: s.exceptionMessage = v.Str() } return true diff --git a/enrichments/trace/internal/elastic/span_test.go b/enrichments/trace/internal/elastic/span_test.go index 694346f..3dff12c 100644 --- a/enrichments/trace/internal/elastic/span_test.go +++ b/enrichments/trace/internal/elastic/span_test.go @@ -30,7 +30,8 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" - semconv "go.opentelemetry.io/collector/semconv/v1.25.0" + semconv25 "go.opentelemetry.io/collector/semconv/v1.25.0" + semconv27 "go.opentelemetry.io/collector/semconv/v1.27.0" tracepb "go.opentelemetry.io/proto/otlp/trace/v1" "google.golang.org/grpc/codes" ) @@ -108,7 +109,7 @@ func TestElasticTransactionEnrich(t *testing.T) { input: func() ptrace.Span { span := getElasticTxn() span.SetName("testtxn") - span.Attributes().PutInt(semconv.AttributeHTTPStatusCode, http.StatusOK) + span.Attributes().PutInt(semconv25.AttributeHTTPStatusCode, http.StatusOK) return span }(), config: config.Enabled().Transaction, @@ -136,7 +137,7 @@ func TestElasticTransactionEnrich(t *testing.T) { // attributes should be preferred over span status for txn result span.Status().SetCode(ptrace.StatusCodeOk) span.Attributes().PutInt( - semconv.AttributeHTTPResponseStatusCode, + semconv25.AttributeHTTPResponseStatusCode, http.StatusProcessing, ) return span @@ -167,7 +168,7 @@ func TestElasticTransactionEnrich(t *testing.T) { // for setting event.outcome span.Status().SetCode(ptrace.StatusCodeOk) span.Attributes().PutInt( - semconv.AttributeHTTPStatusCode, http.StatusInternalServerError) + semconv25.AttributeHTTPStatusCode, http.StatusInternalServerError) return span }(), config: config.Enabled().Transaction, @@ -195,7 +196,7 @@ func TestElasticTransactionEnrich(t *testing.T) { // attributes should be preferred over span status for txn result span.Status().SetCode(ptrace.StatusCodeOk) span.Attributes().PutInt( - semconv.AttributeRPCGRPCStatusCode, + semconv25.AttributeRPCGRPCStatusCode, int64(codes.OK), ) return span @@ -225,7 +226,7 @@ func TestElasticTransactionEnrich(t *testing.T) { // attributes should be preferred over span status for txn result span.Status().SetCode(ptrace.StatusCodeOk) span.Attributes().PutInt( - semconv.AttributeRPCGRPCStatusCode, + semconv25.AttributeRPCGRPCStatusCode, int64(codes.Internal), ) return span @@ -302,7 +303,7 @@ func TestElasticTransactionEnrich(t *testing.T) { span := getElasticTxn() span.SetName("testtxn") span.SetSpanID([8]byte{1}) - span.Attributes().PutStr(semconv.AttributeMessagingSystem, "kafka") + span.Attributes().PutStr(semconv25.AttributeMessagingSystem, "kafka") return span }(), config: config.Enabled().Transaction, @@ -457,7 +458,7 @@ func TestElasticSpanEnrich(t *testing.T) { input: func() ptrace.Span { span := getElasticSpan() span.SetName("testspan") - span.Attributes().PutStr(semconv.AttributePeerService, "testsvc") + span.Attributes().PutStr(semconv25.AttributePeerService, "testsvc") return span }(), config: config.Enabled().Span, @@ -480,9 +481,9 @@ func TestElasticSpanEnrich(t *testing.T) { input: func() ptrace.Span { span := getElasticSpan() span.SetName("testspan") - span.Attributes().PutStr(semconv.AttributePeerService, "testsvc") + span.Attributes().PutStr(semconv25.AttributePeerService, "testsvc") span.Attributes().PutInt( - semconv.AttributeHTTPResponseStatusCode, + semconv25.AttributeHTTPResponseStatusCode, http.StatusOK, ) return span @@ -510,13 +511,13 @@ func TestElasticSpanEnrich(t *testing.T) { span.SetName("testspan") // peer.service should be ignored if more specific deductions // can be made about the service target. - span.Attributes().PutStr(semconv.AttributePeerService, "testsvc") + span.Attributes().PutStr(semconv25.AttributePeerService, "testsvc") span.Attributes().PutInt( - semconv.AttributeHTTPResponseStatusCode, + semconv25.AttributeHTTPResponseStatusCode, http.StatusOK, ) span.Attributes().PutStr( - semconv.AttributeURLFull, + semconv25.AttributeURLFull, "https://www.foo.bar:443/search?q=OpenTelemetry#SemConv", ) return span @@ -544,13 +545,13 @@ func TestElasticSpanEnrich(t *testing.T) { span.SetName("testspan") // peer.service should be ignored if more specific deductions // can be made about the service target. - span.Attributes().PutStr(semconv.AttributePeerService, "testsvc") + span.Attributes().PutStr(semconv25.AttributePeerService, "testsvc") span.Attributes().PutInt( - semconv.AttributeHTTPResponseStatusCode, + semconv25.AttributeHTTPResponseStatusCode, http.StatusOK, ) span.Attributes().PutStr( - semconv.AttributeHTTPURL, + semconv25.AttributeHTTPURL, "https://www.foo.bar:443/search?q=OpenTelemetry#SemConv", ) return span @@ -578,13 +579,13 @@ func TestElasticSpanEnrich(t *testing.T) { span.SetName("testspan") // peer.service should be ignored if more specific deductions // can be made about the service target. - span.Attributes().PutStr(semconv.AttributePeerService, "testsvc") + span.Attributes().PutStr(semconv25.AttributePeerService, "testsvc") span.Attributes().PutInt( - semconv.AttributeHTTPResponseStatusCode, + semconv25.AttributeHTTPResponseStatusCode, http.StatusOK, ) - span.Attributes().PutStr(semconv.AttributeURLDomain, "www.foo.bar") - span.Attributes().PutInt(semconv.AttributeURLPort, 443) + span.Attributes().PutStr(semconv25.AttributeURLDomain, "www.foo.bar") + span.Attributes().PutInt(semconv25.AttributeURLPort, 443) return span }(), config: config.Enabled().Span, @@ -608,9 +609,9 @@ func TestElasticSpanEnrich(t *testing.T) { input: func() ptrace.Span { span := getElasticSpan() span.SetName("testspan") - span.Attributes().PutStr(semconv.AttributePeerService, "testsvc") + span.Attributes().PutStr(semconv25.AttributePeerService, "testsvc") span.Attributes().PutInt( - semconv.AttributeRPCGRPCStatusCode, + semconv25.AttributeRPCGRPCStatusCode, int64(codes.OK), ) return span @@ -636,8 +637,8 @@ func TestElasticSpanEnrich(t *testing.T) { input: func() ptrace.Span { span := getElasticSpan() span.SetName("testspan") - span.Attributes().PutStr(semconv.AttributePeerService, "testsvc") - span.Attributes().PutStr(semconv.AttributeRPCSystem, "xmlrpc") + span.Attributes().PutStr(semconv25.AttributePeerService, "testsvc") + span.Attributes().PutStr(semconv25.AttributeRPCSystem, "xmlrpc") return span }(), config: config.Enabled().Span, @@ -663,8 +664,8 @@ func TestElasticSpanEnrich(t *testing.T) { span.SetName("testspan") // peer.service should be ignored if more specific deductions // can be made about the service target. - span.Attributes().PutStr(semconv.AttributePeerService, "testsvc") - span.Attributes().PutStr(semconv.AttributeRPCService, "service.Test") + span.Attributes().PutStr(semconv25.AttributePeerService, "testsvc") + span.Attributes().PutStr(semconv25.AttributeRPCService, "service.Test") return span }(), config: config.Enabled().Span, @@ -688,9 +689,9 @@ func TestElasticSpanEnrich(t *testing.T) { span := getElasticSpan() span.SetName("testspan") // No peer.service is set - span.Attributes().PutStr(semconv.AttributeRPCService, "service.Test") - span.Attributes().PutStr(semconv.AttributeServerAddress, "10.2.20.18") - span.Attributes().PutInt(semconv.AttributeServerPort, 8081) + span.Attributes().PutStr(semconv25.AttributeRPCService, "service.Test") + span.Attributes().PutStr(semconv25.AttributeServerAddress, "10.2.20.18") + span.Attributes().PutInt(semconv25.AttributeServerPort, 8081) return span }(), config: config.Enabled().Span, @@ -714,9 +715,9 @@ func TestElasticSpanEnrich(t *testing.T) { span := getElasticSpan() span.SetName("testspan") // No peer.service is set - span.Attributes().PutStr(semconv.AttributeRPCService, "service.Test") - span.Attributes().PutStr(semconv.AttributeNetPeerName, "10.2.20.18") - span.Attributes().PutInt(semconv.AttributeNetPeerPort, 8081) + span.Attributes().PutStr(semconv25.AttributeRPCService, "service.Test") + span.Attributes().PutStr(semconv25.AttributeNetPeerName, "10.2.20.18") + span.Attributes().PutInt(semconv25.AttributeNetPeerPort, 8081) return span }(), config: config.Enabled().Span, @@ -739,8 +740,8 @@ func TestElasticSpanEnrich(t *testing.T) { input: func() ptrace.Span { span := getElasticSpan() span.SetName("testspan") - span.Attributes().PutStr(semconv.AttributePeerService, "testsvc") - span.Attributes().PutStr(semconv.AttributeMessagingSystem, "kafka") + span.Attributes().PutStr(semconv25.AttributePeerService, "testsvc") + span.Attributes().PutStr(semconv25.AttributeMessagingSystem, "kafka") return span }(), config: config.Enabled().Span, @@ -764,8 +765,8 @@ func TestElasticSpanEnrich(t *testing.T) { input: func() ptrace.Span { span := getElasticSpan() span.SetName("testspan") - span.Attributes().PutStr(semconv.AttributePeerService, "testsvc") - span.Attributes().PutStr(semconv.AttributeMessagingDestinationName, "t1") + span.Attributes().PutStr(semconv25.AttributePeerService, "testsvc") + span.Attributes().PutStr(semconv25.AttributeMessagingDestinationName, "t1") return span }(), config: config.Enabled().Span, @@ -788,9 +789,9 @@ func TestElasticSpanEnrich(t *testing.T) { input: func() ptrace.Span { span := getElasticSpan() span.SetName("testspan") - span.Attributes().PutStr(semconv.AttributePeerService, "testsvc") - span.Attributes().PutBool(semconv.AttributeMessagingDestinationTemporary, true) - span.Attributes().PutStr(semconv.AttributeMessagingDestinationName, "t1") + span.Attributes().PutStr(semconv25.AttributePeerService, "testsvc") + span.Attributes().PutBool(semconv25.AttributeMessagingDestinationTemporary, true) + span.Attributes().PutStr(semconv25.AttributeMessagingDestinationName, "t1") return span }(), config: config.Enabled().Span, @@ -813,14 +814,14 @@ func TestElasticSpanEnrich(t *testing.T) { input: func() ptrace.Span { span := getElasticSpan() span.SetName("testspan") - span.Attributes().PutStr(semconv.AttributePeerService, "testsvc") + span.Attributes().PutStr(semconv25.AttributePeerService, "testsvc") span.Attributes().PutStr( - semconv.AttributeURLFull, + semconv25.AttributeURLFull, "https://localhost:9200/index/_search?q=user.id:kimchy", ) span.Attributes().PutStr( - semconv.AttributeDBSystem, - semconv.AttributeDBSystemElasticsearch, + semconv25.AttributeDBSystem, + semconv25.AttributeDBSystemElasticsearch, ) return span }(), @@ -845,19 +846,19 @@ func TestElasticSpanEnrich(t *testing.T) { input: func() ptrace.Span { span := getElasticSpan() span.SetName("testspan") - span.Attributes().PutStr(semconv.AttributePeerService, "testsvc") + span.Attributes().PutStr(semconv25.AttributePeerService, "testsvc") span.Attributes().PutStr( - semconv.AttributeRPCSystem, - semconv.AttributeRPCSystemGRPC, + semconv25.AttributeRPCSystem, + semconv25.AttributeRPCSystemGRPC, ) - span.Attributes().PutStr(semconv.AttributeRPCService, "cassandra.API") + span.Attributes().PutStr(semconv25.AttributeRPCService, "cassandra.API") span.Attributes().PutStr( - semconv.AttributeRPCGRPCStatusCode, - semconv.AttributeRPCGRPCStatusCodeOk, + semconv25.AttributeRPCGRPCStatusCode, + semconv25.AttributeRPCGRPCStatusCodeOk, ) span.Attributes().PutStr( - semconv.AttributeDBSystem, - semconv.AttributeDBSystemCassandra, + semconv25.AttributeDBSystem, + semconv25.AttributeDBSystemCassandra, ) return span }(), @@ -914,6 +915,28 @@ func TestElasticSpanEnrich(t *testing.T) { return &spanLinks }(), }, + { + name: "genai_with_system", + input: func() ptrace.Span { + span := getElasticSpan() + span.SetName("testspan") + span.SetSpanID([8]byte{1}) + span.Attributes().PutStr(semconv27.AttributeGenAiSystem, "openai") + return span + }(), + config: config.Enabled().Span, + enrichedAttrs: map[string]any{ + AttributeTimestampUs: startTs.AsTime().UnixMicro(), + AttributeSpanName: "testspan", + AttributeProcessorEvent: "span", + AttributeSpanRepresentativeCount: float64(1), + AttributeSpanType: "genai", + AttributeSpanSubtype: "openai", + AttributeSpanDurationUs: expectedDuration.Microseconds(), + AttributeEventOutcome: "success", + AttributeSuccessCount: int64(1), + }, + }, } { t.Run(tc.name, func(t *testing.T) { expectedSpan := ptrace.NewSpan() @@ -974,9 +997,9 @@ func TestSpanEventEnrich(t *testing.T) { event := ptrace.NewSpanEvent() event.SetName("exception") event.SetTimestamp(ts) - event.Attributes().PutStr(semconv.AttributeExceptionType, "java.net.ConnectionError") - event.Attributes().PutStr(semconv.AttributeExceptionMessage, "something is wrong") - event.Attributes().PutStr(semconv.AttributeExceptionStacktrace, `Exception in thread "main" java.lang.RuntimeException: Test exception\\n at com.example.GenerateTrace.methodB(GenerateTrace.java:13)\\n at com.example.GenerateTrace.methodA(GenerateTrace.java:9)\\n at com.example.GenerateTrace.main(GenerateTrace.java:5)`) + event.Attributes().PutStr(semconv25.AttributeExceptionType, "java.net.ConnectionError") + event.Attributes().PutStr(semconv25.AttributeExceptionMessage, "something is wrong") + event.Attributes().PutStr(semconv25.AttributeExceptionStacktrace, `Exception in thread "main" java.lang.RuntimeException: Test exception\\n at com.example.GenerateTrace.methodB(GenerateTrace.java:13)\\n at com.example.GenerateTrace.methodA(GenerateTrace.java:9)\\n at com.example.GenerateTrace.main(GenerateTrace.java:5)`) return event }(), config: config.Enabled().SpanEvent, @@ -1007,9 +1030,9 @@ func TestSpanEventEnrich(t *testing.T) { event := ptrace.NewSpanEvent() event.SetName("exception") event.SetTimestamp(ts) - event.Attributes().PutStr(semconv.AttributeExceptionType, "java.net.ConnectionError") - event.Attributes().PutStr(semconv.AttributeExceptionMessage, "something is wrong") - event.Attributes().PutStr(semconv.AttributeExceptionStacktrace, `Exception in thread "main" java.lang.RuntimeException: Test exception\\n at com.example.GenerateTrace.methodB(GenerateTrace.java:13)\\n at com.example.GenerateTrace.methodA(GenerateTrace.java:9)\\n at com.example.GenerateTrace.main(GenerateTrace.java:5)`) + event.Attributes().PutStr(semconv25.AttributeExceptionType, "java.net.ConnectionError") + event.Attributes().PutStr(semconv25.AttributeExceptionMessage, "something is wrong") + event.Attributes().PutStr(semconv25.AttributeExceptionStacktrace, `Exception in thread "main" java.lang.RuntimeException: Test exception\\n at com.example.GenerateTrace.methodB(GenerateTrace.java:13)\\n at com.example.GenerateTrace.methodA(GenerateTrace.java:9)\\n at com.example.GenerateTrace.main(GenerateTrace.java:5)`) return event }(), config: config.Enabled().SpanEvent,