Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make EventType Autocreate Async #7709

Merged
merged 2 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions pkg/broker/ingress/ingress_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {

// EventType auto-create feature handling
if h.EvenTypeHandler != nil {
if err := h.EvenTypeHandler.AutoCreateEventType(ctx, event, toKReference(broker), broker.GetUID()); err != nil {
h.Logger.Error("Even type auto create failed", zap.Error(err))
}
h.EvenTypeHandler.AutoCreateEventType(ctx, event, toKReference(broker), broker.GetUID())
}
}

Expand Down
6 changes: 1 addition & 5 deletions pkg/channel/fanout/fanout_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,7 @@ func (f *FanoutEventHandler) autoCreateEventType(ctx context.Context, evnt event
f.logger.Warn("No channelUID provided, unable to autocreate event type")
return
}
err := f.eventTypeHandler.AutoCreateEventType(ctx, &evnt, f.channelRef, *f.channelUID)
if err != nil {
f.logger.Warn("EventTypeCreate failed")
return
}
f.eventTypeHandler.AutoCreateEventType(ctx, &evnt, f.channelRef, *f.channelUID)
}
}

Expand Down
96 changes: 52 additions & 44 deletions pkg/eventtype/eventtypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"crypto/md5" //nolint:gosec
"fmt"
"time"

"github.com/cloudevents/sdk-go/v2/event"
"go.uber.org/zap"
Expand Down Expand Up @@ -52,55 +53,62 @@ func generateEventTypeName(name, namespace, eventType, eventSource string) strin
}

// AutoCreateEventType creates EventType object based on processed event's types from addressable KReference objects
func (h *EventTypeAutoHandler) AutoCreateEventType(ctx context.Context, event *event.Event, addressable *duckv1.KReference, ownerUID types.UID) error {
func (h *EventTypeAutoHandler) AutoCreateEventType(ctx context.Context, event *event.Event, addressable *duckv1.KReference, ownerUID types.UID) {
// Feature flag gate
if !h.FeatureStore.IsEnabled(feature.EvenTypeAutoCreate) {
h.Logger.Debug("Event Type auto creation is disabled")
return nil
return
}
h.Logger.Debug("Event Types auto creation is enabled")

eventTypeName := generateEventTypeName(addressable.Name, addressable.Namespace, event.Type(), event.Source())

exists, err := h.EventTypeLister.EventTypes(addressable.Namespace).Get(eventTypeName)
if err != nil && !apierrs.IsNotFound(err) {
h.Logger.Error("Failed to retrieve Even Type", zap.Error(err))
return err
}
if exists != nil {
return nil
}

source, _ := apis.ParseURL(event.Source())
schema, _ := apis.ParseURL(event.DataSchema())

et := &v1beta2.EventType{
ObjectMeta: metav1.ObjectMeta{
Name: eventTypeName,
Namespace: addressable.Namespace,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: addressable.APIVersion,
Kind: addressable.Kind,
Name: addressable.Name,
UID: ownerUID,
ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), time.Second*30)
go func() {
h.Logger.Debug("Event Types auto creation is enabled")

eventTypeName := generateEventTypeName(addressable.Name, addressable.Namespace, event.Type(), event.Source())

exists, err := h.EventTypeLister.EventTypes(addressable.Namespace).Get(eventTypeName)
if err != nil && !apierrs.IsNotFound(err) {
h.Logger.Error("Failed to retrieve Even Type", zap.Error(err))
cancel()
return
}
if exists != nil {
cancel()
return
}

source, _ := apis.ParseURL(event.Source())
schema, _ := apis.ParseURL(event.DataSchema())

et := &v1beta2.EventType{
ObjectMeta: metav1.ObjectMeta{
Name: eventTypeName,
Namespace: addressable.Namespace,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: addressable.APIVersion,
Kind: addressable.Kind,
Name: addressable.Name,
UID: ownerUID,
},
},
},
},
Spec: v1beta2.EventTypeSpec{
Type: event.Type(),
Source: source,
Schema: schema,
SchemaData: event.DataSchema(),
Reference: addressable,
Description: "Event Type auto-created by controller",
},
}

_, err = h.EventingClient.EventTypes(et.Namespace).Create(ctx, et, metav1.CreateOptions{})
if err != nil && !apierrs.IsAlreadyExists(err) {
h.Logger.Error("Failed to create Event Type", zap.Error(err))
return err
}
return nil
Spec: v1beta2.EventTypeSpec{
Type: event.Type(),
Source: source,
Schema: schema,
SchemaData: event.DataSchema(),
Reference: addressable,
Description: "Event Type auto-created by controller",
},
}

_, err = h.EventingClient.EventTypes(et.Namespace).Create(ctx, et, metav1.CreateOptions{})
if err != nil && !apierrs.IsAlreadyExists(err) {
h.Logger.Error("Failed to create Event Type", zap.Error(err))
cancel()
return
}
cancel()
}()
}
11 changes: 3 additions & 8 deletions pkg/eventtype/eventtypes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"reflect"
"testing"
"time"

v2 "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
Expand Down Expand Up @@ -94,14 +95,8 @@ func TestEventTypeAutoHandler_AutoCreateEventType(t *testing.T) {

for i, event := range tc.events {

err := handler.AutoCreateEventType(ctx, &event, tc.addressable, ownerUID)
if err != nil {
if tc.expectedError == err {
t.Errorf("test case '%s', expected '%s', got '%s'", tc.name, tc.expectedError, err)
} else {
t.Error(err)
}
}
handler.AutoCreateEventType(ctx, &event, tc.addressable, ownerUID)
time.Sleep(time.Millisecond * 500) // autocreate runs in a different goroutine, need to wait for it to finish

etName := generateEventTypeName(tc.addressable.Name, tc.addressable.Namespace, event.Type(), event.Source())
et, err := eventingClient.EventingV1beta2().EventTypes(tc.addressable.Namespace).Get(ctx, etName, metav1.GetOptions{})
Expand Down
Loading