Skip to content

Commit

Permalink
interceptor/opencensus: trace instrument batchSpanUploading
Browse files Browse the repository at this point in the history
Trace instrument the section that performs batch uploading
to the spanreceiver. The created span has a parent link
from the initiating RPC's span. Added tests to ensure that
these conditions always hold.

Updates #63
  • Loading branch information
odeke-em committed Oct 9, 2018
1 parent 9fec101 commit 498e498
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 10 deletions.
121 changes: 116 additions & 5 deletions interceptor/opencensus/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package ocinterceptor_test

import (
"bytes"
"encoding/json"
"fmt"
"reflect"
"strings"
Expand All @@ -29,6 +31,7 @@ import (

commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/census-instrumentation/opencensus-service/interceptor/opencensus"
"github.com/census-instrumentation/opencensus-service/internal"
)
Expand All @@ -52,9 +55,21 @@ func TestEnsureRecordedMetrics(t *testing.T) {
t.Fatalf("Failed to create the ocagent-exporter: %v", err)
}
trace.RegisterExporter(oce)

metricsReportingPeriod := 5 * time.Millisecond
view.SetReportingPeriod(metricsReportingPeriod)
// On exit, revert the metrics reporting period.

defer func() {
oce.Stop()

// Pause for a bit before exiting to give OpenCensus-Go trace
// some time to export any remaining traces, before we unregister
// the exporter.
<-time.After(5 * metricsReportingPeriod)

trace.UnregisterExporter(oce)
view.SetReportingPeriod(60 * time.Second)
}()

// Now for the stats exporter
Expand All @@ -63,11 +78,6 @@ func TestEnsureRecordedMetrics(t *testing.T) {
}
defer view.Unregister(internal.AllViews...)

metricsReportingPeriod := 5 * time.Millisecond
view.SetReportingPeriod(metricsReportingPeriod)
// On exit, revert the metrics reporting period.
defer view.SetReportingPeriod(60 * time.Second)

cme := newCountMetricsExporter()
view.RegisterExporter(cme)
defer view.UnregisterExporter(cme)
Expand Down Expand Up @@ -149,6 +159,107 @@ func TestEnsureRecordedMetrics_zeroLengthSpansSender(t *testing.T) {
checkCountMetricsExporterResults(t, cme, n, 0)
}

type testOCTraceExporter struct {
mu sync.Mutex
spanData []*trace.SpanData
}

func (tote *testOCTraceExporter) ExportSpan(sd *trace.SpanData) {
tote.mu.Lock()
defer tote.mu.Unlock()

tote.spanData = append(tote.spanData, sd)
}

func TestExportSpanLinkingMaintainsParentLink(t *testing.T) {
// Always sample for the purpose of examining all the spans in this test.
trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})

// TODO: File an issue with OpenCensus-Go to ask for a method to retrieve
// the default sampler because the current method of blindly changing the
// global sampler makes testing hard.
// Denoise this test by setting the sampler to never sample
defer trace.ApplyConfig(trace.Config{DefaultSampler: trace.NeverSample()})

ocSpansSaver := new(testOCTraceExporter)
trace.RegisterExporter(ocSpansSaver)
defer trace.UnregisterExporter(ocSpansSaver)

spanSink := newSpanAppender()
spansBufferPeriod := 10 * time.Millisecond
_, port, doneFn := ocInterceptorOnGRPCServer(t, spanSink, ocinterceptor.WithSpanBufferPeriod(spansBufferPeriod))
defer doneFn()

traceSvcClient, traceSvcDoneFn, err := makeTraceServiceClient(port)
if err != nil {
t.Fatalf("Failed to create the trace service client: %v", err)
}
defer traceSvcDoneFn()

n := 5
for i := 0; i <= n; i++ {
sl := []*tracepb.Span{{TraceId: []byte("abcdefghijklmnop"), SpanId: []byte{byte(i + 1), 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}}}
_ = traceSvcClient.Send(&agenttracepb.ExportTraceServiceRequest{Spans: sl, Node: &commonpb.Node{}})
}

// Give it enough time to process the streamed spans.
<-time.After(spansBufferPeriod * 2)

// End the gRPC service to complete the RPC trace so that we
// can examine the RPC trace as well.
traceSvcDoneFn()

// Give it some more time to complete the RPC trace and export its spanData.
<-time.After(spansBufferPeriod * 2)

// Inspection time!
ocSpansSaver.mu.Lock()
defer ocSpansSaver.mu.Unlock()

if len(ocSpansSaver.spanData) == 0 {
t.Fatal("Unfortunately did not receive an exported span data. Please check this library's implementation or go.opencensus.io/trace")
}

gotSpanData := ocSpansSaver.spanData[:]
if g, w := len(gotSpanData), 2; g != w {
blob, _ := json.MarshalIndent(gotSpanData, " ", " ")
t.Fatalf("Spandata count: Got %d Want %d\n\nData: %s", g, w, blob)
}

interceptorSpanData := gotSpanData[0]
if g, w := len(interceptorSpanData.Links), 1; g != w {
t.Fatalf("Links count: Got %d Want %d\nGotSpanData: %#v", g, w, interceptorSpanData)
}

rpcSpanData := gotSpanData[1]

// Ensure that the link matches up exactly!
wantLink := trace.Link{
SpanID: rpcSpanData.SpanID,
TraceID: rpcSpanData.TraceID,
Type: trace.LinkTypeParent,
}
if g, w := interceptorSpanData.Links[0], wantLink; !reflect.DeepEqual(g, w) {
t.Errorf("Link:\nGot: %#v\nWant: %#v\n", g, w)
}
if g, w := interceptorSpanData.Name, "OpenCensusInterceptor.Export"; g != w {
t.Errorf("InterceptorExport span's SpanData.Name:\nGot: %q\nWant: %q\n", g, w)
}

// And then for the interceptorSpanData itself, it SHOULD NOT
// have a ParentID, so let's enforce all the conditions below:
// 1. That it doesn't have the RPC spanID as its ParentSpanID
// 2. That it actually has no ParentSpanID i.e. has a blank SpanID
if g, w := interceptorSpanData.ParentSpanID[:], rpcSpanData.SpanID[:]; bytes.Equal(g, w) {
t.Errorf("InterceptorSpanData.ParentSpanID unfortunately was linked to the RPC span\nGot: %x\nWant: %x", g, w)
}

var blankSpanID trace.SpanID
if g, w := interceptorSpanData.ParentSpanID[:], blankSpanID[:]; !bytes.Equal(g, w) {
t.Errorf("InterceptorSpanData unfortunately has a parent and isn't NULL\nGot: %x\nWant: %x", g, w)
}
}

func checkCountMetricsExporterResults(t *testing.T, cme *countMetricsExporter, n int, wantAllCountsToBe int64) {
cme.mu.Lock()
defer cme.mu.Unlock()
Expand Down
36 changes: 32 additions & 4 deletions interceptor/opencensus/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

"google.golang.org/api/support/bundler"

"go.opencensus.io/trace"

commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
Expand Down Expand Up @@ -61,11 +63,18 @@ type spansAndNode struct {

var errTraceExportProtocolViolation = errors.New("protocol violation: Export's first message must have a Node")

const interceptorName = "opencensus"

// Export is the gRPC method that receives streamed traces from
// OpenCensus-traceproto compatible libraries/applications.
func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) error {
// The bundler will receive batches of spans i.e. []*tracepb.Span
traceBundler := bundler.NewBundler((*spansAndNode)(nil), oci.batchSpanExporting)
// We need to ensure that it propagates the interceptor name as a tag
ctxWithInterceptorName := internal.ContextWithInterceptorName(tes.Context(), interceptorName)
traceBundler := bundler.NewBundler((*spansAndNode)(nil), func(payload interface{}) {
oci.batchSpanExporting(ctxWithInterceptorName, payload)
})

spanBufferPeriod := oci.spanBufferPeriod
if spanBufferPeriod <= 0 {
spanBufferPeriod = 2 * time.Second // Arbitrary value
Expand All @@ -90,7 +99,7 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err
return errTraceExportProtocolViolation
}

spansMetricsFn := internal.NewReceivedSpansRecorderStreaming(tes.Context(), "opencensus")
spansMetricsFn := internal.NewReceivedSpansRecorderStreaming(tes.Context(), interceptorName)

processReceivedSpans := func(ni *commonpb.Node, spans []*tracepb.Span) {
// Firstly, we'll add them to the bundler.
Expand Down Expand Up @@ -120,12 +129,31 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err
}
}

func (oci *OCInterceptor) batchSpanExporting(payload interface{}) {
func (oci *OCInterceptor) batchSpanExporting(longLivedRPCCtx context.Context, payload interface{}) {
spnL := payload.([]*spansAndNode)
if len(spnL) == 0 {
return
}

// Trace this method
ctx, span := trace.StartSpan(context.Background(), "OpenCensusInterceptor.Export")
defer span.End()

// TODO: (@odeke-em) investigate if it is necessary
// to group nodes with their respective spans during
// spansAndNode list unfurling then send spans grouped per node
ctx := context.Background()

// If the starting RPC has a parent span, then add it as a parent link.
parentSpanFromRPC := trace.FromContext(longLivedRPCCtx)
if parentSpanFromRPC != nil {
psc := parentSpanFromRPC.SpanContext()
span.AddLink(trace.Link{
SpanID: psc.SpanID,
TraceID: psc.TraceID,
Type: trace.LinkTypeParent,
})
}

for _, spn := range spnL {
oci.spanSink.ReceiveSpans(ctx, spn.node, spn.spans...)
}
Expand Down
9 changes: 8 additions & 1 deletion internal/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ var AllViews = []*view.View{
ViewReceivedSpansInterceptor,
}

// ContextWithInterceptorName adds the tag "opencensus_interceptor" and the name of the
// interceptor as the value, and returns the newly created context.
func ContextWithInterceptorName(ctx context.Context, interceptorName string) context.Context {
ctx, _ = tag.New(ctx, tag.Upsert(tagKeyInterceptorName, interceptorName))
return ctx
}

// NewReceivedSpansRecorderStreaming creates a function that uses a context created
// from the name of the interceptor to record the number of the spans received
// by the interceptor.
Expand All @@ -58,7 +65,7 @@ func NewReceivedSpansRecorderStreaming(lifetimeCtx context.Context, interceptorN
// the context doesn't change, so it is more useful for avoid expensively adding
// keys on each invocation. We can create the context once and then reuse it
// when recording measurements.
ctx, _ := tag.New(lifetimeCtx, tag.Upsert(tagKeyInterceptorName, interceptorName))
ctx := ContextWithInterceptorName(lifetimeCtx, interceptorName)

return func(ni *commonpb.Node, spans []*tracepb.Span) {
// TODO: (@odeke-em) perhaps also record information from the node?
Expand Down

0 comments on commit 498e498

Please sign in to comment.