diff --git a/interceptor/opencensus/observability_test.go b/interceptor/opencensus/observability_test.go index bb6833f3af1..aa76da46085 100644 --- a/interceptor/opencensus/observability_test.go +++ b/interceptor/opencensus/observability_test.go @@ -15,6 +15,8 @@ package ocinterceptor_test import ( + "bytes" + "encoding/json" "fmt" "reflect" "strings" @@ -29,6 +31,7 @@ import ( commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" "github.com/census-instrumentation/opencensus-service/interceptor/opencensus" "github.com/census-instrumentation/opencensus-service/internal" ) @@ -52,9 +55,21 @@ func TestEnsureRecordedMetrics(t *testing.T) { t.Fatalf("Failed to create the ocagent-exporter: %v", err) } trace.RegisterExporter(oce) + + metricsReportingPeriod := 5 * time.Millisecond + view.SetReportingPeriod(metricsReportingPeriod) + // On exit, revert the metrics reporting period. + defer func() { oce.Stop() + + // Pause for a bit before exiting to give OpenCensus-Go trace + // some time to export any remaining traces, before we unregister + // the exporter. + <-time.After(5 * metricsReportingPeriod) + trace.UnregisterExporter(oce) + view.SetReportingPeriod(60 * time.Second) }() // Now for the stats exporter @@ -63,11 +78,6 @@ func TestEnsureRecordedMetrics(t *testing.T) { } defer view.Unregister(internal.AllViews...) - metricsReportingPeriod := 5 * time.Millisecond - view.SetReportingPeriod(metricsReportingPeriod) - // On exit, revert the metrics reporting period. - defer view.SetReportingPeriod(60 * time.Second) - cme := newCountMetricsExporter() view.RegisterExporter(cme) defer view.UnregisterExporter(cme) @@ -149,6 +159,107 @@ func TestEnsureRecordedMetrics_zeroLengthSpansSender(t *testing.T) { checkCountMetricsExporterResults(t, cme, n, 0) } +type testOCTraceExporter struct { + mu sync.Mutex + spanData []*trace.SpanData +} + +func (tote *testOCTraceExporter) ExportSpan(sd *trace.SpanData) { + tote.mu.Lock() + defer tote.mu.Unlock() + + tote.spanData = append(tote.spanData, sd) +} + +func TestExportSpanLinkingMaintainsParentLink(t *testing.T) { + // Always sample for the purpose of examining all the spans in this test. + trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()}) + + // TODO: File an issue with OpenCensus-Go to ask for a method to retrieve + // the default sampler because the current method of blindly changing the + // global sampler makes testing hard. + // Denoise this test by setting the sampler to never sample + defer trace.ApplyConfig(trace.Config{DefaultSampler: trace.NeverSample()}) + + ocSpansSaver := new(testOCTraceExporter) + trace.RegisterExporter(ocSpansSaver) + defer trace.UnregisterExporter(ocSpansSaver) + + spanSink := newSpanAppender() + spansBufferPeriod := 10 * time.Millisecond + _, port, doneFn := ocInterceptorOnGRPCServer(t, spanSink, ocinterceptor.WithSpanBufferPeriod(spansBufferPeriod)) + defer doneFn() + + traceSvcClient, traceSvcDoneFn, err := makeTraceServiceClient(port) + if err != nil { + t.Fatalf("Failed to create the trace service client: %v", err) + } + defer traceSvcDoneFn() + + n := 5 + for i := 0; i <= n; i++ { + sl := []*tracepb.Span{{TraceId: []byte("abcdefghijklmnop"), SpanId: []byte{byte(i + 1), 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}}} + _ = traceSvcClient.Send(&agenttracepb.ExportTraceServiceRequest{Spans: sl, Node: &commonpb.Node{}}) + } + + // Give it enough time to process the streamed spans. + <-time.After(spansBufferPeriod * 2) + + // End the gRPC service to complete the RPC trace so that we + // can examine the RPC trace as well. + traceSvcDoneFn() + + // Give it some more time to complete the RPC trace and export its spanData. + <-time.After(spansBufferPeriod * 2) + + // Inspection time! + ocSpansSaver.mu.Lock() + defer ocSpansSaver.mu.Unlock() + + if len(ocSpansSaver.spanData) == 0 { + t.Fatal("Unfortunately did not receive an exported span data. Please check this library's implementation or go.opencensus.io/trace") + } + + gotSpanData := ocSpansSaver.spanData[:] + if g, w := len(gotSpanData), 2; g != w { + blob, _ := json.MarshalIndent(gotSpanData, " ", " ") + t.Fatalf("Spandata count: Got %d Want %d\n\nData: %s", g, w, blob) + } + + interceptorSpanData := gotSpanData[0] + if g, w := len(interceptorSpanData.Links), 1; g != w { + t.Fatalf("Links count: Got %d Want %d\nGotSpanData: %#v", g, w, interceptorSpanData) + } + + rpcSpanData := gotSpanData[1] + + // Ensure that the link matches up exactly! + wantLink := trace.Link{ + SpanID: rpcSpanData.SpanID, + TraceID: rpcSpanData.TraceID, + Type: trace.LinkTypeParent, + } + if g, w := interceptorSpanData.Links[0], wantLink; !reflect.DeepEqual(g, w) { + t.Errorf("Link:\nGot: %#v\nWant: %#v\n", g, w) + } + if g, w := interceptorSpanData.Name, "OpenCensusInterceptor.Export"; g != w { + t.Errorf("InterceptorExport span's SpanData.Name:\nGot: %q\nWant: %q\n", g, w) + } + + // And then for the interceptorSpanData itself, it SHOULD NOT + // have a ParentID, so let's enforce all the conditions below: + // 1. That it doesn't have the RPC spanID as its ParentSpanID + // 2. That it actually has no ParentSpanID i.e. has a blank SpanID + if g, w := interceptorSpanData.ParentSpanID[:], rpcSpanData.SpanID[:]; bytes.Equal(g, w) { + t.Errorf("InterceptorSpanData.ParentSpanID unfortunately was linked to the RPC span\nGot: %x\nWant: %x", g, w) + } + + var blankSpanID trace.SpanID + if g, w := interceptorSpanData.ParentSpanID[:], blankSpanID[:]; !bytes.Equal(g, w) { + t.Errorf("InterceptorSpanData unfortunately has a parent and isn't NULL\nGot: %x\nWant: %x", g, w) + } +} + func checkCountMetricsExporterResults(t *testing.T, cme *countMetricsExporter, n int, wantAllCountsToBe int64) { cme.mu.Lock() defer cme.mu.Unlock() diff --git a/interceptor/opencensus/opencensus.go b/interceptor/opencensus/opencensus.go index 13fe02d343d..6a16acb0258 100644 --- a/interceptor/opencensus/opencensus.go +++ b/interceptor/opencensus/opencensus.go @@ -21,6 +21,8 @@ import ( "google.golang.org/api/support/bundler" + "go.opencensus.io/trace" + commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" @@ -61,11 +63,18 @@ type spansAndNode struct { var errTraceExportProtocolViolation = errors.New("protocol violation: Export's first message must have a Node") +const interceptorName = "opencensus" + // Export is the gRPC method that receives streamed traces from // OpenCensus-traceproto compatible libraries/applications. func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) error { // The bundler will receive batches of spans i.e. []*tracepb.Span - traceBundler := bundler.NewBundler((*spansAndNode)(nil), oci.batchSpanExporting) + // We need to ensure that it propagates the interceptor name as a tag + ctxWithInterceptorName := internal.ContextWithInterceptorName(tes.Context(), interceptorName) + traceBundler := bundler.NewBundler((*spansAndNode)(nil), func(payload interface{}) { + oci.batchSpanExporting(ctxWithInterceptorName, payload) + }) + spanBufferPeriod := oci.spanBufferPeriod if spanBufferPeriod <= 0 { spanBufferPeriod = 2 * time.Second // Arbitrary value @@ -90,7 +99,7 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err return errTraceExportProtocolViolation } - spansMetricsFn := internal.NewReceivedSpansRecorderStreaming(tes.Context(), "opencensus") + spansMetricsFn := internal.NewReceivedSpansRecorderStreaming(tes.Context(), interceptorName) processReceivedSpans := func(ni *commonpb.Node, spans []*tracepb.Span) { // Firstly, we'll add them to the bundler. @@ -120,12 +129,31 @@ func (oci *OCInterceptor) Export(tes agenttracepb.TraceService_ExportServer) err } } -func (oci *OCInterceptor) batchSpanExporting(payload interface{}) { +func (oci *OCInterceptor) batchSpanExporting(longLivedRPCCtx context.Context, payload interface{}) { spnL := payload.([]*spansAndNode) + if len(spnL) == 0 { + return + } + + // Trace this method + ctx, span := trace.StartSpan(context.Background(), "OpenCensusInterceptor.Export") + defer span.End() + // TODO: (@odeke-em) investigate if it is necessary // to group nodes with their respective spans during // spansAndNode list unfurling then send spans grouped per node - ctx := context.Background() + + // If the starting RPC has a parent span, then add it as a parent link. + parentSpanFromRPC := trace.FromContext(longLivedRPCCtx) + if parentSpanFromRPC != nil { + psc := parentSpanFromRPC.SpanContext() + span.AddLink(trace.Link{ + SpanID: psc.SpanID, + TraceID: psc.TraceID, + Type: trace.LinkTypeParent, + }) + } + for _, spn := range spnL { oci.spanSink.ReceiveSpans(ctx, spn.node, spn.spans...) } diff --git a/internal/observability.go b/internal/observability.go index 6a3610114e7..16c782d379b 100644 --- a/internal/observability.go +++ b/internal/observability.go @@ -50,6 +50,13 @@ var AllViews = []*view.View{ ViewReceivedSpansInterceptor, } +// ContextWithInterceptorName adds the tag "opencensus_interceptor" and the name of the +// interceptor as the value, and returns the newly created context. +func ContextWithInterceptorName(ctx context.Context, interceptorName string) context.Context { + ctx, _ = tag.New(ctx, tag.Upsert(tagKeyInterceptorName, interceptorName)) + return ctx +} + // NewReceivedSpansRecorderStreaming creates a function that uses a context created // from the name of the interceptor to record the number of the spans received // by the interceptor. @@ -58,7 +65,7 @@ func NewReceivedSpansRecorderStreaming(lifetimeCtx context.Context, interceptorN // the context doesn't change, so it is more useful for avoid expensively adding // keys on each invocation. We can create the context once and then reuse it // when recording measurements. - ctx, _ := tag.New(lifetimeCtx, tag.Upsert(tagKeyInterceptorName, interceptorName)) + ctx := ContextWithInterceptorName(lifetimeCtx, interceptorName) return func(ni *commonpb.Node, spans []*tracepb.Span) { // TODO: (@odeke-em) perhaps also record information from the node?