Skip to content

Commit

Permalink
E2E test for Channel Tracing (#1858)
Browse files Browse the repository at this point in the history
* WIP

* Simple one works.

* Revert deep vendored stuff.

* Revert upload test images.

* It works!

* Run in parallel.

* pkg determines the hostname.

* Cleanup

* Use GetLog.

* Try installing monitoring and setting tracing to 100%.

* Add a comment about where the expectedNumSpans comes from.

* Create the istio-system ns.

* Update knative.dev/pkg.

* Switch back to knative.dev/pkg's zipkin.

* Rerun dep ensure.

* Switch in main_test.

* Random namespace name.

* Include trace so far in the error case.

* Set the ConfigMap in code.

* Set timeout to 60 seconds.

* Small changes.

* Works on the Mac.

* Update knative/pkg.

* Switch to KNATIVE_MONITORING_RELEASE.

* Add logging for the tracing setup in all the test images that have tracing.

* Only set the ConfigMap in the code, not the shell script.

* Remove setting the ConfigMap again.

* Move the bulk of the test into the helpers directory so that it can be reused by eventing-contrib.
  • Loading branch information
Harwayne authored and knative-prow-robot committed Sep 20, 2019
1 parent 8b54bc6 commit 989058c
Show file tree
Hide file tree
Showing 13 changed files with 323 additions and 47 deletions.
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 11 additions & 1 deletion pkg/tracing/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ var (
SampleRate: 0.01,
ZipkinEndpoint: "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans",
}

// AlwaysSample is a configuration that samples 100% of the requests and sends them to Zipkin.
// It is expected to be used only for testing purposes (e.g. in e2e tests).
// TODO(#1712): Remove this and pull "static" configuration from the environment instead.
AlwaysSample = &tracingconfig.Config{
Backend: tracingconfig.Zipkin,
Debug: true,
SampleRate: 1.0,
ZipkinEndpoint: "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans",
}
)

// setupPublishing sets up trace publishing for the process. Note that other pieces
Expand Down Expand Up @@ -111,7 +121,7 @@ func enableZeroSamplingCM(ns string) corev1.ConfigMap {
Namespace: ns,
},
Data: map[string]string{
"enable": "True",
"backend": "zipkin",
"debug": "False",
"sample-rate": "0",
"zipkin-endpoint": "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans",
Expand Down
2 changes: 1 addition & 1 deletion test/base/resources/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func EventLoggerPod(name string) *corev1.Pod {
return eventLoggerPod("logevents", name)
}

// EventDetailsPod creates a Pod that vaalidates events received and log details about events.
// EventDetailsPod creates a Pod that validates events received and log details about events.
func EventDetailsPod(name string) *corev1.Pod {
return eventLoggerPod("eventdetails", name)
}
Expand Down
44 changes: 21 additions & 23 deletions test/common/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,38 +34,41 @@ const (
timeout = 4 * time.Minute
)

// GetLog gets the logs from the given Pod in the namespace of this client. It will get the logs
// from the first container, whichever it is.
func (client *Client) GetLog(podName string) (string, error) {
containerName, err := client.getContainerName(podName, client.Namespace)
if err != nil {
return "", err
}
logs, err := client.Kube.PodLogs(podName, containerName, client.Namespace)
if err != nil {
return "", err
}
return string(logs), nil
}

// CheckLog waits until logs for the logger Pod satisfy the checker.
// If the checker does not pass within timeout it returns error.
func (client *Client) CheckLog(podName string, checker func(string) bool) error {
namespace := client.Namespace
containerName, err := client.getContainerName(podName, namespace)
if err != nil {
return err
}
return wait.PollImmediate(interval, timeout, func() (bool, error) {
logs, err := client.Kube.PodLogs(podName, containerName, namespace)
logs, err := client.GetLog(podName)
if err != nil {
return true, err
}
return checker(string(logs)), nil
return checker(logs), nil
})
}

// CheckLogEmpty waits the given amount of time and check the log is empty
func (client *Client) CheckLogEmpty(podName string, timeout time.Duration) error {
time.Sleep(timeout)

namespace := client.Namespace
containerName, err := client.getContainerName(podName, namespace)
logs, err := client.GetLog(podName)
if err != nil {
return err
}
logs, err := client.Kube.PodLogs(podName, containerName, namespace)
if err != nil {
return err
}
if string(logs) != "" {
return fmt.Errorf("expected empty log, got %s", string(logs))
if logs != "" {
return fmt.Errorf("expected empty log, got %s", logs)
}
return nil
}
Expand Down Expand Up @@ -106,16 +109,11 @@ func CheckerContainsAtLeast(content string, count int) func(string) bool {
// FindAnyLogContents attempts to find logs for given Pod/Container that has 'any' of the given contents.
// It returns an error if it couldn't retrieve the logs. In case 'any' of the contents are there, it returns true.
func (client *Client) FindAnyLogContents(podName string, contents []string) (bool, error) {
namespace := client.Namespace
containerName, err := client.getContainerName(podName, namespace)
if err != nil {
return false, err
}
logs, err := client.Kube.PodLogs(podName, containerName, namespace)
logs, err := client.GetLog(podName)
if err != nil {
return false, err
}
eventContentsSet, err := parseEventContentsFromPodLogs(string(logs))
eventContentsSet, err := parseEventContentsFromPodLogs(logs)
if err != nil {
return false, err
}
Expand Down
29 changes: 29 additions & 0 deletions test/conformance/channel_tracing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// +build e2e

/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package conformance

import (
"testing"

"knative.dev/eventing/test/conformance/helpers"
)

func TestChannelTracing(t *testing.T) {
helpers.ChannelTracingTestHelper(t, channelTestRunner)
}
6 changes: 0 additions & 6 deletions test/conformance/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ import (
"knative.dev/eventing/test/conformance/helpers"
)

const (
userHeaderKey = "this-was-user-set"
userHeaderValue = "a value"
)

// The Channel MUST pass through all tracing information as CloudEvents attributes
func TestMustPassTracingHeaders(t *testing.T) {
t.Logf("Starting channel tracing headers test")
Expand All @@ -38,5 +33,4 @@ func TestMustPassTracingHeaders(t *testing.T) {
resources.CloudEventEncodingBinary,
channelTestRunner,
)

}
157 changes: 157 additions & 0 deletions test/conformance/helpers/channel_tracing_test_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package helpers

import (
"fmt"
"regexp"
"testing"
"time"

"k8s.io/apimachinery/pkg/util/uuid"
"knative.dev/eventing/test/base/resources"
"knative.dev/eventing/test/common"
tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing"
"knative.dev/pkg/test/zipkin"
)

func ChannelTracingTestHelper(t *testing.T, channelTestRunner common.ChannelTestRunner) {
testCases := map[string]struct {
incomingTraceId bool
istio bool
}{
"includes incoming trace id": {
incomingTraceId: true,
},
}

for n, tc := range testCases {
loggerPodName := "logger"
t.Run(n, func(t *testing.T) {
channelTestRunner.RunTests(t, common.FeatureBasic, func(st *testing.T, channel string) {
// Don't accidentally use t, use st instead. To ensure this, shadow 't' to a useless
// type.
t := struct{}{}
_ = fmt.Sprintf("%s", t)

client := common.Setup(st, true)
defer common.TearDown(client)

// Do NOT call zipkin.CleanupZipkinTracingSetup. That will be called exactly once in
// TestMain.
tracinghelper.Setup(st, client)

mustContain := setupChannelTracing(st, channel, client, loggerPodName, tc.incomingTraceId)
assertLogContents(st, client, loggerPodName, mustContain)

traceID := getTraceID(st, client, loggerPodName)

// We expect the following spans:
// 1. Sending pod sends event to Channel (only if the sending pod generates a span).
// 2. Channel receives event from sending pod.
// 3. Channel sends event to logging pod.
// 4. Logging pod receives event from Channel.
// So we expect 4 spans if the sending pod is generating a span, 3 if not.
expectedNumSpans := 3
if tc.incomingTraceId {
expectedNumSpans = 4
}
trace, err := zipkin.JSONTrace(traceID, expectedNumSpans, 60*time.Second)
if err != nil {
st.Fatalf("Unable to get trace %q: %v. Trace so far %+v", traceID, err, trace)
}
st.Logf("I got the trace, %q!\n%+v", traceID, trace)

// TODO: Assert information in the trace.
})
})
}
}

// setupChannelTracing is the general setup for TestChannelTracing. It creates the following:
// SendEvents (Pod) -> Channel -> Subscription -> K8s Service -> LogEvents (Pod)
// It returns a string that is expected to be sent by the SendEvents Pod and should be present in
// the LogEvents Pod logs.
func setupChannelTracing(t *testing.T, channel string, client *common.Client, loggerPodName string, incomingTraceId bool) string {
// Create the Channel.
channelName := "ch"
channelTypeMeta := common.GetChannelTypeMeta(channel)
client.CreateChannelOrFail(channelName, channelTypeMeta)

// Create the 'sink', a LogEvents Pod and a K8s Service that points to it.
pod := resources.EventDetailsPod(loggerPodName)
client.CreatePodOrFail(pod, common.WithService(loggerPodName))

// Create the Subscription linking the Channel to the LogEvents K8s Service.
client.CreateSubscriptionOrFail(
"sub",
channelName,
channelTypeMeta,
resources.WithSubscriberForSubscription(loggerPodName),
)

// Wait for all test resources to be ready, so that we can start sending events.
if err := client.WaitForAllTestResourcesReady(); err != nil {
t.Fatalf("Failed to get all test resources ready: %v", err)
}

// Everything is setup to receive an event. Generate a CloudEvent.
senderName := "sender"
eventID := fmt.Sprintf("%s", uuid.NewUUID())
body := fmt.Sprintf("TestChannelTracing %s", eventID)
event := &resources.CloudEvent{
ID: eventID,
Source: senderName,
Type: resources.CloudEventDefaultType,
Data: fmt.Sprintf(`{"msg":%q}`, body),
Encoding: resources.CloudEventEncodingBinary,
}

// Send the CloudEvent (either with or without tracing inside the SendEvents Pod).
sendEvent := client.SendFakeEventToAddressable
if incomingTraceId {
sendEvent = client.SendFakeEventWithTracingToAddressable
}
if err := sendEvent(senderName, channelName, channelTypeMeta, event); err != nil {
t.Fatalf("Failed to send fake CloudEvent to the channel %q", channelName)
}
return body
}

// assertLogContents verifies that loggerPodName's logs contains mustContain. It is used to show
// that the expected event was sent to the logger Pod.
func assertLogContents(t *testing.T, client *common.Client, loggerPodName string, mustContain string) {
if err := client.CheckLog(loggerPodName, common.CheckerContains(mustContain)); err != nil {
t.Fatalf("String %q not found in logs of logger pod %q: %v", mustContain, loggerPodName, err)
}
}

// getTraceID gets the TraceID from loggerPodName's logs. It will also assert that body is present.
func getTraceID(t *testing.T, client *common.Client, loggerPodName string) string {
logs, err := client.GetLog(loggerPodName)
if err != nil {
t.Fatalf("Error getting logs: %v", err)
}
// This is the format that the eventdetails image prints headers.
re := regexp.MustCompile("\nGot Header X-B3-Traceid: ([a-zA-Z0-9]{32})\n")
matches := re.FindStringSubmatch(logs)
if len(matches) != 2 {
t.Fatalf("Unable to extract traceID: %q", logs)
}
traceID := matches[1]
return traceID
}
51 changes: 51 additions & 0 deletions test/conformance/helpers/tracing/zipkin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tracing

import (
"sync"
"testing"

"knative.dev/eventing/test/common"
"knative.dev/pkg/test/zipkin"
)

// Setup sets up port forwarding to Zipkin and sets the knative-eventing tracing config to debug
// mode (everything is sampled).
func Setup(t *testing.T, client *common.Client) {
// Do NOT call zipkin.CleanupZipkinTracingSetup. That will be called exactly once in
// TestMain.
zipkin.SetupZipkinTracing(client.Kube.Kube, t.Logf)
setTracingConfigToDebugMode(t, client)
}

var setTracingConfigOnce = sync.Once{}

// TODO Do we need a tear down method as well?
func setTracingConfigToDebugMode(t *testing.T, client *common.Client) {
setTracingConfigOnce.Do(func() {
err := client.Kube.UpdateConfigMap("knative-eventing", "config-tracing", map[string]string{
"backend": "zipkin",
"zipkin-endpoint": "http://zipkin.istio-system.svc.cluster.local:9411/api/v2/spans",
"debug": "true",
"sample-rate": "1.0",
})
if err != nil {
t.Fatalf("Unable to set the ConfigMap: %v", err)
}
})
}
Loading

0 comments on commit 989058c

Please sign in to comment.