Skip to content

Commit

Permalink
Make EventType Autocreate Async (#7709)
Browse files Browse the repository at this point in the history
* feat: eventtype autocreate is now async

Signed-off-by: Calum Murray <cmurray@redhat.com>

* cleanup: removed unnecessary return statement

Signed-off-by: Calum Murray <cmurray@redhat.com>

---------

Signed-off-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
Cali0707 authored Mar 5, 2024
1 parent 2645b50 commit 6e7728e
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 60 deletions.
4 changes: 1 addition & 3 deletions pkg/broker/ingress/ingress_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {

// EventType auto-create feature handling
if h.EvenTypeHandler != nil {
if err := h.EvenTypeHandler.AutoCreateEventType(ctx, event, toKReference(broker), broker.GetUID()); err != nil {
h.Logger.Error("Even type auto create failed", zap.Error(err))
}
h.EvenTypeHandler.AutoCreateEventType(ctx, event, toKReference(broker), broker.GetUID())
}
}

Expand Down
6 changes: 1 addition & 5 deletions pkg/channel/fanout/fanout_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,7 @@ func (f *FanoutEventHandler) autoCreateEventType(ctx context.Context, evnt event
f.logger.Warn("No channelUID provided, unable to autocreate event type")
return
}
err := f.eventTypeHandler.AutoCreateEventType(ctx, &evnt, f.channelRef, *f.channelUID)
if err != nil {
f.logger.Warn("EventTypeCreate failed")
return
}
f.eventTypeHandler.AutoCreateEventType(ctx, &evnt, f.channelRef, *f.channelUID)
}
}

Expand Down
96 changes: 52 additions & 44 deletions pkg/eventtype/eventtypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"crypto/md5" //nolint:gosec
"fmt"
"time"

"github.com/cloudevents/sdk-go/v2/event"
"go.uber.org/zap"
Expand Down Expand Up @@ -52,55 +53,62 @@ func generateEventTypeName(name, namespace, eventType, eventSource string) strin
}

// AutoCreateEventType creates EventType object based on processed event's types from addressable KReference objects
func (h *EventTypeAutoHandler) AutoCreateEventType(ctx context.Context, event *event.Event, addressable *duckv1.KReference, ownerUID types.UID) error {
func (h *EventTypeAutoHandler) AutoCreateEventType(ctx context.Context, event *event.Event, addressable *duckv1.KReference, ownerUID types.UID) {
// Feature flag gate
if !h.FeatureStore.IsEnabled(feature.EvenTypeAutoCreate) {
h.Logger.Debug("Event Type auto creation is disabled")
return nil
return
}
h.Logger.Debug("Event Types auto creation is enabled")

eventTypeName := generateEventTypeName(addressable.Name, addressable.Namespace, event.Type(), event.Source())

exists, err := h.EventTypeLister.EventTypes(addressable.Namespace).Get(eventTypeName)
if err != nil && !apierrs.IsNotFound(err) {
h.Logger.Error("Failed to retrieve Even Type", zap.Error(err))
return err
}
if exists != nil {
return nil
}

source, _ := apis.ParseURL(event.Source())
schema, _ := apis.ParseURL(event.DataSchema())

et := &v1beta2.EventType{
ObjectMeta: metav1.ObjectMeta{
Name: eventTypeName,
Namespace: addressable.Namespace,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: addressable.APIVersion,
Kind: addressable.Kind,
Name: addressable.Name,
UID: ownerUID,
ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), time.Second*30)
go func() {
h.Logger.Debug("Event Types auto creation is enabled")

eventTypeName := generateEventTypeName(addressable.Name, addressable.Namespace, event.Type(), event.Source())

exists, err := h.EventTypeLister.EventTypes(addressable.Namespace).Get(eventTypeName)
if err != nil && !apierrs.IsNotFound(err) {
h.Logger.Error("Failed to retrieve Even Type", zap.Error(err))
cancel()
return
}
if exists != nil {
cancel()
return
}

source, _ := apis.ParseURL(event.Source())
schema, _ := apis.ParseURL(event.DataSchema())

et := &v1beta2.EventType{
ObjectMeta: metav1.ObjectMeta{
Name: eventTypeName,
Namespace: addressable.Namespace,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: addressable.APIVersion,
Kind: addressable.Kind,
Name: addressable.Name,
UID: ownerUID,
},
},
},
},
Spec: v1beta2.EventTypeSpec{
Type: event.Type(),
Source: source,
Schema: schema,
SchemaData: event.DataSchema(),
Reference: addressable,
Description: "Event Type auto-created by controller",
},
}

_, err = h.EventingClient.EventTypes(et.Namespace).Create(ctx, et, metav1.CreateOptions{})
if err != nil && !apierrs.IsAlreadyExists(err) {
h.Logger.Error("Failed to create Event Type", zap.Error(err))
return err
}
return nil
Spec: v1beta2.EventTypeSpec{
Type: event.Type(),
Source: source,
Schema: schema,
SchemaData: event.DataSchema(),
Reference: addressable,
Description: "Event Type auto-created by controller",
},
}

_, err = h.EventingClient.EventTypes(et.Namespace).Create(ctx, et, metav1.CreateOptions{})
if err != nil && !apierrs.IsAlreadyExists(err) {
h.Logger.Error("Failed to create Event Type", zap.Error(err))
cancel()
return
}
cancel()
}()
}
11 changes: 3 additions & 8 deletions pkg/eventtype/eventtypes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"reflect"
"testing"
"time"

v2 "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
Expand Down Expand Up @@ -94,14 +95,8 @@ func TestEventTypeAutoHandler_AutoCreateEventType(t *testing.T) {

for i, event := range tc.events {

err := handler.AutoCreateEventType(ctx, &event, tc.addressable, ownerUID)
if err != nil {
if tc.expectedError == err {
t.Errorf("test case '%s', expected '%s', got '%s'", tc.name, tc.expectedError, err)
} else {
t.Error(err)
}
}
handler.AutoCreateEventType(ctx, &event, tc.addressable, ownerUID)
time.Sleep(time.Millisecond * 500) // autocreate runs in a different goroutine, need to wait for it to finish

etName := generateEventTypeName(tc.addressable.Name, tc.addressable.Namespace, event.Type(), event.Source())
et, err := eventingClient.EventingV1beta2().EventTypes(tc.addressable.Namespace).Get(ctx, etName, metav1.GetOptions{})
Expand Down

0 comments on commit 6e7728e

Please sign in to comment.