Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

E2E test for Channel Tracing #1858

Merged
merged 35 commits into from
Sep 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
ff72461
WIP
Harwayne Sep 9, 2019
ca5f88a
Simple one works.
Harwayne Sep 9, 2019
9ca4206
Revert deep vendored stuff.
Harwayne Sep 9, 2019
5d4cd00
Revert upload test images.
Harwayne Sep 9, 2019
b442008
It works!
Harwayne Sep 9, 2019
a9a55d1
Run in parallel.
Harwayne Sep 10, 2019
5f536c5
pkg determines the hostname.
Harwayne Sep 10, 2019
7c9b13c
Cleanup
Harwayne Sep 10, 2019
68693df
Use GetLog.
Harwayne Sep 10, 2019
e165ca5
Merge branch 'master' into e2e-tracing
Harwayne Sep 11, 2019
3b33fb9
Try installing monitoring and setting tracing to 100%.
Harwayne Sep 11, 2019
ca04836
Add a comment about where the expectedNumSpans comes from.
Harwayne Sep 12, 2019
3a3fc29
Create the istio-system ns.
Harwayne Sep 12, 2019
e833388
Update knative.dev/pkg.
Harwayne Sep 12, 2019
3a84ed5
Switch back to knative.dev/pkg's zipkin.
Harwayne Sep 12, 2019
793a2f1
Merge branch 'master' into e2e-tracing
Harwayne Sep 12, 2019
dadd66d
Rerun dep ensure.
Harwayne Sep 12, 2019
28ad88d
Switch in main_test.
Harwayne Sep 12, 2019
8eedb51
Random namespace name.
Harwayne Sep 16, 2019
8e96712
Merge branch 'master' into e2e-tracing
Harwayne Sep 16, 2019
ca9f3c3
Include trace so far in the error case.
Harwayne Sep 16, 2019
9c765c7
Set the ConfigMap in code.
Harwayne Sep 16, 2019
5340ad2
Set timeout to 60 seconds.
Harwayne Sep 16, 2019
1a8a05c
Small changes.
Harwayne Sep 16, 2019
0fbd809
Merge branch 'master' into e2e-tracing
Harwayne Sep 16, 2019
a13ed44
Works on the Mac.
Harwayne Sep 18, 2019
30c5b0d
Update knative/pkg.
Harwayne Sep 18, 2019
b7acbfb
Merge branch 'master' into e2e-tracing
Harwayne Sep 18, 2019
1a36cd2
Merge branch 'update-pkg' into e2e-tracing
Harwayne Sep 18, 2019
5ef3845
Merge branch 'master' into e2e-tracing
Harwayne Sep 18, 2019
fd4e160
Switch to KNATIVE_MONITORING_RELEASE.
Harwayne Sep 18, 2019
04f6246
Add logging for the tracing setup in all the test images that have tr…
Harwayne Sep 18, 2019
86fe35a
Only set the ConfigMap in the code, not the shell script.
Harwayne Sep 18, 2019
22d45d3
Remove setting the ConfigMap again.
Harwayne Sep 18, 2019
037fe7b
Move the bulk of the test into the helpers directory so that it can b…
Harwayne Sep 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 11 additions & 1 deletion pkg/tracing/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ var (
SampleRate: 0.01,
ZipkinEndpoint: "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans",
}

// AlwaysSample is a configuration that samples 100% of the requests and sends them to Zipkin.
// It is expected to be used only for testing purposes (e.g. in e2e tests).
// TODO(#1712): Remove this and pull "static" configuration from the environment instead.
AlwaysSample = &tracingconfig.Config{
Backend: tracingconfig.Zipkin,
Debug: true,
SampleRate: 1.0,
ZipkinEndpoint: "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans",
}
)

// setupPublishing sets up trace publishing for the process. Note that other pieces
Expand Down Expand Up @@ -111,7 +121,7 @@ func enableZeroSamplingCM(ns string) corev1.ConfigMap {
Namespace: ns,
},
Data: map[string]string{
"enable": "True",
"backend": "zipkin",
"debug": "False",
"sample-rate": "0",
"zipkin-endpoint": "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans",
Expand Down
2 changes: 1 addition & 1 deletion test/base/resources/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func EventLoggerPod(name string) *corev1.Pod {
return eventLoggerPod("logevents", name)
}

// EventDetailsPod creates a Pod that vaalidates events received and log details about events.
// EventDetailsPod creates a Pod that validates events received and log details about events.
func EventDetailsPod(name string) *corev1.Pod {
return eventLoggerPod("eventdetails", name)
}
Expand Down
44 changes: 21 additions & 23 deletions test/common/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,38 +34,41 @@ const (
timeout = 4 * time.Minute
)

// GetLog gets the logs from the given Pod in the namespace of this client. It will get the logs
// from the first container, whichever it is.
func (client *Client) GetLog(podName string) (string, error) {
containerName, err := client.getContainerName(podName, client.Namespace)
if err != nil {
return "", err
}
logs, err := client.Kube.PodLogs(podName, containerName, client.Namespace)
if err != nil {
return "", err
}
return string(logs), nil
}

// CheckLog waits until logs for the logger Pod satisfy the checker.
// If the checker does not pass within timeout it returns error.
func (client *Client) CheckLog(podName string, checker func(string) bool) error {
namespace := client.Namespace
containerName, err := client.getContainerName(podName, namespace)
if err != nil {
return err
}
return wait.PollImmediate(interval, timeout, func() (bool, error) {
logs, err := client.Kube.PodLogs(podName, containerName, namespace)
logs, err := client.GetLog(podName)
if err != nil {
return true, err
}
return checker(string(logs)), nil
return checker(logs), nil
})
}

// CheckLogEmpty waits the given amount of time and check the log is empty
func (client *Client) CheckLogEmpty(podName string, timeout time.Duration) error {
time.Sleep(timeout)

namespace := client.Namespace
containerName, err := client.getContainerName(podName, namespace)
logs, err := client.GetLog(podName)
if err != nil {
return err
}
logs, err := client.Kube.PodLogs(podName, containerName, namespace)
if err != nil {
return err
}
if string(logs) != "" {
return fmt.Errorf("expected empty log, got %s", string(logs))
if logs != "" {
return fmt.Errorf("expected empty log, got %s", logs)
}
return nil
}
Expand Down Expand Up @@ -106,16 +109,11 @@ func CheckerContainsAtLeast(content string, count int) func(string) bool {
// FindAnyLogContents attempts to find logs for given Pod/Container that has 'any' of the given contents.
// It returns an error if it couldn't retrieve the logs. In case 'any' of the contents are there, it returns true.
func (client *Client) FindAnyLogContents(podName string, contents []string) (bool, error) {
namespace := client.Namespace
containerName, err := client.getContainerName(podName, namespace)
if err != nil {
return false, err
}
logs, err := client.Kube.PodLogs(podName, containerName, namespace)
logs, err := client.GetLog(podName)
if err != nil {
return false, err
}
eventContentsSet, err := parseEventContentsFromPodLogs(string(logs))
eventContentsSet, err := parseEventContentsFromPodLogs(logs)
if err != nil {
return false, err
}
Expand Down
29 changes: 29 additions & 0 deletions test/conformance/channel_tracing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// +build e2e

/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package conformance

import (
"testing"

"knative.dev/eventing/test/conformance/helpers"
)

func TestChannelTracing(t *testing.T) {
helpers.ChannelTracingTestHelper(t, channelTestRunner)
}
6 changes: 0 additions & 6 deletions test/conformance/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ import (
"knative.dev/eventing/test/conformance/helpers"
)

const (
userHeaderKey = "this-was-user-set"
userHeaderValue = "a value"
)

// The Channel MUST pass through all tracing information as CloudEvents attributes
func TestMustPassTracingHeaders(t *testing.T) {
t.Logf("Starting channel tracing headers test")
Expand All @@ -38,5 +33,4 @@ func TestMustPassTracingHeaders(t *testing.T) {
resources.CloudEventEncodingBinary,
channelTestRunner,
)

}
157 changes: 157 additions & 0 deletions test/conformance/helpers/channel_tracing_test_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package helpers

import (
"fmt"
"regexp"
"testing"
"time"

"k8s.io/apimachinery/pkg/util/uuid"
"knative.dev/eventing/test/base/resources"
"knative.dev/eventing/test/common"
tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing"
"knative.dev/pkg/test/zipkin"
)

func ChannelTracingTestHelper(t *testing.T, channelTestRunner common.ChannelTestRunner) {
testCases := map[string]struct {
incomingTraceId bool
istio bool
}{
"includes incoming trace id": {
incomingTraceId: true,
},
}

for n, tc := range testCases {
loggerPodName := "logger"
t.Run(n, func(t *testing.T) {
channelTestRunner.RunTests(t, common.FeatureBasic, func(st *testing.T, channel string) {
// Don't accidentally use t, use st instead. To ensure this, shadow 't' to a useless
// type.
t := struct{}{}
_ = fmt.Sprintf("%s", t)

client := common.Setup(st, true)
defer common.TearDown(client)

// Do NOT call zipkin.CleanupZipkinTracingSetup. That will be called exactly once in
// TestMain.
tracinghelper.Setup(st, client)

mustContain := setupChannelTracing(st, channel, client, loggerPodName, tc.incomingTraceId)
assertLogContents(st, client, loggerPodName, mustContain)

traceID := getTraceID(st, client, loggerPodName)

// We expect the following spans:
// 1. Sending pod sends event to Channel (only if the sending pod generates a span).
// 2. Channel receives event from sending pod.
// 3. Channel sends event to logging pod.
// 4. Logging pod receives event from Channel.
// So we expect 4 spans if the sending pod is generating a span, 3 if not.
expectedNumSpans := 3
if tc.incomingTraceId {
expectedNumSpans = 4
}
trace, err := zipkin.JSONTrace(traceID, expectedNumSpans, 60*time.Second)
if err != nil {
st.Fatalf("Unable to get trace %q: %v. Trace so far %+v", traceID, err, trace)
}
st.Logf("I got the trace, %q!\n%+v", traceID, trace)

// TODO: Assert information in the trace.
})
})
}
}

// setupChannelTracing is the general setup for TestChannelTracing. It creates the following:
// SendEvents (Pod) -> Channel -> Subscription -> K8s Service -> LogEvents (Pod)
// It returns a string that is expected to be sent by the SendEvents Pod and should be present in
// the LogEvents Pod logs.
func setupChannelTracing(t *testing.T, channel string, client *common.Client, loggerPodName string, incomingTraceId bool) string {
// Create the Channel.
channelName := "ch"
channelTypeMeta := common.GetChannelTypeMeta(channel)
client.CreateChannelOrFail(channelName, channelTypeMeta)

// Create the 'sink', a LogEvents Pod and a K8s Service that points to it.
pod := resources.EventDetailsPod(loggerPodName)
client.CreatePodOrFail(pod, common.WithService(loggerPodName))

// Create the Subscription linking the Channel to the LogEvents K8s Service.
client.CreateSubscriptionOrFail(
"sub",
channelName,
channelTypeMeta,
resources.WithSubscriberForSubscription(loggerPodName),
)

// Wait for all test resources to be ready, so that we can start sending events.
if err := client.WaitForAllTestResourcesReady(); err != nil {
t.Fatalf("Failed to get all test resources ready: %v", err)
}

// Everything is setup to receive an event. Generate a CloudEvent.
senderName := "sender"
eventID := fmt.Sprintf("%s", uuid.NewUUID())
body := fmt.Sprintf("TestChannelTracing %s", eventID)
event := &resources.CloudEvent{
ID: eventID,
Source: senderName,
Type: resources.CloudEventDefaultType,
Data: fmt.Sprintf(`{"msg":%q}`, body),
Encoding: resources.CloudEventEncodingBinary,
}

// Send the CloudEvent (either with or without tracing inside the SendEvents Pod).
sendEvent := client.SendFakeEventToAddressable
if incomingTraceId {
sendEvent = client.SendFakeEventWithTracingToAddressable
}
if err := sendEvent(senderName, channelName, channelTypeMeta, event); err != nil {
t.Fatalf("Failed to send fake CloudEvent to the channel %q", channelName)
}
return body
}

// assertLogContents verifies that loggerPodName's logs contains mustContain. It is used to show
// that the expected event was sent to the logger Pod.
func assertLogContents(t *testing.T, client *common.Client, loggerPodName string, mustContain string) {
if err := client.CheckLog(loggerPodName, common.CheckerContains(mustContain)); err != nil {
t.Fatalf("String %q not found in logs of logger pod %q: %v", mustContain, loggerPodName, err)
}
}

// getTraceID gets the TraceID from loggerPodName's logs. It will also assert that body is present.
func getTraceID(t *testing.T, client *common.Client, loggerPodName string) string {
logs, err := client.GetLog(loggerPodName)
if err != nil {
t.Fatalf("Error getting logs: %v", err)
}
// This is the format that the eventdetails image prints headers.
re := regexp.MustCompile("\nGot Header X-B3-Traceid: ([a-zA-Z0-9]{32})\n")
matches := re.FindStringSubmatch(logs)
if len(matches) != 2 {
t.Fatalf("Unable to extract traceID: %q", logs)
}
traceID := matches[1]
return traceID
}
51 changes: 51 additions & 0 deletions test/conformance/helpers/tracing/zipkin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tracing

import (
"sync"
"testing"

"knative.dev/eventing/test/common"
"knative.dev/pkg/test/zipkin"
)

// Setup sets up port forwarding to Zipkin and sets the knative-eventing tracing config to debug
// mode (everything is sampled).
func Setup(t *testing.T, client *common.Client) {
// Do NOT call zipkin.CleanupZipkinTracingSetup. That will be called exactly once in
// TestMain.
zipkin.SetupZipkinTracing(client.Kube.Kube, t.Logf)
setTracingConfigToDebugMode(t, client)
}

var setTracingConfigOnce = sync.Once{}

// TODO Do we need a tear down method as well?
func setTracingConfigToDebugMode(t *testing.T, client *common.Client) {
setTracingConfigOnce.Do(func() {
err := client.Kube.UpdateConfigMap("knative-eventing", "config-tracing", map[string]string{
"backend": "zipkin",
"zipkin-endpoint": "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans",
"debug": "true",
"sample-rate": "1.0",
})
if err != nil {
t.Fatalf("Unable to set the ConfigMap: %v", err)
}
})
}
Loading