Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

E2E test for Channel Tracing #1858

Merged
merged 35 commits into from
Sep 20, 2019
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
ff72461
WIP
Harwayne Sep 9, 2019
ca5f88a
Simple one works.
Harwayne Sep 9, 2019
9ca4206
Revert deep vendored stuff.
Harwayne Sep 9, 2019
5d4cd00
Revert upload test images.
Harwayne Sep 9, 2019
b442008
It works!
Harwayne Sep 9, 2019
a9a55d1
Run in parallel.
Harwayne Sep 10, 2019
5f536c5
pkg determines the hostname.
Harwayne Sep 10, 2019
7c9b13c
Cleanup
Harwayne Sep 10, 2019
68693df
Use GetLog.
Harwayne Sep 10, 2019
e165ca5
Merge branch 'master' into e2e-tracing
Harwayne Sep 11, 2019
3b33fb9
Try installing monitoring and setting tracing to 100%.
Harwayne Sep 11, 2019
ca04836
Add a comment about where the expectedNumSpans comes from.
Harwayne Sep 12, 2019
3a3fc29
Create the istio-system ns.
Harwayne Sep 12, 2019
e833388
Update knative.dev/pkg.
Harwayne Sep 12, 2019
3a84ed5
Switch back to knative.dev/pkg's zipkin.
Harwayne Sep 12, 2019
793a2f1
Merge branch 'master' into e2e-tracing
Harwayne Sep 12, 2019
dadd66d
Rerun dep ensure.
Harwayne Sep 12, 2019
28ad88d
Switch in main_test.
Harwayne Sep 12, 2019
8eedb51
Random namespace name.
Harwayne Sep 16, 2019
8e96712
Merge branch 'master' into e2e-tracing
Harwayne Sep 16, 2019
ca9f3c3
Include trace so far in the error case.
Harwayne Sep 16, 2019
9c765c7
Set the ConfigMap in code.
Harwayne Sep 16, 2019
5340ad2
Set timeout to 60 seconds.
Harwayne Sep 16, 2019
1a8a05c
Small changes.
Harwayne Sep 16, 2019
0fbd809
Merge branch 'master' into e2e-tracing
Harwayne Sep 16, 2019
a13ed44
Works on the Mac.
Harwayne Sep 18, 2019
30c5b0d
Update knative/pkg.
Harwayne Sep 18, 2019
b7acbfb
Merge branch 'master' into e2e-tracing
Harwayne Sep 18, 2019
1a36cd2
Merge branch 'update-pkg' into e2e-tracing
Harwayne Sep 18, 2019
5ef3845
Merge branch 'master' into e2e-tracing
Harwayne Sep 18, 2019
fd4e160
Switch to KNATIVE_MONITORING_RELEASE.
Harwayne Sep 18, 2019
04f6246
Add logging for the tracing setup in all the test images that have tr…
Harwayne Sep 18, 2019
86fe35a
Only set the ConfigMap in the code, not the shell script.
Harwayne Sep 18, 2019
22d45d3
Remove setting the ConfigMap again.
Harwayne Sep 18, 2019
037fe7b
Move the bulk of the test into the helpers directory so that it can b…
Harwayne Sep 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 11 additions & 1 deletion pkg/tracing/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ var (
SampleRate: 0.01,
ZipkinEndpoint: "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans",
}

// AlwaysSample is a configuration that samples 100% of the requests and sends them to Zipkin.
// It is expected to be used only for testing purposes (e.g. in e2e tests).
// TODO(#1712): Remove this and pull "static" configuration from the environment instead.
AlwaysSample = &tracingconfig.Config{
Backend: tracingconfig.Zipkin,
Debug: true,
SampleRate: 1.0,
ZipkinEndpoint: "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans",
}
)

// setupPublishing sets up trace publishing for the process. Note that other pieces
Expand Down Expand Up @@ -111,7 +121,7 @@ func enableZeroSamplingCM(ns string) corev1.ConfigMap {
Namespace: ns,
},
Data: map[string]string{
"enable": "True",
"backend": "zipkin",
"debug": "False",
"sample-rate": "0",
"zipkin-endpoint": "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans",
Expand Down
2 changes: 1 addition & 1 deletion test/base/resources/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func EventLoggerPod(name string) *corev1.Pod {
return eventLoggerPod("logevents", name)
}

// EventDetailsPod creates a Pod that vaalidates events received and log details about events.
// EventDetailsPod creates a Pod that validates events received and log details about events.
func EventDetailsPod(name string) *corev1.Pod {
return eventLoggerPod("eventdetails", name)
}
Expand Down
44 changes: 21 additions & 23 deletions test/common/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,38 +34,41 @@ const (
timeout = 4 * time.Minute
)

// GetLog gets the logs from the given Pod in the namespace of this client. It will get the logs
// from the first container, whichever it is.
func (client *Client) GetLog(podName string) (string, error) {
containerName, err := client.getContainerName(podName, client.Namespace)
if err != nil {
return "", err
}
logs, err := client.Kube.PodLogs(podName, containerName, client.Namespace)
if err != nil {
return "", err
}
return string(logs), nil
}

// CheckLog waits until logs for the logger Pod satisfy the checker.
// If the checker does not pass within timeout it returns error.
func (client *Client) CheckLog(podName string, checker func(string) bool) error {
namespace := client.Namespace
containerName, err := client.getContainerName(podName, namespace)
if err != nil {
return err
}
return wait.PollImmediate(interval, timeout, func() (bool, error) {
logs, err := client.Kube.PodLogs(podName, containerName, namespace)
logs, err := client.GetLog(podName)
if err != nil {
return true, err
}
return checker(string(logs)), nil
return checker(logs), nil
})
}

// CheckLogEmpty waits the given amount of time and check the log is empty
func (client *Client) CheckLogEmpty(podName string, timeout time.Duration) error {
time.Sleep(timeout)

namespace := client.Namespace
containerName, err := client.getContainerName(podName, namespace)
logs, err := client.GetLog(podName)
if err != nil {
return err
}
logs, err := client.Kube.PodLogs(podName, containerName, namespace)
if err != nil {
return err
}
if string(logs) != "" {
return fmt.Errorf("expected empty log, got %s", string(logs))
if logs != "" {
return fmt.Errorf("expected empty log, got %s", logs)
}
return nil
}
Expand Down Expand Up @@ -106,16 +109,11 @@ func CheckerContainsAtLeast(content string, count int) func(string) bool {
// FindAnyLogContents attempts to find logs for given Pod/Container that has 'any' of the given contents.
// It returns an error if it couldn't retrieve the logs. In case 'any' of the contents are there, it returns true.
func (client *Client) FindAnyLogContents(podName string, contents []string) (bool, error) {
namespace := client.Namespace
containerName, err := client.getContainerName(podName, namespace)
if err != nil {
return false, err
}
logs, err := client.Kube.PodLogs(podName, containerName, namespace)
logs, err := client.GetLog(podName)
if err != nil {
return false, err
}
eventContentsSet, err := parseEventContentsFromPodLogs(string(logs))
eventContentsSet, err := parseEventContentsFromPodLogs(logs)
if err != nil {
return false, err
}
Expand Down
173 changes: 173 additions & 0 deletions test/conformance/channel_tracing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// +build e2e

/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package conformance

import (
"fmt"
"regexp"
"testing"
"time"

"k8s.io/apimachinery/pkg/util/uuid"
"knative.dev/eventing/test/base/resources"
"knative.dev/eventing/test/common"
"knative.dev/pkg/test/zipkin"
)

func TestChannelTracing(t *testing.T) {
testCases := map[string]struct {
incomingTraceId bool
istio bool
}{
"no incoming trace id": {},
"includes incoming trace id": {
incomingTraceId: true,
}, /*
"caller has istio": {
istio: true,
},*/
}

for n, tc := range testCases {
loggerPodName := "logger"
t.Run(n, func(t *testing.T) {
channelTestRunner.RunTests(t, common.FeatureBasic, func(st *testing.T, channel string) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function needs to be defined as a helper function, if you want it to be ported to eventing-contrib to test other channel implementations, example https://github.com/knative/eventing/blob/master/test/conformance/helpers/channel_header_single_event_helper.go

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

I didn't know, thanks for pointing it out.

// Don't accidentally use t, use st instead. To ensure this, shadow 't' to some a
// useless type.
t := struct{}{}
_ = fmt.Sprintf("%s", t)

client := common.Setup(st, true)
defer common.TearDown(client)

// Do NOT call zipkin.CleanupZipkinTracingSetup. That will be called exactly once in
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
// TestMain.
zipkin.SetupZipkinTracing(client.Kube.Kube, st.Logf)

// TODO This should really be upsert.
err := client.Kube.UpdateConfigMap("knative-eventing", "config-tracing", map[string]string{
"backend": "zipkin",
"zipkin-endpoint": "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans",
"debug": "true",
"sample-rate": "1.0",
})
if err != nil {
st.Fatalf("Unable to set the ConfigMap: %v", err)
}

mustContain := setupChannelTracing(st, channel, client, loggerPodName, tc.incomingTraceId)
assertLogContents(st, client, loggerPodName, mustContain)

traceID := getTraceID(st, client, loggerPodName)

// We expect the following spans:
// 1. Sending pod sends event to Channel (only if the sending pod generates a span).
// 2. Channel receives event from sending pod.
// 3. Channel sends event to logging pod.
// 4. Logging pod receives event from Channel.
// So we expect 4 spans if the sending pod is generating a span, 3 if not.
expectedNumSpans := 3
if tc.incomingTraceId {
expectedNumSpans = 4
}
Copy link
Member

@daisy-ycguo daisy-ycguo Sep 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why the number of spans is different when the incoming message includes trace Id ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment to the code, does it make sense?

trace, err := zipkin.JSONTrace(traceID, expectedNumSpans, 60*time.Second)
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
st.Fatalf("Unable to get trace %q: %v. Trace so far %+v", traceID, err, trace)
}
st.Logf("I got the trace, %q!\n%+v", traceID, trace)

// TODO: Assert information in the trace.
})
})
}
}

// setupChannelTracing is the general setup for TestChannelTracing. It creates the following:
// SendEvents (Pod) -> Channel -> Subscription -> K8s Service -> LogEvents (Pod)
// It returns a string that is expected to be sent by the SendEvents Pod and should be present in
// the LogEvents Pod logs.
func setupChannelTracing(t *testing.T, channel string, client *common.Client, loggerPodName string, incomingTraceId bool) string {
// Create the Channel.
channelName := "ch"
channelTypeMeta := common.GetChannelTypeMeta(channel)
client.CreateChannelOrFail(channelName, channelTypeMeta)

// Create the 'sink', a LogEvents Pod and a K8s Service that points to it.
pod := resources.EventDetailsPod(loggerPodName)
client.CreatePodOrFail(pod, common.WithService(loggerPodName))

// Create the Subscription linking the Channel to the LogEvents K8s Service.
client.CreateSubscriptionOrFail(
"sub",
channelName,
channelTypeMeta,
resources.WithSubscriberForSubscription(loggerPodName),
)

// Wait for all test resources to be ready, so that we can start sending events.
if err := client.WaitForAllTestResourcesReady(); err != nil {
t.Fatalf("Failed to get all test resources ready: %v", err)
}

// Everything is setup to receive an event. Generate a CloudEvent.
senderName := "sender"
eventID := fmt.Sprintf("%s", uuid.NewUUID())
body := fmt.Sprintf("TestChannelTracing %s", eventID)
event := &resources.CloudEvent{
ID: eventID,
Source: senderName,
Type: resources.CloudEventDefaultType,
Data: fmt.Sprintf(`{"msg":%q}`, body),
Encoding: resources.CloudEventEncodingBinary,
}

// Send the CloudEvent (either with or without tracing inside the SendEvents Pod).
sendEvent := client.SendFakeEventToAddressable
if incomingTraceId {
sendEvent = client.SendFakeEventWithTracingToAddressable
}
if err := sendEvent(senderName, channelName, channelTypeMeta, event); err != nil {
t.Fatalf("Failed to send fake CloudEvent to the channel %q", channelName)
}
return body
}

// assertLogContents verifies that loggerPodName's logs contains mustContain. It is used to show
// that the expected event was sent to the logger Pod.
func assertLogContents(t *testing.T, client *common.Client, loggerPodName string, mustContain string) {
if err := client.CheckLog(loggerPodName, common.CheckerContains(mustContain)); err != nil {
t.Fatalf("String %q not found in logs of logger pod %q: %v", mustContain, loggerPodName, err)
}
}

// getTraceID gets the TraceID from loggerPodName's logs. It will also assert that body is present.
func getTraceID(t *testing.T, client *common.Client, loggerPodName string) string {
logs, err := client.GetLog(loggerPodName)
if err != nil {
t.Fatalf("Error getting logs: %v", err)
}
// This is the format that the eventdetails image prints headers.
re := regexp.MustCompile("\nGot Header X-B3-Traceid: ([a-zA-Z0-9]{32})\n")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit hacky...
We should find a better way to get the header, but can be in a different PR..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally agree, what do you recommend?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can define a library to add and parse headers to/from the logs, each header will be quoted with something like <header>...</header>. We can also use the similar way to log and parse the body of cloud event, if we need more accurate check on it.

This requires bigger changes, so it can be in a different PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#1945 to track.

submatches := re.FindStringSubmatch(logs)
if len(submatches) != 2 {
t.Fatalf("Unable to extract traceID: %q", logs)
}
traceID := submatches[1]
return traceID
}
24 changes: 17 additions & 7 deletions test/conformance/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ limitations under the License.
package conformance

import (
"log"
"os"
"testing"

"knative.dev/eventing/test"
"knative.dev/eventing/test/common"
"knative.dev/pkg/test/zipkin"
)

var setup = common.Setup
Expand All @@ -30,11 +32,19 @@ var channels test.Channels
var channelTestRunner common.ChannelTestRunner

func TestMain(m *testing.M) {
test.InitializeEventingFlags()
channels = test.EventingFlags.Channels
channelTestRunner = common.ChannelTestRunner{
ChannelFeatureMap: common.ChannelFeatureMap,
ChannelsToTest: test.EventingFlags.Channels,
}
os.Exit(m.Run())
os.Exit(func() int {
test.InitializeEventingFlags()
channels = test.EventingFlags.Channels
channelTestRunner = common.ChannelTestRunner{
ChannelFeatureMap: common.ChannelFeatureMap,
ChannelsToTest: test.EventingFlags.Channels,
}

// Any tests may SetupZipkinTracing, it will only actually be done once. This should be the ONLY
// place that cleans it up. If an individual test calls this instead, then it will break other
// tests that need the tracing in place.
defer zipkin.CleanupZipkinTracingSetup(log.Printf)

return m.Run()
}())
}
19 changes: 19 additions & 0 deletions test/e2e-common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,26 @@ function knative_setup() {
echo ">> Starting Knative Eventing"
echo "Installing Knative Eventing"
ko apply -f ${EVENTING_CONFIG} || return 1

# Force tracing to 100%.
kubectl apply -f - << END
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
apiVersion: v1
kind: ConfigMap
metadata:
namespace: knative-eventing
name: config-tracing
data:
backend: stackdriver
zipkin-endpoint: "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans"
debug: "true"
sample-rate: "1.0"
END
wait_until_pods_running knative-eventing || fail_test "Knative Eventing did not come up"

echo "Installing Knative Monitoring"
kubectl create namespace istio-system
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
kubectl apply --filename https://github.com/knative/serving/releases/download/v0.8.0/monitoring-tracing-zipkin-in-mem.yaml || return 1
Harwayne marked this conversation as resolved.
Show resolved Hide resolved
wait_until_pods_running istio-system || fail_test "Knative Monitoring did not come up"
}

# Teardown the Knative environment after tests finish.
Expand Down
9 changes: 7 additions & 2 deletions test/test_images/eventdetails/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (
"log"

cloudevents "github.com/cloudevents/sdk-go"
"go.uber.org/zap"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/tracing"
)

//func handler(event cloudevents.Event) {
func handler(ctx context.Context, event cloudevents.Event) {
fmt.Printf("Got Event Context: %+v\n", event.Context)
tx := cloudevents.HTTPTransportContextFrom(ctx)
Expand All @@ -47,7 +49,10 @@ func handler(ctx context.Context, event cloudevents.Event) {
}

func main() {
c, err := cloudevents.NewDefaultClient()
if err := tracing.SetupStaticPublishing(zap.NewNop().Sugar(), "", tracing.AlwaysSample); err != nil {
log.Fatalf("Unable to setup trace publishing: %v", err)
}
c, err := kncloudevents.NewDefaultClient()
if err != nil {
log.Fatalf("failed to create eventdetails client, %v", err)
}
Expand Down
Loading