Skip to content

Commit

Permalink
E2E test for broker tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
Daisy Guo committed Sep 26, 2019
1 parent 6b2d850 commit c5744fb
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 0 deletions.
23 changes: 23 additions & 0 deletions test/base/resources/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,29 @@ func EventTransformationPod(name string, event *CloudEvent) *corev1.Pod {
}
}

// EventMutatorPod creates a Pod that response an event with new type.
func EventMutatorPod(name string, eventType string) *corev1.Pod {
const imageName = "eventmutator"
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{"e2etest": string(uuid.NewUUID())},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: imageName,
Image: pkgTest.ImagePath(imageName),
ImagePullPolicy: corev1.PullAlways,
Args: []string{
"-event-type-resp",
eventType,
},
}},
RestartPolicy: corev1.RestartPolicyAlways,
},
}
}

// HelloWorldPod creates a Pod that logs "Hello, World!".
func HelloWorldPod(name string, options ...PodOption) *corev1.Pod {
const imageName = "helloworld"
Expand Down
29 changes: 29 additions & 0 deletions test/conformance/broker_tracing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// +build e2e

/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package conformance

import (
"testing"

"knative.dev/eventing/test/conformance/helpers"
)

func TestBrokerTracing(t *testing.T) {
helpers.BrokerTracingTestHelper(t, channelTestRunner)
}
146 changes: 146 additions & 0 deletions test/conformance/helpers/broker_tracing_test_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package helpers

import (
"fmt"
"testing"
"time"

"k8s.io/apimachinery/pkg/util/uuid"
"knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/test/base/resources"
"knative.dev/eventing/test/common"
"knative.dev/pkg/test/zipkin"
)

func BrokerTracingTestHelper(t *testing.T, channelTestRunner common.ChannelTestRunner) {
testCases := map[string]struct {
incomingTraceId bool
istio bool
}{
"no incoming trace id": {},
"includes incoming trace id": {
incomingTraceId: true,
}, /*
"caller has istio": {
istio: true,
},*/
}

for n, tc := range testCases {
loggerPodName := "logger"
t.Run(n, func(t *testing.T) {
channelTestRunner.RunTests(t, common.FeatureBasic, func(st *testing.T, channel string) {
// Don't accidentally use t, use st instead. To ensure this, shadow 't' to some a
// useless type.
t := struct{}{}
_ = fmt.Sprintf("%s", t)

client := common.Setup(st, true)
defer common.TearDown(client)

// Do NOT call zipkin.CleanupZipkinTracingSetup. That will be called exactly once in
// TestMain.
zipkin.SetupZipkinTracing(client.Kube.Kube, st.Logf)

mustContain := setupBrokerTracing(st, channel, client, loggerPodName, tc.incomingTraceId)
assertLogContents(st, client, loggerPodName, mustContain)

traceID := getTraceID(st, client, loggerPodName)
expectedNumSpans := 11
if tc.incomingTraceId {
expectedNumSpans = 12
}
trace, err := zipkin.JSONTrace(traceID, expectedNumSpans, 15*time.Second)
if err != nil {
st.Fatalf("Unable to get trace %q: %v", traceID, err)
}
st.Logf("I got the trace, %q!\n%+v", traceID, trace)

// TODO: Assert information in the trace.
})
})
}
}

// setupBrokerTracing is the general setup for TestBrokerTracing. It creates the following:
// SendEvents (Pod) -> Broker -> Subscription -> K8s Service -> LogEvents (Pod)
// It returns a string that is expected to be sent by the SendEvents Pod and should be present in
// the LogEvents Pod logs.
func setupBrokerTracing(t *testing.T, channel string, client *common.Client, loggerPodName string, mutatorPodName string, incomingTraceId bool) string {
// Create the Broker.
const (
brokerName = "br"
eventTypeFoo = "foo"
eventTypeBar = "bar"
any = v1alpha1.TriggerAnyFilter
)
channelTypeMeta := common.GetChannelTypeMeta(channel)
client.CreateBrokerOrFail(brokerName, channelTypeMeta)

// Create a LogEvents Pod and a K8s Service that points to it.
logPod := resources.EventDetailsPod(loggerPodName)
client.CreatePodOrFail(logPod, common.WithService(loggerPodName))

// Create a Trigger that receive events (type=bar) and send to logger Pod.
client.CreateTriggerOrFail(
"trigger-trace",
resources.WithBroker(brokerName),
resources.WithDeprecatedSourceAndTypeTriggerFilter(any, eventTypeBar),
resources.WithSubscriberRefForTrigger(loggerPodName),
)

// Create an event mutator to response an event with type bar
eventMutatorPod := resources.EventMutatorPod(mutatorPodName, eventTypeBar)
client.CreatePodOrFail(eventMutatorPod, common.WithService(mutatorPodName))

// Create a Trigger that receive events (type=foo) and send to event mutator Pod.
client.CreateTriggerOrFail(
"trigger-trace",
resources.WithBroker(brokerName),
resources.WithDeprecatedSourceAndTypeTriggerFilter(any, eventTypeBar),
resources.WithSubscriberRefForTrigger(loggerPodName),
)

// Wait for all test resources to be ready, so that we can start sending events.
if err := client.WaitForAllTestResourcesReady(); err != nil {
t.Fatalf("Failed to get all test resources ready: %v", err)
}

// Everything is setup to receive an event. Generate a CloudEvent.
senderName := "sender"
eventID := fmt.Sprintf("%s", uuid.NewUUID())
body := fmt.Sprintf("TestBrokerTracing %s", eventID)
event := &resources.CloudEvent{
ID: eventID,
Source: senderName,
Type: eventTypeFoo,
Data: fmt.Sprintf(`{"msg":%q}`, body),
Encoding: resources.CloudEventEncodingBinary,
}

// Send the CloudEvent (either with or without tracing inside the SendEvents Pod).
sendEvent := client.SendFakeEventToAddressable
if incomingTraceId {
sendEvent = client.SendFakeEventWithTracingToAddressable
}
if err := sendEvent(senderName, brokerName, common.BrokerTypeMeta, event); err != nil {
t.Fatalf("Failed to send fake CloudEvent to the broker %q", brokerName)
}
return body
}
72 changes: 72 additions & 0 deletions test/test_images/eventmutator/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"flag"
"log"

cloudevents "github.com/cloudevents/sdk-go"
"go.uber.org/zap"
"knative.dev/eventing/pkg/tracing"
)

var (
eventTypeResp string
)

func init() {
flag.StringVar(&eventTypeResp, "event-type-resp", "", "The Event Type to response.")
}

func gotEvent(event cloudevents.Event, resp *cloudevents.EventResponse) error {
ctx := event.Context.AsV03()

dataBytes, err := event.DataBytes()
if err != nil {
log.Printf("Got Data Error: %s\n", err.Error())
return err
}
log.Println("Received an event: ")
log.Printf("[%s] %s %s: %s", ctx.Time.String(), ctx.GetSource(), ctx.GetType(), dataBytes)

event.SetType(eventTypeResp)

log.Println("Change the event to: ")
log.Printf("[%s] %s %s: %+v", ctx.Time.String(), ctx.GetSource(), ctx.GetType(), dataBytes)

resp.RespondWith(200, &event)
return nil
}

func main() {
// parse the command line flags
flag.Parse()

if err := tracing.SetupStaticPublishing(zap.NewNop().Sugar(), "", tracing.AlwaysSample); err != nil {
log.Fatalf("Unable to setup trace publishing: %v", err)
}

c, err := cloudevents.NewDefaultClient()
if err != nil {
log.Fatalf("failed to create client, %v", err)
}

log.Printf("listening on 8080")
log.Fatalf("failed to start receiver: %s", c.StartReceiver(context.Background(), gotEvent))
}

0 comments on commit c5744fb

Please sign in to comment.