diff --git a/apis/eventing/v1alpha1/cloudeventsource_types.go b/apis/eventing/v1alpha1/cloudeventsource_types.go index 2bfb97b928a..de60ef7a270 100644 --- a/apis/eventing/v1alpha1/cloudeventsource_types.go +++ b/apis/eventing/v1alpha1/cloudeventsource_types.go @@ -18,10 +18,32 @@ package v1alpha1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" v1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" ) +type CloudEventSourceInterface interface { + client.Object + GenerateIdentifier() string + GetSpec() *CloudEventSourceSpec + GetStatus() *CloudEventSourceStatus +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// ClusterCloudEventSource defines how a KEDA event will be sent to event sink on cluster level +// +kubebuilder:resource:path=clustercloudeventsources,scope=Cluster +// +kubebuilder:subresource:status +// +kubebuilder:printcolumn:name="Active",type="string",JSONPath=".status.conditions[?(@.type==\"Active\")].status" +type ClusterCloudEventSource struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec CloudEventSourceSpec `json:"spec"` + Status CloudEventSourceStatus `json:"status,omitempty"` +} + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // CloudEventSource defines how a KEDA event will be sent to event sink @@ -38,6 +60,15 @@ type CloudEventSource struct { // +kubebuilder:object:root=true +// ClusterCloudEventSourceList is a list of ClusterCloudEventSource resources +type ClusterCloudEventSourceList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata"` + Items []ClusterCloudEventSource `json:"items"` +} + +// +kubebuilder:object:root=true + // CloudEventSourceList is a list of CloudEventSource resources type CloudEventSourceList struct { metav1.TypeMeta `json:",inline"` @@ -94,6 +125,7 @@ type EventSubscription struct { func init() { SchemeBuilder.Register(&CloudEventSource{}, &CloudEventSourceList{}) + SchemeBuilder.Register(&ClusterCloudEventSource{}, &ClusterCloudEventSourceList{}) } // GenerateIdentifier returns identifier for the object in for "kind.namespace.name" @@ -101,6 +133,17 @@ func (ces *CloudEventSource) GenerateIdentifier() string { return v1alpha1.GenerateIdentifier("CloudEventSource", ces.Namespace, ces.Name) } +func (ces *CloudEventSource) GetSpec() *CloudEventSourceSpec { return &ces.Spec } +func (ces *CloudEventSource) GetStatus() *CloudEventSourceStatus { return &ces.Status } + +// GenerateIdentifier returns identifier for the object in for "kind.namespace.name" +func (ces *ClusterCloudEventSource) GenerateIdentifier() string { + return v1alpha1.GenerateIdentifier("ClusterCloudEventSource", "cluster-scoped", ces.Name) +} + +func (ces *ClusterCloudEventSource) GetSpec() *CloudEventSourceSpec { return &ces.Spec } +func (ces *ClusterCloudEventSource) GetStatus() *CloudEventSourceStatus { return &ces.Status } + // GetCloudEventSourceInitializedConditions returns CloudEventSource Conditions initialized to the default -> Status: Unknown func GetCloudEventSourceInitializedConditions() *v1alpha1.Conditions { return &v1alpha1.Conditions{{Type: v1alpha1.ConditionActive, Status: metav1.ConditionUnknown}} diff --git a/apis/eventing/v1alpha1/zz_generated.deepcopy.go b/apis/eventing/v1alpha1/zz_generated.deepcopy.go index f38b11df437..114764d49ba 100644 --- a/apis/eventing/v1alpha1/zz_generated.deepcopy.go +++ b/apis/eventing/v1alpha1/zz_generated.deepcopy.go @@ -156,6 +156,65 @@ func (in *CloudEventSourceStatus) DeepCopy() *CloudEventSourceStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterCloudEventSource) DeepCopyInto(out *ClusterCloudEventSource) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterCloudEventSource. +func (in *ClusterCloudEventSource) DeepCopy() *ClusterCloudEventSource { + if in == nil { + return nil + } + out := new(ClusterCloudEventSource) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ClusterCloudEventSource) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterCloudEventSourceList) DeepCopyInto(out *ClusterCloudEventSourceList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]ClusterCloudEventSource, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterCloudEventSourceList. +func (in *ClusterCloudEventSourceList) DeepCopy() *ClusterCloudEventSourceList { + if in == nil { + return nil + } + out := new(ClusterCloudEventSourceList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ClusterCloudEventSourceList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Destination) DeepCopyInto(out *Destination) { *out = *in diff --git a/config/crd/bases/eventing.keda.sh_clustercloudeventsources.yaml b/config/crd/bases/eventing.keda.sh_clustercloudeventsources.yaml new file mode 100644 index 00000000000..1ae963db15f --- /dev/null +++ b/config/crd/bases/eventing.keda.sh_clustercloudeventsources.yaml @@ -0,0 +1,140 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.14.0 + name: clustercloudeventsources.eventing.keda.sh +spec: + group: eventing.keda.sh + names: + kind: ClusterCloudEventSource + listKind: ClusterCloudEventSourceList + plural: clustercloudeventsources + singular: clustercloudeventsource + scope: Cluster + versions: + - additionalPrinterColumns: + - jsonPath: .status.conditions[?(@.type=="Active")].status + name: Active + type: string + name: v1alpha1 + schema: + openAPIV3Schema: + description: ClusterCloudEventSource defines how a KEDA event will be sent + to event sink on cluster level + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: CloudEventSourceSpec defines the spec of CloudEventSource + properties: + authenticationRef: + description: |- + AuthenticationRef points to the TriggerAuthentication or ClusterTriggerAuthentication object that + is used to authenticate the scaler with the environment + properties: + kind: + description: Kind of the resource being referred to. Defaults + to TriggerAuthentication. + type: string + name: + type: string + required: + - name + type: object + clusterName: + type: string + destination: + description: Destination defines the various ways to emit events + properties: + azureEventGridTopic: + properties: + endpoint: + type: string + required: + - endpoint + type: object + http: + properties: + uri: + type: string + required: + - uri + type: object + type: object + eventSubscription: + description: EventSubscription defines filters for events + properties: + excludedEventTypes: + items: + description: CloudEventType contains the list of cloudevent + types + enum: + - keda.scaledobject.ready.v1 + - keda.scaledobject.failed.v1 + type: string + type: array + includedEventTypes: + items: + description: CloudEventType contains the list of cloudevent + types + enum: + - keda.scaledobject.ready.v1 + - keda.scaledobject.failed.v1 + type: string + type: array + type: object + required: + - destination + type: object + status: + description: CloudEventSourceStatus defines the observed state of CloudEventSource + properties: + conditions: + description: Conditions an array representation to store multiple + Conditions + items: + description: Condition to store the condition state + properties: + message: + description: A human readable message indicating details about + the transition. + type: string + reason: + description: The reason for the condition's last transition. + type: string + status: + description: Status of the condition, one of True, False, Unknown. + type: string + type: + description: Type of condition + type: string + required: + - status + - type + type: object + type: array + type: object + required: + - spec + type: object + served: true + storage: true + subresources: + status: {} diff --git a/controllers/eventing/cloudeventsource_controller.go b/controllers/eventing/cloudeventsource_controller.go index 62c972076fd..3aea7db8b7f 100644 --- a/controllers/eventing/cloudeventsource_controller.go +++ b/controllers/eventing/cloudeventsource_controller.go @@ -26,6 +26,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -64,7 +65,12 @@ func (r *CloudEventSourceReconciler) Reconcile(ctx context.Context, req ctrl.Req reqLogger := log.FromContext(ctx) // Fetch the EventSource instance - cloudEventSource := &eventingv1alpha1.CloudEventSource{} + var cloudEventSource eventingv1alpha1.CloudEventSourceInterface + if req.Namespace != "" { + cloudEventSource = &eventingv1alpha1.CloudEventSource{} + } else { + cloudEventSource = &eventingv1alpha1.ClusterCloudEventSource{} + } err := r.Client.Get(ctx, req.NamespacedName, cloudEventSource) if err != nil { if errors.IsNotFound(err) { @@ -91,7 +97,7 @@ func (r *CloudEventSourceReconciler) Reconcile(ctx context.Context, req ctrl.Req } // ensure Status Conditions are initialized - if !cloudEventSource.Status.Conditions.AreInitialized() { + if !cloudEventSource.GetStatus().Conditions.AreInitialized() { conditions := eventingv1alpha1.GetCloudEventSourceInitializedConditions() if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, cloudEventSource, conditions); err != nil { return ctrl.Result{}, err @@ -114,14 +120,25 @@ func (r *CloudEventSourceReconciler) Reconcile(ctx context.Context, req ctrl.Req // SetupWithManager sets up the controller with the Manager. func (r *CloudEventSourceReconciler) SetupWithManager(mgr ctrl.Manager) error { + nsFilter := util.IgnoreOtherNamespaces() + filter := predicate.Funcs{ + GenericFunc: func(e event.GenericEvent) bool { + if e.Object.GetNamespace() == "" { + return true + } + return nsFilter.Generic(e) + }, + } + return ctrl.NewControllerManagedBy(mgr). + For(&eventingv1alpha1.ClusterCloudEventSource{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). For(&eventingv1alpha1.CloudEventSource{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). - WithEventFilter(util.IgnoreOtherNamespaces()). + WithEventFilter(filter). Complete(r) } // requestEventLoop tries to start EventLoop handler for the respective EventSource -func (r *CloudEventSourceReconciler) requestEventLoop(ctx context.Context, logger logr.Logger, eventSource *eventingv1alpha1.CloudEventSource) error { +func (r *CloudEventSourceReconciler) requestEventLoop(ctx context.Context, logger logr.Logger, eventSource eventingv1alpha1.CloudEventSourceInterface) error { logger.V(1).Info("Notify eventHandler of an update in eventSource") key, err := cache.MetaNamespaceKeyFunc(eventSource) @@ -135,13 +152,13 @@ func (r *CloudEventSourceReconciler) requestEventLoop(ctx context.Context, logge } // store CloudEventSource's current Generation - r.cloudEventSourceGenerations.Store(key, eventSource.Generation) + r.cloudEventSourceGenerations.Store(key, eventSource.GetGeneration()) return nil } // stopEventLoop stops EventLoop handler for the respective EventSource -func (r *CloudEventSourceReconciler) stopEventLoop(logger logr.Logger, eventSource *eventingv1alpha1.CloudEventSource) error { +func (r *CloudEventSourceReconciler) stopEventLoop(logger logr.Logger, eventSource eventingv1alpha1.CloudEventSourceInterface) error { key, err := cache.MetaNamespaceKeyFunc(eventSource) if err != nil { logger.Error(err, "error getting key for eventSource") @@ -157,7 +174,7 @@ func (r *CloudEventSourceReconciler) stopEventLoop(logger logr.Logger, eventSour } // eventSourceGenerationChanged returns true if CloudEventSource's Generation was changed, ie. EventSource.Spec was changed -func (r *CloudEventSourceReconciler) cloudEventSourceGenerationChanged(logger logr.Logger, eventSource *eventingv1alpha1.CloudEventSource) (bool, error) { +func (r *CloudEventSourceReconciler) cloudEventSourceGenerationChanged(logger logr.Logger, eventSource eventingv1alpha1.CloudEventSourceInterface) (bool, error) { key, err := cache.MetaNamespaceKeyFunc(eventSource) if err != nil { logger.Error(err, "error getting key for eventSource") @@ -167,14 +184,14 @@ func (r *CloudEventSourceReconciler) cloudEventSourceGenerationChanged(logger lo value, loaded := r.cloudEventSourceGenerations.Load(key) if loaded { generation := value.(int64) - if generation == eventSource.Generation { + if generation == eventSource.GetGeneration() { return false, nil } } return true, nil } -func (r *CloudEventSourceReconciler) updatePromMetrics(eventSource *eventingv1alpha1.CloudEventSource, namespacedName string) { +func (r *CloudEventSourceReconciler) updatePromMetrics(eventSource eventingv1alpha1.CloudEventSourceInterface, namespacedName string) { r.eventSourcePromMetricsLock.Lock() defer r.eventSourcePromMetricsLock.Unlock() @@ -182,8 +199,8 @@ func (r *CloudEventSourceReconciler) updatePromMetrics(eventSource *eventingv1al metricscollector.DecrementCRDTotal(metricscollector.CloudEventSourceResource, ns) } - metricscollector.IncrementCRDTotal(metricscollector.CloudEventSourceResource, eventSource.Namespace) - r.eventSourcePromMetricsMap[namespacedName] = eventSource.Namespace + metricscollector.IncrementCRDTotal(metricscollector.CloudEventSourceResource, eventSource.GetNamespace()) + r.eventSourcePromMetricsMap[namespacedName] = eventSource.GetNamespace() } // UpdatePromMetricsOnDelete is idempotent, so it can be called multiple times without side-effects diff --git a/controllers/eventing/cloudeventsource_finalizer.go b/controllers/eventing/cloudeventsource_finalizer.go index 03da520188a..6409d1b6db5 100644 --- a/controllers/eventing/cloudeventsource_finalizer.go +++ b/controllers/eventing/cloudeventsource_finalizer.go @@ -31,9 +31,9 @@ const ( cloudEventSourceResourceType = "cloudEventSource" ) -func (r *CloudEventSourceReconciler) EnsureEventSourceResourceFinalizer(ctx context.Context, logger logr.Logger, cloudEventSource *eventingv1alpha1.CloudEventSource) error { +func (r *CloudEventSourceReconciler) EnsureEventSourceResourceFinalizer(ctx context.Context, logger logr.Logger, cloudEventSource eventingv1alpha1.CloudEventSourceInterface) error { if !util.Contains(cloudEventSource.GetFinalizers(), cloudEventSourceFinalizer) { - logger.Info(fmt.Sprintf("Adding Finalizer to %s %s/%s", cloudEventSourceResourceType, cloudEventSource.Namespace, cloudEventSource.Name)) + logger.Info(fmt.Sprintf("Adding Finalizer to %s %s/%s", cloudEventSourceResourceType, cloudEventSource.GetNamespace(), cloudEventSource.GetName())) cloudEventSource.SetFinalizers(append(cloudEventSource.GetFinalizers(), cloudEventSourceFinalizer)) // Update CR @@ -46,7 +46,7 @@ func (r *CloudEventSourceReconciler) EnsureEventSourceResourceFinalizer(ctx cont return nil } -func (r *CloudEventSourceReconciler) FinalizeEventSourceResource(ctx context.Context, logger logr.Logger, cloudEventSource *eventingv1alpha1.CloudEventSource, namespacedName string) error { +func (r *CloudEventSourceReconciler) FinalizeEventSourceResource(ctx context.Context, logger logr.Logger, cloudEventSource eventingv1alpha1.CloudEventSourceInterface, namespacedName string) error { if util.Contains(cloudEventSource.GetFinalizers(), cloudEventSourceFinalizer) { if err := r.stopEventLoop(logger, cloudEventSource); err != nil { return err diff --git a/pkg/eventemitter/eventemitter.go b/pkg/eventemitter/eventemitter.go index 959156efc1a..ef02630b1d1 100644 --- a/pkg/eventemitter/eventemitter.go +++ b/pkg/eventemitter/eventemitter.go @@ -32,9 +32,7 @@ import ( "time" "github.com/go-logr/logr" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/record" @@ -71,9 +69,9 @@ type EventEmitter struct { // EventHandler defines the behavior for EventEmitter clients type EventHandler interface { - DeleteCloudEventSource(cloudEventSource *eventingv1alpha1.CloudEventSource) error - HandleCloudEventSource(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource) error - Emit(object runtime.Object, namesapce types.NamespacedName, eventType string, cloudeventType eventingv1alpha1.CloudEventType, reason string, message string) + DeleteCloudEventSource(cloudEventSource eventingv1alpha1.CloudEventSourceInterface) error + HandleCloudEventSource(ctx context.Context, cloudEventSource eventingv1alpha1.CloudEventSourceInterface) error + Emit(object client.Object, namesapce types.NamespacedName, eventType string, cloudeventType eventingv1alpha1.CloudEventType, reason string, message string) } // EventDataHandler defines the behavior for different event handlers @@ -112,17 +110,17 @@ func NewEventEmitter(client client.Client, recorder record.EventRecorder, cluste } } -func initializeLogger(cloudEventSource *eventingv1alpha1.CloudEventSource, cloudEventSourceEmitterName string) logr.Logger { - return logf.Log.WithName(cloudEventSourceEmitterName).WithValues("type", cloudEventSource.Kind, "namespace", cloudEventSource.Namespace, "name", cloudEventSource.Name) +func initializeLogger(cloudEventSource eventingv1alpha1.CloudEventSourceInterface, cloudEventSourceEmitterName string) logr.Logger { + return logf.Log.WithName(cloudEventSourceEmitterName).WithValues("type", cloudEventSource.GetObjectKind().GroupVersionKind().Kind, "namespace", cloudEventSource.GetNamespace(), "name", cloudEventSource.GetName()) } // HandleCloudEventSource will create CloudEventSource handlers that defined in spec and start an event loop once handlers // are created successfully. -func (e *EventEmitter) HandleCloudEventSource(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource) error { +func (e *EventEmitter) HandleCloudEventSource(ctx context.Context, cloudEventSource eventingv1alpha1.CloudEventSourceInterface) error { e.createEventHandlers(ctx, cloudEventSource) if !e.checkIfEventHandlersExist(cloudEventSource) { - return fmt.Errorf("no CloudEventSource handler is created for %s/%s", cloudEventSource.Namespace, cloudEventSource.Name) + return fmt.Errorf("no CloudEventSource handler is created for %s/%s", cloudEventSource.GetNamespace(), cloudEventSource.GetName()) } key := cloudEventSource.GenerateIdentifier() @@ -148,12 +146,12 @@ func (e *EventEmitter) HandleCloudEventSource(ctx context.Context, cloudEventSou // passing deep copy of CloudEventSource to the eventLoop go routines, it's a precaution to not have global objects shared between threads e.log.V(1).Info("Start CloudEventSource loop.") - go e.startEventLoop(cancelCtx, cloudEventSource.DeepCopy(), eventingMutex) + go e.startEventLoop(cancelCtx, cloudEventSource.DeepCopyObject().(eventingv1alpha1.CloudEventSourceInterface), eventingMutex) return nil } // DeleteCloudEventSource will stop the event loop and clean event handlers in cache. -func (e *EventEmitter) DeleteCloudEventSource(cloudEventSource *eventingv1alpha1.CloudEventSource) error { +func (e *EventEmitter) DeleteCloudEventSource(cloudEventSource eventingv1alpha1.CloudEventSourceInterface) error { key := cloudEventSource.GenerateIdentifier() result, ok := e.eventLoopContexts.Load(key) if ok { @@ -172,7 +170,7 @@ func (e *EventEmitter) DeleteCloudEventSource(cloudEventSource *eventingv1alpha1 // createEventHandlers will create different handler as defined in CloudEventSource, and store them in cache for repeated // use in the loop. -func (e *EventEmitter) createEventHandlers(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource) { +func (e *EventEmitter) createEventHandlers(ctx context.Context, cloudEventSource eventingv1alpha1.CloudEventSourceInterface) { e.eventHandlersCacheLock.Lock() e.eventFilterCacheLock.Lock() defer e.eventHandlersCacheLock.Unlock() @@ -180,24 +178,24 @@ func (e *EventEmitter) createEventHandlers(ctx context.Context, cloudEventSource key := cloudEventSource.GenerateIdentifier() - clusterName := cloudEventSource.Spec.ClusterName + clusterName := cloudEventSource.GetSpec().ClusterName if clusterName == "" { clusterName = e.clusterName } // Resolve auth related - authParams, podIdentity, err := resolver.ResolveAuthRefAndPodIdentity(ctx, e.client, e.log, cloudEventSource.Spec.AuthenticationRef, nil, cloudEventSource.Namespace, e.secretsLister) + authParams, podIdentity, err := resolver.ResolveAuthRefAndPodIdentity(ctx, e.client, e.log, cloudEventSource.GetSpec().AuthenticationRef, nil, cloudEventSource.GetNamespace(), e.secretsLister) if err != nil { e.log.Error(err, "error resolving auth params", "cloudEventSource", cloudEventSource) return } // Create EventFilter from CloudEventSource - e.eventFilterCache[key] = NewEventFilter(cloudEventSource.Spec.EventSubscription.IncludedEventTypes, cloudEventSource.Spec.EventSubscription.ExcludedEventTypes) + e.eventFilterCache[key] = NewEventFilter(cloudEventSource.GetSpec().EventSubscription.IncludedEventTypes, cloudEventSource.GetSpec().EventSubscription.ExcludedEventTypes) // Create different event destinations here - if cloudEventSource.Spec.Destination.HTTP != nil { - eventHandler, err := NewCloudEventHTTPHandler(ctx, clusterName, cloudEventSource.Spec.Destination.HTTP.URI, initializeLogger(cloudEventSource, "cloudevent_http")) + if cloudEventSource.GetSpec().Destination.HTTP != nil { + eventHandler, err := NewCloudEventHTTPHandler(ctx, clusterName, cloudEventSource.GetSpec().Destination.HTTP.URI, initializeLogger(cloudEventSource, "cloudevent_http")) if err != nil { e.log.Error(err, "create CloudEvent HTTP handler failed") return @@ -211,8 +209,8 @@ func (e *EventEmitter) createEventHandlers(ctx context.Context, cloudEventSource return } - if cloudEventSource.Spec.Destination.AzureEventGridTopic != nil { - eventHandler, err := NewAzureEventGridTopicHandler(ctx, clusterName, cloudEventSource.Spec.Destination.AzureEventGridTopic, authParams, podIdentity, initializeLogger(cloudEventSource, "azure_event_grid_topic")) + if cloudEventSource.GetSpec().Destination.AzureEventGridTopic != nil { + eventHandler, err := NewAzureEventGridTopicHandler(ctx, clusterName, cloudEventSource.GetSpec().Destination.AzureEventGridTopic, authParams, podIdentity, initializeLogger(cloudEventSource, "azure_event_grid_topic")) if err != nil { e.log.Error(err, "create Azure Event Grid handler failed") return @@ -226,11 +224,11 @@ func (e *EventEmitter) createEventHandlers(ctx context.Context, cloudEventSource return } - e.log.Info("No destionation is defined in CloudEventSource", "CloudEventSource", cloudEventSource.Name) + e.log.Info("No destionation is defined in CloudEventSource", "CloudEventSource", cloudEventSource.GetName()) } // clearEventHandlersCache will clear all event handlers that created by the passing CloudEventSource -func (e *EventEmitter) clearEventHandlersCache(cloudEventSource *eventingv1alpha1.CloudEventSource) { +func (e *EventEmitter) clearEventHandlersCache(cloudEventSource eventingv1alpha1.CloudEventSourceInterface) { e.eventHandlersCacheLock.Lock() defer e.eventHandlersCacheLock.Unlock() e.eventFilterCacheLock.Lock() @@ -241,7 +239,7 @@ func (e *EventEmitter) clearEventHandlersCache(cloudEventSource *eventingv1alpha delete(e.eventFilterCache, key) // Clear different event destination here. - if cloudEventSource.Spec.Destination.HTTP != nil { + if cloudEventSource.GetSpec().Destination.HTTP != nil { eventHandlerKey := newEventHandlerKey(key, cloudEventHandlerTypeHTTP) if eventHandler, found := e.eventHandlersCache[eventHandlerKey]; found { eventHandler.CloseHandler() @@ -249,7 +247,7 @@ func (e *EventEmitter) clearEventHandlersCache(cloudEventSource *eventingv1alpha } } - if cloudEventSource.Spec.Destination.AzureEventGridTopic != nil { + if cloudEventSource.GetSpec().Destination.AzureEventGridTopic != nil { eventHandlerKey := newEventHandlerKey(key, cloudEventHandlerTypeAzureEventGridTopic) if eventHandler, found := e.eventHandlersCache[eventHandlerKey]; found { eventHandler.CloseHandler() @@ -259,7 +257,7 @@ func (e *EventEmitter) clearEventHandlersCache(cloudEventSource *eventingv1alpha } // checkIfEventHandlersExist will check if the event handlers that were created by passing CloudEventSource exist -func (e *EventEmitter) checkIfEventHandlersExist(cloudEventSource *eventingv1alpha1.CloudEventSource) bool { +func (e *EventEmitter) checkIfEventHandlersExist(cloudEventSource eventingv1alpha1.CloudEventSourceInterface) bool { e.eventHandlersCacheLock.RLock() defer e.eventHandlersCacheLock.RUnlock() @@ -273,40 +271,40 @@ func (e *EventEmitter) checkIfEventHandlersExist(cloudEventSource *eventingv1alp return false } -func (e *EventEmitter) startEventLoop(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource, cloudEventSourceMutex sync.Locker) { +func (e *EventEmitter) startEventLoop(ctx context.Context, cloudEventSource eventingv1alpha1.CloudEventSourceInterface, cloudEventSourceMutex sync.Locker) { for { select { case eventData := <-e.cloudEventProcessingChan: e.log.V(1).Info("Consuming events from CloudEventSource.") e.emitEventByHandler(eventData) e.checkEventHandlers(ctx, cloudEventSource, cloudEventSourceMutex) - metricscollector.RecordCloudEventQueueStatus(cloudEventSource.Namespace, len(e.cloudEventProcessingChan)) + metricscollector.RecordCloudEventQueueStatus(cloudEventSource.GetNamespace(), len(e.cloudEventProcessingChan)) case <-ctx.Done(): e.log.V(1).Info("CloudEventSource loop has stopped.") - metricscollector.RecordCloudEventQueueStatus(cloudEventSource.Namespace, len(e.cloudEventProcessingChan)) + metricscollector.RecordCloudEventQueueStatus(cloudEventSource.GetNamespace(), len(e.cloudEventProcessingChan)) return } } } // checkEventHandlers will check each eventhandler active status -func (e *EventEmitter) checkEventHandlers(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource, cloudEventSourceMutex sync.Locker) { +func (e *EventEmitter) checkEventHandlers(ctx context.Context, cloudEventSource eventingv1alpha1.CloudEventSourceInterface, cloudEventSourceMutex sync.Locker) { e.log.V(1).Info("Checking event handlers status.") cloudEventSourceMutex.Lock() defer cloudEventSourceMutex.Unlock() // Get the latest object - err := e.client.Get(ctx, types.NamespacedName{Name: cloudEventSource.Name, Namespace: cloudEventSource.Namespace}, cloudEventSource) + err := e.client.Get(ctx, types.NamespacedName{Name: cloudEventSource.GetName(), Namespace: cloudEventSource.GetNamespace()}, cloudEventSource) if err != nil { e.log.Error(err, "error getting cloudEventSource", "cloudEventSource", cloudEventSource) return } keyPrefix := cloudEventSource.GenerateIdentifier() needUpdate := false - cloudEventSourceStatus := cloudEventSource.Status.DeepCopy() + cloudEventSourceStatus := cloudEventSource.GetStatus().DeepCopy() for k, v := range e.eventHandlersCache { - e.log.V(1).Info("Checking event handler status.", "handler", k, "status", cloudEventSource.Status.Conditions.GetActiveCondition().Status) + e.log.V(1).Info("Checking event handler status.", "handler", k, "status", cloudEventSource.GetStatus().Conditions.GetActiveCondition().Status) if strings.Contains(k, keyPrefix) { - if v.GetActiveStatus() != cloudEventSource.Status.Conditions.GetActiveCondition().Status { + if v.GetActiveStatus() != cloudEventSource.GetStatus().Conditions.GetActiveCondition().Status { needUpdate = true cloudEventSourceStatus.Conditions.SetActiveCondition( metav1.ConditionFalse, @@ -325,7 +323,7 @@ func (e *EventEmitter) checkEventHandlers(ctx context.Context, cloudEventSource } // Emit is emitting event to both local kubernetes and custom CloudEventSource handler. After emit event to local kubernetes, event will inqueue and waitng for handler's consuming. -func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedName, eventType string, cloudeventType eventingv1alpha1.CloudEventType, reason, message string) { +func (e *EventEmitter) Emit(object client.Object, namesapce types.NamespacedName, eventType string, cloudeventType eventingv1alpha1.CloudEventType, reason, message string) { e.recorder.Event(object, eventType, reason, message) e.eventHandlersCacheLock.RLock() @@ -334,13 +332,11 @@ func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedNam return } - objectName, _ := meta.NewAccessor().Name(object) - objectType, _ := meta.NewAccessor().Kind(object) eventData := eventdata.EventData{ Namespace: namesapce.Namespace, CloudEventType: cloudeventType, - ObjectName: strings.ToLower(objectName), - ObjectType: strings.ToLower(objectType), + ObjectName: strings.ToLower(object.GetName()), + ObjectType: strings.ToLower(object.GetObjectKind().GroupVersionKind().Kind), Reason: reason, Message: message, Time: time.Now().UTC(), @@ -425,8 +421,8 @@ func (e *EventEmitter) emitErrorHandle(eventData eventdata.EventData, err error) e.enqueueEventData(requeueData) } -func (e *EventEmitter) setCloudEventSourceStatusActive(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource) error { - cloudEventSourceStatus := cloudEventSource.Status.DeepCopy() +func (e *EventEmitter) setCloudEventSourceStatusActive(ctx context.Context, cloudEventSource eventingv1alpha1.CloudEventSourceInterface) error { + cloudEventSourceStatus := cloudEventSource.GetStatus().DeepCopy() cloudEventSourceStatus.Conditions.SetActiveCondition( metav1.ConditionTrue, eventingv1alpha1.CloudEventSourceConditionActiveReason, @@ -435,8 +431,8 @@ func (e *EventEmitter) setCloudEventSourceStatusActive(ctx context.Context, clou return e.updateCloudEventSourceStatus(ctx, cloudEventSource, cloudEventSourceStatus) } -func (e *EventEmitter) updateCloudEventSourceStatus(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource, cloudEventSourceStatus *eventingv1alpha1.CloudEventSourceStatus) error { - e.log.V(1).Info("Updating CloudEventSource status", "CloudEventSource", cloudEventSource.Name) +func (e *EventEmitter) updateCloudEventSourceStatus(ctx context.Context, cloudEventSource eventingv1alpha1.CloudEventSourceInterface, cloudEventSourceStatus *eventingv1alpha1.CloudEventSourceStatus) error { + e.log.V(1).Info("Updating CloudEventSource status", "CloudEventSource", cloudEventSource.GetName()) transform := func(runtimeObj client.Object, target interface{}) error { status, ok := target.(*eventingv1alpha1.CloudEventSourceStatus) if !ok { @@ -446,6 +442,9 @@ func (e *EventEmitter) updateCloudEventSourceStatus(ctx context.Context, cloudEv case *eventingv1alpha1.CloudEventSource: e.log.V(1).Info("New CloudEventSource status", "status", *status) obj.Status = *status + case *eventingv1alpha1.ClusterCloudEventSource: + e.log.V(1).Info("New ClusterCloudEventSource status", "status", *status) + obj.Status = *status default: } return nil diff --git a/pkg/status/status.go b/pkg/status/status.go index 852843afac9..c259181b118 100755 --- a/pkg/status/status.go +++ b/pkg/status/status.go @@ -184,6 +184,12 @@ func TransformObject(ctx context.Context, client runtimeclient.StatusClient, log logger.Error(err, "failed to patch CloudEventSource") return err } + case *eventingv1alpha1.ClusterCloudEventSource: + patch = runtimeclient.MergeFrom(obj.DeepCopy()) + if err := transform(obj, target); err != nil { + logger.Error(err, "failed to patch CloudEventSource") + return err + } default: err := fmt.Errorf("unknown scalable object type %v", obj) logger.Error(err, "failed to patch Objects")