diff --git a/cmd/broker/ingress/main.go b/cmd/broker/ingress/main.go index 45cc42d0b6f..9600621883b 100644 --- a/cmd/broker/ingress/main.go +++ b/cmd/broker/ingress/main.go @@ -34,17 +34,17 @@ import ( "go.opencensus.io/stats/view" "knative.dev/eventing/pkg/broker/ingress" "knative.dev/eventing/pkg/channel" + "knative.dev/eventing/pkg/kncloudevents" "knative.dev/eventing/pkg/tracing" + kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" + "knative.dev/pkg/injection" + "knative.dev/pkg/injection/sharedmain" "knative.dev/pkg/metrics" "knative.dev/pkg/signals" "knative.dev/pkg/system" pkgtracing "knative.dev/pkg/tracing" - - kubeclient "knative.dev/pkg/client/injection/kube/client" - "knative.dev/pkg/injection" - "knative.dev/pkg/injection/sharedmain" ) var ( @@ -124,7 +124,7 @@ func main() { writer.WriteHeader(http.StatusOK) }) - ceClient, err := cloudevents.NewClient(httpTransport, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) + ceClient, err := kncloudevents.NewDefaultClientGivenHttpTransport(httpTransport) if err != nil { logger.Fatal("Unable to create CE client", zap.Error(err)) } diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 2ab6edfb927..238a71782df 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -27,13 +27,14 @@ import ( cloudevents "github.com/cloudevents/sdk-go" cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http" + "go.opencensus.io/trace" "go.uber.org/zap" eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" "knative.dev/eventing/pkg/broker" eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" + "knative.dev/eventing/pkg/kncloudevents" "knative.dev/eventing/pkg/logging" "knative.dev/eventing/pkg/reconciler/trigger/path" - "knative.dev/eventing/pkg/tracing" "knative.dev/eventing/pkg/utils" pkgtracing "knative.dev/pkg/tracing" ) @@ -69,7 +70,7 @@ func NewHandler(logger *zap.Logger, triggerLister eventinglisters.TriggerNamespa return nil, err } - ceClient, err := cloudevents.NewClient(httpTransport, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) + ceClient, err := kncloudevents.NewDefaultClientGivenHttpTransport(httpTransport) if err != nil { return nil, err } @@ -252,10 +253,9 @@ func (r *Handler) sendEvent(ctx context.Context, tctx cloudevents.HTTPTransportC } sendingCTX := utils.ContextFrom(tctx, subscriberURI) - sendingCTX, err = tracing.AddSpanFromTraceparentAttribute(sendingCTX, "name", *event) - if err != nil { - r.logger.Info("Unable to attach trace", zap.Error(err)) - } + // Due to an issue in utils.ContextFrom, we don't retain the original trace context from ctx, so + // bring it in manually. + sendingCTX = trace.NewContext(sendingCTX, trace.FromContext(ctx)) start := time.Now() rctx, replyEvent, err := r.ceClient.Send(sendingCTX, *event) diff --git a/pkg/broker/ingress/ingress_handler.go b/pkg/broker/ingress/ingress_handler.go index 424c55c4355..0963eb7f061 100644 --- a/pkg/broker/ingress/ingress_handler.go +++ b/pkg/broker/ingress/ingress_handler.go @@ -9,9 +9,9 @@ import ( "time" cloudevents "github.com/cloudevents/sdk-go" + "go.opencensus.io/trace" "go.uber.org/zap" "knative.dev/eventing/pkg/broker" - "knative.dev/eventing/pkg/tracing" "knative.dev/eventing/pkg/utils" ) @@ -73,8 +73,6 @@ func (h *Handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp * return nil } - tracing.AddTraceparentAttributeFromContext(ctx, event) - reporterArgs := &ReportArgs{ ns: h.Namespace, broker: h.BrokerName, @@ -91,6 +89,10 @@ func (h *Handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp * start := time.Now() sendingCTX := utils.ContextFrom(tctx, h.ChannelURI) + // Due to an issue in utils.ContextFrom, we don't retain the original trace context from ctx, so + // bring it in manually. + sendingCTX = trace.NewContext(sendingCTX, trace.FromContext(ctx)) + rctx, _, err := h.CeClient.Send(sendingCTX, event) rtctx := cloudevents.HTTPTransportContextFrom(rctx) // Record the dispatch time. diff --git a/pkg/kncloudevents/good_client.go b/pkg/kncloudevents/good_client.go index 80af546d452..e19a24aaf20 100644 --- a/pkg/kncloudevents/good_client.go +++ b/pkg/kncloudevents/good_client.go @@ -25,6 +25,12 @@ func NewDefaultClient(target ...string) (cloudevents.Client, error) { if err != nil { return nil, err } + return NewDefaultClientGivenHttpTransport(t) +} + +// NewDefaultClientGivenHttpTransport creates a new CloudEvents client using the provided HTTP +// transport. Note that it does modify the provided HTTP Transport by adding tracing to its Client. +func NewDefaultClientGivenHttpTransport(t *cloudevents.HTTPTransport) (cloudevents.Client, error) { // Add output tracing. t.Client = &gohttp.Client{ Transport: &ochttp.Transport{ diff --git a/pkg/tracing/names.go b/pkg/tracing/names.go index f583530899c..4cb52165a70 100644 --- a/pkg/tracing/names.go +++ b/pkg/tracing/names.go @@ -27,7 +27,7 @@ type BrokerIngressNameArgs struct { // BrokerIngressName creates the service name for Broker Ingresses to use when writing Zipkin // traces. func BrokerIngressName(args BrokerIngressNameArgs) string { - return fmt.Sprintf("%s-broker-ingress.%s", args.BrokerName, args.Namespace) + return fmt.Sprintf("%s-broker.%s", args.BrokerName, args.Namespace) } // BrokerFilterNameArgs are the arguments needed to generate the BrokerFilterName. diff --git a/pkg/tracing/names_test.go b/pkg/tracing/names_test.go index 8304da8bad4..ad24a8c4747 100644 --- a/pkg/tracing/names_test.go +++ b/pkg/tracing/names_test.go @@ -25,7 +25,7 @@ func TestBrokerIngressName(t *testing.T) { Namespace: testNS, BrokerName: broker, } - if got, want := BrokerIngressName(args), "my-broker-broker-ingress.test-namespace"; got != want { + if got, want := BrokerIngressName(args), "my-broker-broker.test-namespace"; got != want { t.Errorf("BrokerIngressName = %q, want %q", got, want) } } diff --git a/test/common/creation.go b/test/common/creation.go index eae37959fa6..7b9b11b90a2 100644 --- a/test/common/creation.go +++ b/test/common/creation.go @@ -23,6 +23,7 @@ import ( rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/eventing/pkg/apis/eventing/v1alpha1" messagingv1alpha1 "knative.dev/eventing/pkg/apis/messaging/v1alpha1" sourcesv1alpha1 "knative.dev/eventing/pkg/apis/sources/v1alpha1" "knative.dev/eventing/test/base" @@ -96,7 +97,7 @@ func (client *Client) CreateSubscriptionsOrFail( } // CreateBrokerOrFail will create a Broker or fail the test if there is an error. -func (client *Client) CreateBrokerOrFail(name string, channelTypeMeta *metav1.TypeMeta) { +func (client *Client) CreateBrokerOrFail(name string, channelTypeMeta *metav1.TypeMeta) *v1alpha1.Broker { namespace := client.Namespace broker := resources.Broker(name, resources.WithChannelTemplateForBroker(*channelTypeMeta)) @@ -107,6 +108,7 @@ func (client *Client) CreateBrokerOrFail(name string, channelTypeMeta *metav1.Ty client.T.Fatalf("Failed to create broker %q: %v", name, err) } client.Tracker.AddObj(broker) + return broker } // CreateBrokersOrFail will create a list of Brokers. @@ -117,7 +119,7 @@ func (client *Client) CreateBrokersOrFail(names []string, channelTypeMeta *metav } // CreateTriggerOrFail will create a Trigger or fail the test if there is an error. -func (client *Client) CreateTriggerOrFail(name string, options ...resources.TriggerOption) { +func (client *Client) CreateTriggerOrFail(name string, options ...resources.TriggerOption) *v1alpha1.Trigger { namespace := client.Namespace trigger := resources.Trigger(name, options...) @@ -128,6 +130,7 @@ func (client *Client) CreateTriggerOrFail(name string, options ...resources.Trig client.T.Fatalf("Failed to create trigger %q: %v", name, err) } client.Tracker.AddObj(trigger) + return trigger } // CreateSequenceOrFail will create a Sequence or fail the test if there is an error. diff --git a/test/common/operation.go b/test/common/operation.go index f92df3e9e2e..c2fe5bfdd15 100644 --- a/test/common/operation.go +++ b/test/common/operation.go @@ -139,7 +139,9 @@ func (client *Client) WaitForResourcesReady(typemeta *metav1.TypeMeta) error { // WaitForAllTestResourcesReady waits until all test resources in the namespace are Ready. func (client *Client) WaitForAllTestResourcesReady() error { // wait for all Knative resources created in this test to become ready. - client.Tracker.WaitForKResourcesReady() + if err := client.Tracker.WaitForKResourcesReady(); err != nil { + return err + } // explicitly wait for all pods to become ready. if err := pkgTest.WaitForAllPodsRunning(client.Kube, client.Namespace); err != nil { return err diff --git a/test/common/tracker.go b/test/common/tracker.go index 0c3746daf6f..160c4fd6672 100644 --- a/test/common/tracker.go +++ b/test/common/tracker.go @@ -21,6 +21,7 @@ package common import ( "encoding/json" + "fmt" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -144,7 +145,7 @@ func (t *Tracker) WaitForKResourcesReady() error { t.logf("Waiting for all KResources to become ready") for _, metaResource := range t.resourcesToCheckStatus { if err := base.WaitForResourceReady(t.dynamicClient, &metaResource); err != nil { - return err + return fmt.Errorf("failed waiting for %+v to become ready: %v", metaResource, err) } } return nil diff --git a/test/conformance/broker_tracing_test.go b/test/conformance/broker_tracing_test.go new file mode 100644 index 00000000000..cdf18cf48ce --- /dev/null +++ b/test/conformance/broker_tracing_test.go @@ -0,0 +1,29 @@ +// +build e2e + +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package conformance + +import ( + "testing" + + "knative.dev/eventing/test/conformance/helpers" +) + +func TestBrokerTracing(t *testing.T) { + helpers.BrokerTracingTestHelper(t, channelTestRunner) +} diff --git a/test/conformance/helpers/broker_tracing_test_helper.go b/test/conformance/helpers/broker_tracing_test_helper.go new file mode 100644 index 00000000000..e40ad1458af --- /dev/null +++ b/test/conformance/helpers/broker_tracing_test_helper.go @@ -0,0 +1,486 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package helpers + +import ( + "fmt" + "net/http" + "testing" + "time" + + "github.com/openzipkin/zipkin-go/model" + "k8s.io/apimachinery/pkg/util/uuid" + "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "knative.dev/eventing/test/base/resources" + "knative.dev/eventing/test/common" + tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing" + "knative.dev/pkg/test/zipkin" +) + +func BrokerTracingTestHelper(t *testing.T, channelTestRunner common.ChannelTestRunner) { + testCases := map[string]struct { + incomingTraceId bool + istio bool + }{ + "includes incoming trace id": { + incomingTraceId: true, + }, + } + + for n, tc := range testCases { + loggerPodName := "logger" + t.Run(n, func(t *testing.T) { + channelTestRunner.RunTests(t, common.FeatureBasic, func(st *testing.T, channel string) { + // Don't accidentally use t, use st instead. To ensure this, shadow 't' to a useless + // type. + t := struct{}{} + _ = fmt.Sprintf("%s", t) + + client := common.Setup(st, true) + defer common.TearDown(client) + + // Label namespace so that it creates the default broker. + if err := client.LabelNamespace(map[string]string{"knative-eventing-injection": "enabled"}); err != nil { + st.Fatalf("Error annotating namespace: %v", err) + } + + // Do NOT call zipkin.CleanupZipkinTracingSetup. That will be called exactly once in + // TestMain. + tracinghelper.Setup(st, client) + + expected, mustContain := setupBrokerTracing(st, channel, client, loggerPodName, tc.incomingTraceId) + assertLogContents(st, client, loggerPodName, mustContain) + traceID := getTraceID(st, client, loggerPodName) + trace, err := zipkin.JSONTrace(traceID, expected.SpanCount(), 2*time.Minute) + if err != nil { + st.Fatalf("Unable to get trace %q: %v. Trace so far %+v", traceID, err, tracinghelper.PrettyPrintTrace(trace)) + } + + tree := tracinghelper.GetTraceTree(st, trace) + if err := expected.Matches(tree); err != nil { + st.Fatalf("Trace Tree did not match expected: %v", err) + } + }) + }) + } +} + +// setupBrokerTracing is the general setup for TestBrokerTracing. It creates the following: +// 1. Broker. +// 2. Trigger on 'foo' events -> K8s Service -> transformer Pod (which replies with a 'bar' event). +// 3. Trigger on 'bar' events -> K8s Service -> eventdetails Pod. +// 4. Sender Pod which sends a 'foo' event. +// It returns a string that is expected to be sent by the SendEvents Pod and should be present in +// the LogEvents Pod logs. +func setupBrokerTracing(t *testing.T, channel string, client *common.Client, loggerPodName string, incomingTraceId bool) (tracinghelper.TestSpanTree, string) { + // Create the Broker. + const ( + etTransformer = "transformer" + etLogger = "logger" + ) + channelTypeMeta := common.GetChannelTypeMeta(channel) + broker := client.CreateBrokerOrFail("br", channelTypeMeta) + + // TODO Remove this wait once https://github.com/knative/eventing/issues/1998 is fixed. + if err := client.WaitForResourceReady(broker.Name, common.BrokerTypeMeta); err != nil { + t.Fatalf("Broker did not become ready: %v", err) + } + + // Create a logger (EventDetails) Pod and a K8s Service that points to it. + logPod := resources.EventDetailsPod(loggerPodName) + client.CreatePodOrFail(logPod, common.WithService(loggerPodName)) + + // Create a Trigger that receives events (type=bar) and sends them to the logger Pod. + loggerTrigger := client.CreateTriggerOrFail( + "logger", + resources.WithBroker(broker.Name), + resources.WithAttributesTriggerFilter(v1alpha1.TriggerAnyFilter, etLogger, map[string]interface{}{}), + resources.WithSubscriberRefForTrigger(loggerPodName), + ) + + // Create a transformer (EventTransfrmer) Pod that replies with the same event as the input, + // except the reply's event's type is changed to bar. + eventTransformerPod := resources.EventTransformationPod("transformer", &resources.CloudEvent{ + Type: etLogger, + }) + client.CreatePodOrFail(eventTransformerPod, common.WithService(eventTransformerPod.Name)) + + // Create a Trigger that receives events (type=foo) and sends them to the transformer Pod. + transformerTrigger := client.CreateTriggerOrFail( + "transformer", + resources.WithBroker(broker.Name), + resources.WithAttributesTriggerFilter(v1alpha1.TriggerAnyFilter, etTransformer, map[string]interface{}{}), + resources.WithSubscriberRefForTrigger(eventTransformerPod.Name), + ) + + // Wait for all test resources to be ready, so that we can start sending events. + if err := client.WaitForAllTestResourcesReady(); err != nil { + t.Fatalf("Failed to get all test resources ready: %v", err) + } + + // Everything is setup to receive an event. Generate a CloudEvent. + senderName := "sender" + eventID := fmt.Sprintf("%s", uuid.NewUUID()) + body := fmt.Sprintf("TestBrokerTracing %s", eventID) + event := &resources.CloudEvent{ + ID: eventID, + Source: senderName, + Type: etTransformer, + Data: fmt.Sprintf(`{"msg":%q}`, body), + Encoding: resources.CloudEventEncodingBinary, + } + + // Send the CloudEvent (either with or without tracing inside the SendEvents Pod). + sendEvent := client.SendFakeEventToAddressable + if incomingTraceId { + sendEvent = client.SendFakeEventWithTracingToAddressable + } + if err := sendEvent(senderName, broker.Name, common.BrokerTypeMeta, event); err != nil { + t.Fatalf("Failed to send fake CloudEvent to the broker %q", broker.Name) + } + + // TODO Actually determine the cluster's domain, similar to knative.dev/pkg/network/domain.go. + domain := "cluster.local" + + // We expect the following spans: + // 0. Artificial root span. + // 1. Send pod sends event to the Broker Ingress (only if the sending pod generates a span). + // 2. Broker Ingress receives the event from the sending pod. + // 3. Broker Ingress sends the event to the Broker's TrChannel (trigger channel). + // 4. Broker TrChannel receives the event from the Broker Ingress. + // 5. Broker TrChannel sends the event to the Broker Filter for the "logger" trigger. + // 6. Broker Filter for the "logger" trigger receives the event from the Broker TrChannel. + // This does not pass the filter, so this 'branch' ends here. + // 7. Broker TrChannel sends the event to the Broker Filter for the "transformer" trigger. + // 8. Broker Filter for the "transformer" trigger receives the event from the Broker TrChannel. + // 9. Broker Filter for the "transformer" trigger sends the event to the transformer pod. + // 10. Transformer pod receives the event from the Broker Filter for the "transformer" trigger. + // 11. Broker Filter for the "transformer" sends the transformer pod's reply to the Broker + // InChannel (Ingress Channel). + // 12. Broker InChannel receives the event from the Broker Filter for the "transformer" trigger. + // 13. Broker InChannel sends the event to the Broker Ingress. + // 14. Broker Ingress receives the event from the Broker InChannel. + // 15. Broker Ingress sends the event to the Broker's TrChannel. + // 16. Broker TrChannel receives the event from the Broker Ingress. + // 17. Broker TrChannel sends the event to the Broker Filter for the "transformer" trigger. + // 18. Broker Filter for the "transformer" trigger receives the event from the Broker + // TrChannel. This does not pass the filter, so this 'branch' ends here. + // 19. Broker TrChannel sends the event to the Broker Filter for the "logger" trigger. + // 20. Broker Filter for the "logger" trigger receives the event from the Broker TrChannel. + // 21. Broker Filter for the "logger" trigger sends the event to the logger pod. + // 22. Logger pod receives the event from the Broker Filter for the "logger" trigger. + + // Useful constants we will use below. + ingressHost := brokerIngressHost(domain, *broker) + ingressChanHost := brokerIngressChannelHost(domain, *broker) + triggerChanHost := brokerTriggerChannelHost(domain, *broker) + filterHost := brokerFilterHost(domain, *broker) + loggerTriggerPath := triggerPath(*loggerTrigger) + transformerTriggerPath := triggerPath(*transformerTrigger) + loggerSVCHost := k8sServiceHost(domain, client.Namespace, loggerPodName) + transformerSVCHost := k8sServiceHost(domain, client.Namespace, eventTransformerPod.Name) + + // This is very hard to read when written directly, so we will build piece by piece. + + // Steps 17-18: 'logger' event being sent to the 'transformer' Trigger. + loggerEventSentFromTrChannelToTransformer := tracinghelper.TestSpanTree{ + Note: "17. Broker TrChannel sends the event to the Broker Filter for the 'transformer' trigger.", + Kind: model.Client, + Tags: map[string]string{ + "http.method": http.MethodPost, + "http.status_code": "202", + "http.url": fmt.Sprintf("http://%s%s", filterHost, transformerTriggerPath), + }, + Children: []tracinghelper.TestSpanTree{ + { + Note: "18. Broker Filter for the 'transformer' trigger receives the event from the Broker TrChannel. This does not pass the filter, so this 'branch' ends here.", + Kind: model.Server, + Tags: map[string]string{ + "http.method": http.MethodPost, + "http.status_code": "202", + "http.host": filterHost, + "http.path": transformerTriggerPath, + }, + }, + }, + } + + // Steps 19-22: 'logger' event being sent to the 'logger' Trigger. + loggerEventSentFromTrChannelToLogger := tracinghelper.TestSpanTree{ + Note: "19. Broker TrChannel sends the event to the Broker Filter for the 'logger' trigger.", + Kind: model.Client, + Tags: map[string]string{ + "http.method": http.MethodPost, + "http.status_code": "202", + "http.url": fmt.Sprintf("http://%s%s", filterHost, loggerTriggerPath), + }, + Children: []tracinghelper.TestSpanTree{ + { + Note: "20. Broker Filter for the 'logger' trigger receives the event from the Broker TrChannel.", + Kind: model.Server, + Tags: map[string]string{ + "http.method": http.MethodPost, + "http.status_code": "202", + "http.host": filterHost, + "http.path": loggerTriggerPath, + }, + Children: []tracinghelper.TestSpanTree{ + { + Note: "21. Broker Filter for the 'logger' trigger sends the event to the logger pod.", + Kind: model.Client, + Tags: map[string]string{ + "http.method": http.MethodPost, + "http.status_code": "202", + "http.url": fmt.Sprintf("http://%s/", loggerSVCHost), + }, + Children: []tracinghelper.TestSpanTree{ + { + Note: "22. Logger pod receives the event from the Broker Filter for the 'logger' trigger.", + Kind: model.Server, + Tags: map[string]string{ + "http.method": http.MethodPost, + "http.path": "/", + "http.status_code": "202", + "http.host": loggerSVCHost, + }, + }, + }, + }, + }, + }, + }, + } + + // Steps 13-22. Directly steps 13-16. 17-22 are included as children. + // Steps 13-16: Event in the Broker InChannel sent to the Broker Ingress to the Trigger Channel. + loggerEventIngressToTrigger := tracinghelper.TestSpanTree{ + Note: "13. Broker InChannel sends the event to the Broker Ingress.", + Kind: model.Client, + Tags: map[string]string{ + "http.method": http.MethodPost, + "http.status_code": "202", + "http.url": fmt.Sprintf("http://%s/", ingressHost), + }, + Children: []tracinghelper.TestSpanTree{ + { + Note: "14. Broker Ingress receives the event from the Broker InChannel.", + Kind: model.Server, + Tags: map[string]string{ + + "http.method": http.MethodPost, + "http.path": "/", + "http.status_code": "202", + "http.host": ingressHost, + }, + Children: []tracinghelper.TestSpanTree{ + { + Note: "15. Broker Ingress sends the event to the Broker's TrChannel.", + Kind: model.Client, + Tags: map[string]string{ + "http.method": http.MethodPost, + "http.status_code": "202", + "http.url": fmt.Sprintf("http://%s/", triggerChanHost), + }, + Children: []tracinghelper.TestSpanTree{ + { + Note: "16. Broker TrChannel receives the event from the Broker Ingress.", + Kind: model.Server, + Tags: map[string]string{ + "http.method": http.MethodPost, + "http.status_code": "202", + "http.host": triggerChanHost, + "http.path": "/", + }, + Children: []tracinghelper.TestSpanTree{ + // Steps 17-18. + loggerEventSentFromTrChannelToTransformer, + // Steps 19-22. + loggerEventSentFromTrChannelToLogger, + }, + }, + }, + }, + }, + }, + }, + } + + // Steps 7-10: Event from TrChannel sent to transformer Trigger and its reply to the InChannel. + transformerEventSentFromTrChannelToTransformer := tracinghelper.TestSpanTree{ + Note: "7. Broker TrChannel sends the event to the Broker Filter for the 'transformer' trigger.", + Kind: model.Client, + Tags: map[string]string{ + "http.method": http.MethodPost, + "http.url": fmt.Sprintf("http://%s%s", filterHost, transformerTriggerPath), + }, + Children: []tracinghelper.TestSpanTree{ + { + Note: "8. Broker Filter for the 'transformer' trigger receives the event from the Broker TrChannel.", + Kind: model.Server, + Tags: map[string]string{ + "http.method": http.MethodPost, + "http.status_code": "202", + "http.host": filterHost, + "http.path": transformerTriggerPath, + }, + Children: []tracinghelper.TestSpanTree{ + { + Note: "9. Broker Filter for the 'transformer' trigger sends the event to the transformer pod.", + Kind: model.Client, + Tags: map[string]string{ + "http.method": http.MethodPost, + "http.status_code": "200", + "http.url": fmt.Sprintf("http://%s/", transformerSVCHost), + }, + Children: []tracinghelper.TestSpanTree{ + { + Note: "10. Transformer pod receives the event from the Broker Filter for the 'transformer' trigger.", + Kind: model.Server, + Tags: map[string]string{ + "http.method": http.MethodPost, + "http.path": "/", + "http.status_code": "200", + "http.host": transformerSVCHost, + }, + }, + }, + }, + }, + }, + }, + } + + // Step 11-22. Directly steps 11-12. Steps 13-22 are children. + // Steps 11-12 Reply from the 'transformer' is sent by the Broker TrChannel to the Broker + // InChannel. + transformerEventResponseFromTrChannel := tracinghelper.TestSpanTree{ + Note: "11. Broker TrChannel for the 'transformer' sends the transformer pod's reply to the Broker InChannel.", + Kind: model.Client, + Tags: map[string]string{ + "http.method": http.MethodPost, + "http.status_code": "202", + "http.url": fmt.Sprintf("http://%s", ingressChanHost), + }, + Children: []tracinghelper.TestSpanTree{ + { + Note: "12. Broker InChannel receives the event from the Broker TrChannel for the 'transformer' trigger.", + Kind: model.Server, + Tags: map[string]string{ + "http.method": http.MethodPost, + "http.status_code": "202", + "http.host": ingressChanHost, + "http.path": "/", + }, + Children: []tracinghelper.TestSpanTree{ + // Steps 13-22. + loggerEventIngressToTrigger, + }, + }, + }, + } + + // Steps 5-6: Event from TrChannel sent to logger Trigger. + transformerEventSentFromTrChannelToLogger := tracinghelper.TestSpanTree{ + Note: "5. Broker TrChannel sends the event to the Broker Filter for the 'logger' trigger.", + Kind: model.Client, + Tags: map[string]string{ + "http.method": http.MethodPost, + "http.status_code": "202", + "http.url": fmt.Sprintf("http://%s%s", filterHost, loggerTriggerPath), + }, + Children: []tracinghelper.TestSpanTree{ + { + Note: "6. Broker Filter for the 'logger' trigger receives the event from the Broker TrChannel. This does not pass the filter, so this 'branch' ends here.", + Kind: model.Server, + Tags: map[string]string{ + "http.method": http.MethodPost, + "http.status_code": "202", + "http.host": filterHost, + "http.path": loggerTriggerPath, + }, + }, + }, + } + + // Steps 0-22. Directly steps 0-4 (missing 1). + // Steps 0-4 (missing 1, which is optional and added below if present): Event sent to the Broker + // Ingress. + expected := tracinghelper.TestSpanTree{ + Note: "0. Artificial root span.", + Root: true, + Children: []tracinghelper.TestSpanTree{ + { + Note: "2. Broker Ingress receives the event from the sending pod.", + Kind: model.Server, + Tags: map[string]string{ + "http.method": http.MethodPost, + "http.status_code": "202", + "http.host": ingressHost, + "http.path": "/", + }, + Children: []tracinghelper.TestSpanTree{ + { + Note: "3. Broker Ingress sends the event to the Broker's TrChannel (trigger channel).", + Kind: model.Client, + Tags: map[string]string{ + "http.method": http.MethodPost, + "http.status_code": "202", + "http.url": fmt.Sprintf("http://%s/", triggerChanHost), + }, + Children: []tracinghelper.TestSpanTree{ + { + Note: "4. Broker TrChannel receives the event from the Broker Ingress.", + Kind: model.Server, + Tags: map[string]string{ + "http.method": http.MethodPost, + "http.status_code": "202", + "http.host": triggerChanHost, + "http.path": "/", + }, + Children: []tracinghelper.TestSpanTree{ + // Steps 5-6. + transformerEventSentFromTrChannelToLogger, + // Steps 7-10. + transformerEventSentFromTrChannelToTransformer, + // Steps 11-22 + transformerEventResponseFromTrChannel, + }, + }, + }, + }, + }, + }, + }, + } + + if incomingTraceId { + expected.Children = []tracinghelper.TestSpanTree{ + { + Note: "1. Send pod sends event to the Broker Ingress (only if the sending pod generates a span).", + Kind: model.Client, + LocalEndpointServiceName: "sender", + Tags: map[string]string{ + "http.method": http.MethodPost, + "http.status_code": "202", + "http.url": fmt.Sprintf("http://%s", ingressHost), + }, + Children: expected.Children, + }, + } + } + return expected, body +} diff --git a/test/conformance/helpers/channel_tracing_test_helper.go b/test/conformance/helpers/channel_tracing_test_helper.go index 99ca60067d3..ea0d9d9d4a6 100644 --- a/test/conformance/helpers/channel_tracing_test_helper.go +++ b/test/conformance/helpers/channel_tracing_test_helper.go @@ -244,7 +244,7 @@ func ChannelTracingTestHelperWithReply(t *testing.T, channelTestRunner common.Ch assertLogContents(st, client, loggerPodName, mustContain) traceID := getTraceID(st, client, loggerPodName) - trace, err := zipkin.JSONTrace(traceID, expected.SpanCount(), 60*time.Second) + trace, err := zipkin.JSONTrace(traceID, expected.SpanCount(), 2*time.Minute) if err != nil { st.Fatalf("Unable to get trace %q: %v. Trace so far %+v", traceID, err, tracinghelper.PrettyPrintTrace(trace)) } diff --git a/test/conformance/helpers/tracing/traces.go b/test/conformance/helpers/tracing/traces.go index 62aeeeb1876..613b071d61b 100644 --- a/test/conformance/helpers/tracing/traces.go +++ b/test/conformance/helpers/tracing/traces.go @@ -19,6 +19,7 @@ package tracing import ( "encoding/json" "fmt" + "sort" "testing" "github.com/openzipkin/zipkin-go/model" @@ -33,6 +34,7 @@ func PrettyPrintTrace(trace []model.SpanModel) string { // SpanTree is the tree of Spans representation of a Trace. type SpanTree struct { + Root bool Span model.SpanModel Children []SpanTree } @@ -48,7 +50,7 @@ func (t SpanTree) ToTestSpanTree() TestSpanTree { children[i] = t.Children[i].toTestSpanTreeHelper() } return TestSpanTree{ - Root: true, + Root: t.Root, Children: children, } } @@ -62,22 +64,29 @@ func (t SpanTree) toTestSpanTreeHelper() TestSpanTree { for i := range t.Children { children[i] = t.Children[i].toTestSpanTreeHelper() } - return TestSpanTree{ + tst := TestSpanTree{ Kind: t.Span.Kind, LocalEndpointServiceName: name, Tags: t.Span.Tags, Children: children, } + tst.SortChildren() + return tst } // TestSpanTree is the expected version of SpanTree used for assertions in testing. +// +// The JSON names of the fields are weird because we want a specific order when pretty printing +// JSON. The JSON will be printed in alphabetical order, so we are imposing a certain order by +// prefixing the keys with a specific letter. The letter has no mean other than ordering. type TestSpanTree struct { - Root bool - Kind model.Kind - LocalEndpointServiceName string - Tags map[string]string + Note string `json:"a_Note,omitempty"` + Root bool `json:"b_Root,omitempty"` + Kind model.Kind `json:"c_Kind,omitempty"` + LocalEndpointServiceName string `json:"d_Name,omitempty"` + Tags map[string]string `json:"e_Tags,omitempty"` - Children []TestSpanTree + Children []TestSpanTree `json:"z_Children,omitempty"` } func (t TestSpanTree) String() string { @@ -85,6 +94,54 @@ func (t TestSpanTree) String() string { return string(b) } +// SortChildren attempts to sort the children of this TestSpanTree. The children are siblings, order +// does not actually matter. TestSpanTree.Matches() correctly handles this, by matching in any +// order. SortChildren() is most useful before JSON pretty printing the structure and comparing +// manually. +// +// The order it uses: +// 1. Shorter children first. +// 2. Span kind. +// 3. "http.url", "http.host", "http.path" tag presence and values. +// If all of those are equal, then arbitrarily choose the earlier index. +func (t *TestSpanTree) SortChildren() { + for _, child := range t.Children { + child.SortChildren() + } + sort.Slice(t.Children, func(i, j int) bool { + ic := t.Children[i] + jc := t.Children[j] + + if ic.height() != jc.height() { + return ic.height() < jc.height() + } + + if ic.Kind != jc.Kind { + return ic.Kind < jc.Kind + } + it := ic.Tags + jt := jc.Tags + for _, key := range []string{"http.url", "http.host", "http.path"} { + if it[key] != jt[key] { + return it[key] < jt[key] + } + } + // We don't have anything to reliably differentiate by. So this isn't going to really be + // sorted, just leave the existing one first arbitrarily. + return i < j + }) +} + +func (t TestSpanTree) height() int { + height := 0 + for _, child := range t.Children { + if ch := child.height(); ch >= height { + height = ch + 1 + } + } + return height +} + // GetTraceTree converts a set slice of spans into a SpanTree. func GetTraceTree(t *testing.T, trace []model.SpanModel) SpanTree { var roots []model.SpanModel @@ -103,6 +160,7 @@ func GetTraceTree(t *testing.T, trace []model.SpanModel) SpanTree { } tree := SpanTree{ + Root: true, Children: children, } if len(parents) != 0 { @@ -144,9 +202,12 @@ func (t TestSpanTree) SpanCount() int { // Matches checks to see if this TestSpanTree matches an actual SpanTree. It is intended to be used // for assertions while testing. func (t TestSpanTree) Matches(actual SpanTree) error { - err := traceTreeMatches(".", t, actual) - if err != nil { - return fmt.Errorf("spanTree did not match: %v. Actual %v, Expected %v", err, actual.ToTestSpanTree().String(), t.String()) + if g, w := actual.ToTestSpanTree().SpanCount(), t.SpanCount(); g != w { + return fmt.Errorf("unexpected number of spans. got %d want %d", g, w) + } + t.SortChildren() + if err := traceTreeMatches(".", t, actual); err != nil { + return fmt.Errorf("spanTree did not match: %v. \n*****Actual***** %v\n*****Expected***** %v", err, actual.ToTestSpanTree().String(), t.String()) } return nil } @@ -186,16 +247,17 @@ func unorderedTraceTreesMatch(pos string, want []TestSpanTree, got []SpanTree) e // so n should be small (say 50 in the largest cases). OuterLoop: for i, w := range want { + var lastErr error for ug := range unmatchedGot { - err := w.Matches(got[ug]) + lastErr = w.Matches(got[ug]) // If there is no error, then it matched successfully. - if err == nil { + if lastErr == nil { unmatchedGot.Delete(ug) continue OuterLoop } } // Nothing matched. - return fmt.Errorf("unable to find child match %s[%d]: Want: %s **** Got: %s", pos, i, w.String(), got) + return fmt.Errorf("unable to find child match %s[%d]: Last Err %v. Want: %s **** Got: %s", pos, i, lastErr, w.String(), got) } // Everything matched. return nil diff --git a/test/conformance/helpers/uri.go b/test/conformance/helpers/uri.go new file mode 100644 index 00000000000..0ee80d7da3c --- /dev/null +++ b/test/conformance/helpers/uri.go @@ -0,0 +1,51 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package helpers + +import ( + "fmt" + + "knative.dev/eventing/pkg/apis/eventing/v1alpha1" +) + +func brokerIngressHost(domain string, broker v1alpha1.Broker) string { + return fmt.Sprintf("%s-broker.%s.svc.%s", broker.Name, broker.Namespace, domain) +} + +func channelHost(domain, namespace, chanName string) string { + return fmt.Sprintf("%s-kn-channel.%s.svc.%s", chanName, namespace, domain) +} + +func brokerIngressChannelHost(domain string, broker v1alpha1.Broker) string { + return channelHost(domain, broker.Namespace, fmt.Sprintf("%s-kne-ingress", broker.Name)) +} + +func brokerTriggerChannelHost(domain string, broker v1alpha1.Broker) string { + return channelHost(domain, broker.Namespace, fmt.Sprintf("%s-kne-trigger", broker.Name)) +} + +func brokerFilterHost(domain string, broker v1alpha1.Broker) string { + return fmt.Sprintf("%s-broker-filter.%s.svc.%s", broker.Name, broker.Namespace, domain) +} + +func triggerPath(trigger v1alpha1.Trigger) string { + return fmt.Sprintf("/triggers/%s/%s/%s", trigger.Namespace, trigger.Name, trigger.UID) +} + +func k8sServiceHost(domain, namespace, svcName string) string { + return fmt.Sprintf("%s.%s.svc.%s", svcName, namespace, domain) +}