Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

E2E Tracing test - Strong trace assertions #1972

Merged
merged 11 commits into from
Sep 30, 2019
16 changes: 16 additions & 0 deletions hack/set-span-id.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
diff --git a/vendor/go.opencensus.io/trace/trace.go b/vendor/go.opencensus.io/trace/trace.go
index 38ead7bf..9e6fe483 100644
--- a/vendor/go.opencensus.io/trace/trace.go
+++ b/vendor/go.opencensus.io/trace/trace.go
@@ -261,6 +261,11 @@ func startSpanInternal(name string, hasParent bool, parent SpanContext, remotePa
return span
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that until some update?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. I just couldn't find a better way to do it. There probably is one, but I wasn't able to find it.

In theory, we will move to some OpenTelemetry libraries when they go 1.0, at which point I hope we don't need a similar hack :)

}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove trailing whitespace:

Suggested change

+func (s *Span) SetSpanID(spanID SpanID) {
+ s.data.SpanID = spanID
+ s.spanContext.SpanID = spanID
+}
+
// End ends the span.
func (s *Span) End() {
if s == nil {
6 changes: 6 additions & 0 deletions hack/update-deps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,9 @@ rm -rf $(find vendor/ -name 'BUILD.bazel')
update_licenses third_party/VENDOR-LICENSE \
$(find . -name "*.go" | grep -v vendor | xargs grep "package main" | cut -d: -f1 | xargs -n1 dirname | uniq)

# HACK HACK HACK
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:-)

# The only way we found to create a consistent Trace tree without any missing Spans is to
# artificially set the SpanId. See pkg/tracing/traceparent.go for more details.
# Produced with:
# git diff origin/master HEAD -- vendor/go.opencensus.io/trace/trace.go > ./hack/set-span-id.patch
git apply ${REPO_ROOT_DIR}/hack/set-span-id.patch
12 changes: 9 additions & 3 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ import (
eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1"
"knative.dev/eventing/pkg/logging"
"knative.dev/eventing/pkg/reconciler/trigger/path"
"knative.dev/eventing/pkg/tracing"
"knative.dev/eventing/pkg/utils"
"knative.dev/pkg/tracing"
pkgtracing "knative.dev/pkg/tracing"
)

const (
Expand Down Expand Up @@ -63,7 +64,7 @@ type FilterResult string
// NewHandler creates a new Handler and its associated MessageReceiver. The caller is responsible for
// Start()ing the returned Handler.
func NewHandler(logger *zap.Logger, triggerLister eventinglisters.TriggerNamespaceLister, reporter StatsReporter) (*Handler, error) {
httpTransport, err := cloudevents.NewHTTPTransport(cloudevents.WithBinaryEncoding(), cehttp.WithMiddleware(tracing.HTTPSpanIgnoringPaths(readyz)))
httpTransport, err := cloudevents.NewHTTPTransport(cloudevents.WithBinaryEncoding(), cehttp.WithMiddleware(pkgtracing.HTTPSpanIgnoringPaths(readyz)))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -250,8 +251,13 @@ func (r *Handler) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransportC
}
}

start := time.Now()
sendingCTX := utils.ContextFrom(tctx, subscriberURI)
sendingCTX, err = tracing.AddSpanFromTraceparentAttribute(sendingCTX, "name", *event)
if err != nil {
r.logger.Info("Unable to attach trace", zap.Error(err))
}

start := time.Now()
rctx, replyEvent, err := r.ceClient.Send(sendingCTX, *event)
rtctx := cloudevents.HTTPTransportContextFrom(rctx)
// Record the dispatch time.
Expand Down
18 changes: 6 additions & 12 deletions pkg/broker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,29 +271,23 @@ func TestReceiver(t *testing.T) {
Header: http.Header{
// foo won't pass filtering.
"foo": []string{"bar"},
// X-Request-Id will pass as an exact header match.
"X-Request-Id": []string{"123"},
// b3 will pass as an exact header match.
// b3 will not pass filtering.
"B3": []string{"0"},
// X-B3-Foo will pass as a prefix match.
// X-B3-Foo will not pass filtering.
"X-B3-Foo": []string{"abc"},
// X-Ot-Foo will not pass filtering.
"X-Ot-Foo": []string{"haden"},
// Knative-Foo will pass as a prefix match.
"Knative-Foo": []string{"baz", "qux"},
// X-Ot-Foo will pass as a prefix match.
"X-Ot-Foo": []string{"haden"},
// X-Request-Id will pass as an exact header match.
"X-Request-Id": []string{"123"},
},
},
expectedHeaders: http.Header{
// X-Request-Id will pass as an exact header match.
"X-Request-Id": []string{"123"},
// b3 will pass as an exact header match.
"B3": []string{"0"},
// X-B3-Foo will pass as a prefix match.
"X-B3-Foo": []string{"abc"},
// Knative-Foo will pass as a prefix match.
"Knative-Foo": []string{"baz", "qux"},
// X-Ot-Foo will pass as a prefix match.
"X-Ot-Foo": []string{"haden"},
},
expectedDispatch: true,
expectedEventCount: true,
Expand Down
25 changes: 2 additions & 23 deletions pkg/broker/ingress/ingress_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,15 @@ package ingress
import (
"context"
"errors"
"fmt"
"net/http"
"net/url"
"reflect"
"strconv"
"strings"
"time"

cloudevents "github.com/cloudevents/sdk-go"
"go.opencensus.io/plugin/ochttp/propagation/b3"
"go.opencensus.io/trace"
"go.uber.org/zap"
"knative.dev/eventing/pkg/broker"
"knative.dev/eventing/pkg/tracing"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pkgtracing ? (to be consistent)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nevermind :-)

I see eventing (I feel eventingtracing is odd)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/resolved

"knative.dev/eventing/pkg/utils"
)

Expand Down Expand Up @@ -77,7 +73,7 @@ func (h *Handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp *
return nil
}

tctx = addOutGoingTracing(ctx, event, tctx)
tracing.AddTraceparentAttributeFromContext(ctx, event)

reporterArgs := &ReportArgs{
ns: h.Namespace,
Expand Down Expand Up @@ -133,20 +129,3 @@ func (h *Handler) getTTLToSet(event *cloudevents.Event) int {
}
return int(ttl) - 1
}

func addOutGoingTracing(ctx context.Context, event cloudevents.Event, tctx cloudevents.HTTPTransportContext) cloudevents.HTTPTransportContext {
// Inject trace into HTTP header.
spanContext := trace.FromContext(ctx).SpanContext()
tctx.Header.Set(b3.TraceIDHeader, spanContext.TraceID.String())
tctx.Header.Set(b3.SpanIDHeader, spanContext.SpanID.String())
sampled := 0
if spanContext.IsSampled() {
sampled = 1
}
tctx.Header.Set(b3.SampledHeader, strconv.Itoa(sampled))

// Set traceparent, a CloudEvent documented extension attribute for distributed tracing.
traceParent := strings.Join([]string{"00", spanContext.TraceID.String(), spanContext.SpanID.String(), fmt.Sprintf("%02x", spanContext.TraceOptions)}, "-")
event.SetExtension(broker.TraceParent, traceParent)
return tctx
}
6 changes: 0 additions & 6 deletions pkg/broker/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,4 @@ const (
// received on a broker and before it is dispatched to the trigger function.
// The format is an RFC3339 time in string format. For example: 2019-08-26T23:38:17.834384404Z.
EventArrivalTime = "knativearrivaltime"

// TraceParent is a documented extension for CloudEvent to include traces.
// https://github.com/cloudevents/spec/blob/v0.3/extensions/distributed-tracing.md#traceparent
// The format is: 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01,
// which stands for version("00" is the current version)-traceID-spanID-trace options
TraceParent = "traceparent"
)
53 changes: 27 additions & 26 deletions pkg/channel/event_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import (
cloudevents "github.com/cloudevents/sdk-go"
cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http"
"go.opencensus.io/plugin/ochttp/propagation/b3"
"go.opencensus.io/trace"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/tracing"
"knative.dev/eventing/pkg/utils"
)

Expand Down Expand Up @@ -101,42 +101,43 @@ func (d *EventDispatcher) DispatchEvent(ctx context.Context, event cloudevents.E

func (d *EventDispatcher) executeRequest(ctx context.Context, url *url.URL, event cloudevents.Event) (context.Context, *cloudevents.Event, error) {
d.logger.Debug("Dispatching event", zap.String("event.id", event.ID()), zap.String("url", url.String()))
originalTransportCTX := cloudevents.HTTPTransportContextFrom(ctx)
sendingCTX := d.generateSendingContext(originalTransportCTX, url, event)

tctx := cloudevents.HTTPTransportContextFrom(ctx)
sctx := utils.ContextFrom(tctx, url)
sctx = addOutGoingTracing(sctx, url)
replyCTX, reply, err := d.ceClient.Send(sendingCTX, event)
if err != nil {
return nil, nil, err
}
replyCTX, err = generateReplyContext(replyCTX, originalTransportCTX)
if err != nil {
return nil, nil, err
}
return replyCTX, reply, nil
}

rctx, reply, err := d.ceClient.Send(sctx, event)
func (d *EventDispatcher) generateSendingContext(originalTransportCTX cehttp.TransportContext, url *url.URL, event cloudevents.Event) context.Context {
sctx := utils.ContextFrom(originalTransportCTX, url)
sctx, err := tracing.AddSpanFromTraceparentAttribute(sctx, url.Path, event)
if err != nil {
return rctx, nil, err
d.logger.Info("Unable to connect outgoing span", zap.Error(err))
}
return sctx
}

func generateReplyContext(rctx context.Context, originalTransportCTX cehttp.TransportContext) (context.Context, error) {
// rtctx = Reply transport context
rtctx := cloudevents.HTTPTransportContextFrom(rctx)
if isFailure(rtctx.StatusCode) {
// Reject non-successful responses.
return rctx, nil, fmt.Errorf("unexpected HTTP response, expected 2xx, got %d", rtctx.StatusCode)
return rctx, fmt.Errorf("unexpected HTTP response, expected 2xx, got %d", rtctx.StatusCode)
}
headers := utils.PassThroughHeaders(rtctx.Header)
if correlationID, ok := tctx.Header[correlationIDHeaderName]; ok {
if correlationID, ok := originalTransportCTX.Header[correlationIDHeaderName]; ok {
headers[correlationIDHeaderName] = correlationID
}
rtctx.Header = http.Header(headers)
rctx = cehttp.WithTransportContext(rctx, rtctx)
return rctx, reply, nil
}

func addOutGoingTracing(ctx context.Context, url *url.URL) context.Context {
tctx := cloudevents.HTTPTransportContextFrom(ctx)
// Creating a dummy request to leverage propagation.SpanContextFromRequest method.
req := &http.Request{
Header: tctx.Header,
}
// TODO use traceparent header. Issue: https://github.com/knative/eventing/issues/1951
// Attach the Span context that is currently saved in the request's headers.
if sc, ok := propagation.SpanContextFromRequest(req); ok {
newCtx, _ := trace.StartSpanWithRemoteParent(ctx, url.Path, sc)
return newCtx
}
return ctx
return rctx, nil
}

// isFailure returns true if the status code is not a successful HTTP status.
Expand All @@ -146,9 +147,9 @@ func isFailure(statusCode int) bool {
}

func (d *EventDispatcher) resolveURL(destination string) *url.URL {
if url, err := url.Parse(destination); err == nil && d.supportedSchemes.Has(url.Scheme) {
if u, err := url.Parse(destination); err == nil && d.supportedSchemes.Has(u.Scheme) {
// Already a URL with a known scheme.
return url
return u
}
return &url.URL{
Scheme: "http",
Expand Down
2 changes: 2 additions & 0 deletions pkg/channel/event_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
cloudevents "github.com/cloudevents/sdk-go"
"go.uber.org/zap"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/tracing"
"knative.dev/eventing/pkg/utils"
)

Expand Down Expand Up @@ -160,6 +161,7 @@ func (r *EventReceiver) ServeHTTP(ctx context.Context, event cloudevents.Event,
sctx := utils.ContextFrom(tctx, nil)
AppendHistory(&event, host)

event = tracing.AddTraceparentAttributeFromContext(ctx, event)
err = r.receiverFunc(sctx, channel, event)
if err != nil {
if _, ok := err.(*UnknownChannelError); ok {
Expand Down
6 changes: 2 additions & 4 deletions pkg/channel/event_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ func TestEventReceiver_ServeHTTP(t *testing.T) {
// Ce headers won't pass through our header filtering as they should actually be set in the CloudEvent itself,
// as extensions. The SDK then sets them as as Ce- headers when sending them through HTTP.
"cE-not-pass-through": {"true"},
"x-B3-pass": {"true"},
"x-ot-pass": {"true"},
"x-B3-pass": {"will not pass"},
"x-ot-pass": {"will not pass"},
},
body: "event-body",
host: "test-name.test-namespace.svc." + utils.GetClusterDomainName(),
Expand All @@ -95,8 +95,6 @@ func TestEventReceiver_ServeHTTP(t *testing.T) {
// Note that only the first value was passed through, the remaining values were
// discarded.
"knatIve-will-pass-through": "true",
"x-B3-pass": "true",
"x-ot-pass": "true",
}
tctx := cloudevents.HTTPTransportContextFrom(ctx)
actualHeaders := make(map[string]string)
Expand Down
110 changes: 110 additions & 0 deletions pkg/tracing/traceparent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tracing

import (
"context"
"fmt"
"regexp"
"strconv"

cloudevents "github.com/cloudevents/sdk-go"
"go.opencensus.io/plugin/ochttp/propagation/b3"
"go.opencensus.io/trace"
)

const (
// traceparentAttribute is the name of the CloudEvents attribute that contains the trace state.
// See
// https://github.com/cloudevents/spec/blob/v1.0-rc1/extensions/distributed-tracing.md#traceparent
traceparentAttribute = "traceparent"
)

// AddTraceparentAttributeFromContext returns a CloudEvent that is identical to the input event,
// with the traceparent CloudEvents extension attribute added. The value for that attribute is the
// Span stored in the context.
//
// The context is expected to have passed through the OpenCensus HTTP Handler, so that the Span has
// been added to it.
func AddTraceparentAttributeFromContext(ctx context.Context, event cloudevents.Event) cloudevents.Event {
span := trace.FromContext(ctx)
if span != nil {
event.SetExtension(traceparentAttribute, traceparentAttributeValue(span))
}
return event
}

func traceparentAttributeValue(span *trace.Span) string {
flags := "00"
if span.SpanContext().IsSampled() {
flags = "01"
}
return fmt.Sprintf("00-%s-%s-%s",
span.SpanContext().TraceID.String(),
span.SpanContext().SpanID.String(),
flags)
}

// AddSpanFromTraceparentAttribute extracts the traceparent extension attribute from the CloudEvent
// and returns a context with that span set. If the traceparent extension attribute is not found or
// cannot be parsed, then an error is returned.
func AddSpanFromTraceparentAttribute(ctx context.Context, name string, event cloudevents.Event) (context.Context, error) {
tp, ok := event.Extensions()[traceparentAttribute]
if !ok {
return ctx, fmt.Errorf("extension attributes did not contain %q", traceparentAttribute)
}
tps, ok := tp.(string)
if !ok {
return ctx, fmt.Errorf("extention attribute %q's value was not a string: %T", traceparentAttribute, tps)
}
sc, err := parseTraceparent(tps)
if err != nil {
return ctx, err
}
// Create a fake Span with the saved information. In order to ensure any requests made with this
// context have a parent of the saved Span, set the SpanID to the one saved in the traceparent.
// Normally, a new SpanID is generated and because this Span is never reported would create a
// hole in the Span tree.
_, span := trace.StartSpanWithRemoteParent(ctx, name, sc)
span.SetSpanID(sc.SpanID)
return trace.NewContext(ctx, span), nil
}

func parseTraceparent(tp string) (trace.SpanContext, error) {
re := regexp.MustCompile("^00-([a-f0-9]{32})-([a-f0-9]{16})-([a-f0-9]{2})$")
m := re.FindStringSubmatch(tp)
if len(m) == 0 {
return trace.SpanContext{}, fmt.Errorf("could not parse traceparent: %q", tp)
}
traceID, ok := b3.ParseTraceID(m[1])
if !ok {
return trace.SpanContext{}, fmt.Errorf("could not parse traceID: %q", tp)
}
spanID, ok := b3.ParseSpanID(m[2])
if !ok {
return trace.SpanContext{}, fmt.Errorf("could not parse spanID: %q", tp)
}
options, err := strconv.ParseUint(m[3], 16, 32)
if err != nil {
return trace.SpanContext{}, fmt.Errorf("could not parse options: %q", tp)
}
return trace.SpanContext{
TraceID: traceID,
SpanID: spanID,
TraceOptions: trace.TraceOptions(options),
}, nil
}
Loading