diff --git a/pkg/adapter/apiserver/events/events.go b/pkg/adapter/apiserver/events/events.go index e2e61ed81a2..8d5cc04ffd4 100644 --- a/pkg/adapter/apiserver/events/events.go +++ b/pkg/adapter/apiserver/events/events.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "strings" + "time" kncloudevents "knative.dev/eventing/pkg/adapter/v2" @@ -133,6 +134,7 @@ func makeEvent(source, eventType string, obj *unstructured.Unstructured, data in ResourceGroup: resourceGroup, } ctx = kncloudevents.ContextWithMetricTag(ctx, metricTag) + ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, 50*time.Millisecond, 5) return ctx, event, nil } diff --git a/test/rekt/apiserversource_test.go b/test/rekt/apiserversource_test.go index 4ba65d6aca6..b441cc87795 100644 --- a/test/rekt/apiserversource_test.go +++ b/test/rekt/apiserversource_test.go @@ -103,3 +103,17 @@ func TestApiServerSourceDataPlane_ResourceMatching(t *testing.T) { env.TestSet(ctx, t, apiserversourcefeatures.DataPlane_ResourceMatching()) } + +func TestApiServerSourceDataPlane_EventsRetries(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + ) + + env.Test(ctx, t, apiserversourcefeatures.SendsEventsWithRetries()) +} diff --git a/test/rekt/features/apiserversource/data_plane.go b/test/rekt/features/apiserversource/data_plane.go index 7d1ad91752c..964e02d50d1 100644 --- a/test/rekt/features/apiserversource/data_plane.go +++ b/test/rekt/features/apiserversource/data_plane.go @@ -447,3 +447,66 @@ func setupAccountAndRoleForPods(sacmName string) feature.StepFn { // return nil // } //} + +func SendsEventsWithRetries() *feature.Feature { + source := feature.MakeRandomK8sName("apiserversource") + sink := feature.MakeRandomK8sName("sink") + + f := feature.NewFeatureNamed("Send events with retries") + + // drop first event to see the retry feature works or not + f.Setup("install sink", + eventshub.Install(sink, + eventshub.StartReceiver, + eventshub.DropFirstN(1), + eventshub.DropEventsResponseCode(429), + ), + ) + + sacmName := feature.MakeRandomK8sName("apiserversource") + f.Setup("Create Service Account for ApiServerSource with RBAC for v1.Pod resources", + setupAccountAndRoleForPods(sacmName)) + + f.Setup("install ApiServerSource", func(ctx context.Context, t feature.T) { + sinkuri, err := svc.Address(ctx, sink) + if err != nil || sinkuri == nil { + t.Fatal("failed to get the address of the sink service", sink, err) + } + + cfg := []manifest.CfgFn{ + apiserversource.WithServiceAccountName(sacmName), + apiserversource.WithEventMode(v1.ReferenceMode), + apiserversource.WithSink(nil, sinkuri.String()), + apiserversource.WithResources(v1.APIVersionKindSelector{ + APIVersion: "v1", + Kind: "Pod", + LabelSelector: &metav1.LabelSelector{MatchLabels: map[string]string{"e2e": "testing"}}, + }), + } + apiserversource.Install(source, cfg...)(ctx, t) + }) + f.Setup("ApiServerSource goes ready", apiserversource.IsReady(source)) + + examplePodName := feature.MakeRandomK8sName("example") + + // create a pod so that ApiServerSource delivers an event to its sink + // event body is similar to this: + // {"kind":"Pod","namespace":"test-wmbcixlv","name":"example-axvlzbvc","apiVersion":"v1"} + f.Requirement("install example pod", + pod.Install(examplePodName, + pod.WithImage(exampleImage), + pod.WithLabels(map[string]string{"e2e": "testing"})), + ) + + f.Stable("ApiServerSource as event source"). + Must("delivers events", + eventasssert.OnStore(sink).Match( + eventasssert.MatchKind(eventasssert.EventReceived), + eventasssert.MatchEvent( + test.HasType("dev.knative.apiserver.ref.add"), + test.DataContains(`"kind":"Pod"`), + test.DataContains(fmt.Sprintf(`"name":"%s"`, examplePodName)), + ), + ).AtLeast(1)) + return f +}