diff --git a/test/experimental/delivery_timeout_test.go b/test/experimental/delivery_timeout_test.go index 8a2d1c57187..c9148f330a4 100644 --- a/test/experimental/delivery_timeout_test.go +++ b/test/experimental/delivery_timeout_test.go @@ -21,7 +21,9 @@ package experimental import ( "testing" + "knative.dev/eventing/pkg/apis/eventing" "knative.dev/eventing/test/experimental/features/delivery_timeout" + "knative.dev/eventing/test/rekt/resources/broker" "knative.dev/pkg/system" "knative.dev/reconciler-test/pkg/environment" "knative.dev/reconciler-test/pkg/k8s" @@ -40,4 +42,5 @@ func TestDeliveryTimeout(t *testing.T) { ) env.Test(ctx, t, delivery_timeout.ChannelToSink()) + env.Test(ctx, t, delivery_timeout.BrokerToSink(broker.WithBrokerClass(eventing.MTChannelBrokerClassValue))) } diff --git a/test/experimental/features/delivery_timeout/broker.go b/test/experimental/features/delivery_timeout/broker.go new file mode 100644 index 00000000000..e3585f3cfb7 --- /dev/null +++ b/test/experimental/features/delivery_timeout/broker.go @@ -0,0 +1,114 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package delivery_timeout + +import ( + "context" + "time" + + cetest "github.com/cloudevents/sdk-go/v2/test" + "github.com/rickb777/date/period" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + eventingclient "knative.dev/eventing/pkg/client/injection/client" + "knative.dev/eventing/test/rekt/resources/broker" + "knative.dev/eventing/test/rekt/resources/trigger" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/eventshub/assert" + "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/manifest" +) + +// BrokerToSink tests a scenario where the flow is source -> broker -> sink (timeout) -- fallback to -> dead letter sink +func BrokerToSink(brokerOpts ...manifest.CfgFn) *feature.Feature { + f := feature.NewFeature() + + timeout := 6 * time.Second + // Clocks are funny, let's add 1 second to take in account eventual clock skews + timeoutPeriod, _ := period.NewOf(timeout - time.Second) + timeoutString := timeoutPeriod.String() + + brokerName := feature.MakeRandomK8sName("broker") + triggerName := feature.MakeRandomK8sName("trigger-sink") + sinkName := feature.MakeRandomK8sName("sink") + deadLetterSinkName := feature.MakeRandomK8sName("dead-letter") + sourceName := feature.MakeRandomK8sName("source") + + ev := cetest.FullEvent() + + f.Setup("install sink", eventshub.Install( + sinkName, + eventshub.StartReceiver, + eventshub.ResponseWaitTime(timeout), + )) + f.Setup("install dead letter sink", eventshub.Install( + deadLetterSinkName, + eventshub.StartReceiver, + )) + + f.Setup("Install broker", broker.Install(brokerName, brokerOpts...)) + f.Setup("Broker is ready", broker.IsReady(brokerName)) + + f.Setup("Install broker -> sink subscription", func(ctx context.Context, t feature.T) { + namespace := environment.FromContext(ctx).Namespace() + _, err := eventingclient.Get(ctx).EventingV1().Triggers(namespace).Create(ctx, + &eventingv1.Trigger{ + ObjectMeta: metav1.ObjectMeta{ + Name: triggerName, + Namespace: namespace, + }, + Spec: eventingv1.TriggerSpec{ + Broker: brokerName, + Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: "v1", + Kind: "Service", + Name: sinkName, + }, + }, + Delivery: &eventingduckv1.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: "v1", + Kind: "Service", + Name: deadLetterSinkName, + }, + }, + Timeout: &timeoutString, + }, + }, + }, metav1.CreateOptions{}) + require.NoError(t, err) + }) + + f.Setup("trigger is ready", trigger.IsReady(triggerName)) + + f.Setup("install source", eventshub.Install( + sourceName, + eventshub.StartSenderToResource(broker.GVR(), brokerName), + eventshub.InputEvent(ev), + )) + + f.Assert("receive event on sink", assert.OnStore(sinkName).MatchEvent(cetest.HasId(ev.ID())).Exact(1)) + f.Assert("receive event on dead letter sink", assert.OnStore(deadLetterSinkName).MatchEvent(cetest.HasId(ev.ID())).Exact(1)) + + return f +}