From 9bcf6057bb4626c4712158071d5a8a669b182c7c Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Fri, 26 Oct 2018 11:41:00 -0700 Subject: [PATCH] interceptor: add zipkin /v2 interceptor (#111) * interceptor: add zipkin interceptor * Zipkin interceptor multiplexing + running on 9411 * Can receive spans from diverse sources and successfully intercept traces from various serviceNames and IPv* combinations. Verified by examining the data on the Zipkin UI itself by resending to a server running on a different port. * End-to-end serialization tests where we intercept JSON from a wild Zipkin reporter, transform it into trace-proto then to OpenCensus-Go SpanData and then export it with the OpenCensus-Go Zipkin exporter, inspect the JSON and compare it with the final/expected JSON data. Fixes #24 * interceptor/zipkin: add remoteEndpoint to Node.Attributes Zipkin's RemoteEndpoint field is necessary when later on building the dependency graph for Zipkin services. This change ensures that that field is added as an attribute to the Node with keys whose prefix is: "zipkin.remoteEndpoint." e.g. - "zipkin.remoteEndpoint.ipv4" - "zipkin.remoteEndpoint.serviceName" - "zipkin.remoteEndpoint.port" * Fail if Zipkin interceptor and exporter run on same address Fail spectacularly if the Zipkin interceptor and the Zipkin exporter will all be run on the same address. This is because this is a self DOS vector as intercepted spans will be processed by the interceptor, then exported out by the exporter but instead those exported spans will then be sent back to the interceptor and this goes on indefinitely and will just consume memory and resources. To perform this check: a) Compare HostPorts of interceptor vs exporter b) If say "127.0.0.1", "localhost", "" for host and the ports match c) Resolve the IPs for the hosts if any of those steps match return a logical conflict error which will be thrown later. The crash will look something like this ```shell 2018/10/23 18:54:41 Configuration logical error: ZipkinInterceptor address ("127.0.0.1:9411") aka (127.0.0.1 on port 9411) is the same as the interceptor address ("localhost:9411") aka (127.0.0.1 on port 9411) ``` We can now detect any of these clashing configurations: ```yaml interceptors: zipkin: address: "127.0.0.1:9411" exporters: zipkin: endpoint: "http://localhost:9411/api/v2/spans" ``` ```yaml interceptors: zipkin: address: ":9411" exporters: zipkin: endpoint: "http://localhost:9411/api/v2/spans" ``` ```yaml interceptors: zipkin: address: "localhost:9411" exporters: zipkin: endpoint: "http://localhost:9411/api/v2/spans" ``` ```yaml interceptors: zipkin: address: "localhost:9411" exporters: zipkin: endpoint: "http://:9411/api/v2/spans" ``` ```yaml interceptors: zipkin: address: "localhost:9411" exporters: zipkin: endpoint: "http://127.0.0.1:9411/api/v2/spans" ``` and in a case where IP resolution helped, "10.0.0.147" is my local address and I sneakishly tried to get the interceptor on "localhost" which clashes with my local address as per: ```yaml interceptors: zipkin: address: "localhost:9411" exporters: zipkin: endpoint: "http://10.0.0.147:9411/api/v2/spans" ``` we caught it ```shell 2018/10/23 19:01:01 Configuration logical error: ZipkinInterceptor address ("10.0.0.147:9411") aka (10.0.0.147 on port 9411) is the same as the interceptor address ("localhost:9411") aka (10.0.0.147 on port 9411) ``` * Go fmt with Go1.10 to match TravisCI Go1.11's gofmt seems to be more conservating with aligning literal assignments while Go1.10 seems more extravagant. This subtle difference was causing TravisCI failures since TravisCI uses Go1.10. * interceptor/zipkin: handle compressed HTTP bodies Some clients such as Zipkin-Java send HTTP bodies with JSON that's been compressed as "gzip". This change handles "Content-Encoding" values of: * "gzip" * "deflate", "zlib" and decompresses them accordingly * interceptor/zipkin: tests use anagram signature A test that checked for raffled output before used an "xorChecksum" but that's not really always unique. The better way is to use an anagram counter, then create a signature from the serialized map of counted runes. * interceptor/zipkin: address review feedback * Package rename to "zipkinterceptor" --> "zipkininterceptor" * Fix error names * Address feedback from review * move "zipkinRoute" const to the top to make it easily discoverable and not be buried deep in code * set status of zipkin interceptor if we encounter an error while parsing spans. We set the span status to: StatusInvalidArgument and the error message * *zipkin: address feedback from review * Add a comment ot exporter/exporterparse.zipkinExporter that the mutex "mu" protects all the constituent fields * Fix and ensure that node uniqueness is clear, with the proper check from a map lookup but also rename the variable to store nodes to "uniqueNodes" to make for better code readability * README.md + default config.yaml: update interceptors docs and information Added a section in the README.md on the various interceptors and how to change the Zipkin interceptor address * sync.Mutex instead of sync.RWMutex for zipkinExporter.mu --- README.md | 32 +- cmd/ocagent/config.go | 98 ++++- cmd/ocagent/config.yaml | 3 + cmd/ocagent/main.go | 46 ++- exporter/exporterparser/zipkin.go | 145 ++++++- interceptor/opencensus/opencensus.go | 10 +- interceptor/zipkin/testdata/sample1.json | 288 +++++++++++++ interceptor/zipkin/trace_interceptor.go | 398 ++++++++++++++++++ interceptor/zipkin/trace_interceptor_test.go | 410 +++++++++++++++++++ internal/observability.go | 18 + 10 files changed, 1411 insertions(+), 37 deletions(-) create mode 100644 interceptor/zipkin/testdata/sample1.json create mode 100644 interceptor/zipkin/trace_interceptor.go create mode 100644 interceptor/zipkin/trace_interceptor_test.go diff --git a/README.md b/README.md index 1fecb54c4b18..0691dd69bbcd 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,8 @@ - [Configuration file](#agent-configuration-file) - [Exporters](#agent-config-exporters) - [Interceptors](#agent-config-interceptors) + - [OpenCensus](#details-interceptors-opencensus) + - [Zipkin](#details-interceptors-zipkin) - [End-to-end example](#agent-config-end-to-end-example) - [Docker image](#agent-docker-image) - [OpenCensus Collector](#opencensus-collector) @@ -271,14 +273,36 @@ exporters: ``` #### Interceptors +As previously mentioned, the agent provides a couple of interceptors + +#### OpenCensus + +This interceptor receives spans from OpenCensus instrumented applications and translates them into the internal span types that +are then sent to the collector/exporters. + +Its address can be configured in the YAML configuration file under section "intercpetors", subsection "opencensus" and field "address". -To modify the address that the OpenCensus interceptor runs on, please use the -YAML field name `opencensus_interceptor` and it takes fields like `address`. For example: +```yaml +interceptors: + opencensus: + address: "127.0.0.1:55678" +``` + +By default this interceptor is ALWAYS started since it is the point of the "OpenCensus agent" +#### Zipkin + +This interceptor receives spans from Zipkin "/v2" API HTTP uploads and translates them into the internal span types that are then +sent to the collector/exporters. + +Its address can be configured in the YAML configuration file under section "intercpetors", subsection "zipkin" and field "address". + +For example: ```yaml -opencensus_interceptor: - address: "localhost:55678" +interceptors: + zipkin: + address: "localhost:9411" ``` ### Running an end-to-end example/demo diff --git a/cmd/ocagent/config.go b/cmd/ocagent/config.go index 41d5b7a9bd7b..c21e2eea2ec0 100644 --- a/cmd/ocagent/config.go +++ b/cmd/ocagent/config.go @@ -15,7 +15,12 @@ package main import ( + "fmt" "log" + "net" + "net/url" + "reflect" + "strings" yaml "gopkg.in/yaml.v2" @@ -52,7 +57,8 @@ type config struct { } type interceptors struct { - OpenCensusInterceptor *interceptorConfig `yaml:"opencensus"` + OpenCensus *interceptorConfig `yaml:"opencensus"` + Zipkin *interceptorConfig `yaml:"zipkin"` } type interceptorConfig struct { @@ -70,10 +76,10 @@ func (c *config) ocInterceptorAddress() string { return defaultOCInterceptorAddress } inCfg := c.Interceptors - if inCfg.OpenCensusInterceptor == nil || inCfg.OpenCensusInterceptor.Address == "" { + if inCfg.OpenCensus == nil || inCfg.OpenCensus.Address == "" { return defaultOCInterceptorAddress } - return inCfg.OpenCensusInterceptor.Address + return inCfg.OpenCensus.Address } func (c *config) zPagesDisabled() bool { @@ -94,6 +100,24 @@ func (c *config) zPagesPort() (int, bool) { return port, true } +func (c *config) zipkinInterceptorEnabled() bool { + if c == nil { + return false + } + return c.Interceptors != nil && c.Interceptors.Zipkin != nil +} + +func (c *config) zipkinInterceptorAddress() string { + if c == nil || c.Interceptors == nil { + return exporterparser.DefaultZipkinEndpointHostPort + } + inCfg := c.Interceptors + if inCfg.Zipkin == nil || inCfg.Zipkin.Address == "" { + return exporterparser.DefaultZipkinEndpointHostPort + } + return inCfg.Zipkin.Address +} + func parseOCAgentConfig(yamlBlob []byte) (*config, error) { var cfg config if err := yaml.Unmarshal(yamlBlob, &cfg); err != nil { @@ -102,7 +126,72 @@ func parseOCAgentConfig(yamlBlob []byte) (*config, error) { return &cfg, nil } -type exporterParser func(yamlConfig []byte) (te []exporter.TraceExporter, err error) +// The goal of this function is to catch logical errors such as +// if the Zipkin interceptor port conflicts with that of the exporter +// lest we'll have a self DOS because spans will be exported "out" from +// the exporter, yet be received from the interceptor, then sent back out +// and back in a never ending loop. +func (c *config) checkLogicalConflicts(blob []byte) error { + var cfg struct { + Exporters *struct { + Zipkin *exporterparser.ZipkinConfig `yaml:"zipkin"` + } `yaml:"exporters"` + } + if err := yaml.Unmarshal(blob, &cfg); err != nil { + return err + } + + if cfg.Exporters == nil || cfg.Exporters.Zipkin == nil { + return nil + } + + zc := cfg.Exporters.Zipkin + + zExporterAddr := zc.EndpointURL() + zExporterURL, err := url.Parse(zExporterAddr) + if err != nil { + return fmt.Errorf("parsing ZipkinExporter address %q got error: %v", zExporterAddr, err) + } + + zInterceptorHostPort := c.zipkinInterceptorAddress() + + zExporterHostPort := zExporterURL.Host + if zInterceptorHostPort == zExporterHostPort { + return fmt.Errorf("ZipkinExporter address (%q) is the same as the interceptor address (%q)", + zExporterHostPort, zInterceptorHostPort) + } + zExpHost, zExpPort, _ := net.SplitHostPort(zExporterHostPort) + zInterceptorHost, zInterceptorPort, _ := net.SplitHostPort(zExporterHostPort) + if eqHosts(zExpHost, zInterceptorHost) && zExpPort == zInterceptorPort { + return fmt.Errorf("ZipkinExporter address (%q) aka (%s on port %s)\nis the same as the interceptor address (%q) aka (%s on port %s)", + zExporterHostPort, zExpHost, zExpPort, zInterceptorHostPort, zInterceptorHost, zInterceptorPort) + } + + // Otherwise, now let's resolve the IPs and ensure that they aren't the same + zExpIPAddr, _ := net.ResolveIPAddr("ip", zExpHost) + zInterceptorIPAddr, _ := net.ResolveIPAddr("ip", zInterceptorHost) + if zExpIPAddr != nil && zInterceptorIPAddr != nil && reflect.DeepEqual(zExpIPAddr, zInterceptorIPAddr) { + return fmt.Errorf("ZipkinExporter address (%q) aka (%+v)\nis the same as the\ninterceptor address (%q) aka (%+v)", + zExporterHostPort, zExpIPAddr, zInterceptorHostPort, zInterceptorIPAddr) + } + return nil +} + +func eqHosts(host1, host2 string) bool { + if host1 == host2 { + return true + } + return eqLocalHost(host1) && eqLocalHost(host2) +} + +func eqLocalHost(host string) bool { + switch strings.ToLower(host) { + case "", "localhost", "127.0.0.1": + return true + default: + return false + } +} // exportersFromYAMLConfig parses the config yaml payload and returns the respective exporters func exportersFromYAMLConfig(config []byte) (traceExporters []exporter.TraceExporter, doneFns []func() error) { @@ -128,6 +217,7 @@ func exportersFromYAMLConfig(config []byte) (traceExporters []exporter.TraceExpo nonNilExporters += 1 } } + if nonNilExporters > 0 { pluralization := "exporter" if nonNilExporters > 1 { diff --git a/cmd/ocagent/config.yaml b/cmd/ocagent/config.yaml index a935c48669e1..336c7cd2aee5 100644 --- a/cmd/ocagent/config.yaml +++ b/cmd/ocagent/config.yaml @@ -2,6 +2,9 @@ interceptors: opencensus: address: "127.0.0.1:55678" + zipkin: + address: "localhost:9411" + exporters: stackdriver: project: "project-id" diff --git a/cmd/ocagent/main.go b/cmd/ocagent/main.go index 6e6dc48706b3..f45aaf351728 100644 --- a/cmd/ocagent/main.go +++ b/cmd/ocagent/main.go @@ -29,6 +29,7 @@ import ( agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" "github.com/census-instrumentation/opencensus-service/exporter" "github.com/census-instrumentation/opencensus-service/interceptor/opencensus" + "github.com/census-instrumentation/opencensus-service/interceptor/zipkin" "github.com/census-instrumentation/opencensus-service/internal" "github.com/census-instrumentation/opencensus-service/spanreceiver" "go.opencensus.io/plugin/ocgrpc" @@ -39,6 +40,8 @@ import ( var configYAMLFile string var ocInterceptorPort int +const zipkinRoute = "/api/v2/spans" + func main() { if err := rootCmd.Execute(); err != nil { log.Fatal(err) @@ -54,6 +57,14 @@ func runOCAgent() { if err != nil { log.Fatalf("Failed to parse own configuration %v error: %v", configYAMLFile, err) } + + // Ensure that we check and catch any logical errors with the + // configuration e.g. if an interceptor shares the same address + // as an exporter which would cause a self DOS and waste resources. + if err := agentConfig.checkLogicalConflicts(yamlBlob); err != nil { + log.Fatalf("Configuration logical error: %v", err) + } + ocInterceptorAddr := agentConfig.ocInterceptorAddress() traceExporters, closeFns := exportersFromYAMLConfig(yamlBlob) @@ -64,7 +75,6 @@ func runOCAgent() { if err != nil { log.Fatal(err) } - closeFns = append(closeFns, ocInterceptorDoneFn) // If zPages are enabled, run them @@ -74,6 +84,16 @@ func runOCAgent() { closeFns = append(closeFns, zCloseFn) } + // If the Zipkin interceptor is enabled, then run it + if agentConfig.zipkinInterceptorEnabled() { + zipkinInterceptorAddr := agentConfig.zipkinInterceptorAddress() + zipkinInterceptorDoneFn, err := runZipkinInterceptor(zipkinInterceptorAddr, commonSpanReceiver) + if err != nil { + log.Fatal(err) + } + closeFns = append(closeFns, zipkinInterceptorDoneFn) + } + // Always cleanup finally defer func() { for _, closeFn := range closeFns { @@ -141,3 +161,27 @@ func runOCInterceptor(addr string, sr spanreceiver.SpanReceiver) (doneFn func() doneFn = ln.Close return doneFn, nil } + +func runZipkinInterceptor(addr string, sr spanreceiver.SpanReceiver) (doneFn func() error, err error) { + zi, err := zipkininterceptor.New(sr) + if err != nil { + return nil, fmt.Errorf("Failed to create the Zipkin interceptor: %v", err) + } + + ln, err := net.Listen("tcp", addr) + if err != nil { + return nil, fmt.Errorf("Cannot bind Zipkin interceptor to address %q: %v", addr, err) + } + mux := http.NewServeMux() + mux.Handle(zipkinRoute, zi) + go func() { + fullAddr := addr + zipkinRoute + log.Printf("Running the Zipkin interceptor at %q", fullAddr) + if err := http.Serve(ln, mux); err != nil { + log.Fatalf("Failed to serve the Zipkin interceptor: %v", err) + } + }() + + doneFn = ln.Close + return doneFn, nil +} diff --git a/exporter/exporterparser/zipkin.go b/exporter/exporterparser/zipkin.go index 9a68844df819..0fdc023d1414 100644 --- a/exporter/exporterparser/zipkin.go +++ b/exporter/exporterparser/zipkin.go @@ -16,7 +16,8 @@ package exporterparser import ( "context" - "log" + "fmt" + "sync" openzipkin "github.com/openzipkin/zipkin-go" "github.com/openzipkin/zipkin-go/reporter/http" @@ -27,20 +28,52 @@ import ( "github.com/census-instrumentation/opencensus-service/exporter" ) -type zipkinConfig struct { +type ZipkinConfig struct { ServiceName string `yaml:"service_name,omitempty"` Endpoint string `yaml:"endpoint,omitempty"` LocalEndpointURI string `yaml:"local_endpoint,omitempty"` } +// zipkinExporter is a multiplexing exporter that spawns a new OpenCensus-Go Zipkin +// exporter per unique node encountered. This is because serviceNames per node define +// unique services, alongside their IPs. Also it is useful to intercept traffic from +// Zipkin servers and then transform them back to the final form when creating an +// OpenCensus spandata. type zipkinExporter struct { - exporter *zipkin.Exporter + // mu protects the fields below + mu sync.Mutex + + defaultExporter *zipkin.Exporter + defaultServiceName string + defaultLocalEndpointURI string + + endpointURI string + + // The goal of this map is to multiplex and route + // different serviceNames with various exporters + serviceNameToExporter map[string]*zipkin.Exporter + + shutdownFns []func() error +} + +const ( + DefaultZipkinEndpointHostPort = "localhost:9411" + DefaultZipkinEndpointURL = "http://" + DefaultZipkinEndpointHostPort + "/api/v2/spans" +) + +func (zc *ZipkinConfig) EndpointURL() string { + // If no endpoint was set, use the default Zipkin reporter URI. + endpoint := DefaultZipkinEndpointURL + if zc != nil && zc.Endpoint != "" { + endpoint = zc.Endpoint + } + return endpoint } func ZipkinExportersFromYAML(config []byte) (tes []exporter.TraceExporter, doneFns []func() error, err error) { var cfg struct { Exporters *struct { - Zipkin *zipkinConfig `yaml:"zipkin"` + Zipkin *ZipkinConfig `yaml:"zipkin"` } `yaml:"exporters"` } if err := yamlUnmarshal(config, &cfg); err != nil { @@ -54,10 +87,7 @@ func ZipkinExportersFromYAML(config []byte) (tes []exporter.TraceExporter, doneF if zc == nil { return nil, nil, nil } - endpoint := "http://localhost:9411/api/v2/spans" - if zc.Endpoint != "" { - endpoint = zc.Endpoint - } + serviceName := "" if zc.ServiceName != "" { serviceName = zc.ServiceName @@ -66,22 +96,99 @@ func ZipkinExportersFromYAML(config []byte) (tes []exporter.TraceExporter, doneF if zc.LocalEndpointURI != "" { localEndpointURI = zc.LocalEndpointURI } - // TODO(jbd): Propagate hostport and more metadata from each node. + endpoint := zc.EndpointURL() + zle, err := newZipkinExporter(endpoint, serviceName, localEndpointURI) + if err != nil { + return nil, nil, fmt.Errorf("Cannot configure Zipkin exporter: %v", err) + } + tes = append(tes, zle) + doneFns = append(doneFns, zle.stop) + return +} + +func newZipkinExporter(finalEndpointURI, defaultServiceName, defaultLocalEndpointURI string) (*zipkinExporter, error) { + localEndpoint, err := openzipkin.NewEndpoint(defaultServiceName, defaultLocalEndpointURI) + if err != nil { + return nil, err + } + reporter := http.NewReporter(finalEndpointURI) + defaultExporter := zipkin.NewExporter(reporter, localEndpoint) + zle := &zipkinExporter{ + endpointURI: finalEndpointURI, + defaultExporter: defaultExporter, + defaultServiceName: defaultServiceName, + defaultLocalEndpointURI: defaultLocalEndpointURI, + serviceNameToExporter: make(map[string]*zipkin.Exporter), + } + + // Ensure that we add the default reporter's Close functions + zle.shutdownFns = append(zle.shutdownFns, reporter.Close) + + return zle, nil +} + +// exporterForNode firstly tries to find a memoize OpenCensus-Go Zipkin exporter +// appropriate for this node. If it doesn't find any, it returns the default exporter +// that was created at start-time. +func (ze *zipkinExporter) exporterForNode(node *commonpb.Node) *zipkin.Exporter { + ze.mu.Lock() + defer ze.mu.Unlock() + + if node == nil { + return ze.defaultExporter + } + + serviceName := node.ServiceInfo.GetName() + if serviceName == "" { + serviceName = ze.defaultServiceName + } + + // Make the unique key for this local endpoint/node: "serviceName" + "ipv4" + "ipv6" + "port" + key := serviceName + node.Attributes["ipv4"] + node.Attributes["ipv6"] + node.Attributes["port"] + if key == "" { + return ze.defaultExporter + } + + // Try looking up if we already created the exporter. + if exp, ok := ze.serviceNameToExporter[key]; ok && exp != nil { + return exp + } + + localEndpointURI := ze.defaultLocalEndpointURI + if ipv4 := node.Attributes["ipv4"]; ipv4 != "" { + localEndpointURI = ipv4 + } else if ipv6 := node.Attributes["ipv6"]; ipv6 != "" { + localEndpointURI = ipv6 + } + + // Otherwise freshly create that Zipkin Exporter. localEndpoint, err := openzipkin.NewEndpoint(serviceName, localEndpointURI) if err != nil { - log.Fatalf("Cannot configure Zipkin exporter: %v", err) + return ze.defaultExporter } - reporter := http.NewReporter(endpoint) - ze := zipkin.NewExporter(reporter, localEndpoint) - tes = append(tes, &zipkinExporter{exporter: ze}) - doneFns = append(doneFns, reporter.Close) - return + reporter := http.NewReporter(ze.endpointURI) + exporter := zipkin.NewExporter(reporter, localEndpoint) + + // Now memoize the created exporter for later use. + ze.serviceNameToExporter[key] = exporter + ze.shutdownFns = append(ze.shutdownFns, reporter.Close) + + return exporter +} + +func (ze *zipkinExporter) stop() error { + ze.mu.Lock() + defer ze.mu.Unlock() + + for _, shutdownFn := range ze.shutdownFns { + _ = shutdownFn() + } + + return nil } func (ze *zipkinExporter) ExportSpanData(ctx context.Context, node *commonpb.Node, spandata ...*trace.SpanData) error { - // TODO: Examine "contrib.go.opencensus.io/exporter/zipkin" to see - // if trace.ExportSpan was constraining and if perhaps the Zipkin - // upload can use the context and information from the Node. - return exportSpans(ctx, node, "zipkin", ze.exporter, spandata) + exporter := ze.exporterForNode(node) + return exportSpans(ctx, node, "zipkin", exporter, spandata) } diff --git a/interceptor/opencensus/opencensus.go b/interceptor/opencensus/opencensus.go index fa773340cc67..eb3239cf0e23 100644 --- a/interceptor/opencensus/opencensus.go +++ b/interceptor/opencensus/opencensus.go @@ -144,15 +144,7 @@ func (oci *Interceptor) batchSpanExporting(longLivedRPCCtx context.Context, payl // spansAndNode list unfurling then send spans grouped per node // If the starting RPC has a parent span, then add it as a parent link. - parentSpanFromRPC := trace.FromContext(longLivedRPCCtx) - if parentSpanFromRPC != nil { - psc := parentSpanFromRPC.SpanContext() - span.AddLink(trace.Link{ - SpanID: psc.SpanID, - TraceID: psc.TraceID, - Type: trace.LinkTypeParent, - }) - } + internal.SetParentLink(longLivedRPCCtx, span) nSpans := int64(0) for _, spn := range spnL { diff --git a/interceptor/zipkin/testdata/sample1.json b/interceptor/zipkin/testdata/sample1.json new file mode 100644 index 000000000000..5339b471566b --- /dev/null +++ b/interceptor/zipkin/testdata/sample1.json @@ -0,0 +1,288 @@ +[{ + "traceId": "4d1e00c0db9010db86154a4ba6e91385", + "parentId": "86154a4ba6e91385", + "id": "4d1e00c0db9010db", + "kind": "CLIENT", + "name": "get", + "timestamp": 1472470996199000, + "duration": 207000, + "localEndpoint": { + "serviceName": "frontend", + "ipv6": "7::0.128.128.127" + }, + "remoteEndpoint": { + "serviceName": "backend", + "ipv4": "192.168.99.101", + "port": 9000 + }, + "annotations": [ + { + "timestamp": 1472470996238000, + "value": "foo" + }, + { + "timestamp": 1472470996403000, + "value": "bar" + } + ], + "tags": { + "http.path": "/api", + "clnt/finagle.version": "6.45.0" + } +}, +{ + "traceId": "4d1e00c0db9010db86154a4ba6e91385", + "parentId": "86154a4ba6e91385", + "id": "4d1e00c0db9010db", + "kind": "CLIENT", + "name": "get", + "timestamp": 1472470996199000, + "duration": 207000, + "localEndpoint": { + "serviceName": "frontend", + "ipv6": "7::0.128.128.127" + }, + "remoteEndpoint": { + "serviceName": "backend", + "ipv4": "192.168.99.101", + "port": 9000 + }, + "annotations": [ + { + "timestamp": 1472470996238000, + "value": "foo" + }, + { + "timestamp": 1472470996403000, + "value": "bar" + } + ], + "tags": { + "http.path": "/api", + "clnt/finagle.version": "6.45.0" + } +}, +{ + "traceId": "4d1e00c0db9010db86154a4ba6e91385", + "parentId": "86154a4ba6e91385", + "id": "4d1e00c0db9010db", + "kind": "CLIENT", + "name": "get", + "timestamp": 1472470996199000, + "duration": 207000, + "localEndpoint": { + "serviceName": "frontend", + "ipv6": "7::0.128.128.127" + }, + "remoteEndpoint": { + "serviceName": "backend", + "ipv4": "192.168.99.101", + "port": 9000 + }, + "annotations": [ + { + "timestamp": 1472470996238000, + "value": "foo" + }, + { + "timestamp": 1472470996403000, + "value": "bar" + } + ], + "tags": { + "http.path": "/api", + "clnt/finagle.version": "6.45.0" + } +}, +{ + "traceId": "4d1e00c0db9010db86154a4ba6e91385", + "parentId": "86154a4ba6e91385", + "id": "4d1e00c0db9010db", + "kind": "CLIENT", + "name": "get", + "timestamp": 1472470996199000, + "duration": 207000, + "localEndpoint": { + "serviceName": "frontend", + "ipv6": "7::0.128.128.127" + }, + "remoteEndpoint": { + "serviceName": "backend", + "ipv4": "192.168.99.101", + "port": 9000 + }, + "annotations": [ + { + "timestamp": 1472470996238000, + "value": "foo" + }, + { + "timestamp": 1472470996403000, + "value": "bar" + } + ], + "tags": { + "http.path": "/api", + "clnt/finagle.version": "6.45.0" + } +}, +{ + "traceId": "4d1e00c0db9010db86154a4ba6e91385", + "parentId": "86154a4ba6e91385", + "id": "4d1e00c0db9010db", + "kind": "CLIENT", + "name": "get", + "timestamp": 1472470996199000, + "duration": 207000, + "localEndpoint": { + "serviceName": "frontend", + "ipv6": "7::0.128.128.127" + }, + "remoteEndpoint": { + "serviceName": "backend", + "ipv4": "192.168.99.101", + "port": 9000 + }, + "annotations": [ + { + "timestamp": 1472470996238000, + "value": "foo" + }, + { + "timestamp": 1472470996403000, + "value": "bar" + } + ], + "tags": { + "http.path": "/api", + "clnt/finagle.version": "6.45.0" + } +}, +{ + "traceId": "4d1e00c0db9010db86154a4ba6e91385", + "parentId": "86154a4ba6e91385", + "id": "4d1e00c0db9010db", + "kind": "CLIENT", + "name": "get", + "timestamp": 1472470996199000, + "duration": 207000, + "localEndpoint": { + "serviceName": "frontend", + "ipv6": "7::0.128.128.127" + }, + "remoteEndpoint": { + "serviceName": "backend", + "ipv4": "192.168.99.101", + "port": 9000 + }, + "annotations": [ + { + "timestamp": 1472470996238000, + "value": "foo" + }, + { + "timestamp": 1472470996403000, + "value": "bar" + } + ], + "tags": { + "http.path": "/api", + "clnt/finagle.version": "6.45.0" + } +}, +{ + "traceId": "4d1e00c0db9010db86154a4ba6e91385", + "parentId": "86154a4ba6e91385", + "id": "4d1e00c0db9010db", + "kind": "CLIENT", + "name": "get", + "timestamp": 1472470996199000, + "duration": 207000, + "localEndpoint": { + "serviceName": "frontend", + "ipv6": "7::0.128.128.127" + }, + "remoteEndpoint": { + "serviceName": "backend", + "ipv4": "192.168.99.101", + "port": 9000 + }, + "annotations": [ + { + "timestamp": 1472470996238000, + "value": "foo" + }, + { + "timestamp": 1472470996403000, + "value": "bar" + } + ], + "tags": { + "http.path": "/api", + "clnt/finagle.version": "6.45.0" + } +}, +{ + "traceId": "4d1e00c0db9010db86154a4ba6e91385", + "parentId": "86154a4ba6e91385", + "id": "4d1e00c0db9010db", + "kind": "CLIENT", + "name": "get", + "timestamp": 1472470996199000, + "duration": 207000, + "localEndpoint": { + "serviceName": "frontend", + "ipv6": "7::0.128.128.127" + }, + "remoteEndpoint": { + "serviceName": "backend", + "ipv4": "192.168.99.101", + "port": 9000 + }, + "annotations": [ + { + "timestamp": 1472470996238000, + "value": "foo" + }, + { + "timestamp": 1472470996403000, + "value": "bar" + } + ], + "tags": { + "http.path": "/api", + "clnt/finagle.version": "6.45.0" + } +}, +{ + "traceId": "4d1e00c0db9010db86154a4ba6e91385", + "parentId": "86154a4ba6e91385", + "id": "4d1e00c0db9010db", + "kind": "CLIENT", + "name": "get", + "timestamp": 1472470996199000, + "duration": 207000, + "localEndpoint": { + "serviceName": "frontend", + "ipv6": "7::0.128.128.127" + }, + "remoteEndpoint": { + "serviceName": "backend", + "ipv4": "192.168.99.101", + "port": 9000 + }, + "annotations": [ + { + "timestamp": 1472470996238000, + "value": "foo" + }, + { + "timestamp": 1472470996403000, + "value": "bar" + } + ], + "tags": { + "http.path": "/api", + "clnt/finagle.version": "6.45.0" + } +}] diff --git a/interceptor/zipkin/trace_interceptor.go b/interceptor/zipkin/trace_interceptor.go new file mode 100644 index 000000000000..a63f9d2e4ee6 --- /dev/null +++ b/interceptor/zipkin/trace_interceptor.go @@ -0,0 +1,398 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zipkininterceptor + +import ( + "compress/gzip" + "compress/zlib" + "context" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "strings" + + "go.opencensus.io/trace" + + zipkinmodel "github.com/openzipkin/zipkin-go/model" + + commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" + "github.com/census-instrumentation/opencensus-service/interceptor" + "github.com/census-instrumentation/opencensus-service/internal" + "github.com/census-instrumentation/opencensus-service/spanreceiver" +) + +type ZipkinInterceptor struct { + spanSink spanreceiver.SpanReceiver +} + +var _ interceptor.TraceInterceptor = (*ZipkinInterceptor)(nil) +var _ http.Handler = (*ZipkinInterceptor)(nil) + +func New(sr spanreceiver.SpanReceiver) (*ZipkinInterceptor, error) { + return &ZipkinInterceptor{spanSink: sr}, nil +} + +func (zi *ZipkinInterceptor) StartTraceInterception(ctx context.Context, spanSink spanreceiver.SpanReceiver) error { + zi.spanSink = spanSink + return nil +} + +func (zi *ZipkinInterceptor) parseAndConvertToTraceSpans(jsonBlob []byte) (reqs []*agenttracepb.ExportTraceServiceRequest, err error) { + var zipkinSpans []*zipkinmodel.SpanModel + if err = json.Unmarshal(jsonBlob, &zipkinSpans); err != nil { + return nil, err + } + + // *commonpb.Node instances have unique addresses hence + // for grouping within a map, we'll use the .String() value + byNodeGrouping := make(map[string][]*tracepb.Span) + uniqueNodes := make([]*commonpb.Node, 0, len(zipkinSpans)) + // Now translate them into tracepb.Span + for _, zspan := range zipkinSpans { + span, node, err := zipkinSpanToTraceSpan(zspan) + // TODO:(@odeke-em) record errors + if err == nil && span != nil { + key := node.String() + if _, alreadyAdded := byNodeGrouping[key]; !alreadyAdded { + uniqueNodes = append(uniqueNodes, node) + } + byNodeGrouping[key] = append(byNodeGrouping[key], span) + } + } + + for _, node := range uniqueNodes { + key := node.String() + spans := byNodeGrouping[key] + if len(spans) == 0 { + // Should never happen but nonetheless be cautious + // not to send blank spans. + continue + } + reqs = append(reqs, &agenttracepb.ExportTraceServiceRequest{ + Node: node, + Spans: spans, + }) + delete(byNodeGrouping, key) + } + + return reqs, nil +} + +func (zi *ZipkinInterceptor) StopTraceInterception(ctx context.Context) error { + return nil +} + +// processBodyIfNecessary checks the "Content-Encoding" HTTP header and if +// a compression such as "gzip", "deflate", "zlib", is found, the body will +// be uncompressed accordingly or return the body untouched if otherwise. +// Clients such as Zipkin-Java do this behavior e.g. +// send "Content-Encoding":"gzip" of the JSON content. +func processBodyIfNecessary(req *http.Request) io.Reader { + switch req.Header.Get("Content-Encoding") { + default: + return req.Body + + case "gzip": + return gunzippedBodyIfPossible(req.Body) + + case "deflate", "zlib": + return zlibUncompressedbody(req.Body) + } +} + +func gunzippedBodyIfPossible(r io.Reader) io.Reader { + gzr, err := gzip.NewReader(r) + if err != nil { + // Just return the old body as was + return r + } + return gzr +} + +func zlibUncompressedbody(r io.Reader) io.Reader { + zr, err := zlib.NewReader(r) + if err != nil { + // Just return the old body as was + return r + } + return zr +} + +// The ZipkinInterceptor receives spans from endpoint /api/v2 as JSON, +// unmarshals them and sends them along to the spanreceiver. +func (zi *ZipkinInterceptor) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Trace this method + ctx, span := trace.StartSpan(context.Background(), "ZipkinInterceptor.Export") + defer span.End() + + // If the starting RPC has a parent span, then add it as a parent link. + parentCtx := r.Context() + internal.SetParentLink(parentCtx, span) + + pr := processBodyIfNecessary(r) + slurp, err := ioutil.ReadAll(pr) + if c, ok := pr.(io.Closer); ok { + _ = c.Close() + } + _ = r.Body.Close() + ereqs, err := zi.parseAndConvertToTraceSpans(slurp) + if err != nil { + span.SetStatus(trace.Status{ + Code: trace.StatusCodeInvalidArgument, + Message: err.Error(), + }) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + spansMetricsFn := internal.NewReceivedSpansRecorderStreaming(ctx, "zipkin") + // Now translate them into tracepb.Span + for _, ereq := range ereqs { + zi.spanSink.ReceiveSpans(ctx, ereq.Node, ereq.Spans...) + // We MUST unconditionally record metrics from this reception. + spansMetricsFn(ereq.Node, ereq.Spans) + } + + // Finally send back the response "Accepted" as + // required at https://zipkin.io/zipkin-api/#/default/post_spans + w.WriteHeader(http.StatusAccepted) +} + +var errNilZipkinSpan = errors.New("non-nil Zipkin span expected") + +var blankIP net.IP + +func zipkinSpanToTraceSpan(zs *zipkinmodel.SpanModel) (*tracepb.Span, *commonpb.Node, error) { + if zs == nil { + return nil, nil, errNilZipkinSpan + } + + node := nodeFromZipkinEndpoints(zs) + + traceID, err := hexStrToBytes(zs.TraceID.String()) + if err != nil { + return nil, node, fmt.Errorf("TraceID: %v", err) + } + spanID, err := hexStrToBytes(zs.ID.String()) + if err != nil { + return nil, node, fmt.Errorf("SpanID: %v", err) + } + var parentSpanID []byte + if zs.ParentID != nil { + parentSpanID, err = hexStrToBytes(zs.ParentID.String()) + if err != nil { + return nil, node, fmt.Errorf("ParentSpanID: %v", err) + } + } + + pbs := &tracepb.Span{ + TraceId: traceID, + SpanId: spanID, + ParentSpanId: parentSpanID, + Name: &tracepb.TruncatableString{Value: zs.Name}, + StartTime: internal.TimeToTimestamp(zs.Timestamp), + EndTime: internal.TimeToTimestamp(zs.Timestamp.Add(zs.Duration)), + Kind: zipkinSpanKindToProtoSpanKind(zs.Kind), + Status: extractProtoStatus(zs), + Attributes: zipkinTagsToTraceAttributes(zs.Tags), + TimeEvents: zipkinAnnotationsToProtoTimeEvents(zs.Annotations), + } + + return pbs, node, nil +} + +func nodeFromZipkinEndpoints(zs *zipkinmodel.SpanModel) *commonpb.Node { + if zs.LocalEndpoint == nil && zs.RemoteEndpoint == nil { + return nil + } + + node := new(commonpb.Node) + + // Retrieve and make use of the local endpoint + if lep := zs.LocalEndpoint; lep != nil { + node.ServiceInfo = &commonpb.ServiceInfo{ + Name: lep.ServiceName, + } + node.Attributes = zipkinEndpointIntoAttributes(lep, node.Attributes, func(s string) string { return s }) + } + + // Retrieve and make use of the remote endpoint + if rep := zs.RemoteEndpoint; rep != nil { + // For remoteEndpoint, our goal is to prefix its fields with "zipkin.remoteEndpoint." + // For example becoming: + // { + // "zipkin.remoteEndpoint.ipv4": "192.168.99.101", + // "zipkin.remoteEndpoint.port": "9000" + // "zipkin.remoteEndpoint.serviceName": "backend", + // } + node.Attributes = zipkinEndpointIntoAttributes(rep, node.Attributes, func(s string) string { + return "zipkin.remoteEndpoint." + s + }) + } + return node +} + +func zipkinEndpointIntoAttributes(ep *zipkinmodel.Endpoint, into map[string]string, prefixFunc func(string) string) map[string]string { + if into == nil { + into = make(map[string]string) + } + if !ep.IPv4.Equal(blankIP) { + into[prefixFunc("ipv4")] = ep.IPv4.String() + } + if !ep.IPv6.Equal(blankIP) { + into[prefixFunc("ipv6")] = ep.IPv6.String() + } + if ep.Port > 0 { + into[prefixFunc("port")] = fmt.Sprintf("%d", ep.Port) + } + return into +} + +const statusCodeUnknown = 2 + +func extractProtoStatus(zs *zipkinmodel.SpanModel) *tracepb.Status { + // The status is stored with the "error" key + // See https://github.com/census-instrumentation/opencensus-go/blob/1eb9a13c7dd02141e065a665f6bf5c99a090a16a/exporter/zipkin/zipkin.go#L160-L165 + if zs == nil || len(zs.Tags) == 0 { + return nil + } + canonicalCodeStr := zs.Tags["error"] + message := zs.Tags["opencensus.status_description"] + if message == "" && canonicalCodeStr == "" { + return nil + } + code, set := canonicalCodesMap[canonicalCodeStr] + if !set { + // If not status code was set, then we should use UNKNOWN + code = statusCodeUnknown + } + return &tracepb.Status{ + Message: message, + Code: code, + } +} + +var canonicalCodesMap = map[string]int32{ + // https://github.com/googleapis/googleapis/blob/bee79fbe03254a35db125dc6d2f1e9b752b390fe/google/rpc/code.proto#L33-L186 + "OK": 0, + "CANCELLED": 1, + "UNKNOWN": 2, + "INVALID_ARGUMENT": 3, + "DEADLINE_EXCEEDED": 4, + "NOT_FOUND": 5, + "ALREADY_EXISTS": 6, + "PERMISSION_DENIED": 7, + "RESOURCE_EXHAUSTED": 8, + "FAILED_PRECONDITION": 9, + "ABORTED": 10, + "OUT_OF_RANGE": 11, + "UNIMPLEMENTED": 12, + "INTERNAL": 13, + "UNAVAILABLE": 14, + "DATA_LOSS": 15, + "UNAUTHENTICATED": 16, +} + +func hexStrToBytes(hexStr string) ([]byte, error) { + if len(hexStr) == 0 { + return nil, nil + } + return hex.DecodeString(hexStr) +} + +func zipkinSpanKindToProtoSpanKind(skind zipkinmodel.Kind) tracepb.Span_SpanKind { + switch strings.ToUpper(string(skind)) { + case "CLIENT": + return tracepb.Span_CLIENT + case "SERVER": + return tracepb.Span_SERVER + default: + return tracepb.Span_SPAN_KIND_UNSPECIFIED + } +} + +func zipkinAnnotationsToProtoTimeEvents(zas []zipkinmodel.Annotation) *tracepb.Span_TimeEvents { + if len(zas) == 0 { + return nil + } + tevs := make([]*tracepb.Span_TimeEvent, 0, len(zas)) + for _, za := range zas { + if tev := zipkinAnnotationToProtoAnnotation(za); tev != nil { + //fmt.Printf("\n\nzas: %+v\ntev: %+v\n", za, tev) + tevs = append(tevs, tev) + } + } + if len(tevs) == 0 { + return nil + } + return &tracepb.Span_TimeEvents{ + TimeEvent: tevs, + } +} + +var blankAnnotation zipkinmodel.Annotation + +func zipkinAnnotationToProtoAnnotation(zas zipkinmodel.Annotation) *tracepb.Span_TimeEvent { + if zas == blankAnnotation { + return nil + } + return &tracepb.Span_TimeEvent{ + Time: internal.TimeToTimestamp(zas.Timestamp), + Value: &tracepb.Span_TimeEvent_Annotation_{ + Annotation: &tracepb.Span_TimeEvent_Annotation{ + Description: &tracepb.TruncatableString{Value: zas.Value}, + }, + }, + } +} + +func zipkinTagsToTraceAttributes(tags map[string]string) *tracepb.Span_Attributes { + if len(tags) == 0 { + return nil + } + + amap := make(map[string]*tracepb.AttributeValue, len(tags)) + for key, value := range tags { + // We did a translation from "boolean" to "string" + // in OpenCensus-Go's Zipkin exporter as per + // https://github.com/census-instrumentation/opencensus-go/blob/1eb9a13c7dd02141e065a665f6bf5c99a090a16a/exporter/zipkin/zipkin.go#L138-L155 + switch value { + case "true", "false": + amap[key] = &tracepb.AttributeValue{ + Value: &tracepb.AttributeValue_BoolValue{BoolValue: value == "true"}, + } + default: + amap[key] = &tracepb.AttributeValue{ + Value: &tracepb.AttributeValue_StringValue{ + StringValue: &tracepb.TruncatableString{Value: value}, + }, + } + } + } + return &tracepb.Span_Attributes{AttributeMap: amap} +} + +func setIfNonEmpty(key, value string, dest map[string]string) { + if value != "" { + dest[key] = value + } +} diff --git a/interceptor/zipkin/trace_interceptor_test.go b/interceptor/zipkin/trace_interceptor_test.go new file mode 100644 index 000000000000..cf2e216feaba --- /dev/null +++ b/interceptor/zipkin/trace_interceptor_test.go @@ -0,0 +1,410 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zipkininterceptor + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "reflect" + "strings" + "testing" + "time" + + openzipkin "github.com/openzipkin/zipkin-go" + zhttp "github.com/openzipkin/zipkin-go/reporter/http" + "go.opencensus.io/exporter/zipkin" + + commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" + "github.com/census-instrumentation/opencensus-service/internal" + "github.com/census-instrumentation/opencensus-service/spanreceiver" + "github.com/census-instrumentation/opencensus-service/translator/trace" +) + +func TestConvertSpansToTraceSpans(t *testing.T) { + // Using Adrian Cole's sample at https://gist.github.com/adriancole/e8823c19dfed64e2eb71 + blob, err := ioutil.ReadFile("./testdata/sample1.json") + if err != nil { + t.Fatalf("Failed to read sample JSON file: %v", err) + } + zi := new(ZipkinInterceptor) + reqs, err := zi.parseAndConvertToTraceSpans(blob) + if err != nil { + t.Fatalf("Failed to parse convert Zipkin spans in JSON to Trace spans: %v", err) + } + + if g, w := len(reqs), 1; g != w { + t.Fatalf("Expecting only one request since all spans share same node/localEndpoint: %v", g) + } + + req := reqs[0] + wantNode := &commonpb.Node{ + ServiceInfo: &commonpb.ServiceInfo{ + Name: "frontend", + }, + Attributes: map[string]string{ + "ipv6": "7::80:807f", + "zipkin.remoteEndpoint.ipv4": "192.168.99.101", + "zipkin.remoteEndpoint.port": "9000", + }, + } + if g, w := req.Node, wantNode; !reflect.DeepEqual(g, w) { + t.Errorf("GotNode:\n\t%v\nWantNode:\n\t%v", g, w) + } + + nonNilSpans := 0 + for _, span := range req.Spans { + if span != nil { + nonNilSpans += 1 + } + } + // Expecting 9 non-nil spans + if g, w := nonNilSpans, 9; g != w { + t.Fatalf("Non-nil spans: Got %d Want %d", g, w) + } +} + +func TestConversionRoundtrip(t *testing.T) { + // The goal is to convert from: + // 1. Original Zipkin JSON as that's the format that Zipkin interceptors will receive + // 2. Into TraceProtoSpans + // 3. Into SpanData + // 4. Back into Zipkin JSON (in this case the Zipkin exporter has been configured) + interceptorInputJSON := []byte(` +[{ + "traceId": "4d1e00c0db9010db86154a4ba6e91385", + "parentId": "86154a4ba6e91385", + "id": "4d1e00c0db9010db", + "kind": "CLIENT", + "name": "get", + "timestamp": 1472470996199000, + "duration": 207000, + "localEndpoint": { + "serviceName": "frontend", + "ipv6": "7::0.128.128.127" + }, + "remoteEndpoint": { + "serviceName": "backend", + "ipv4": "192.168.99.101", + "port": 9000 + }, + "annotations": [ + { + "timestamp": 1472470996238000, + "value": "foo" + }, + { + "timestamp": 1472470996403000, + "value": "bar" + } + ], + "tags": { + "http.path": "/api", + "clnt/finagle.version": "6.45.0" + } +}, +{ + "traceId": "4d1e00c0db9010db86154a4ba6e91385", + "parentId": "86154a4ba6e91386", + "id": "4d1e00c0db9010db", + "kind": "SERVER", + "name": "put", + "timestamp": 1472470996199000, + "duration": 207000, + "localEndpoint": { + "serviceName": "frontend", + "ipv6": "7::0.128.128.127" + }, + "remoteEndpoint": { + "serviceName": "frontend", + "ipv4": "192.168.99.101", + "port": 9000 + }, + "annotations": [ + { + "timestamp": 1472470996238000, + "value": "foo" + }, + { + "timestamp": 1472470996403000, + "value": "bar" + } + ], + "tags": { + "http.path": "/api", + "clnt/finagle.version": "6.45.0" + } +}]`) + + sink := new(noopSink) + zi, err := New(sink) + if err != nil { + t.Fatalf("Failed to create the Zipkin interceptor: %v", err) + } + ereqs, err := zi.parseAndConvertToTraceSpans(interceptorInputJSON) + if err != nil { + t.Fatalf("Failed to parse and convert interceptor JSON: %v", err) + } + + wantProtoRequests := []*agenttracepb.ExportTraceServiceRequest{ + { + Node: &commonpb.Node{ + ServiceInfo: &commonpb.ServiceInfo{Name: "frontend"}, + Attributes: map[string]string{ + "ipv6": "7::80:807f", + "zipkin.remoteEndpoint.ipv4": "192.168.99.101", + "zipkin.remoteEndpoint.port": "9000", + }, + }, + + Spans: []*tracepb.Span{ + { + TraceId: []byte{0x4d, 0x1e, 0x00, 0xc0, 0xdb, 0x90, 0x10, 0xdb, 0x86, 0x15, 0x4a, 0x4b, 0xa6, 0xe9, 0x13, 0x85}, + ParentSpanId: []byte{0x86, 0x15, 0x4a, 0x4b, 0xa6, 0xe9, 0x13, 0x85}, + SpanId: []byte{0x4d, 0x1e, 0x00, 0xc0, 0xdb, 0x90, 0x10, 0xdb}, + Kind: tracepb.Span_CLIENT, + Name: &tracepb.TruncatableString{Value: "get"}, + StartTime: internal.TimeToTimestamp(time.Unix(int64(1472470996199000)/1e6, 1e3*(int64(1472470996199000)%1e6))), + EndTime: internal.TimeToTimestamp(time.Unix(int64(1472470996199000+207000)/1e6, 1e3*(int64(1472470996199000+207000)%1e6))), + TimeEvents: &tracepb.Span_TimeEvents{ + TimeEvent: []*tracepb.Span_TimeEvent{ + { + Time: internal.TimeToTimestamp(time.Unix(int64(1472470996238000)/1e6, 1e3*(int64(1472470996238000)%1e6))), + Value: &tracepb.Span_TimeEvent_Annotation_{ + Annotation: &tracepb.Span_TimeEvent_Annotation{ + Description: &tracepb.TruncatableString{Value: "foo"}, + }, + }, + }, + { + Time: internal.TimeToTimestamp(time.Unix(int64(1472470996403000)/1e6, 1e3*(int64(1472470996403000)%1e6))), + Value: &tracepb.Span_TimeEvent_Annotation_{ + Annotation: &tracepb.Span_TimeEvent_Annotation{ + Description: &tracepb.TruncatableString{Value: "bar"}, + }, + }, + }, + }, + }, + Attributes: &tracepb.Span_Attributes{ + AttributeMap: map[string]*tracepb.AttributeValue{ + "http.path": { + Value: &tracepb.AttributeValue_StringValue{ + StringValue: &tracepb.TruncatableString{Value: "/api"}, + }, + }, + "clnt/finagle.version": { + Value: &tracepb.AttributeValue_StringValue{ + StringValue: &tracepb.TruncatableString{Value: "6.45.0"}, + }, + }, + }, + }, + }, + { + TraceId: []byte{0x4d, 0x1e, 0x00, 0xc0, 0xdb, 0x90, 0x10, 0xdb, 0x86, 0x15, 0x4a, 0x4b, 0xa6, 0xe9, 0x13, 0x85}, + SpanId: []byte{0x4d, 0x1e, 0x00, 0xc0, 0xdb, 0x90, 0x10, 0xdb}, + ParentSpanId: []byte{0x86, 0x15, 0x4a, 0x4b, 0xa6, 0xe9, 0x13, 0x86}, + Kind: tracepb.Span_SERVER, + Name: &tracepb.TruncatableString{Value: "put"}, + StartTime: internal.TimeToTimestamp(time.Unix(int64(1472470996199000)/1e6, 1e3*(int64(1472470996199000)%1e6))), + EndTime: internal.TimeToTimestamp(time.Unix(int64(1472470996199000+207000)/1e6, 1e3*(int64(1472470996199000+207000)%1e6))), + TimeEvents: &tracepb.Span_TimeEvents{ + TimeEvent: []*tracepb.Span_TimeEvent{ + { + Time: internal.TimeToTimestamp(time.Unix(int64(1472470996238000)/1e6, 1e3*(int64(1472470996238000)%1e6))), + Value: &tracepb.Span_TimeEvent_Annotation_{ + Annotation: &tracepb.Span_TimeEvent_Annotation{ + Description: &tracepb.TruncatableString{Value: "foo"}, + }, + }, + }, + { + Time: internal.TimeToTimestamp(time.Unix(int64(1472470996403000)/1e6, 1e3*(int64(1472470996403000)%1e6))), + Value: &tracepb.Span_TimeEvent_Annotation_{ + Annotation: &tracepb.Span_TimeEvent_Annotation{ + Description: &tracepb.TruncatableString{Value: "bar"}, + }, + }, + }, + }, + }, + Attributes: &tracepb.Span_Attributes{ + AttributeMap: map[string]*tracepb.AttributeValue{ + "http.path": { + Value: &tracepb.AttributeValue_StringValue{ + StringValue: &tracepb.TruncatableString{Value: "/api"}, + }, + }, + "clnt/finagle.version": { + Value: &tracepb.AttributeValue_StringValue{ + StringValue: &tracepb.TruncatableString{Value: "6.45.0"}, + }, + }, + }, + }, + }, + }, + }, + } + + g, w := ereqs, wantProtoRequests + if len(g) != len(w) { + t.Errorf("Unmatched lengths:\nGot: %d\nWant: %d", len(g), len(w)) + } + if !reflect.DeepEqual(g, w) { + t.Fatalf("Failed to transform the expected ProtoSpans\nGot:\n\t%v\nWant:\n\t%v\n", g, w) + } + + // Now the last phase is to transmit them over the wire and then compare the JSONs + + buf := new(bytes.Buffer) + // This will act as the final Zipkin server. + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + io.Copy(buf, r.Body) + _ = r.Body.Close() + })) + defer backend.Close() + + var reporterShutdownFns []func() error + for _, treq := range ereqs { + ipv6 := treq.Node.Attributes["ipv6"] + // Convert this ipv6 address to hostport as recommended in: + // https://github.com/openzipkin/zipkin-go/issues/84 + hostPort := fmt.Sprintf("[%s]:0", ipv6) + le, err := openzipkin.NewEndpoint(treq.Node.GetServiceInfo().GetName(), hostPort) + if err != nil { + t.Errorf("NewEndpoint err: %v", err) + continue + } + re := zhttp.NewReporter(backend.URL) + ze := zipkin.NewExporter(re, le) + reporterShutdownFns = append(reporterShutdownFns, re.Close) + + for _, span := range treq.Spans { + sd, _ := tracetranslator.ProtoSpanToOCSpanData(span) + ze.ExportSpan(sd) + } + } + // Now shut down the respective reporters and the server + for _, reporterFn := range reporterShutdownFns { + _ = reporterFn() + } + backend.Close() + + // Give them time to shutdown + <-time.After(200 * time.Millisecond) + + // We expect this final JSON: that is + wantFinalJSON := ` +[{ + "traceId": "4d1e00c0db9010db86154a4ba6e91385", + "parentId": "86154a4ba6e91385", + "id": "4d1e00c0db9010db", + "kind": "CLIENT", + "name": "get", + "timestamp": 1472470996199000, + "duration": 207000, + "localEndpoint": { + "serviceName": "frontend", + "ipv4": "0.0.0.0", + "ipv6": "7::80:807f" + }, + "annotations": [ + { + "timestamp": 1472470996238000, + "value": "foo" + }, + { + "timestamp": 1472470996403000, + "value": "bar" + } + ], + "tags": { + "http.path": "/api", + "clnt/finagle.version": "6.45.0" + } +}, +{ + "traceId": "4d1e00c0db9010db86154a4ba6e91385", + "parentId": "86154a4ba6e91386", + "id": "4d1e00c0db9010db", + "kind": "SERVER", + "name": "put", + "timestamp": 1472470996199000, + "duration": 207000, + "localEndpoint": { + "serviceName": "frontend", + "ipv4": "0.0.0.0", + "ipv6": "7::80:807f" + }, + "annotations": [ + { + "timestamp": 1472470996238000, + "value": "foo" + }, + { + "timestamp": 1472470996403000, + "value": "bar" + } + ], + "tags": { + "http.path": "/api", + "clnt/finagle.version": "6.45.0" + } +}]` + + fmtForComparison := func(s string) string { + ss := strings.Replace(s, "\n", "", -1) + return strings.Replace(ss, " ", "", -1) + } + // Just compressing the input and output since their JSON serializations vary, i.e. might have spaces or not + gj, wj := fmtForComparison(buf.String()), fmtForComparison(wantFinalJSON) + if gj != wj { + // Since the field might be in ruffled order e.g: + // [{"timestamp":1472470996199000,"duration":207000}] + // vs + // [{"duration":207000,"timestamp":1472470996199000}] + // we should resort to an xorCheckSum + gs, ws := anagramSignature(gj), anagramSignature(wj) + if gs != ws { + t.Errorf("The roundtrip JSON doesn't match the JSON that we want\nGot:\n%s\nWant:\n%s", gj, wj) + } + } +} + +func anagramSignature(ss string) string { + mp := make(map[rune]int64) + for _, s := range ss { + mp[s] += 1 + } + blob, _ := json.Marshal(mp) + return string(blob) +} + +type noopSink int + +var _ spanreceiver.SpanReceiver = (*noopSink)(nil) + +func (ns *noopSink) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spanreceiver.Acknowledgement, error) { + return nil, nil +} diff --git a/internal/observability.go b/internal/observability.go index 28ca72f2727b..6abc53c3885f 100644 --- a/internal/observability.go +++ b/internal/observability.go @@ -107,3 +107,21 @@ func GRPCServerWithObservabilityEnabled(extraOpts ...grpc.ServerOption) *grpc.Se opts := append(extraOpts, grpc.StatsHandler(&ocgrpc.ServerHandler{})) return grpc.NewServer(opts...) } + +// SetParentLink tries to retrieve a span from sideCtx and if one exists +// sets its SpanID, TraceID as a link in the span provided. It returns +// true only if it retrieved a parent span from the context. +func SetParentLink(sideCtx context.Context, span *trace.Span) bool { + parentSpanFromRPC := trace.FromContext(sideCtx) + if parentSpanFromRPC == nil { + return false + } + + psc := parentSpanFromRPC.SpanContext() + span.AddLink(trace.Link{ + SpanID: psc.SpanID, + TraceID: psc.TraceID, + Type: trace.LinkTypeParent, + }) + return true +}