From 6e7728e9bab25e75178060ca9cb54ba0f59ea9fc Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Tue, 5 Mar 2024 10:14:49 -0500 Subject: [PATCH] Make EventType Autocreate Async (#7709) * feat: eventtype autocreate is now async Signed-off-by: Calum Murray * cleanup: removed unnecessary return statement Signed-off-by: Calum Murray --------- Signed-off-by: Calum Murray --- pkg/broker/ingress/ingress_handler.go | 4 +- pkg/channel/fanout/fanout_event_handler.go | 6 +- pkg/eventtype/eventtypes.go | 96 ++++++++++++---------- pkg/eventtype/eventtypes_test.go | 11 +-- 4 files changed, 57 insertions(+), 60 deletions(-) diff --git a/pkg/broker/ingress/ingress_handler.go b/pkg/broker/ingress/ingress_handler.go index 577c308f930..6219cb92537 100644 --- a/pkg/broker/ingress/ingress_handler.go +++ b/pkg/broker/ingress/ingress_handler.go @@ -278,9 +278,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { // EventType auto-create feature handling if h.EvenTypeHandler != nil { - if err := h.EvenTypeHandler.AutoCreateEventType(ctx, event, toKReference(broker), broker.GetUID()); err != nil { - h.Logger.Error("Even type auto create failed", zap.Error(err)) - } + h.EvenTypeHandler.AutoCreateEventType(ctx, event, toKReference(broker), broker.GetUID()) } } diff --git a/pkg/channel/fanout/fanout_event_handler.go b/pkg/channel/fanout/fanout_event_handler.go index b4700f19afd..6217f90b1e8 100644 --- a/pkg/channel/fanout/fanout_event_handler.go +++ b/pkg/channel/fanout/fanout_event_handler.go @@ -208,11 +208,7 @@ func (f *FanoutEventHandler) autoCreateEventType(ctx context.Context, evnt event f.logger.Warn("No channelUID provided, unable to autocreate event type") return } - err := f.eventTypeHandler.AutoCreateEventType(ctx, &evnt, f.channelRef, *f.channelUID) - if err != nil { - f.logger.Warn("EventTypeCreate failed") - return - } + f.eventTypeHandler.AutoCreateEventType(ctx, &evnt, f.channelRef, *f.channelUID) } } diff --git a/pkg/eventtype/eventtypes.go b/pkg/eventtype/eventtypes.go index d14dda5f1fe..178a1a5e1df 100644 --- a/pkg/eventtype/eventtypes.go +++ b/pkg/eventtype/eventtypes.go @@ -20,6 +20,7 @@ import ( "context" "crypto/md5" //nolint:gosec "fmt" + "time" "github.com/cloudevents/sdk-go/v2/event" "go.uber.org/zap" @@ -52,55 +53,62 @@ func generateEventTypeName(name, namespace, eventType, eventSource string) strin } // AutoCreateEventType creates EventType object based on processed event's types from addressable KReference objects -func (h *EventTypeAutoHandler) AutoCreateEventType(ctx context.Context, event *event.Event, addressable *duckv1.KReference, ownerUID types.UID) error { +func (h *EventTypeAutoHandler) AutoCreateEventType(ctx context.Context, event *event.Event, addressable *duckv1.KReference, ownerUID types.UID) { // Feature flag gate if !h.FeatureStore.IsEnabled(feature.EvenTypeAutoCreate) { h.Logger.Debug("Event Type auto creation is disabled") - return nil + return } - h.Logger.Debug("Event Types auto creation is enabled") - eventTypeName := generateEventTypeName(addressable.Name, addressable.Namespace, event.Type(), event.Source()) - - exists, err := h.EventTypeLister.EventTypes(addressable.Namespace).Get(eventTypeName) - if err != nil && !apierrs.IsNotFound(err) { - h.Logger.Error("Failed to retrieve Even Type", zap.Error(err)) - return err - } - if exists != nil { - return nil - } - - source, _ := apis.ParseURL(event.Source()) - schema, _ := apis.ParseURL(event.DataSchema()) - - et := &v1beta2.EventType{ - ObjectMeta: metav1.ObjectMeta{ - Name: eventTypeName, - Namespace: addressable.Namespace, - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: addressable.APIVersion, - Kind: addressable.Kind, - Name: addressable.Name, - UID: ownerUID, + ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), time.Second*30) + go func() { + h.Logger.Debug("Event Types auto creation is enabled") + + eventTypeName := generateEventTypeName(addressable.Name, addressable.Namespace, event.Type(), event.Source()) + + exists, err := h.EventTypeLister.EventTypes(addressable.Namespace).Get(eventTypeName) + if err != nil && !apierrs.IsNotFound(err) { + h.Logger.Error("Failed to retrieve Even Type", zap.Error(err)) + cancel() + return + } + if exists != nil { + cancel() + return + } + + source, _ := apis.ParseURL(event.Source()) + schema, _ := apis.ParseURL(event.DataSchema()) + + et := &v1beta2.EventType{ + ObjectMeta: metav1.ObjectMeta{ + Name: eventTypeName, + Namespace: addressable.Namespace, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: addressable.APIVersion, + Kind: addressable.Kind, + Name: addressable.Name, + UID: ownerUID, + }, }, }, - }, - Spec: v1beta2.EventTypeSpec{ - Type: event.Type(), - Source: source, - Schema: schema, - SchemaData: event.DataSchema(), - Reference: addressable, - Description: "Event Type auto-created by controller", - }, - } - - _, err = h.EventingClient.EventTypes(et.Namespace).Create(ctx, et, metav1.CreateOptions{}) - if err != nil && !apierrs.IsAlreadyExists(err) { - h.Logger.Error("Failed to create Event Type", zap.Error(err)) - return err - } - return nil + Spec: v1beta2.EventTypeSpec{ + Type: event.Type(), + Source: source, + Schema: schema, + SchemaData: event.DataSchema(), + Reference: addressable, + Description: "Event Type auto-created by controller", + }, + } + + _, err = h.EventingClient.EventTypes(et.Namespace).Create(ctx, et, metav1.CreateOptions{}) + if err != nil && !apierrs.IsAlreadyExists(err) { + h.Logger.Error("Failed to create Event Type", zap.Error(err)) + cancel() + return + } + cancel() + }() } diff --git a/pkg/eventtype/eventtypes_test.go b/pkg/eventtype/eventtypes_test.go index 744d485885e..1f72d9886ce 100644 --- a/pkg/eventtype/eventtypes_test.go +++ b/pkg/eventtype/eventtypes_test.go @@ -20,6 +20,7 @@ import ( "context" "reflect" "testing" + "time" v2 "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/event" @@ -94,14 +95,8 @@ func TestEventTypeAutoHandler_AutoCreateEventType(t *testing.T) { for i, event := range tc.events { - err := handler.AutoCreateEventType(ctx, &event, tc.addressable, ownerUID) - if err != nil { - if tc.expectedError == err { - t.Errorf("test case '%s', expected '%s', got '%s'", tc.name, tc.expectedError, err) - } else { - t.Error(err) - } - } + handler.AutoCreateEventType(ctx, &event, tc.addressable, ownerUID) + time.Sleep(time.Millisecond * 500) // autocreate runs in a different goroutine, need to wait for it to finish etName := generateEventTypeName(tc.addressable.Name, tc.addressable.Namespace, event.Type(), event.Source()) et, err := eventingClient.EventingV1beta2().EventTypes(tc.addressable.Namespace).Get(ctx, etName, metav1.GetOptions{})