Skip to content

Commit

Permalink
chore: adds initAsAdditional functionality to support coexistence wit…
Browse files Browse the repository at this point in the history
…h other tracers (#161)

* chore: adds initAsAdditional functionality to support coexistence with other OTel tracers.

* chore: makes the attrs removal extendable by allowing own attributes.
  • Loading branch information
jcchavezs authored Dec 3, 2021
1 parent 90414f1 commit 58d9331
Show file tree
Hide file tree
Showing 6 changed files with 317 additions and 105 deletions.
59 changes: 59 additions & 0 deletions instrumentation/opentelemetry/examples/init_additional_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package examples

import (
"context"
"log"
"net/http"

"github.com/gorilla/mux"
"github.com/hypertrace/goagent/config"
hyperotel "github.com/hypertrace/goagent/instrumentation/opentelemetry"
"github.com/hypertrace/goagent/instrumentation/opentelemetry/net/hyperhttp"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
)

var otherSpanExporter trace.SpanExporter = nil

func ExampleInitAdditional() {
hyperSpanProcessor, shutdown := hyperotel.InitAsAdditional(config.Load())
defer shutdown()

ctx := context.Background()
resources, _ := resource.New(
ctx,
resource.WithAttributes(
semconv.ServiceNameKey.String("my-server"),
),
)

otherSpanProcessor := sdktrace.NewBatchSpanProcessor(
hyperotel.RemoveGoAgentAttrs(otherSpanExporter),
)

tp := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithSpanProcessor(hyperSpanProcessor),
sdktrace.WithSpanProcessor(otherSpanProcessor),
sdktrace.WithResource(resources),
)

defer func() { _ = tp.Shutdown(ctx) }()

otel.SetTracerProvider(tp)

r := mux.NewRouter()
r.Handle("/foo", otelhttp.NewHandler(
hyperhttp.WrapHandler(
http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {}),
nil,
),
"/foo",
))

log.Fatal(http.ListenAndServe(":8081", r))
}
144 changes: 144 additions & 0 deletions instrumentation/opentelemetry/init_additional.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package opentelemetry

import (
"context"
"log"
"strings"

config "github.com/hypertrace/agent-config/gen/go/v1"
sdkconfig "github.com/hypertrace/goagent/sdk/config"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
)

// InitAsAdditional initializes opentelemetry tracing and returns a span processor and a shutdown
// function to flush data immediately on a termination signal.
// This is ideal for when we use goagent along with other opentelemetry setups.
func InitAsAdditional(cfg *config.AgentConfig) (trace.SpanProcessor, func()) {
mu.Lock()
defer mu.Unlock()
if initialized {
return nil, func() {}
}
sdkconfig.InitConfig(cfg)

exporterFactory = makeExporterFactory(cfg)

exporter, err := exporterFactory()
if err != nil {
log.Fatal(err)
}

if cfg.GetServiceName().GetValue() != "" {
resource, err := resource.New(
context.Background(),
resource.WithAttributes(createResources(cfg.GetServiceName().GetValue(), cfg.ResourceAttributes)...),
)
if err != nil {
log.Fatal(err)
}

exporter = addResourceToSpans(exporter, resource)
}

return trace.NewBatchSpanProcessor(exporter, trace.WithBatchTimeout(batchTimeout)), func() {
exporter.Shutdown(context.Background())
sdkconfig.ResetConfig()
}
}

type shieldResourceSpan struct {
trace.ReadOnlySpan
resource *resource.Resource
}

var _ trace.ReadOnlySpan = (*shieldResourceSpan)(nil)

func (s *shieldResourceSpan) Resource() *resource.Resource {
return s.resource
}

type resourcePutter struct {
trace.SpanExporter
resource *resource.Resource
}

func (e *resourcePutter) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) error {
newSpans := []trace.ReadOnlySpan{}
for _, span := range spans {
newSpans = append(newSpans, &shieldResourceSpan{span, e.resource})
}

return e.SpanExporter.ExportSpans(ctx, newSpans)
}

func addResourceToSpans(e trace.SpanExporter, r *resource.Resource) trace.SpanExporter {
return &resourcePutter{SpanExporter: e, resource: r}
}

// shieldAttrsSpan is a wrapper around a span that removes all attributes added by goagent as not
// all the the tracing servers can handle the load of big attributes like body or headers.
type shieldAttrsSpan struct {
trace.ReadOnlySpan
prefixes []string
}

func (s *shieldAttrsSpan) Attributes() []attribute.KeyValue {
attrs := []attribute.KeyValue{}
for _, attr := range s.ReadOnlySpan.Attributes() {
key := string(attr.Key)
hasPrefix := false
for _, prefix := range s.prefixes {
if strings.HasPrefix(key, prefix) {
hasPrefix = true
break
}
}

if !hasPrefix {
attrs = append(attrs, attr)
}
}

return attrs
}

type attrsRemover struct {
trace.SpanExporter
prefixes []string
}

func (e *attrsRemover) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) error {
newSpans := []trace.ReadOnlySpan{}
for _, span := range spans {
newSpans = append(newSpans, &shieldAttrsSpan{span, e.prefixes})
}

return e.SpanExporter.ExportSpans(ctx, newSpans)
}

var attrsRemovalPrefixes = []string{
"http.request.header.",
"http.response.header.",
"http.request.body",
"http.response.body",
"rpc.request.metadata.",
"rpc.response.metadata.",
"rpc.request.body",
"rpc.response.body",
}

var RemoveGoAgentAttrs = MakeRemoveGoAgentAttrs(attrsRemovalPrefixes)

// RemoveGoAgentAttrs removes custom goagent attributes from the spans so that other tracing servers
// don't receive them and don't have to handle the load.
func MakeRemoveGoAgentAttrs(attrsRemovalPrefixes []string) func(sp trace.SpanExporter) trace.SpanExporter {
return func(sp trace.SpanExporter) trace.SpanExporter {
if sp == nil {
return sp
}

return &attrsRemover{sp, attrsRemovalPrefixes}
}
}
100 changes: 100 additions & 0 deletions instrumentation/opentelemetry/init_additional_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package opentelemetry

import (
"context"
"encoding/json"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"

"github.com/hypertrace/goagent/config"
"github.com/hypertrace/goagent/instrumentation/opentelemetry/internal/tracetesting"
"github.com/openzipkin/zipkin-go/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
)

func TestInitAdditional(t *testing.T) {
zipkinSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
zs := []model.SpanModel{}
b, err := ioutil.ReadAll(r.Body)
require.NoError(t, err)
defer r.Body.Close()

err = json.Unmarshal(b, &zs)
require.NoError(t, err)

assert.Equal(t, 1, len(zs))
assert.Equal(t, "test-span", zs[0].Name)
assert.Equal(t, "another-name", zs[0].LocalEndpoint.ServiceName)

w.WriteHeader(http.StatusAccepted)
}))
defer zipkinSrv.Close()

cfg := config.Load()
cfg.ServiceName = config.String("another-name")
cfg.Reporting.TraceReporterType = config.TraceReporterType_ZIPKIN
cfg.Reporting.Endpoint = config.String(zipkinSrv.URL)

hyperSpanProcessor, shutdown := InitAsAdditional(cfg)
defer shutdown()

ctx := context.Background()
resources, _ := resource.New(
ctx,
resource.WithAttributes(
semconv.ServiceNameKey.String("one-name"),
),
)

rec := tracetesting.NewRecorder()

recSpanProcessor := sdktrace.NewSimpleSpanProcessor(RemoveGoAgentAttrs(rec))

tp := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithSpanProcessor(hyperSpanProcessor),
sdktrace.WithSpanProcessor(recSpanProcessor),
sdktrace.WithResource(resources),
)

defer func() { _ = tp.Shutdown(ctx) }()

ctx, s := tp.Tracer("test").Start(context.Background(), "test-span")
s.SetStatus(codes.Ok, "OK")
s.SetAttributes(
attribute.String("http.status_code", "200"),
attribute.String("http.request.header.x-forwarded-for", "1.1.1.1"),
)
s.End()

recSpanProcessor.ForceFlush(context.Background())

recSpans := rec.Flush()
assert.Len(t, recSpans, 1)

recSpan := recSpans[0]
for _, attr := range recSpan.Attributes() {
if attr.Key == "http.status_code" {
assert.Equal(t, "200", attr.Value.AsString())
}

if attr.Key == "http.request.header.x-forwarded-for" {
// This attribute should be filtered out by the RemoveGoAgentAttrs
t.FailNow()
}
}

for _, attr := range recSpan.Resource().Attributes() {
if attr.Key == semconv.ServiceNameKey {
assert.Equal(t, "one-name", attr.Value.AsString())
}
}
}
18 changes: 11 additions & 7 deletions instrumentation/opentelemetry/internal/tracetesting/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,31 @@ import (
"go.opentelemetry.io/otel/sdk/trace"
)

var _ trace.SpanExporter = &recorder{}

// recorder records spans being synced through the SpanSyncer interface.
type recorder struct {
// Recorder records spans being synced through the SpanSyncer interface.
type Recorder struct {
spans []trace.ReadOnlySpan
}

var _ trace.SpanExporter = &Recorder{}

func NewRecorder() *Recorder {
return &Recorder{}
}

// ExportSpans records spans into the internal buffer
func (r *recorder) ExportSpans(_ context.Context, s []trace.ReadOnlySpan) error {
func (r *Recorder) ExportSpans(_ context.Context, s []trace.ReadOnlySpan) error {
r.spans = append(r.spans, s...)
return nil
}

// Shutdown flushes the buffer
func (r *recorder) Shutdown(_ context.Context) error {
func (r *Recorder) Shutdown(_ context.Context) error {
_ = r.Flush()
return nil
}

// Flush returns the current recorded spans and reset the recordings
func (r *recorder) Flush() []trace.ReadOnlySpan {
func (r *Recorder) Flush() []trace.ReadOnlySpan {
spans := r.spans
r.spans = nil
return spans
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,19 @@ package tracetesting
import (
"context"

"github.com/hypertrace/goagent/instrumentation/opentelemetry"
"go.opentelemetry.io/contrib/propagators/b3"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/semconv/v1.4.0"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
apitrace "go.opentelemetry.io/otel/trace"
)

// InitTracer initializes the tracer and returns a flusher of the reported
// spans for further inspection. Its main purpose is to declare a tracer
// for TESTING.
func InitTracer() (apitrace.Tracer, func() []sdktrace.ReadOnlySpan) {
exporter := &recorder{}
exporter := &Recorder{}

resources, _ := resource.New(context.Background(), resource.WithAttributes(semconv.ServiceNameKey.String("TestService")))

Expand All @@ -29,7 +28,7 @@ func InitTracer() (apitrace.Tracer, func() []sdktrace.ReadOnlySpan) {
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(b3.New())

return tp.Tracer(opentelemetry.TracerDomain), func() []sdktrace.ReadOnlySpan {
return tp.Tracer("goagent-test"), func() []sdktrace.ReadOnlySpan {
return exporter.Flush()
}
}
Loading

0 comments on commit 58d9331

Please sign in to comment.