Skip to content

Commit

Permalink
interceptor: add zipkin /v2 interceptor (#111)
Browse files Browse the repository at this point in the history
* interceptor: add zipkin interceptor

* Zipkin interceptor multiplexing + running on 9411

* Can receive spans from diverse sources and successfully
intercept traces from various serviceNames and IPv* combinations.
Verified by examining the data on the Zipkin UI itself by
resending to a server running on a different port.

* End-to-end serialization tests where we intercept JSON
from a wild Zipkin reporter, transform it into trace-proto
then to OpenCensus-Go SpanData and then export it with the
OpenCensus-Go Zipkin exporter, inspect the JSON and compare
it with the final/expected JSON data.

Fixes #24

* interceptor/zipkin: add remoteEndpoint to Node.Attributes

Zipkin's RemoteEndpoint field is necessary when later on
building the dependency graph for Zipkin services.

This change ensures that that field is added as an attribute
to the Node with keys whose prefix is:
  "zipkin.remoteEndpoint." e.g.

- "zipkin.remoteEndpoint.ipv4"
- "zipkin.remoteEndpoint.serviceName"
- "zipkin.remoteEndpoint.port"

* Fail if Zipkin interceptor and exporter run on same address

Fail spectacularly if the Zipkin interceptor and the Zipkin
exporter will all be run on the same address.
This is because this is a self DOS vector as intercepted spans
will be processed by the interceptor, then exported out by
the exporter but instead those exported spans will then be
sent back to the interceptor and this goes on indefinitely
and will just consume memory and resources.

To perform this check:
a) Compare HostPorts of interceptor vs exporter
b) If say "127.0.0.1", "localhost", "" for host and the ports
match
c) Resolve the IPs for the hosts

if any of those steps match return a logical conflict error
which will be thrown later.

The crash will look something like this
```shell
2018/10/23 18:54:41 Configuration logical error: ZipkinInterceptor address ("127.0.0.1:9411") aka (127.0.0.1 on port 9411)
is the same as the interceptor address ("localhost:9411") aka (127.0.0.1 on port 9411)
```

We can now detect any of these clashing configurations:

```yaml
interceptors:
    zipkin:
        address: "127.0.0.1:9411"
exporters:
    zipkin:
        endpoint: "http://localhost:9411/api/v2/spans"
```

```yaml
interceptors:
    zipkin:
        address: ":9411"

exporters:
    zipkin:
        endpoint: "http://localhost:9411/api/v2/spans"
```

```yaml
interceptors:
    zipkin:
        address: "localhost:9411"

exporters:
    zipkin:
        endpoint: "http://localhost:9411/api/v2/spans"
```

```yaml
interceptors:
    zipkin:
        address: "localhost:9411"

exporters:
    zipkin:
        endpoint: "http://:9411/api/v2/spans"
```

```yaml
interceptors:
    zipkin:
        address: "localhost:9411"

exporters:
    zipkin:
        endpoint: "http://127.0.0.1:9411/api/v2/spans"
```

and in a case where IP resolution helped, "10.0.0.147" is my local
address and I sneakishly tried to get the interceptor on "localhost"
which clashes with my local address as per:

```yaml
interceptors:
    zipkin:
        address: "localhost:9411"

exporters:
    zipkin:
        endpoint: "http://10.0.0.147:9411/api/v2/spans"
```

we caught it
```shell
2018/10/23 19:01:01 Configuration logical error: ZipkinInterceptor address ("10.0.0.147:9411") aka (10.0.0.147 on port 9411)
is the same as the interceptor address ("localhost:9411") aka (10.0.0.147 on port 9411)
```

* Go fmt with Go1.10 to match TravisCI

Go1.11's gofmt seems to be more conservating with
aligning literal assignments while Go1.10 seems more
extravagant. This subtle difference was causing TravisCI
failures since TravisCI uses Go1.10.

* interceptor/zipkin: handle compressed HTTP bodies

Some clients such as Zipkin-Java send HTTP bodies
with JSON that's been compressed as "gzip".
This change handles "Content-Encoding" values of:
* "gzip"
* "deflate", "zlib"
and decompresses them accordingly

* interceptor/zipkin: tests use anagram signature

A test that checked for raffled output before used an
"xorChecksum" but that's not really always unique.
The better way is to use an anagram counter, then
create a signature from the serialized map of counted
runes.

* interceptor/zipkin: address review feedback

* Package rename to "zipkinterceptor" --> "zipkininterceptor"
* Fix error names

* Address feedback from review

* move "zipkinRoute" const to the top to make it easily
discoverable and not be buried deep in code
* set status of zipkin interceptor if we encounter an error
while parsing spans. We set the span status to: StatusInvalidArgument
and the error message

* *zipkin: address feedback from review

* Add a comment ot exporter/exporterparse.zipkinExporter
that the mutex "mu" protects all the constituent fields
* Fix and ensure that node uniqueness is clear, with the
proper check from a map lookup but also rename the variable
to store nodes to "uniqueNodes" to make for better code readability

* README.md + default config.yaml: update interceptors docs and information

Added a section in the README.md on the various interceptors
and how to change the Zipkin interceptor address

* sync.Mutex instead of sync.RWMutex for zipkinExporter.mu
  • Loading branch information
Emmanuel T Odeke committed Oct 26, 2018
1 parent f366180 commit 9bcf605
Show file tree
Hide file tree
Showing 10 changed files with 1,411 additions and 37 deletions.
32 changes: 28 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
- [Configuration file](#agent-configuration-file)
- [Exporters](#agent-config-exporters)
- [Interceptors](#agent-config-interceptors)
- [OpenCensus](#details-interceptors-opencensus)
- [Zipkin](#details-interceptors-zipkin)
- [End-to-end example](#agent-config-end-to-end-example)
- [Docker image](#agent-docker-image)
- [OpenCensus Collector](#opencensus-collector)
Expand Down Expand Up @@ -271,14 +273,36 @@ exporters:
```

#### <a name="agent-config-interceptors"></a>Interceptors
As previously mentioned, the agent provides a couple of interceptors

#### <a name="details-interceptors-opencensus"> OpenCensus

This interceptor receives spans from OpenCensus instrumented applications and translates them into the internal span types that
are then sent to the collector/exporters.

Its address can be configured in the YAML configuration file under section "intercpetors", subsection "opencensus" and field "address".

To modify the address that the OpenCensus interceptor runs on, please use the
YAML field name `opencensus_interceptor` and it takes fields like `address`.
For example:
```yaml
interceptors:
opencensus:
address: "127.0.0.1:55678"
```

By default this interceptor is ALWAYS started since it is the point of the "OpenCensus agent"

#### <a name="details-interceptors-zipkin"> Zipkin

This interceptor receives spans from Zipkin "/v2" API HTTP uploads and translates them into the internal span types that are then
sent to the collector/exporters.

Its address can be configured in the YAML configuration file under section "intercpetors", subsection "zipkin" and field "address".

For example:
```yaml
opencensus_interceptor:
address: "localhost:55678"
interceptors:
zipkin:
address: "localhost:9411"
```

### <a name="agent-config-end-to-end-example"></a>Running an end-to-end example/demo
Expand Down
98 changes: 94 additions & 4 deletions cmd/ocagent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
package main

import (
"fmt"
"log"
"net"
"net/url"
"reflect"
"strings"

yaml "gopkg.in/yaml.v2"

Expand Down Expand Up @@ -52,7 +57,8 @@ type config struct {
}

type interceptors struct {
OpenCensusInterceptor *interceptorConfig `yaml:"opencensus"`
OpenCensus *interceptorConfig `yaml:"opencensus"`
Zipkin *interceptorConfig `yaml:"zipkin"`
}

type interceptorConfig struct {
Expand All @@ -70,10 +76,10 @@ func (c *config) ocInterceptorAddress() string {
return defaultOCInterceptorAddress
}
inCfg := c.Interceptors
if inCfg.OpenCensusInterceptor == nil || inCfg.OpenCensusInterceptor.Address == "" {
if inCfg.OpenCensus == nil || inCfg.OpenCensus.Address == "" {
return defaultOCInterceptorAddress
}
return inCfg.OpenCensusInterceptor.Address
return inCfg.OpenCensus.Address
}

func (c *config) zPagesDisabled() bool {
Expand All @@ -94,6 +100,24 @@ func (c *config) zPagesPort() (int, bool) {
return port, true
}

func (c *config) zipkinInterceptorEnabled() bool {
if c == nil {
return false
}
return c.Interceptors != nil && c.Interceptors.Zipkin != nil
}

func (c *config) zipkinInterceptorAddress() string {
if c == nil || c.Interceptors == nil {
return exporterparser.DefaultZipkinEndpointHostPort
}
inCfg := c.Interceptors
if inCfg.Zipkin == nil || inCfg.Zipkin.Address == "" {
return exporterparser.DefaultZipkinEndpointHostPort
}
return inCfg.Zipkin.Address
}

func parseOCAgentConfig(yamlBlob []byte) (*config, error) {
var cfg config
if err := yaml.Unmarshal(yamlBlob, &cfg); err != nil {
Expand All @@ -102,7 +126,72 @@ func parseOCAgentConfig(yamlBlob []byte) (*config, error) {
return &cfg, nil
}

type exporterParser func(yamlConfig []byte) (te []exporter.TraceExporter, err error)
// The goal of this function is to catch logical errors such as
// if the Zipkin interceptor port conflicts with that of the exporter
// lest we'll have a self DOS because spans will be exported "out" from
// the exporter, yet be received from the interceptor, then sent back out
// and back in a never ending loop.
func (c *config) checkLogicalConflicts(blob []byte) error {
var cfg struct {
Exporters *struct {
Zipkin *exporterparser.ZipkinConfig `yaml:"zipkin"`
} `yaml:"exporters"`
}
if err := yaml.Unmarshal(blob, &cfg); err != nil {
return err
}

if cfg.Exporters == nil || cfg.Exporters.Zipkin == nil {
return nil
}

zc := cfg.Exporters.Zipkin

zExporterAddr := zc.EndpointURL()
zExporterURL, err := url.Parse(zExporterAddr)
if err != nil {
return fmt.Errorf("parsing ZipkinExporter address %q got error: %v", zExporterAddr, err)
}

zInterceptorHostPort := c.zipkinInterceptorAddress()

zExporterHostPort := zExporterURL.Host
if zInterceptorHostPort == zExporterHostPort {
return fmt.Errorf("ZipkinExporter address (%q) is the same as the interceptor address (%q)",
zExporterHostPort, zInterceptorHostPort)
}
zExpHost, zExpPort, _ := net.SplitHostPort(zExporterHostPort)
zInterceptorHost, zInterceptorPort, _ := net.SplitHostPort(zExporterHostPort)
if eqHosts(zExpHost, zInterceptorHost) && zExpPort == zInterceptorPort {
return fmt.Errorf("ZipkinExporter address (%q) aka (%s on port %s)\nis the same as the interceptor address (%q) aka (%s on port %s)",
zExporterHostPort, zExpHost, zExpPort, zInterceptorHostPort, zInterceptorHost, zInterceptorPort)
}

// Otherwise, now let's resolve the IPs and ensure that they aren't the same
zExpIPAddr, _ := net.ResolveIPAddr("ip", zExpHost)
zInterceptorIPAddr, _ := net.ResolveIPAddr("ip", zInterceptorHost)
if zExpIPAddr != nil && zInterceptorIPAddr != nil && reflect.DeepEqual(zExpIPAddr, zInterceptorIPAddr) {
return fmt.Errorf("ZipkinExporter address (%q) aka (%+v)\nis the same as the\ninterceptor address (%q) aka (%+v)",
zExporterHostPort, zExpIPAddr, zInterceptorHostPort, zInterceptorIPAddr)
}
return nil
}

func eqHosts(host1, host2 string) bool {
if host1 == host2 {
return true
}
return eqLocalHost(host1) && eqLocalHost(host2)
}

func eqLocalHost(host string) bool {
switch strings.ToLower(host) {
case "", "localhost", "127.0.0.1":
return true
default:
return false
}
}

// exportersFromYAMLConfig parses the config yaml payload and returns the respective exporters
func exportersFromYAMLConfig(config []byte) (traceExporters []exporter.TraceExporter, doneFns []func() error) {
Expand All @@ -128,6 +217,7 @@ func exportersFromYAMLConfig(config []byte) (traceExporters []exporter.TraceExpo
nonNilExporters += 1
}
}

if nonNilExporters > 0 {
pluralization := "exporter"
if nonNilExporters > 1 {
Expand Down
3 changes: 3 additions & 0 deletions cmd/ocagent/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ interceptors:
opencensus:
address: "127.0.0.1:55678"

zipkin:
address: "localhost:9411"

exporters:
stackdriver:
project: "project-id"
Expand Down
46 changes: 45 additions & 1 deletion cmd/ocagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
"github.com/census-instrumentation/opencensus-service/exporter"
"github.com/census-instrumentation/opencensus-service/interceptor/opencensus"
"github.com/census-instrumentation/opencensus-service/interceptor/zipkin"
"github.com/census-instrumentation/opencensus-service/internal"
"github.com/census-instrumentation/opencensus-service/spanreceiver"
"go.opencensus.io/plugin/ocgrpc"
Expand All @@ -39,6 +40,8 @@ import (
var configYAMLFile string
var ocInterceptorPort int

const zipkinRoute = "/api/v2/spans"

func main() {
if err := rootCmd.Execute(); err != nil {
log.Fatal(err)
Expand All @@ -54,6 +57,14 @@ func runOCAgent() {
if err != nil {
log.Fatalf("Failed to parse own configuration %v error: %v", configYAMLFile, err)
}

// Ensure that we check and catch any logical errors with the
// configuration e.g. if an interceptor shares the same address
// as an exporter which would cause a self DOS and waste resources.
if err := agentConfig.checkLogicalConflicts(yamlBlob); err != nil {
log.Fatalf("Configuration logical error: %v", err)
}

ocInterceptorAddr := agentConfig.ocInterceptorAddress()

traceExporters, closeFns := exportersFromYAMLConfig(yamlBlob)
Expand All @@ -64,7 +75,6 @@ func runOCAgent() {
if err != nil {
log.Fatal(err)
}

closeFns = append(closeFns, ocInterceptorDoneFn)

// If zPages are enabled, run them
Expand All @@ -74,6 +84,16 @@ func runOCAgent() {
closeFns = append(closeFns, zCloseFn)
}

// If the Zipkin interceptor is enabled, then run it
if agentConfig.zipkinInterceptorEnabled() {
zipkinInterceptorAddr := agentConfig.zipkinInterceptorAddress()
zipkinInterceptorDoneFn, err := runZipkinInterceptor(zipkinInterceptorAddr, commonSpanReceiver)
if err != nil {
log.Fatal(err)
}
closeFns = append(closeFns, zipkinInterceptorDoneFn)
}

// Always cleanup finally
defer func() {
for _, closeFn := range closeFns {
Expand Down Expand Up @@ -141,3 +161,27 @@ func runOCInterceptor(addr string, sr spanreceiver.SpanReceiver) (doneFn func()
doneFn = ln.Close
return doneFn, nil
}

func runZipkinInterceptor(addr string, sr spanreceiver.SpanReceiver) (doneFn func() error, err error) {
zi, err := zipkininterceptor.New(sr)
if err != nil {
return nil, fmt.Errorf("Failed to create the Zipkin interceptor: %v", err)
}

ln, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("Cannot bind Zipkin interceptor to address %q: %v", addr, err)
}
mux := http.NewServeMux()
mux.Handle(zipkinRoute, zi)
go func() {
fullAddr := addr + zipkinRoute
log.Printf("Running the Zipkin interceptor at %q", fullAddr)
if err := http.Serve(ln, mux); err != nil {
log.Fatalf("Failed to serve the Zipkin interceptor: %v", err)
}
}()

doneFn = ln.Close
return doneFn, nil
}
Loading

0 comments on commit 9bcf605

Please sign in to comment.