diff --git a/hack/set-span-id.patch b/hack/set-span-id.patch new file mode 100644 index 00000000000..ed9326b0d4b --- /dev/null +++ b/hack/set-span-id.patch @@ -0,0 +1,16 @@ +diff --git a/vendor/go.opencensus.io/trace/trace.go b/vendor/go.opencensus.io/trace/trace.go +index 38ead7bf..9e6fe483 100644 +--- a/vendor/go.opencensus.io/trace/trace.go ++++ b/vendor/go.opencensus.io/trace/trace.go +@@ -261,6 +261,11 @@ func startSpanInternal(name string, hasParent bool, parent SpanContext, remotePa + return span + } + ++func (s *Span) SetSpanID(spanID SpanID) { ++ s.data.SpanID = spanID ++ s.spanContext.SpanID = spanID ++} ++ + // End ends the span. + func (s *Span) End() { + if s == nil { diff --git a/hack/update-deps.sh b/hack/update-deps.sh index 34935881b91..675798d13e2 100755 --- a/hack/update-deps.sh +++ b/hack/update-deps.sh @@ -33,3 +33,9 @@ rm -rf $(find vendor/ -name 'BUILD.bazel') update_licenses third_party/VENDOR-LICENSE \ $(find . -name "*.go" | grep -v vendor | xargs grep "package main" | cut -d: -f1 | xargs -n1 dirname | uniq) +# HACK HACK HACK +# The only way we found to create a consistent Trace tree without any missing Spans is to +# artificially set the SpanId. See pkg/tracing/traceparent.go for more details. +# Produced with: +# git diff origin/master HEAD -- vendor/go.opencensus.io/trace/trace.go > ./hack/set-span-id.patch +git apply ${REPO_ROOT_DIR}/hack/set-span-id.patch diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index f051a9527de..2ab6edfb927 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -33,8 +33,9 @@ import ( eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" "knative.dev/eventing/pkg/logging" "knative.dev/eventing/pkg/reconciler/trigger/path" + "knative.dev/eventing/pkg/tracing" "knative.dev/eventing/pkg/utils" - "knative.dev/pkg/tracing" + pkgtracing "knative.dev/pkg/tracing" ) const ( @@ -63,7 +64,7 @@ type FilterResult string // NewHandler creates a new Handler and its associated MessageReceiver. The caller is responsible for // Start()ing the returned Handler. func NewHandler(logger *zap.Logger, triggerLister eventinglisters.TriggerNamespaceLister, reporter StatsReporter) (*Handler, error) { - httpTransport, err := cloudevents.NewHTTPTransport(cloudevents.WithBinaryEncoding(), cehttp.WithMiddleware(tracing.HTTPSpanIgnoringPaths(readyz))) + httpTransport, err := cloudevents.NewHTTPTransport(cloudevents.WithBinaryEncoding(), cehttp.WithMiddleware(pkgtracing.HTTPSpanIgnoringPaths(readyz))) if err != nil { return nil, err } @@ -250,8 +251,13 @@ func (r *Handler) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransportC } } - start := time.Now() sendingCTX := utils.ContextFrom(tctx, subscriberURI) + sendingCTX, err = tracing.AddSpanFromTraceparentAttribute(sendingCTX, "name", *event) + if err != nil { + r.logger.Info("Unable to attach trace", zap.Error(err)) + } + + start := time.Now() rctx, replyEvent, err := r.ceClient.Send(sendingCTX, *event) rtctx := cloudevents.HTTPTransportContextFrom(rctx) // Record the dispatch time. diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index d36c6ed26d1..9748f4c44f7 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -271,29 +271,23 @@ func TestReceiver(t *testing.T) { Header: http.Header{ // foo won't pass filtering. "foo": []string{"bar"}, - // X-Request-Id will pass as an exact header match. - "X-Request-Id": []string{"123"}, - // b3 will pass as an exact header match. + // b3 will not pass filtering. "B3": []string{"0"}, - // X-B3-Foo will pass as a prefix match. + // X-B3-Foo will not pass filtering. "X-B3-Foo": []string{"abc"}, + // X-Ot-Foo will not pass filtering. + "X-Ot-Foo": []string{"haden"}, // Knative-Foo will pass as a prefix match. "Knative-Foo": []string{"baz", "qux"}, - // X-Ot-Foo will pass as a prefix match. - "X-Ot-Foo": []string{"haden"}, + // X-Request-Id will pass as an exact header match. + "X-Request-Id": []string{"123"}, }, }, expectedHeaders: http.Header{ // X-Request-Id will pass as an exact header match. "X-Request-Id": []string{"123"}, - // b3 will pass as an exact header match. - "B3": []string{"0"}, - // X-B3-Foo will pass as a prefix match. - "X-B3-Foo": []string{"abc"}, // Knative-Foo will pass as a prefix match. "Knative-Foo": []string{"baz", "qux"}, - // X-Ot-Foo will pass as a prefix match. - "X-Ot-Foo": []string{"haden"}, }, expectedDispatch: true, expectedEventCount: true, diff --git a/pkg/broker/ingress/ingress_handler.go b/pkg/broker/ingress/ingress_handler.go index 50460f4a819..424c55c4355 100644 --- a/pkg/broker/ingress/ingress_handler.go +++ b/pkg/broker/ingress/ingress_handler.go @@ -3,19 +3,15 @@ package ingress import ( "context" "errors" - "fmt" "net/http" "net/url" "reflect" - "strconv" - "strings" "time" cloudevents "github.com/cloudevents/sdk-go" - "go.opencensus.io/plugin/ochttp/propagation/b3" - "go.opencensus.io/trace" "go.uber.org/zap" "knative.dev/eventing/pkg/broker" + "knative.dev/eventing/pkg/tracing" "knative.dev/eventing/pkg/utils" ) @@ -77,7 +73,7 @@ func (h *Handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp * return nil } - tctx = addOutGoingTracing(ctx, event, tctx) + tracing.AddTraceparentAttributeFromContext(ctx, event) reporterArgs := &ReportArgs{ ns: h.Namespace, @@ -133,20 +129,3 @@ func (h *Handler) getTTLToSet(event *cloudevents.Event) int { } return int(ttl) - 1 } - -func addOutGoingTracing(ctx context.Context, event cloudevents.Event, tctx cloudevents.HTTPTransportContext) cloudevents.HTTPTransportContext { - // Inject trace into HTTP header. - spanContext := trace.FromContext(ctx).SpanContext() - tctx.Header.Set(b3.TraceIDHeader, spanContext.TraceID.String()) - tctx.Header.Set(b3.SpanIDHeader, spanContext.SpanID.String()) - sampled := 0 - if spanContext.IsSampled() { - sampled = 1 - } - tctx.Header.Set(b3.SampledHeader, strconv.Itoa(sampled)) - - // Set traceparent, a CloudEvent documented extension attribute for distributed tracing. - traceParent := strings.Join([]string{"00", spanContext.TraceID.String(), spanContext.SpanID.String(), fmt.Sprintf("%02x", spanContext.TraceOptions)}, "-") - event.SetExtension(broker.TraceParent, traceParent) - return tctx -} diff --git a/pkg/broker/metrics.go b/pkg/broker/metrics.go index 349247b6eb5..99eba69a6d1 100644 --- a/pkg/broker/metrics.go +++ b/pkg/broker/metrics.go @@ -22,10 +22,4 @@ const ( // received on a broker and before it is dispatched to the trigger function. // The format is an RFC3339 time in string format. For example: 2019-08-26T23:38:17.834384404Z. EventArrivalTime = "knativearrivaltime" - - // TraceParent is a documented extension for CloudEvent to include traces. - // https://github.com/cloudevents/spec/blob/v0.3/extensions/distributed-tracing.md#traceparent - // The format is: 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01, - // which stands for version("00" is the current version)-traceID-spanID-trace options - TraceParent = "traceparent" ) diff --git a/pkg/channel/event_dispatcher.go b/pkg/channel/event_dispatcher.go index 5ed4ea7218d..b587427714e 100644 --- a/pkg/channel/event_dispatcher.go +++ b/pkg/channel/event_dispatcher.go @@ -25,10 +25,10 @@ import ( cloudevents "github.com/cloudevents/sdk-go" cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http" "go.opencensus.io/plugin/ochttp/propagation/b3" - "go.opencensus.io/trace" "go.uber.org/zap" "k8s.io/apimachinery/pkg/util/sets" "knative.dev/eventing/pkg/kncloudevents" + "knative.dev/eventing/pkg/tracing" "knative.dev/eventing/pkg/utils" ) @@ -101,42 +101,43 @@ func (d *EventDispatcher) DispatchEvent(ctx context.Context, event cloudevents.E func (d *EventDispatcher) executeRequest(ctx context.Context, url *url.URL, event cloudevents.Event) (context.Context, *cloudevents.Event, error) { d.logger.Debug("Dispatching event", zap.String("event.id", event.ID()), zap.String("url", url.String())) + originalTransportCTX := cloudevents.HTTPTransportContextFrom(ctx) + sendingCTX := d.generateSendingContext(originalTransportCTX, url, event) - tctx := cloudevents.HTTPTransportContextFrom(ctx) - sctx := utils.ContextFrom(tctx, url) - sctx = addOutGoingTracing(sctx, url) + replyCTX, reply, err := d.ceClient.Send(sendingCTX, event) + if err != nil { + return nil, nil, err + } + replyCTX, err = generateReplyContext(replyCTX, originalTransportCTX) + if err != nil { + return nil, nil, err + } + return replyCTX, reply, nil +} - rctx, reply, err := d.ceClient.Send(sctx, event) +func (d *EventDispatcher) generateSendingContext(originalTransportCTX cehttp.TransportContext, url *url.URL, event cloudevents.Event) context.Context { + sctx := utils.ContextFrom(originalTransportCTX, url) + sctx, err := tracing.AddSpanFromTraceparentAttribute(sctx, url.Path, event) if err != nil { - return rctx, nil, err + d.logger.Info("Unable to connect outgoing span", zap.Error(err)) } + return sctx +} + +func generateReplyContext(rctx context.Context, originalTransportCTX cehttp.TransportContext) (context.Context, error) { + // rtctx = Reply transport context rtctx := cloudevents.HTTPTransportContextFrom(rctx) if isFailure(rtctx.StatusCode) { // Reject non-successful responses. - return rctx, nil, fmt.Errorf("unexpected HTTP response, expected 2xx, got %d", rtctx.StatusCode) + return rctx, fmt.Errorf("unexpected HTTP response, expected 2xx, got %d", rtctx.StatusCode) } headers := utils.PassThroughHeaders(rtctx.Header) - if correlationID, ok := tctx.Header[correlationIDHeaderName]; ok { + if correlationID, ok := originalTransportCTX.Header[correlationIDHeaderName]; ok { headers[correlationIDHeaderName] = correlationID } rtctx.Header = http.Header(headers) rctx = cehttp.WithTransportContext(rctx, rtctx) - return rctx, reply, nil -} - -func addOutGoingTracing(ctx context.Context, url *url.URL) context.Context { - tctx := cloudevents.HTTPTransportContextFrom(ctx) - // Creating a dummy request to leverage propagation.SpanContextFromRequest method. - req := &http.Request{ - Header: tctx.Header, - } - // TODO use traceparent header. Issue: https://github.com/knative/eventing/issues/1951 - // Attach the Span context that is currently saved in the request's headers. - if sc, ok := propagation.SpanContextFromRequest(req); ok { - newCtx, _ := trace.StartSpanWithRemoteParent(ctx, url.Path, sc) - return newCtx - } - return ctx + return rctx, nil } // isFailure returns true if the status code is not a successful HTTP status. @@ -146,9 +147,9 @@ func isFailure(statusCode int) bool { } func (d *EventDispatcher) resolveURL(destination string) *url.URL { - if url, err := url.Parse(destination); err == nil && d.supportedSchemes.Has(url.Scheme) { + if u, err := url.Parse(destination); err == nil && d.supportedSchemes.Has(u.Scheme) { // Already a URL with a known scheme. - return url + return u } return &url.URL{ Scheme: "http", diff --git a/pkg/channel/event_receiver.go b/pkg/channel/event_receiver.go index 6348c2503c0..3c96280a0f6 100644 --- a/pkg/channel/event_receiver.go +++ b/pkg/channel/event_receiver.go @@ -27,6 +27,7 @@ import ( cloudevents "github.com/cloudevents/sdk-go" "go.uber.org/zap" "knative.dev/eventing/pkg/kncloudevents" + "knative.dev/eventing/pkg/tracing" "knative.dev/eventing/pkg/utils" ) @@ -160,6 +161,7 @@ func (r *EventReceiver) ServeHTTP(ctx context.Context, event cloudevents.Event, sctx := utils.ContextFrom(tctx, nil) AppendHistory(&event, host) + event = tracing.AddTraceparentAttributeFromContext(ctx, event) err = r.receiverFunc(sctx, channel, event) if err != nil { if _, ok := err.(*UnknownChannelError); ok { diff --git a/pkg/channel/event_receiver_test.go b/pkg/channel/event_receiver_test.go index 1c7d0410812..b1d1d0ac5ce 100644 --- a/pkg/channel/event_receiver_test.go +++ b/pkg/channel/event_receiver_test.go @@ -77,8 +77,8 @@ func TestEventReceiver_ServeHTTP(t *testing.T) { // Ce headers won't pass through our header filtering as they should actually be set in the CloudEvent itself, // as extensions. The SDK then sets them as as Ce- headers when sending them through HTTP. "cE-not-pass-through": {"true"}, - "x-B3-pass": {"true"}, - "x-ot-pass": {"true"}, + "x-B3-pass": {"will not pass"}, + "x-ot-pass": {"will not pass"}, }, body: "event-body", host: "test-name.test-namespace.svc." + utils.GetClusterDomainName(), @@ -95,8 +95,6 @@ func TestEventReceiver_ServeHTTP(t *testing.T) { // Note that only the first value was passed through, the remaining values were // discarded. "knatIve-will-pass-through": "true", - "x-B3-pass": "true", - "x-ot-pass": "true", } tctx := cloudevents.HTTPTransportContextFrom(ctx) actualHeaders := make(map[string]string) diff --git a/pkg/tracing/traceparent.go b/pkg/tracing/traceparent.go new file mode 100644 index 00000000000..581042a125d --- /dev/null +++ b/pkg/tracing/traceparent.go @@ -0,0 +1,110 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tracing + +import ( + "context" + "fmt" + "regexp" + "strconv" + + cloudevents "github.com/cloudevents/sdk-go" + "go.opencensus.io/plugin/ochttp/propagation/b3" + "go.opencensus.io/trace" +) + +const ( + // traceparentAttribute is the name of the CloudEvents attribute that contains the trace state. + // See + // https://github.com/cloudevents/spec/blob/v1.0-rc1/extensions/distributed-tracing.md#traceparent + traceparentAttribute = "traceparent" +) + +// AddTraceparentAttributeFromContext returns a CloudEvent that is identical to the input event, +// with the traceparent CloudEvents extension attribute added. The value for that attribute is the +// Span stored in the context. +// +// The context is expected to have passed through the OpenCensus HTTP Handler, so that the Span has +// been added to it. +func AddTraceparentAttributeFromContext(ctx context.Context, event cloudevents.Event) cloudevents.Event { + span := trace.FromContext(ctx) + if span != nil { + event.SetExtension(traceparentAttribute, traceparentAttributeValue(span)) + } + return event +} + +func traceparentAttributeValue(span *trace.Span) string { + flags := "00" + if span.SpanContext().IsSampled() { + flags = "01" + } + return fmt.Sprintf("00-%s-%s-%s", + span.SpanContext().TraceID.String(), + span.SpanContext().SpanID.String(), + flags) +} + +// AddSpanFromTraceparentAttribute extracts the traceparent extension attribute from the CloudEvent +// and returns a context with that span set. If the traceparent extension attribute is not found or +// cannot be parsed, then an error is returned. +func AddSpanFromTraceparentAttribute(ctx context.Context, name string, event cloudevents.Event) (context.Context, error) { + tp, ok := event.Extensions()[traceparentAttribute] + if !ok { + return ctx, fmt.Errorf("extension attributes did not contain %q", traceparentAttribute) + } + tps, ok := tp.(string) + if !ok { + return ctx, fmt.Errorf("extention attribute %q's value was not a string: %T", traceparentAttribute, tps) + } + sc, err := parseTraceparent(tps) + if err != nil { + return ctx, err + } + // Create a fake Span with the saved information. In order to ensure any requests made with this + // context have a parent of the saved Span, set the SpanID to the one saved in the traceparent. + // Normally, a new SpanID is generated and because this Span is never reported would create a + // hole in the Span tree. + _, span := trace.StartSpanWithRemoteParent(ctx, name, sc) + span.SetSpanID(sc.SpanID) + return trace.NewContext(ctx, span), nil +} + +func parseTraceparent(tp string) (trace.SpanContext, error) { + re := regexp.MustCompile("^00-([a-f0-9]{32})-([a-f0-9]{16})-([a-f0-9]{2})$") + m := re.FindStringSubmatch(tp) + if len(m) == 0 { + return trace.SpanContext{}, fmt.Errorf("could not parse traceparent: %q", tp) + } + traceID, ok := b3.ParseTraceID(m[1]) + if !ok { + return trace.SpanContext{}, fmt.Errorf("could not parse traceID: %q", tp) + } + spanID, ok := b3.ParseSpanID(m[2]) + if !ok { + return trace.SpanContext{}, fmt.Errorf("could not parse spanID: %q", tp) + } + options, err := strconv.ParseUint(m[3], 16, 32) + if err != nil { + return trace.SpanContext{}, fmt.Errorf("could not parse options: %q", tp) + } + return trace.SpanContext{ + TraceID: traceID, + SpanID: spanID, + TraceOptions: trace.TraceOptions(options), + }, nil +} diff --git a/pkg/tracing/traceparent_test.go b/pkg/tracing/traceparent_test.go new file mode 100644 index 00000000000..8fb822d2248 --- /dev/null +++ b/pkg/tracing/traceparent_test.go @@ -0,0 +1,156 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tracing + +import ( + "context" + "fmt" + "testing" + + cloudevents "github.com/cloudevents/sdk-go" + "go.opencensus.io/trace" +) + +func TestAddTraceparentAttributeFromContext(t *testing.T) { + testCases := map[string]struct { + inCTX bool + sampled bool + }{ + "no span in context": {}, + "not sampled": { + inCTX: true, + }, + "sampled": { + inCTX: true, + sampled: true, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + sampler := trace.WithSampler(trace.NeverSample()) + if tc.sampled { + sampler = trace.WithSampler(trace.AlwaysSample()) + } + _, span := trace.StartSpan(context.Background(), "name", sampler) + + ctx := context.Background() + if tc.inCTX { + ctx = trace.NewContext(ctx, span) + } + event := cloudevents.Event{ + Context: &cloudevents.EventContextV03{ + ID: "from-the-test", + }, + } + eventWithTraceparent := AddTraceparentAttributeFromContext(ctx, event) + if tc.inCTX { + sampled := "00" + if tc.sampled { + sampled = "01" + } + tp, ok := eventWithTraceparent.Extensions()["traceparent"] + if !ok { + t.Fatal("traceparent annotation not present.") + } + expected := fmt.Sprintf("00-%s-%s-%s", span.SpanContext().TraceID, span.SpanContext().SpanID, sampled) + if expected != tp.(string) { + t.Fatalf("Unexpected traceparent value. Got %q. Want %q", tp.(string), expected) + } + } else { + if event != eventWithTraceparent { + t.Fatalf("Event changed unexpectedly. Got %v. Want %v", eventWithTraceparent, event) + } + } + }) + } +} + +func TestAddSpanFromTraceparentAttribute(t *testing.T) { + traceID := "1234567890abcdef1234567890abcdef" + spanID := "1234567890abcdef" + options := "01" + testCases := map[string]struct { + present bool + notAString bool + traceparent string + expectError bool + }{ + "not present": { + expectError: true, + }, + "not a string": { + present: true, + notAString: true, + expectError: true, + }, + "bad format": { + present: true, + traceparent: "random-string", + expectError: true, + }, + "bad traceID": { + present: true, + traceparent: fmt.Sprintf("00-%s-%s-%s", "bad", spanID, options), + expectError: true, + }, + "bad spanID": { + present: true, + traceparent: fmt.Sprintf("00-%s-%s-%s", traceID, "bad", options), + expectError: true, + }, + "bad options": { + present: true, + traceparent: fmt.Sprintf("00-%s-%s-%s", traceID, spanID, "bad"), + expectError: true, + }, + "good": { + present: true, + traceparent: fmt.Sprintf("00-%s-%s-%s", traceID, spanID, options), + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + event := cloudevents.Event{ + Context: &cloudevents.EventContextV03{}, + } + if tc.present { + if tc.notAString { + event.SetExtension("traceparent", 1.0) + } else { + event.SetExtension("traceparent", tc.traceparent) + } + } + ctx, err := AddSpanFromTraceparentAttribute(context.Background(), "name", event) + if tc.expectError { + if err == nil { + t.Fatal("Expected an error, actually nil") + } + return + } + span := trace.FromContext(ctx) + if actual := span.SpanContext().TraceID.String(); traceID != actual { + t.Errorf("Incorrect TraceID. Got %q. Want %q", actual, traceID) + } + if actual := span.SpanContext().SpanID.String(); spanID != actual { + t.Errorf("Incorrect SpanID. Got %q. Want %q", actual, spanID) + } + if actual := span.SpanContext().TraceOptions; 1 != actual { + t.Errorf("Incorrect options. Got %q. Want %q", actual, 1) + } + }) + } +} diff --git a/pkg/utils/context.go b/pkg/utils/context.go index bb34c842a3f..a2188fcdb01 100644 --- a/pkg/utils/context.go +++ b/pkg/utils/context.go @@ -34,9 +34,6 @@ var ( forwardHeaders = sets.NewString( // tracing "x-request-id", - // Single header for b3 tracing. See - // https://github.com/openzipkin/b3-propagation#single-header. - "b3", ) // These MUST be lowercase strings, as they will be compared against lowercase strings. // Removing CloudEvents ce- prefixes on purpose as they should be set in the CloudEvent itself as extensions. @@ -45,11 +42,6 @@ var ( forwardPrefixes = []string{ // knative "knative-", - // tracing - // TODO check if we can remove this once we address the issue in ContextFrom. - // Issue: https://github.com/knative/eventing/issues/1953 - "x-b3-", - "x-ot-", } ) diff --git a/test/conformance/channel_tracing_test.go b/test/conformance/channel_tracing_test.go index 7eabf583db2..15634483c0a 100644 --- a/test/conformance/channel_tracing_test.go +++ b/test/conformance/channel_tracing_test.go @@ -27,3 +27,7 @@ import ( func TestChannelTracing(t *testing.T) { helpers.ChannelTracingTestHelper(t, channelTestRunner) } + +func TestChannelTracingWithReply(t *testing.T) { + helpers.ChannelTracingTestHelperWithReply(t, channelTestRunner) +} diff --git a/test/conformance/helpers/channel_tracing_test_helper.go b/test/conformance/helpers/channel_tracing_test_helper.go index f04fb05ab43..99ca60067d3 100644 --- a/test/conformance/helpers/channel_tracing_test_helper.go +++ b/test/conformance/helpers/channel_tracing_test_helper.go @@ -62,16 +62,14 @@ func ChannelTracingTestHelper(t *testing.T, channelTestRunner common.ChannelTest traceID := getTraceID(st, client, loggerPodName) trace, err := zipkin.JSONTrace(traceID, expected.SpanCount(), 60*time.Second) if err != nil { - st.Fatalf("Unable to get trace %q: %v. Trace so far %+v", traceID, err, trace) + st.Fatalf("Unable to get trace %q: %v. Trace so far %+v", traceID, err, tracinghelper.PrettyPrintTrace(trace)) + } + st.Logf("I got the trace, %q!\n%+v", traceID, tracinghelper.PrettyPrintTrace(trace)) + + tree := tracinghelper.GetTraceTree(st, trace) + if err := expected.Matches(tree); err != nil { + st.Fatalf("Trace Tree did not match expected: %v", err) } - st.Logf("I got the trace, %q!\n%+v", traceID, trace) - - // TODO uncomment once we use traceparent in event_dispatcher.addOutGoingTracing method. - // Issue https://github.com/knative/eventing/issues/1951 - //tree := tracinghelper.GetTraceTree(st, trace) - //if err := expected.Matches(tree); err != nil { - // st.Fatalf("Trace Tree did not match expected: %v", err) - //} }) }) } @@ -215,3 +213,230 @@ func getTraceID(t *testing.T, client *common.Client, loggerPodName string) strin traceID := matches[1] return traceID } + +func ChannelTracingTestHelperWithReply(t *testing.T, channelTestRunner common.ChannelTestRunner) { + testCases := map[string]struct { + incomingTraceId bool + istio bool + }{ + "includes incoming trace id": { + incomingTraceId: true, + }, + } + + for n, tc := range testCases { + loggerPodName := "logger" + t.Run(n, func(t *testing.T) { + channelTestRunner.RunTests(t, common.FeatureBasic, func(st *testing.T, channel string) { + // Don't accidentally use t, use st instead. To ensure this, shadow 't' to a useless + // type. + t := struct{}{} + _ = fmt.Sprintf("%s", t) + + client := common.Setup(st, true) + defer common.TearDown(client) + + // Do NOT call zipkin.CleanupZipkinTracingSetup. That will be called exactly once in + // TestMain. + tracinghelper.Setup(st, client) + + expected, mustContain := setupChannelTracingWithReply(st, channel, client, loggerPodName, tc.incomingTraceId) + assertLogContents(st, client, loggerPodName, mustContain) + + traceID := getTraceID(st, client, loggerPodName) + trace, err := zipkin.JSONTrace(traceID, expected.SpanCount(), 60*time.Second) + if err != nil { + st.Fatalf("Unable to get trace %q: %v. Trace so far %+v", traceID, err, tracinghelper.PrettyPrintTrace(trace)) + } + st.Logf("I got the trace, %q!\n%+v", traceID, tracinghelper.PrettyPrintTrace(trace)) + + tree := tracinghelper.GetTraceTree(st, trace) + if err := expected.Matches(tree); err != nil { + st.Fatalf("Trace Tree did not match expected: %v", err) + } + }) + }) + } +} + +// setupChannelTracing is the general setup for TestChannelTracing. It creates the following: +// SendEvents (Pod) -> Channel -> Subscription -> K8s Service -> Mutate (Pod) +// v +// LogEvents (Pod) <- K8s Service <- Subscription <- Channel <- (Reply) Subscription +// It returns the expected trace tree and a string that is expected to be sent by the SendEvents Pod +// and should be present in the LogEvents Pod logs. +func setupChannelTracingWithReply(t *testing.T, channel string, client *common.Client, loggerPodName string, incomingTraceId bool) (tracinghelper.TestSpanTree, string) { + // Create the Channels. + channelName := "ch" + channelTypeMeta := common.GetChannelTypeMeta(channel) + client.CreateChannelOrFail(channelName, channelTypeMeta) + + replyChannelName := "reply-ch" + client.CreateChannelOrFail(replyChannelName, channelTypeMeta) + + // Create the 'sink', a LogEvents Pod and a K8s Service that points to it. + loggerPod := resources.EventDetailsPod(loggerPodName) + client.CreatePodOrFail(loggerPod, common.WithService(loggerPodName)) + + // Create the subscriber, a Pod that mutates the event. + transformerPod := resources.EventTransformationPod("transformer", &resources.CloudEvent{ + Type: "mutated", + }) + client.CreatePodOrFail(transformerPod, common.WithService(transformerPod.Name)) + + // Create the Subscription linking the Channel to the mutator. + client.CreateSubscriptionOrFail( + "sub", + channelName, + channelTypeMeta, + resources.WithSubscriberForSubscription(transformerPod.Name), + resources.WithReplyForSubscription(replyChannelName, channelTypeMeta)) + + // Create the Subscription linking the reply Channel to the LogEvents K8s Service. + client.CreateSubscriptionOrFail( + "reply-sub", + replyChannelName, + channelTypeMeta, + resources.WithSubscriberForSubscription(loggerPodName), + ) + + // Wait for all test resources to be ready, so that we can start sending events. + if err := client.WaitForAllTestResourcesReady(); err != nil { + t.Fatalf("Failed to get all test resources ready: %v", err) + } + + // Everything is setup to receive an event. Generate a CloudEvent. + senderName := "sender" + eventID := fmt.Sprintf("%s", uuid.NewUUID()) + body := fmt.Sprintf("TestChannelTracing %s", eventID) + event := &resources.CloudEvent{ + ID: eventID, + Source: senderName, + Type: resources.CloudEventDefaultType, + Data: fmt.Sprintf(`{"msg":%q}`, body), + Encoding: resources.CloudEventEncodingBinary, + } + + // Send the CloudEvent (either with or without tracing inside the SendEvents Pod). + sendEvent := client.SendFakeEventToAddressable + if incomingTraceId { + sendEvent = client.SendFakeEventWithTracingToAddressable + } + if err := sendEvent(senderName, channelName, channelTypeMeta, event); err != nil { + t.Fatalf("Failed to send fake CloudEvent to the channel %q", channelName) + } + + // We expect the following spans: + // 0. Artificial root span. + // 1. Sending pod sends event to Channel (only if the sending pod generates a span). + // 2. Channel receives event from sending pod. + // 3. Channel sends event to transformer pod. + // 4. Transformer Pod receives event from Channel. + // 5. Channel sends reply from Transformer Pod to the reply Channel. + // 6. Reply Channel receives event from the original Channel's reply. + // 7. Reply Channel sends event to the logging Pod. + // 8. Logging pod receives event from Channel. + expected := tracinghelper.TestSpanTree{ + // 0. Artificial root span. + Root: true, + // 1 is added below if it is needed. + Children: []tracinghelper.TestSpanTree{ + { + // 2. Channel receives event from sending pod. + Kind: model.Server, + Tags: map[string]string{ + "http.method": "POST", + "http.status_code": "202", + "http.host": fmt.Sprintf("%s-kn-channel.%s.svc.cluster.local", channelName, client.Namespace), + "http.path": "/", + }, + Children: []tracinghelper.TestSpanTree{ + { + // 3. Channel sends event to transformer pod. + Kind: model.Client, + Tags: map[string]string{ + "http.method": "POST", + "http.status_code": "200", + "http.url": fmt.Sprintf("http://%s.%s.svc.cluster.local/", transformerPod.Name, client.Namespace), + }, + Children: []tracinghelper.TestSpanTree{ + { + // 4. Transformer Pod receives event from Channel. + Kind: model.Server, + LocalEndpointServiceName: transformerPod.Name, + Tags: map[string]string{ + "http.method": "POST", + "http.path": "/", + "http.status_code": "200", + "http.host": fmt.Sprintf("%s.%s.svc.cluster.local", transformerPod.Name, client.Namespace), + }, + }, + }, + }, + { + // 5. Channel sends reply from Transformer Pod to the reply Channel. + Kind: model.Client, + Tags: map[string]string{ + "http.method": "POST", + "http.status_code": "202", + "http.url": fmt.Sprintf("http://%s-kn-channel.%s.svc.cluster.local", replyChannelName, client.Namespace), + }, + Children: []tracinghelper.TestSpanTree{ + // 6. Reply Channel receives event from the original Channel's reply. + { + Kind: model.Server, + Tags: map[string]string{ + "http.method": "POST", + "http.status_code": "202", + "http.host": fmt.Sprintf("%s-kn-channel.%s.svc.cluster.local", replyChannelName, client.Namespace), + "http.path": "/", + }, + Children: []tracinghelper.TestSpanTree{ + { + // 7. Reply Channel sends event to the logging Pod. + Kind: model.Client, + Tags: map[string]string{ + "http.method": "POST", + "http.status_code": "202", + "http.url": fmt.Sprintf("http://%s.%s.svc.cluster.local/", loggerPod.Name, client.Namespace), + }, + Children: []tracinghelper.TestSpanTree{ + { + // 8. Logging pod receives event from Channel. + Kind: model.Server, + LocalEndpointServiceName: loggerPod.Name, + Tags: map[string]string{ + "http.method": "POST", + "http.path": "/", + "http.status_code": "202", + "http.host": fmt.Sprintf("%s.%s.svc.cluster.local", loggerPod.Name, client.Namespace), + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + if incomingTraceId { + expected.Children = []tracinghelper.TestSpanTree{ + { + // 1. Sending pod sends event to Channel (only if the sending pod generates a span). + Kind: model.Client, + LocalEndpointServiceName: "sender", + Tags: map[string]string{ + "http.method": "POST", + "http.status_code": "202", + "http.url": fmt.Sprintf("http://%s-kn-channel.%s.svc.cluster.local", channelName, client.Namespace), + }, + Children: expected.Children, + }, + } + } + return expected, body +} diff --git a/test/conformance/helpers/tracing/traces.go b/test/conformance/helpers/tracing/traces.go index 77ad0720e6d..62aeeeb1876 100644 --- a/test/conformance/helpers/tracing/traces.go +++ b/test/conformance/helpers/tracing/traces.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/openzipkin/zipkin-go/model" + "k8s.io/apimachinery/pkg/util/sets" ) // PrettyPrintTrace pretty prints a Trace. @@ -36,6 +37,39 @@ type SpanTree struct { Children []SpanTree } +func (t SpanTree) String() string { + b, _ := json.Marshal(t) + return string(b) +} + +func (t SpanTree) ToTestSpanTree() TestSpanTree { + children := make([]TestSpanTree, len(t.Children)) + for i := range t.Children { + children[i] = t.Children[i].toTestSpanTreeHelper() + } + return TestSpanTree{ + Root: true, + Children: children, + } +} + +func (t SpanTree) toTestSpanTreeHelper() TestSpanTree { + name := "" + if t.Span.LocalEndpoint != nil { + name = t.Span.LocalEndpoint.ServiceName + } + children := make([]TestSpanTree, len(t.Children)) + for i := range t.Children { + children[i] = t.Children[i].toTestSpanTreeHelper() + } + return TestSpanTree{ + Kind: t.Span.Kind, + LocalEndpointServiceName: name, + Tags: t.Span.Tags, + Children: children, + } +} + // TestSpanTree is the expected version of SpanTree used for assertions in testing. type TestSpanTree struct { Root bool @@ -46,6 +80,11 @@ type TestSpanTree struct { Children []TestSpanTree } +func (t TestSpanTree) String() string { + b, _ := json.Marshal(t) + return string(b) +} + // GetTraceTree converts a set slice of spans into a SpanTree. func GetTraceTree(t *testing.T, trace []model.SpanModel) SpanTree { var roots []model.SpanModel @@ -85,6 +124,7 @@ func getChildren(parents map[model.ID][]model.SpanModel, current []model.SpanMod }) delete(parents, span.ID) } + return children, nil } @@ -104,7 +144,11 @@ func (t TestSpanTree) SpanCount() int { // Matches checks to see if this TestSpanTree matches an actual SpanTree. It is intended to be used // for assertions while testing. func (t TestSpanTree) Matches(actual SpanTree) error { - return traceTreeMatches(".", t, actual) + err := traceTreeMatches(".", t, actual) + if err != nil { + return fmt.Errorf("spanTree did not match: %v. Actual %v, Expected %v", err, actual.ToTestSpanTree().String(), t.String()) + } + return nil } func traceTreeMatches(pos string, want TestSpanTree, got SpanTree) error { @@ -123,14 +167,36 @@ func traceTreeMatches(pos string, want TestSpanTree, got SpanTree) error { return fmt.Errorf("unexpected tag[%s] value at %q: got %q, want %q", k, pos, g, w) } } - if g, w := len(got.Children), len(want.Children); g != w { + return unorderedTraceTreesMatch(pos, want.Children, got.Children) +} + +// unorderedTraceTreesMatch checks to see if for every TestSpanTree in want, there is a +// corresponding SpanTree in got. It's comparison is done unordered, but slowly. It should not be +// called with too many entries in either slice. +func unorderedTraceTreesMatch(pos string, want []TestSpanTree, got []SpanTree) error { + if g, w := len(got), len(want); g != w { return fmt.Errorf("unexpected number of children at %q: got %v, want %v", pos, g, w) } - // TODO: Children are actually unordered, assert them in an unordered fashion. - for i := range want.Children { - if err := traceTreeMatches(fmt.Sprintf("%s%d.", pos, i), want.Children[i], got.Children[i]); err != nil { - return err + unmatchedGot := sets.NewInt() + for i := range got { + unmatchedGot.Insert(i) + } + // This is an O(n^4) algorithm. It compares every item in want to every item in got, O(n^2). + // Those comparisons do the same recursively O(n^2). We expect there to be not too many traces, + // so n should be small (say 50 in the largest cases). +OuterLoop: + for i, w := range want { + for ug := range unmatchedGot { + err := w.Matches(got[ug]) + // If there is no error, then it matched successfully. + if err == nil { + unmatchedGot.Delete(ug) + continue OuterLoop + } } + // Nothing matched. + return fmt.Errorf("unable to find child match %s[%d]: Want: %s **** Got: %s", pos, i, w.String(), got) } + // Everything matched. return nil } diff --git a/test/test_images/transformevents/main.go b/test/test_images/transformevents/main.go index 13e6f2d1570..1c01cd93ee8 100644 --- a/test/test_images/transformevents/main.go +++ b/test/test_images/transformevents/main.go @@ -22,6 +22,9 @@ import ( "log" cloudevents "github.com/cloudevents/sdk-go" + "go.uber.org/zap" + "knative.dev/eventing/pkg/kncloudevents" + "knative.dev/eventing/pkg/tracing" ) var ( @@ -62,7 +65,7 @@ func gotEvent(event cloudevents.Event, resp *cloudevents.EventResponse) error { } log.Println("Transform the event to: ") - log.Printf("[%s] %s %s: %+v", ctx.Time.String(), ctx.GetSource(), ctx.GetType(), dataBytes) + log.Printf("[%s] %s %s: %s", ctx.Time.String(), ctx.GetSource(), ctx.GetType(), dataBytes) resp.RespondWith(200, &r) return nil @@ -72,7 +75,12 @@ func main() { // parse the command line flags flag.Parse() - c, err := cloudevents.NewDefaultClient() + logger, _ := zap.NewDevelopment() + if err := tracing.SetupStaticPublishing(logger.Sugar(), "", tracing.AlwaysSample); err != nil { + log.Fatalf("Unable to setup trace publishing: %v", err) + } + + c, err := kncloudevents.NewDefaultClient() if err != nil { log.Fatalf("failed to create client, %v", err) } diff --git a/vendor/go.opencensus.io/trace/trace.go b/vendor/go.opencensus.io/trace/trace.go index 38ead7bf0ad..9e6fe483a6e 100644 --- a/vendor/go.opencensus.io/trace/trace.go +++ b/vendor/go.opencensus.io/trace/trace.go @@ -261,6 +261,11 @@ func startSpanInternal(name string, hasParent bool, parent SpanContext, remotePa return span } +func (s *Span) SetSpanID(spanID SpanID) { + s.data.SpanID = spanID + s.spanContext.SpanID = spanID +} + // End ends the span. func (s *Span) End() { if s == nil {