Skip to content

Commit

Permalink
Unify zipkin v1 and v2 annotation/tag parsing logic (#1002)
Browse files Browse the repository at this point in the history
Fixes #975

Please look at the individual commits, most of this is just moving code around.

Moves zipkin v2 trace conversion code into translator/trace/zipkin, previously it was in the receiver
Use the same tag parsing logic for both zipkin v1 and v2
  • Loading branch information
chris-smith-zocdoc authored Jun 23, 2020
1 parent c108cef commit 969a8d4
Show file tree
Hide file tree
Showing 13 changed files with 1,043 additions and 386 deletions.
295 changes: 1 addition & 294 deletions receiver/zipkinreceiver/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,10 @@ import (
"io/ioutil"
"net"
"net/http"
"strconv"
"strings"
"sync"

"github.com/apache/thrift/lib/go/thrift"
commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
zipkinmodel "github.com/openzipkin/zipkin-go/model"
zipkinproto "github.com/openzipkin/zipkin-go/proto/v2"
Expand All @@ -41,9 +38,7 @@ import (
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/internal"
"go.opentelemetry.io/collector/obsreport"
tracetranslator "go.opentelemetry.io/collector/translator/trace"
"go.opentelemetry.io/collector/translator/trace/zipkin"
)

Expand Down Expand Up @@ -201,39 +196,7 @@ func (zr *ZipkinReceiver) v2ToTraceSpans(blob []byte, hdr http.Header) (reqs []c
return nil, err
}

// *commonpb.Node instances have unique addresses hence
// for grouping within a map, we'll use the .String() value
byNodeGrouping := make(map[string][]*tracepb.Span)
uniqueNodes := make([]*commonpb.Node, 0, len(zipkinSpans))
// Now translate them into tracepb.Span
for _, zspan := range zipkinSpans {
if zspan == nil {
continue
}
span, node := zipkinSpanToTraceSpan(zspan)
key := node.String()
if _, alreadyAdded := byNodeGrouping[key]; !alreadyAdded {
uniqueNodes = append(uniqueNodes, node)
}
byNodeGrouping[key] = append(byNodeGrouping[key], span)
}

for _, node := range uniqueNodes {
key := node.String()
spans := byNodeGrouping[key]
if len(spans) == 0 {
// Should never happen but nonetheless be cautious
// not to send blank spans.
continue
}
reqs = append(reqs, consumerdata.TraceData{
Node: node,
Spans: spans,
})
delete(byNodeGrouping, key)
}

return reqs, nil
return zipkin.V2BatchToOCProto(zipkinSpans)
}

func (zr *ZipkinReceiver) deserializeFromJSON(jsonBlob []byte, debugWasSet bool) (zs []*zipkinmodel.SpanModel, err error) {
Expand Down Expand Up @@ -369,262 +332,6 @@ func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusAccepted)
}

func zipkinSpanToTraceSpan(zs *zipkinmodel.SpanModel) (*tracepb.Span, *commonpb.Node) {
traceID := tracetranslator.UInt64ToByteTraceID(zs.TraceID.High, zs.TraceID.Low)
var parentSpanID []byte
if zs.ParentID != nil {
parentSpanID = tracetranslator.UInt64ToByteSpanID(uint64(*zs.ParentID))
}

pbs := &tracepb.Span{
TraceId: traceID,
SpanId: tracetranslator.UInt64ToByteSpanID(uint64(zs.ID)),
ParentSpanId: parentSpanID,
Name: &tracepb.TruncatableString{Value: zs.Name},
StartTime: internal.TimeToTimestamp(zs.Timestamp),
EndTime: internal.TimeToTimestamp(zs.Timestamp.Add(zs.Duration)),
Kind: zipkinSpanKindToProtoSpanKind(zs.Kind),
Status: extractProtoStatus(zs),
Attributes: zipkinTagsToTraceAttributes(zs.Tags, zs.Kind),
TimeEvents: zipkinAnnotationsToProtoTimeEvents(zs.Annotations),
}

node := nodeFromZipkinEndpoints(zs, pbs)
zipkin.SetTimestampsIfUnset(pbs)

return pbs, node
}

func nodeFromZipkinEndpoints(zs *zipkinmodel.SpanModel, pbs *tracepb.Span) *commonpb.Node {
if zs.LocalEndpoint == nil && zs.RemoteEndpoint == nil {
return nil
}

node := new(commonpb.Node)
var endpointMap map[string]string

// Retrieve and make use of the local endpoint
if lep := zs.LocalEndpoint; lep != nil {
node.ServiceInfo = &commonpb.ServiceInfo{
Name: lep.ServiceName,
}
endpointMap = zipkinEndpointIntoAttributes(lep, endpointMap, isLocalEndpoint)
}

// Retrieve and make use of the remote endpoint
if rep := zs.RemoteEndpoint; rep != nil {
endpointMap = zipkinEndpointIntoAttributes(rep, endpointMap, isRemoteEndpoint)
}

if endpointMap != nil {
if pbs.Attributes == nil {
pbs.Attributes = &tracepb.Span_Attributes{}
}
if pbs.Attributes.AttributeMap == nil {
pbs.Attributes.AttributeMap = make(
map[string]*tracepb.AttributeValue, len(endpointMap))
}

// Delete the redundant serviceName key since it is already on the node.
delete(endpointMap, zipkin.LocalEndpointServiceName)
attrbMap := pbs.Attributes.AttributeMap
for key, value := range endpointMap {
attrbMap[key] = &tracepb.AttributeValue{
Value: &tracepb.AttributeValue_StringValue{
StringValue: &tracepb.TruncatableString{Value: value},
},
}
}
}

return node
}

type zipkinDirection bool

const (
isLocalEndpoint zipkinDirection = true
isRemoteEndpoint zipkinDirection = false
)

var blankIP net.IP

// zipkinEndpointIntoAttributes extracts information from s zipkin endpoint struct
// and puts it into a map with pre-defined keys.
func zipkinEndpointIntoAttributes(
ep *zipkinmodel.Endpoint,
into map[string]string,
endpointType zipkinDirection,
) map[string]string {

if into == nil {
into = make(map[string]string)
}

var ipv4Key, ipv6Key, portKey, serviceNameKey string
if endpointType == isLocalEndpoint {
ipv4Key, ipv6Key = zipkin.LocalEndpointIPv4, zipkin.LocalEndpointIPv6
portKey, serviceNameKey = zipkin.LocalEndpointPort, zipkin.LocalEndpointServiceName
} else {
ipv4Key, ipv6Key = zipkin.RemoteEndpointIPv4, zipkin.RemoteEndpointIPv6
portKey, serviceNameKey = zipkin.RemoteEndpointPort, zipkin.RemoteEndpointServiceName
}
if ep.IPv4 != nil && !ep.IPv4.Equal(blankIP) {
into[ipv4Key] = ep.IPv4.String()
}
if ep.IPv6 != nil && !ep.IPv6.Equal(blankIP) {
into[ipv6Key] = ep.IPv6.String()
}
if ep.Port > 0 {
into[portKey] = strconv.Itoa(int(ep.Port))
}
if serviceName := ep.ServiceName; serviceName != "" {
into[serviceNameKey] = serviceName
}
return into
}

const statusCodeUnknown = 2

func extractProtoStatus(zs *zipkinmodel.SpanModel) *tracepb.Status {
// The status is stored with the "error" key
// See https://github.com/census-instrumentation/opencensus-go/blob/1eb9a13c7dd02141e065a665f6bf5c99a090a16a/exporter/zipkin/zipkin.go#L160-L165
if zs == nil || len(zs.Tags) == 0 {
return nil
}
canonicalCodeStr := zs.Tags["error"]
message := zs.Tags["opencensus.status_description"]
if message == "" && canonicalCodeStr == "" {
return nil
}
code, set := canonicalCodesMap[canonicalCodeStr]
if !set {
// If not status code was set, then we should use UNKNOWN
code = statusCodeUnknown
}
return &tracepb.Status{
Message: message,
Code: code,
}
}

var canonicalCodesMap = map[string]int32{
// https://github.com/googleapis/googleapis/blob/bee79fbe03254a35db125dc6d2f1e9b752b390fe/google/rpc/code.proto#L33-L186
"OK": 0,
"CANCELLED": 1,
"UNKNOWN": 2,
"INVALID_ARGUMENT": 3,
"DEADLINE_EXCEEDED": 4,
"NOT_FOUND": 5,
"ALREADY_EXISTS": 6,
"PERMISSION_DENIED": 7,
"RESOURCE_EXHAUSTED": 8,
"FAILED_PRECONDITION": 9,
"ABORTED": 10,
"OUT_OF_RANGE": 11,
"UNIMPLEMENTED": 12,
"INTERNAL": 13,
"UNAVAILABLE": 14,
"DATA_LOSS": 15,
"UNAUTHENTICATED": 16,
}

func zipkinSpanKindToProtoSpanKind(skind zipkinmodel.Kind) tracepb.Span_SpanKind {
switch strings.ToUpper(string(skind)) {
case "CLIENT":
return tracepb.Span_CLIENT
case "SERVER":
return tracepb.Span_SERVER
default:
return tracepb.Span_SPAN_KIND_UNSPECIFIED
}
}

func zipkinAnnotationsToProtoTimeEvents(zas []zipkinmodel.Annotation) *tracepb.Span_TimeEvents {
if len(zas) == 0 {
return nil
}
tevs := make([]*tracepb.Span_TimeEvent, 0, len(zas))
for _, za := range zas {
if tev := zipkinAnnotationToProtoAnnotation(za); tev != nil {
tevs = append(tevs, tev)
}
}
if len(tevs) == 0 {
return nil
}
return &tracepb.Span_TimeEvents{
TimeEvent: tevs,
}
}

var blankAnnotation zipkinmodel.Annotation

func zipkinAnnotationToProtoAnnotation(zas zipkinmodel.Annotation) *tracepb.Span_TimeEvent {
if zas == blankAnnotation {
return nil
}
return &tracepb.Span_TimeEvent{
Time: internal.TimeToTimestamp(zas.Timestamp),
Value: &tracepb.Span_TimeEvent_Annotation_{
Annotation: &tracepb.Span_TimeEvent_Annotation{
Description: &tracepb.TruncatableString{Value: zas.Value},
},
},
}
}

func zipkinTagsToTraceAttributes(tags map[string]string, skind zipkinmodel.Kind) *tracepb.Span_Attributes {
// Produce and Consumer span kinds are not representable in OpenCensus format.
// We will represent them using TagSpanKind attribute, according to OpenTracing
// conventions. Check if it is one of those span kinds.
var spanKindTagVal tracetranslator.OpenTracingSpanKind
switch skind {
case zipkinmodel.Producer:
spanKindTagVal = tracetranslator.OpenTracingSpanKindProducer
case zipkinmodel.Consumer:
spanKindTagVal = tracetranslator.OpenTracingSpanKindConsumer
}

if len(tags) == 0 && spanKindTagVal == "" {
// No input tags and no need to add a span kind tag. Keep attributes map empty.
return nil
}

amap := make(map[string]*tracepb.AttributeValue, len(tags))
for key, value := range tags {
// We did a translation from "boolean" to "string"
// in OpenCensus-Go's Zipkin exporter as per
// https://github.com/census-instrumentation/opencensus-go/blob/1eb9a13c7dd02141e065a665f6bf5c99a090a16a/exporter/zipkin/zipkin.go#L138-L155
switch value {
case "true", "false":
amap[key] = &tracepb.AttributeValue{
Value: &tracepb.AttributeValue_BoolValue{BoolValue: value == "true"},
}
default:
amap[key] = &tracepb.AttributeValue{
Value: &tracepb.AttributeValue_StringValue{
StringValue: &tracepb.TruncatableString{Value: value},
},
}
}

}

if spanKindTagVal != "" {
// Set the previously translated span kind attribute (see top of this function).
// We do this after the "tags" map is translated so that we will overwrite
// the attribute if it exists.
amap[tracetranslator.TagSpanKind] = &tracepb.AttributeValue{
Value: &tracepb.AttributeValue_StringValue{
StringValue: &tracepb.TruncatableString{Value: string(spanKindTagVal)},
},
}
}

return &tracepb.Span_Attributes{AttributeMap: amap}
}

func transportType(r *http.Request) string {
v1 := r.URL != nil && strings.Contains(r.URL.Path, "api/v1/spans")
if v1 {
Expand Down
Loading

0 comments on commit 969a8d4

Please sign in to comment.