Skip to content

Commit

Permalink
[OTEL-1726] Add OTel span utils to tracetuil (#24270)
Browse files Browse the repository at this point in the history
* [OTEL-1726] Add OTel span utils to tracetuil

* Address review comments

* Fix an import

* Update codeowners
  • Loading branch information
songy23 authored Apr 3, 2024
1 parent a0ead78 commit 8822c5d
Show file tree
Hide file tree
Showing 8 changed files with 725 additions and 7 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@
/pkg/obfuscate/ @DataDog/agent-apm
/pkg/trace/ @DataDog/agent-apm
/pkg/trace/api/otlp*.go @DataDog/opentelemetry
/pkg/trace/traceutil/otel*.go @DataDog/opentelemetry
/pkg/trace/telemetry/ @DataDog/telemetry-and-analytics
/comp/core/autodiscovery/listeners/ @DataDog/container-integrations
/comp/core/autodiscovery/listeners/cloudfoundry*.go @DataDog/platform-integrations
Expand Down
3 changes: 3 additions & 0 deletions comp/core/log/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ require (
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hectane/go-acl v0.0.0-20190604041725-da78bae5fc95 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
Expand All @@ -87,6 +88,8 @@ require (
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.5.1-0.20231216201459-8508981c8b6c // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/outcaste-io/ristretto v0.2.1 // indirect
github.com/pelletier/go-toml v1.2.0 // indirect
github.com/philhofer/fwd v1.1.2 // indirect
Expand Down
5 changes: 5 additions & 0 deletions comp/core/log/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 1 addition & 6 deletions pkg/trace/api/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
semconv117 "go.opentelemetry.io/collector/semconv/v1.17.0"
semconv "go.opentelemetry.io/collector/semconv/v1.6.1"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
Expand All @@ -44,10 +43,6 @@ import (
// computed for the resource spans.
const keyStatsComputed = "_dd.stats_computed"

var (
signalTypeSet = attribute.NewSet(attribute.String("signal", "traces"))
)

var _ (ptraceotlp.GRPCServer) = (*OTLPReceiver)(nil)

// OTLPReceiver implements an OpenTelemetry Collector receiver which accepts incoming
Expand Down Expand Up @@ -190,7 +185,7 @@ func (o *OTLPReceiver) sample(tid uint64) sampler.SamplingPriority {
func (o *OTLPReceiver) ReceiveResourceSpans(ctx context.Context, rspans ptrace.ResourceSpans, httpHeader http.Header) source.Source {
// each rspans is coming from a different resource and should be considered
// a separate payload; typically there is only one item in this slice
src, srcok := o.conf.OTLPReceiver.AttributesTranslator.ResourceToSource(ctx, rspans.Resource(), signalTypeSet)
src, srcok := o.conf.OTLPReceiver.AttributesTranslator.ResourceToSource(ctx, rspans.Resource(), traceutil.SignalTypeSet)
hostFromMap := func(m map[string]string, key string) {
// hostFromMap sets the hostname to m[key] if it is set.
if v, ok := m[key]; ok {
Expand Down
2 changes: 1 addition & 1 deletion pkg/trace/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ require (
go.opentelemetry.io/collector/pdata v1.0.1
go.opentelemetry.io/collector/semconv v0.93.0
go.opentelemetry.io/otel v1.22.0
go.opentelemetry.io/otel/metric v1.22.0
go.uber.org/atomic v1.11.0
golang.org/x/sys v0.16.0
golang.org/x/time v0.3.0
Expand Down Expand Up @@ -97,7 +98,6 @@ require (
go.opentelemetry.io/collector/confmap v0.93.0 // indirect
go.opentelemetry.io/collector/featuregate v1.0.1 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.45.0 // indirect
go.opentelemetry.io/otel/metric v1.22.0 // indirect
go.opentelemetry.io/otel/sdk v1.22.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.22.0 // indirect
go.opentelemetry.io/otel/trace v1.22.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions pkg/trace/traceutil/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const (
MaxNameLen = 100
// MaxServiceLen the maximum length a service can have
MaxServiceLen = 100
// MaxResourceLen the maximum length a resource can have
MaxResourceLen = 5000
)

var (
Expand Down
283 changes: 283 additions & 0 deletions pkg/trace/traceutil/otel_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package traceutil

import (
"context"
"strings"

"github.com/DataDog/datadog-agent/pkg/trace/log"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
semconv117 "go.opentelemetry.io/collector/semconv/v1.17.0"
semconv "go.opentelemetry.io/collector/semconv/v1.6.1"
"go.opentelemetry.io/otel/attribute"
)

// Util functions for converting OTel semantics to DD semantics.
// TODO(OTEL-1726): reuse the same mapping code for ReceiveResourceSpans and Concentrator

var (
// SignalTypeSet is the OTel attribute set for traces.
SignalTypeSet = attribute.NewSet(attribute.String("signal", "traces"))
)

// IndexOTelSpans iterates over the input OTel spans and returns 3 maps:
// OTel spans indexed by span ID, OTel resources indexed by span ID, OTel instrumentation scopes indexed by span ID.
// Skips spans with invalid trace ID or span ID. If there are multiple spans with the same (non-zero) span ID, the last one wins.
func IndexOTelSpans(traces ptrace.Traces) (map[pcommon.SpanID]ptrace.Span, map[pcommon.SpanID]pcommon.Resource, map[pcommon.SpanID]pcommon.InstrumentationScope) {
spanByID := make(map[pcommon.SpanID]ptrace.Span)
resByID := make(map[pcommon.SpanID]pcommon.Resource)
scopeByID := make(map[pcommon.SpanID]pcommon.InstrumentationScope)
rspanss := traces.ResourceSpans()
for i := 0; i < rspanss.Len(); i++ {
rspans := rspanss.At(i)
res := rspans.Resource()
for j := 0; j < rspans.ScopeSpans().Len(); j++ {
libspans := rspans.ScopeSpans().At(j)
for k := 0; k < libspans.Spans().Len(); k++ {
span := libspans.Spans().At(k)
if span.TraceID().IsEmpty() || span.SpanID().IsEmpty() {
continue
}
spanByID[span.SpanID()] = span
resByID[span.SpanID()] = res
scopeByID[span.SpanID()] = libspans.Scope()
}
}
}
return spanByID, resByID, scopeByID
}

// GetTopLevelOTelSpans returns the span IDs of the top level OTel spans.
func GetTopLevelOTelSpans(spanByID map[pcommon.SpanID]ptrace.Span, resByID map[pcommon.SpanID]pcommon.Resource, topLevelByKind bool) map[pcommon.SpanID]struct{} {
topLevelSpans := make(map[pcommon.SpanID]struct{})
for spanID, span := range spanByID {
if span.ParentSpanID().IsEmpty() {
// case 1: root span
topLevelSpans[spanID] = struct{}{}
continue
}

if topLevelByKind {
// New behavior for computing top level OTel spans, see computeTopLevelAndMeasured in pkg/trace/api/otlp.go
spanKind := span.Kind()
if spanKind == ptrace.SpanKindServer || spanKind == ptrace.SpanKindConsumer {
// span is a server-side span, mark as top level
topLevelSpans[spanID] = struct{}{}
}
continue
}

// Otherwise, fall back to old behavior in ComputeTopLevel
parentSpan, ok := spanByID[span.ParentSpanID()]
if !ok {
// case 2: parent span not in the same chunk, presumably it belongs to another service
topLevelSpans[spanID] = struct{}{}
continue
}

svc := GetOTelService(span, resByID[spanID], true)
parentSvc := GetOTelService(parentSpan, resByID[parentSpan.SpanID()], true)
if svc != parentSvc {
// case 3: parent is not in the same service
topLevelSpans[spanID] = struct{}{}
}
}
return topLevelSpans
}

// GetOTelAttrVal returns the matched value as a string in the input map with the given keys.
// If there are multiple keys present, the first matched one is returned.
// If normalize is true, normalize the return value with NormalizeTagValue.
func GetOTelAttrVal(attrs pcommon.Map, normalize bool, keys ...string) string {
val := ""
for _, key := range keys {
attrval, exists := attrs.Get(key)
if exists {
val = attrval.AsString()
}
}

if normalize {
val = NormalizeTagValue(val)
}

return val
}

// GetOTelAttrValInResAndSpanAttrs returns the matched value as a string in the OTel resource attributes and span attributes with the given keys.
// If there are multiple keys present, the first matched one is returned.
// If the key is present in both resource attributes and span attributes, resource attributes take precedence.
// If normalize is true, normalize the return value with NormalizeTagValue.
func GetOTelAttrValInResAndSpanAttrs(span ptrace.Span, res pcommon.Resource, normalize bool, keys ...string) string {
if val := GetOTelAttrVal(res.Attributes(), normalize, keys...); val != "" {
return val
}
return GetOTelAttrVal(span.Attributes(), normalize, keys...)
}

// GetOTelSpanType returns the DD span type based on OTel span kind and attributes.
func GetOTelSpanType(span ptrace.Span, res pcommon.Resource) string {
var typ string
switch span.Kind() {
case ptrace.SpanKindServer:
typ = "web"
case ptrace.SpanKindClient:
db := GetOTelAttrValInResAndSpanAttrs(span, res, true, semconv.AttributeDBSystem)
if db == "redis" || db == "memcached" {
typ = "cache"
} else if db != "" {
typ = "db"
} else {
typ = "http"
}
default:
typ = "custom"
}
return typ
}

// GetOTelService returns the DD service name based on OTel span and resource attributes.
func GetOTelService(span ptrace.Span, res pcommon.Resource, normalize bool) string {
// No need to normalize with NormalizeTagValue since we will do NormalizeService later
svc := GetOTelAttrValInResAndSpanAttrs(span, res, false, semconv.AttributeServiceName)
if svc == "" {
svc = "otlpresourcenoservicename"
}
if normalize {
newsvc, err := NormalizeService(svc, "")
switch err {
case ErrTooLong:
log.Debugf("Fixing malformed trace. Service is too long (reason:service_truncate), truncating span.service to length=%d: %s", MaxServiceLen, svc)
case ErrInvalid:
log.Debugf("Fixing malformed trace. Service is invalid (reason:service_invalid), replacing invalid span.service=%s with fallback span.service=%s", svc, newsvc)
}
svc = newsvc
}
return svc
}

// GetOTelResource returns the DD resource name based on OTel span and resource attributes.
func GetOTelResource(span ptrace.Span, res pcommon.Resource) (resName string) {
resName = GetOTelAttrValInResAndSpanAttrs(span, res, false, "resource.name")
if resName == "" {
if m := GetOTelAttrValInResAndSpanAttrs(span, res, false, semconv.AttributeHTTPMethod); m != "" {
// use the HTTP method + route (if available)
resName = m
if route := GetOTelAttrValInResAndSpanAttrs(span, res, false, semconv.AttributeHTTPRoute); route != "" {
resName = resName + " " + route
}
} else if m := GetOTelAttrValInResAndSpanAttrs(span, res, false, semconv.AttributeMessagingOperation); m != "" {
resName = m
// use the messaging operation
if dest := GetOTelAttrValInResAndSpanAttrs(span, res, false, semconv.AttributeMessagingDestination, semconv117.AttributeMessagingDestinationName); dest != "" {
resName = resName + " " + dest
}
} else if m := GetOTelAttrValInResAndSpanAttrs(span, res, false, semconv.AttributeRPCMethod); m != "" {
resName = m
// use the RPC method
if svc := GetOTelAttrValInResAndSpanAttrs(span, res, false, semconv.AttributeRPCService); m != "" {
// ...and service if available
resName = resName + " " + svc
}
} else {
resName = span.Name()
}
}
if len(resName) > MaxResourceLen {
resName = resName[:MaxResourceLen]
}
return
}

// GetOTelOperationName returns the DD operation name based on OTel span and resource attributes and given configs.
func GetOTelOperationName(
span ptrace.Span,
res pcommon.Resource,
lib pcommon.InstrumentationScope,
spanNameAsResourceName bool,
spanNameRemappings map[string]string,
normalize bool) string {
// No need to normalize with NormalizeTagValue since we will do NormalizeName later
name := GetOTelAttrValInResAndSpanAttrs(span, res, false, "operation.name")
if name == "" {
if spanNameAsResourceName {
name = span.Name()
} else {
name = strings.ToLower(span.Kind().String())
if lib.Name() != "" {
name = lib.Name() + "." + name
} else {
name = "opentelemetry." + name
}
}
}
if v, ok := spanNameRemappings[name]; ok {
name = v
}

if normalize {
normalizeName, err := NormalizeName(name)
switch err {
case ErrEmpty:
log.Debugf("Fixing malformed trace. Name is empty (reason:span_name_empty), setting span.name=%s", normalizeName)
case ErrTooLong:
log.Debugf("Fixing malformed trace. Name is too long (reason:span_name_truncate), truncating span.name to length=%d", MaxServiceLen)
case ErrInvalid:
log.Debugf("Fixing malformed trace. Name is invalid (reason:span_name_invalid), setting span.name=%s", normalizeName)
}
name = normalizeName
}

return name
}

// GetOTelHostname returns the DD hostname based on OTel span and resource attributes.
func GetOTelHostname(span ptrace.Span, res pcommon.Resource, tr *attributes.Translator, fallbackHost string) string {
ctx := context.Background()
src, srcok := tr.ResourceToSource(ctx, res, SignalTypeSet)
if !srcok {
if v := GetOTelAttrValInResAndSpanAttrs(span, res, false, "_dd.hostname"); v != "" {
src = source.Source{Kind: source.HostnameKind, Identifier: v}
srcok = true
}
}
if srcok {
switch src.Kind {
case source.HostnameKind:
return src.Identifier
default:
// We are not on a hostname (serverless), hence the hostname is empty
return ""
}
} else {
// fallback hostname from Agent conf.Hostname
return fallbackHost
}
}

// GetOTelStatusCode returns the DD status code of the OTel span.
func GetOTelStatusCode(span ptrace.Span) uint32 {
if code, ok := span.Attributes().Get(semconv.AttributeHTTPStatusCode); ok {
return uint32(code.Int())
}
return 0
}

// GetOTelContainerTags returns a list of DD container tags in the OTel resource attributes.
// Tags are always normalized.
func GetOTelContainerTags(rattrs pcommon.Map) []string {
var containerTags []string
containerTagsMap := attributes.ContainerTagsFromResourceAttributes(rattrs)
for k, v := range containerTagsMap {
t := NormalizeTag(k + ":" + v)
containerTags = append(containerTags, t)
}
return containerTags
}
Loading

0 comments on commit 8822c5d

Please sign in to comment.