Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Broker Data Plane Conformance tests for the Ingress and Consumer #3531

Merged
merged 5 commits into from
Jul 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions test/conformance/broker_data_plane_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// +build e2e

/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package conformance

import (
"testing"

"knative.dev/eventing/test/conformance/helpers"
testlib "knative.dev/eventing/test/lib"
)

func TestBrokerV1Beta1DataPlaneIngress(t *testing.T) {
client := testlib.Setup(t, true, testlib.SetupClientOptionNoop)
defer testlib.TearDown(client)

broker := helpers.BrokerDataPlaneSetupHelper(client, brokerName, brokerNamespace, brokerClass)
helpers.BrokerV1Beta1IngressDataPlaneTestHelper(t, client, broker)
}
func TestBrokerV1Beta1DataPlaneConsumer(t *testing.T) {
client := testlib.Setup(t, true, testlib.SetupClientOptionNoop)
defer testlib.TearDown(client)

broker := helpers.BrokerDataPlaneSetupHelper(client, brokerName, brokerNamespace, brokerClass)
helpers.BrokerV1Beta1ConsumerDataPlaneTestHelper(t, client, broker)
}
353 changes: 353 additions & 0 deletions test/conformance/helpers/broker_data_plane_test_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,353 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package helpers

import (
"fmt"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1"
testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/recordevents"
"knative.dev/eventing/test/lib/resources"
"knative.dev/eventing/test/lib/sender"

ce "github.com/cloudevents/sdk-go/v2"
cetest "github.com/cloudevents/sdk-go/v2/test"
)

func BrokerDataPlaneSetupHelper(client *testlib.Client, brokerName, brokerNamespace, brokerClass string) *eventingv1beta1.Broker {
var broker *eventingv1beta1.Broker
var err error
if brokerName == "" || brokerNamespace == "" {
brokerName = "br"
config := client.CreateBrokerConfigMapOrFail(brokerName, &testlib.DefaultChannel)

broker = client.CreateBrokerV1Beta1OrFail(
brokerName,
resources.WithBrokerClassForBrokerV1Beta1(brokerClass),
resources.WithConfigForBrokerV1Beta1(config),
)
client.WaitForResourceReadyOrFail(broker.Name, testlib.BrokerTypeMeta)
} else {
if broker, err = client.Eventing.EventingV1beta1().Brokers(brokerNamespace).Get(brokerName, metav1.GetOptions{}); err != nil {
client.T.Fatalf("Could not Get broker %s/%s: %v", brokerNamespace, brokerName, err)
}
}
return broker
}

//At ingress
//Supports CE 0.3 or CE 1.0 via HTTP
//Supports structured or Binary mode
//Respond with 2xx on good CE
//Respond with 400 on bad CE
//Reject non-POST requests to publish URI
func BrokerV1Beta1IngressDataPlaneTestHelper(
t *testing.T,
client *testlib.Client,
broker *eventingv1beta1.Broker,
) {
triggerName := "trigger"
loggerName := "logger-pod"
eventTracker, _ := recordevents.StartEventRecordOrFail(client, loggerName)
defer eventTracker.Cleanup()
client.WaitForAllTestResourcesReadyOrFail()

trigger := client.CreateTriggerOrFailV1Beta1(
triggerName,
resources.WithBrokerV1Beta1(broker.Name),
resources.WithAttributesTriggerFilterV1Beta1(eventingv1beta1.TriggerAnyFilter, eventingv1beta1.TriggerAnyFilter, nil),
resources.WithSubscriberServiceRefForTriggerV1Beta1(loggerName),
)

client.WaitForResourceReadyOrFail(trigger.Name, testlib.TriggerTypeMeta)

t.Run("Ingress Supports CE0.3", func(t *testing.T) {
eventID := "CE0.3"
event := ce.NewEvent()
event.SetID(eventID)
event.SetType(testlib.DefaultEventType)
event.SetSource("0.3.event.sender.test.knative.dev")
body := fmt.Sprintf(`{"msg":%q}`, eventID)

if err := event.SetData(ce.ApplicationJSON, []byte(body)); err != nil {
t.Fatalf("Cannot set the payload of the event: %s", err.Error())
}
event.Context.AsV03()
event.SetSpecVersion("0.3")
client.SendEventToAddressable("v03-test-sender", broker.Name, testlib.BrokerTypeMeta, event)
originalEventMatcher := recordevents.MatchEvent(cetest.AllOf(
cetest.HasId(eventID),
cetest.HasSpecVersion("0.3"),
))
eventTracker.AssertAtLeast(1, originalEventMatcher)
})

t.Run("Ingress Supports CE1.0", func(t *testing.T) {
eventID := "CE1.0"
event := ce.NewEvent()
event.SetID(eventID)
event.SetType(testlib.DefaultEventType)
event.SetSource("1.0.event.sender.test.knative.dev")
body := fmt.Sprintf(`{"msg":%q}`, eventID)
if err := event.SetData(ce.ApplicationJSON, []byte(body)); err != nil {
t.Fatalf("Cannot set the payload of the event: %s", err.Error())
}

client.SendEventToAddressable("v10-test-sender", broker.Name, testlib.BrokerTypeMeta, event)
originalEventMatcher := recordevents.MatchEvent(cetest.AllOf(
cetest.HasId(eventID),
cetest.HasSpecVersion("1.0"),
))
eventTracker.AssertAtLeast(1, originalEventMatcher)

})
t.Run("Ingress Supports Structured Mode", func(t *testing.T) {
eventID := "Structured-Mode"
event := ce.NewEvent()
event.SetID(eventID)
event.SetType(testlib.DefaultEventType)
event.SetSource("structured.mode.event.sender.test.knative.dev")
body := fmt.Sprintf(`{"msg":%q}`, eventID)
if err := event.SetData(ce.ApplicationJSON, []byte(body)); err != nil {
t.Fatalf("Cannot set the payload of the event: %s", err.Error())
}

client.SendEventToAddressable("structured-test-sender", broker.Name, testlib.BrokerTypeMeta, event, sender.WithEncoding(ce.EncodingStructured))
originalEventMatcher := recordevents.MatchEvent(cetest.AllOf(
cetest.HasId(eventID),
))
eventTracker.AssertAtLeast(1, originalEventMatcher)
})

t.Run("Ingress Supports Binary Mode", func(t *testing.T) {
eventID := "Binary-Mode"
event := ce.NewEvent()
event.SetID(eventID)
event.SetType(testlib.DefaultEventType)
event.SetSource("binary.mode.event.sender.test.knative.dev")
body := fmt.Sprintf(`{"msg":%q}`, eventID)
if err := event.SetData(ce.ApplicationJSON, []byte(body)); err != nil {
t.Fatalf("Cannot set the payload of the event: %s", err.Error())
}

client.SendEventToAddressable("binary-test-sender", broker.Name, testlib.BrokerTypeMeta, event, sender.WithEncoding(ce.EncodingBinary))
originalEventMatcher := recordevents.MatchEvent(cetest.AllOf(
cetest.HasId(eventID),
))
eventTracker.AssertAtLeast(1, originalEventMatcher)
})

t.Run("Respond with 2XX on good CE", func(t *testing.T) {
eventID := "2hundred-on-good-ce"
body := fmt.Sprintf(`{"msg":%q}`, eventID)
responseSink := "http://" + client.GetServiceHost(loggerName)
client.SendRequestToAddressable("twohundred-test-sender", broker.Name, testlib.BrokerTypeMeta,
map[string]string{
"ce-specversion": "1.0",
"ce-type": testlib.DefaultEventType,
"ce-source": "2xx.request.sender.test.knative.dev",
"ce-id": eventID,
"content-type": ce.ApplicationJSON,
},
body,
sender.WithResponseSink(responseSink),
)

eventTracker.AssertExact(1, recordevents.MatchEvent(sender.MatchStatusCode(202))) // should probably be a range

})
//Respond with 400 on bad CE
t.Run("Respond with 400 on bad CE", func(t *testing.T) {
eventID := "four-hundred-on-bad-ce"
body := ";la}{kjsdf;oai2095{}{}8234092349807asdfashdf"
responseSink := "http://" + client.GetServiceHost(loggerName)
client.SendRequestToAddressable("fourhundres-test-sender", broker.Name, testlib.BrokerTypeMeta,
map[string]string{
"ce-specversion": "9000.1", //its over 9,000!
"ce-type": testlib.DefaultEventType,
"ce-source": "400.request.sender.test.knative.dev",
"ce-id": eventID,
"content-type": ce.ApplicationJSON,
},
body,
sender.WithResponseSink(responseSink))
eventTracker.AssertExact(1, recordevents.MatchEvent(sender.MatchStatusCode(400)))
})

}

//At consumer
//No upgrade of version
//Attributes received should be the same as produced (attributes may be added)
//Events are filtered
//Events are delivered to multiple subscribers
//Deliveries succeed at least once
//Replies are accepted and delivered
//Replies that are unsuccessfully forwarded cause initial message to be redelivered (Very difficult to test, can be ignored)
func BrokerV1Beta1ConsumerDataPlaneTestHelper(
t *testing.T,
client *testlib.Client,
broker *eventingv1beta1.Broker,
) {
triggerName := "trigger"
secondTriggerName := "second-trigger"
loggerName := "logger-pod"
secondLoggerName := "second-logger-pod"
eventTracker, _ := recordevents.StartEventRecordOrFail(client, loggerName)
defer eventTracker.Cleanup()
secondTracker, _ := recordevents.StartEventRecordOrFail(client, secondLoggerName)
defer secondTracker.Cleanup()
client.WaitForAllTestResourcesReadyOrFail()

trigger := client.CreateTriggerOrFailV1Beta1(
triggerName,
resources.WithBrokerV1Beta1(broker.Name),
resources.WithAttributesTriggerFilterV1Beta1(eventingv1beta1.TriggerAnyFilter, eventingv1beta1.TriggerAnyFilter, nil),
resources.WithSubscriberServiceRefForTriggerV1Beta1(loggerName),
)

client.WaitForResourceReadyOrFail(trigger.Name, testlib.TriggerTypeMeta)
secondTrigger := client.CreateTriggerOrFailV1Beta1(
secondTriggerName,
resources.WithBrokerV1Beta1(broker.Name),
resources.WithAttributesTriggerFilterV1Beta1("filtered-event", eventingv1beta1.TriggerAnyFilter, nil),
resources.WithSubscriberServiceRefForTriggerV1Beta1(secondLoggerName),
)
client.WaitForResourceReadyOrFail(secondTrigger.Name, testlib.TriggerTypeMeta)
eventID := "consumer-broker-tests"
baseSource := "consumer-test-sender"
baseEvent := ce.NewEvent()
baseEvent.SetID(eventID)
baseEvent.SetType(testlib.DefaultEventType)
baseEvent.SetSource(baseSource)
baseEvent.SetSpecVersion("1.0")
body := fmt.Sprintf(`{"msg":%q}`, eventID)
if err := baseEvent.SetData(ce.ApplicationJSON, []byte(body)); err != nil {
t.Fatalf("Cannot set the payload of the baseEvent: %s", err.Error())
}

t.Run("No upgrade of version", func(t *testing.T) {
event := baseEvent
source := "no-upgrade"
event.SetID(source)
event.Context = event.Context.AsV03()

client.SendEventToAddressable(source+"-sender", broker.Name, testlib.BrokerTypeMeta, event, sender.WithEncoding(ce.EncodingStructured))
originalEventMatcher := recordevents.MatchEvent(cetest.AllOf(
cetest.HasSpecVersion("0.3"),
cetest.HasId("no-upgrade"),
))
eventTracker.AssertExact(1, originalEventMatcher)

})

t.Run("Attributes received should be the same as produced (attributes may be added)", func(t *testing.T) {
event := baseEvent
id := "identical-attibutes"
event.SetID(id)
client.SendEventToAddressable(id+"-sender", broker.Name, testlib.BrokerTypeMeta, event)
originalEventMatcher := recordevents.MatchEvent(
cetest.HasId(id),
cetest.HasType(testlib.DefaultEventType),
cetest.HasSource(baseSource),
cetest.HasSpecVersion("1.0"),
)
eventTracker.AssertExact(1, originalEventMatcher)
})

t.Run("Events are filtered", func(t *testing.T) {
event := baseEvent
source := "filtered-event"
event.SetSource(source)
secondEvent := baseEvent

client.SendEventToAddressable("first-"+source+"-sender", broker.Name, testlib.BrokerTypeMeta, event)
client.SendEventToAddressable("second-"+source+"-sender", broker.Name, testlib.BrokerTypeMeta, secondEvent)
filteredEventMatcher := recordevents.MatchEvent(
cetest.HasSource(source),
)
nonEventMatcher := recordevents.MatchEvent(
cetest.HasSource(baseSource),
)
secondTracker.AssertAtLeast(1, filteredEventMatcher)
secondTracker.AssertNot(nonEventMatcher)
})

t.Run("Events are delivered to multiple subscribers", func(t *testing.T) {
event := baseEvent
source := "filtered-event"
event.SetSource(source)
client.SendEventToAddressable(source+"-sender", broker.Name, testlib.BrokerTypeMeta, event)
filteredEventMatcher := recordevents.MatchEvent(
cetest.HasSource(source),
)
eventTracker.AssertAtLeast(1, filteredEventMatcher)
secondTracker.AssertAtLeast(1, filteredEventMatcher)
})

t.Run("Deliveries succeed at least once", func(t *testing.T) {
event := baseEvent
source := "delivery-check"
event.SetSource(source)
client.SendEventToAddressable(source+"-sender", broker.Name, testlib.BrokerTypeMeta, event)
originalEventMatcher := recordevents.MatchEvent(
cetest.HasSource(source),
)
eventTracker.AssertAtLeast(1, originalEventMatcher)
})

t.Run("Replies are accepted and delivered", func(t *testing.T) {
event := baseEvent
source := "origin-for-reply"
event.SetSource(source)
msg := []byte(`{"msg":"Transformed!"}`)
transformPod := resources.EventTransformationPod(
"tranformer-pod",
"reply-check-type",
"reply-check-source",
msg,
)
client.CreatePodOrFail(transformPod, testlib.WithService("transformer-pod"))
transformTrigger := client.CreateTriggerOrFailV1Beta1(
"transform-trigger",
resources.WithBrokerV1Beta1(broker.Name),
resources.WithAttributesTriggerFilterV1Beta1(source, baseEvent.Type(), nil),
resources.WithSubscriberServiceRefForTriggerV1Beta1("transformer-pod"),
)
client.WaitForResourceReadyOrFail(transformTrigger.Name, testlib.TriggerTypeMeta)
transformEventTracker, _ := recordevents.StartEventRecordOrFail(client, "transform-events-logger")
defer transformEventTracker.Cleanup()

replyTrigger := client.CreateTriggerOrFailV1Beta1(
"reply-trigger",
resources.WithBrokerV1Beta1(broker.Name),
resources.WithAttributesTriggerFilterV1Beta1("reply-check-source", "reply-check-type", nil),
resources.WithSubscriberServiceRefForTriggerV1Beta1(loggerName),
)
client.WaitForResourceReadyOrFail(replyTrigger.Name, testlib.TriggerTypeMeta)
client.SendEventToAddressable(source+"-sender", broker.Name, testlib.BrokerTypeMeta, event)
transformedEventMatcher := recordevents.MatchEvent(
cetest.HasSource("reply-check-source"),
cetest.HasType("reply-check-type"),
cetest.HasData(msg),
)
eventTracker.AssertAtLeast(2, transformedEventMatcher)
})
}
Loading