From 1ab68cdeae603a4825f22edbbf2050f822320f53 Mon Sep 17 00:00:00 2001 From: SpiritZhou Date: Tue, 21 May 2024 14:02:28 +0800 Subject: [PATCH 1/6] Update Signed-off-by: SpiritZhou --- .../v1alpha1/cloudeventsource_types.go | 88 ++++- .../v1alpha1/cloudeventsource_webhook.go | 37 +- .../v1alpha1/zz_generated.deepcopy.go | 59 ++++ cmd/operator/main.go | 7 + cmd/webhooks/main.go | 4 + ...ting.keda.sh_clustercloudeventsources.yaml | 138 ++++++++ config/crd/kustomization.yaml | 1 + config/rbac/role.yaml | 7 + config/webhooks/validation_webhooks.yaml | 24 ++ .../eventing/cloudeventsource_controller.go | 15 +- .../eventing/cloudeventsource_finalizer.go | 65 ---- .../clustercloudeventsource_controller.go | 201 +++++++++++ controllers/eventing/finalizer.go | 70 ++++ pkg/eventemitter/eventemitter.go | 153 +++++--- pkg/mock/mock_eventemitter/mock_interface.go | 4 +- pkg/status/status.go | 8 + .../cloudevent_source_test.go | 329 ++++++++++++------ 17 files changed, 973 insertions(+), 237 deletions(-) create mode 100644 config/crd/bases/eventing.keda.sh_clustercloudeventsources.yaml delete mode 100644 controllers/eventing/cloudeventsource_finalizer.go create mode 100644 controllers/eventing/clustercloudeventsource_controller.go create mode 100644 controllers/eventing/finalizer.go diff --git a/apis/eventing/v1alpha1/cloudeventsource_types.go b/apis/eventing/v1alpha1/cloudeventsource_types.go index 2bfb97b928a..f1436e23f94 100644 --- a/apis/eventing/v1alpha1/cloudeventsource_types.go +++ b/apis/eventing/v1alpha1/cloudeventsource_types.go @@ -22,6 +22,17 @@ import ( v1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" ) +// +kubebuilder:object:generate=false +type CloudEventSourceInterface interface { + GetKind() string + GetName() string + GetNamespace() string + GetSpec() CloudEventSourceSpec + GetStatus() CloudEventSourceStatus + GetGeneration() int64 + GenerateIdentifier() string +} + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // CloudEventSource defines how a KEDA event will be sent to event sink @@ -45,6 +56,28 @@ type CloudEventSourceList struct { Items []CloudEventSource `json:"items"` } +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// +kubebuilder:resource:path=clustercloudeventsources,scope=Cluster +// +kubebuilder:subresource:status +// +kubebuilder:printcolumn:name="Active",type="string",JSONPath=".status.conditions[?(@.type==\"Active\")].status" +type ClusterCloudEventSource struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec CloudEventSourceSpec `json:"spec"` + Status CloudEventSourceStatus `json:"status,omitempty"` +} + +// +kubebuilder:object:root=true + +// ClusterCloudEventSourceList is a list of ClusterCloudEventSource resources +type ClusterCloudEventSourceList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata"` + Items []ClusterCloudEventSource `json:"items"` +} + // CloudEventSourceSpec defines the spec of CloudEventSource type CloudEventSourceSpec struct { // +optional @@ -93,7 +126,31 @@ type EventSubscription struct { } func init() { - SchemeBuilder.Register(&CloudEventSource{}, &CloudEventSourceList{}) + SchemeBuilder.Register(&CloudEventSource{}, &CloudEventSourceList{}, &ClusterCloudEventSource{}, &ClusterCloudEventSourceList{}) +} + +func (ces *CloudEventSource) GetKind() string { + return ces.Kind +} + +func (ces *CloudEventSource) GetName() string { + return ces.Name +} + +func (ces *CloudEventSource) GetNamespace() string { + return ces.Namespace +} + +func (ces *CloudEventSource) GetSpec() CloudEventSourceSpec { + return ces.Spec +} + +func (ces *CloudEventSource) GetStatus() CloudEventSourceStatus { + return *ces.Status.DeepCopy() +} + +func (ces *CloudEventSource) GetGeneration() int64 { + return ces.Generation } // GenerateIdentifier returns identifier for the object in for "kind.namespace.name" @@ -101,6 +158,35 @@ func (ces *CloudEventSource) GenerateIdentifier() string { return v1alpha1.GenerateIdentifier("CloudEventSource", ces.Namespace, ces.Name) } +func (cces *ClusterCloudEventSource) GetKind() string { + return cces.Kind +} + +func (cces *ClusterCloudEventSource) GetName() string { + return cces.Name +} + +func (cces *ClusterCloudEventSource) GetNamespace() string { + return cces.Namespace +} + +func (cces *ClusterCloudEventSource) GetSpec() CloudEventSourceSpec { + return cces.Spec +} + +func (cces *ClusterCloudEventSource) GetStatus() CloudEventSourceStatus { + return *cces.Status.DeepCopy() +} + +func (cces *ClusterCloudEventSource) GetGeneration() int64 { + return cces.Generation +} + +// GenerateIdentifier returns identifier for the object in for "kind.cluster-scoped.name" +func (cces *ClusterCloudEventSource) GenerateIdentifier() string { + return v1alpha1.GenerateIdentifier("ClusterCloudEventSource", "cluster-scoped", cces.Name) +} + // GetCloudEventSourceInitializedConditions returns CloudEventSource Conditions initialized to the default -> Status: Unknown func GetCloudEventSourceInitializedConditions() *v1alpha1.Conditions { return &v1alpha1.Conditions{{Type: v1alpha1.ConditionActive, Status: metav1.ConditionUnknown}} diff --git a/apis/eventing/v1alpha1/cloudeventsource_webhook.go b/apis/eventing/v1alpha1/cloudeventsource_webhook.go index b520fc4f27f..af7e69d7fb4 100644 --- a/apis/eventing/v1alpha1/cloudeventsource_webhook.go +++ b/apis/eventing/v1alpha1/cloudeventsource_webhook.go @@ -37,6 +37,12 @@ func (ces *CloudEventSource) SetupWebhookWithManager(mgr ctrl.Manager) error { Complete() } +func (cces *ClusterCloudEventSource) SetupWebhookWithManager(mgr ctrl.Manager) error { + return ctrl.NewWebhookManagedBy(mgr). + For(cces). + Complete() +} + // +kubebuilder:webhook:path=/validate-eventing-keda-sh-v1alpha1-cloudeventsource,mutating=false,failurePolicy=ignore,sideEffects=None,groups=eventing.keda.sh,resources=cloudeventsources,verbs=create;update,versions=v1alpha1,name=vcloudeventsource.kb.io,admissionReviewVersions=v1 var _ webhook.Validator = &CloudEventSource{} @@ -64,6 +70,33 @@ func (ces *CloudEventSource) ValidateDelete() (admission.Warnings, error) { return nil, nil } +// +kubebuilder:webhook:path=/validate-eventing-keda-sh-v1alpha1-clustercloudeventsource,mutating=false,failurePolicy=ignore,sideEffects=None,groups=eventing.keda.sh,resources=clustercloudeventsources,verbs=create;update,versions=v1alpha1,name=vclustercloudeventsource.kb.io,admissionReviewVersions=v1 + +var _ webhook.Validator = &ClusterCloudEventSource{} + +// ValidateCreate implements webhook.Validator so a webhook will be registered for the type +func (cces *ClusterCloudEventSource) ValidateCreate() (admission.Warnings, error) { + val, _ := json.MarshalIndent(cces, "", " ") + cloudeventsourcelog.Info(fmt.Sprintf("validating clustercloudeventsource creation for %s", string(val))) + return validateSpec(&cces.Spec) +} + +func (cces *ClusterCloudEventSource) ValidateUpdate(old runtime.Object) (admission.Warnings, error) { + val, _ := json.MarshalIndent(cces, "", " ") + cloudeventsourcelog.V(1).Info(fmt.Sprintf("validating clustercloudeventsource update for %s", string(val))) + + oldCes := old.(*ClusterCloudEventSource) + if isCloudEventSourceRemovingFinalizer(cces.ObjectMeta, oldCes.ObjectMeta, cces.Spec, oldCes.Spec) { + cloudeventsourcelog.V(1).Info("finalizer removal, skipping validation") + return nil, nil + } + return validateSpec(&cces.Spec) +} + +func (cces *ClusterCloudEventSource) ValidateDelete() (admission.Warnings, error) { + return nil, nil +} + func isCloudEventSourceRemovingFinalizer(om metav1.ObjectMeta, oldOm metav1.ObjectMeta, spec CloudEventSourceSpec, oldSpec CloudEventSourceSpec) bool { cesSpec, _ := json.MarshalIndent(spec, "", " ") oldCesSpec, _ := json.MarshalIndent(oldSpec, "", " ") @@ -81,7 +114,7 @@ func validateSpec(spec *CloudEventSourceSpec) (admission.Warnings, error) { if spec.EventSubscription.ExcludedEventTypes != nil { for _, excludedEventType := range spec.EventSubscription.ExcludedEventTypes { if !slices.Contains(AllEventTypes, excludedEventType) { - return nil, fmt.Errorf("excludedEventType: %s in cloudeventsource spec is not supported", excludedEventType) + return nil, fmt.Errorf("excludedEventType: %s in cloudeventsource/clustercloudeventsource spec is not supported", excludedEventType) } } } @@ -89,7 +122,7 @@ func validateSpec(spec *CloudEventSourceSpec) (admission.Warnings, error) { if spec.EventSubscription.IncludedEventTypes != nil { for _, includedEventType := range spec.EventSubscription.IncludedEventTypes { if !slices.Contains(AllEventTypes, includedEventType) { - return nil, fmt.Errorf("includedEventType: %s in cloudeventsource spec is not supported", includedEventType) + return nil, fmt.Errorf("includedEventType: %s in cloudeventsource/clustercloudeventsource spec is not supported", includedEventType) } } } diff --git a/apis/eventing/v1alpha1/zz_generated.deepcopy.go b/apis/eventing/v1alpha1/zz_generated.deepcopy.go index f38b11df437..114764d49ba 100644 --- a/apis/eventing/v1alpha1/zz_generated.deepcopy.go +++ b/apis/eventing/v1alpha1/zz_generated.deepcopy.go @@ -156,6 +156,65 @@ func (in *CloudEventSourceStatus) DeepCopy() *CloudEventSourceStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterCloudEventSource) DeepCopyInto(out *ClusterCloudEventSource) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterCloudEventSource. +func (in *ClusterCloudEventSource) DeepCopy() *ClusterCloudEventSource { + if in == nil { + return nil + } + out := new(ClusterCloudEventSource) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ClusterCloudEventSource) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterCloudEventSourceList) DeepCopyInto(out *ClusterCloudEventSourceList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]ClusterCloudEventSource, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterCloudEventSourceList. +func (in *ClusterCloudEventSourceList) DeepCopy() *ClusterCloudEventSourceList { + if in == nil { + return nil + } + out := new(ClusterCloudEventSourceList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ClusterCloudEventSourceList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Destination) DeepCopyInto(out *Destination) { *out = *in diff --git a/cmd/operator/main.go b/cmd/operator/main.go index c9172cb5971..2ea6ed90f90 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -266,6 +266,13 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "CloudEventSource") os.Exit(1) } + if err = (eventingcontrollers.NewClusterCloudEventSourceReconciler( + mgr.GetClient(), + eventEmitter, + )).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "ClusterCloudEventSource") + os.Exit(1) + } //+kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { diff --git a/cmd/webhooks/main.go b/cmd/webhooks/main.go index d9832bdd1b7..46a80a3955e 100644 --- a/cmd/webhooks/main.go +++ b/cmd/webhooks/main.go @@ -156,4 +156,8 @@ func setupWebhook(mgr manager.Manager) { setupLog.Error(err, "unable to create webhook", "webhook", "CloudEventSource") os.Exit(1) } + if err := (&eventingv1alpha1.ClusterCloudEventSource{}).SetupWebhookWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create webhook", "webhook", "ClusterCloudEventSource") + os.Exit(1) + } } diff --git a/config/crd/bases/eventing.keda.sh_clustercloudeventsources.yaml b/config/crd/bases/eventing.keda.sh_clustercloudeventsources.yaml new file mode 100644 index 00000000000..079a8976cbe --- /dev/null +++ b/config/crd/bases/eventing.keda.sh_clustercloudeventsources.yaml @@ -0,0 +1,138 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.14.0 + name: clustercloudeventsources.eventing.keda.sh +spec: + group: eventing.keda.sh + names: + kind: ClusterCloudEventSource + listKind: ClusterCloudEventSourceList + plural: clustercloudeventsources + singular: clustercloudeventsource + scope: Cluster + versions: + - additionalPrinterColumns: + - jsonPath: .status.conditions[?(@.type=="Active")].status + name: Active + type: string + name: v1alpha1 + schema: + openAPIV3Schema: + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + description: CloudEventSourceSpec defines the spec of CloudEventSource + properties: + authenticationRef: + description: |- + AuthenticationRef points to the TriggerAuthentication or ClusterTriggerAuthentication object that + is used to authenticate the scaler with the environment + properties: + kind: + description: Kind of the resource being referred to. Defaults + to TriggerAuthentication. + type: string + name: + type: string + required: + - name + type: object + clusterName: + type: string + destination: + description: Destination defines the various ways to emit events + properties: + azureEventGridTopic: + properties: + endpoint: + type: string + required: + - endpoint + type: object + http: + properties: + uri: + type: string + required: + - uri + type: object + type: object + eventSubscription: + description: EventSubscription defines filters for events + properties: + excludedEventTypes: + items: + description: CloudEventType contains the list of cloudevent + types + enum: + - keda.scaledobject.ready.v1 + - keda.scaledobject.failed.v1 + type: string + type: array + includedEventTypes: + items: + description: CloudEventType contains the list of cloudevent + types + enum: + - keda.scaledobject.ready.v1 + - keda.scaledobject.failed.v1 + type: string + type: array + type: object + required: + - destination + type: object + status: + description: CloudEventSourceStatus defines the observed state of CloudEventSource + properties: + conditions: + description: Conditions an array representation to store multiple + Conditions + items: + description: Condition to store the condition state + properties: + message: + description: A human readable message indicating details about + the transition. + type: string + reason: + description: The reason for the condition's last transition. + type: string + status: + description: Status of the condition, one of True, False, Unknown. + type: string + type: + description: Type of condition + type: string + required: + - status + - type + type: object + type: array + type: object + required: + - spec + type: object + served: true + storage: true + subresources: + status: {} diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml index 71cd2574fda..af8c7dae447 100644 --- a/config/crd/kustomization.yaml +++ b/config/crd/kustomization.yaml @@ -7,6 +7,7 @@ resources: - bases/keda.sh_triggerauthentications.yaml - bases/keda.sh_clustertriggerauthentications.yaml - bases/eventing.keda.sh_cloudeventsources.yaml +- bases/eventing.keda.sh_clustercloudeventsources.yaml # +kubebuilder:scaffold:crdkustomizeresource ## ScaledJob CRD needs to be patched because for some usecases (details in the patch file) diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 6ca6d10cfe0..fd9cf99b941 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -107,6 +107,13 @@ rules: - cloudeventsources/status verbs: - '*' +- apiGroups: + - eventing.keda.sh + resources: + - clustercloudeventsources + - clustercloudeventsources/status + verbs: + - '*' - apiGroups: - keda.sh resources: diff --git a/config/webhooks/validation_webhooks.yaml b/config/webhooks/validation_webhooks.yaml index 14ff71baef8..e996568ac59 100644 --- a/config/webhooks/validation_webhooks.yaml +++ b/config/webhooks/validation_webhooks.yaml @@ -129,3 +129,27 @@ webhooks: - cloudeventsources sideEffects: None timeoutSeconds: 10 +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: keda-admission-webhooks + namespace: keda + path: /validate-eventing-keda-sh-v1alpha1-clustercloudeventsource + failurePolicy: Ignore + matchPolicy: Equivalent + name: vclustercloudeventsource.kb.io + namespaceSelector: {} + objectSelector: {} + rules: + - apiGroups: + - eventing.keda.sh + apiVersions: + - v1alpha1 + operations: + - CREATE + - UPDATE + resources: + - clustercloudeventsources + sideEffects: None + timeoutSeconds: 10 diff --git a/controllers/eventing/cloudeventsource_controller.go b/controllers/eventing/cloudeventsource_controller.go index 62c972076fd..965d1554a47 100644 --- a/controllers/eventing/cloudeventsource_controller.go +++ b/controllers/eventing/cloudeventsource_controller.go @@ -60,9 +60,10 @@ func NewCloudEventSourceReconciler(c client.Client, e eventemitter.EventHandler) // +kubebuilder:rbac:groups=eventing.keda.sh,resources=cloudeventsources;cloudeventsources/status,verbs="*" // Reconcile performs reconciliation on the identified EventSource resource based on the request information passed, returns the result and an error (if any). +// +//nolint:dupl func (r *CloudEventSourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { reqLogger := log.FromContext(ctx) - // Fetch the EventSource instance cloudEventSource := &eventingv1alpha1.CloudEventSource{} err := r.Client.Get(ctx, req.NamespacedName, cloudEventSource) @@ -78,15 +79,15 @@ func (r *CloudEventSourceReconciler) Reconcile(ctx context.Context, req ctrl.Req return ctrl.Result{}, err } - reqLogger.Info("Reconciling EventSource") + reqLogger.Info("Reconciling CloudEventSource") if !cloudEventSource.GetDeletionTimestamp().IsZero() { - return ctrl.Result{}, r.FinalizeEventSourceResource(ctx, reqLogger, cloudEventSource, req.NamespacedName.String()) + return ctrl.Result{}, FinalizeCloudEventSourceResource(ctx, reqLogger, r, cloudEventSource, req.NamespacedName.String()) } r.updatePromMetrics(cloudEventSource, req.NamespacedName.String()) // ensure finalizer is set on this CR - if err := r.EnsureEventSourceResourceFinalizer(ctx, reqLogger, cloudEventSource); err != nil { + if err := EnsureCloudEventSourceResourceFinalizer(ctx, reqLogger, r, cloudEventSource); err != nil { return ctrl.Result{}, err } @@ -141,14 +142,14 @@ func (r *CloudEventSourceReconciler) requestEventLoop(ctx context.Context, logge } // stopEventLoop stops EventLoop handler for the respective EventSource -func (r *CloudEventSourceReconciler) stopEventLoop(logger logr.Logger, eventSource *eventingv1alpha1.CloudEventSource) error { - key, err := cache.MetaNamespaceKeyFunc(eventSource) +func (r *CloudEventSourceReconciler) StopEventLoop(logger logr.Logger, obj client.Object) error { + key, err := cache.MetaNamespaceKeyFunc(obj) if err != nil { logger.Error(err, "error getting key for eventSource") return err } - if err := r.eventEmitter.DeleteCloudEventSource(eventSource); err != nil { + if err := r.eventEmitter.DeleteCloudEventSource(obj.(*eventingv1alpha1.CloudEventSource)); err != nil { return err } // delete CloudEventSource's current Generation diff --git a/controllers/eventing/cloudeventsource_finalizer.go b/controllers/eventing/cloudeventsource_finalizer.go deleted file mode 100644 index 03da520188a..00000000000 --- a/controllers/eventing/cloudeventsource_finalizer.go +++ /dev/null @@ -1,65 +0,0 @@ -/* -Copyright 2023 The KEDA Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package eventing - -import ( - "context" - "fmt" - - "github.com/go-logr/logr" - - eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1" - "github.com/kedacore/keda/v2/controllers/keda/util" -) - -const ( - cloudEventSourceFinalizer = "finalizer.keda.sh" - cloudEventSourceResourceType = "cloudEventSource" -) - -func (r *CloudEventSourceReconciler) EnsureEventSourceResourceFinalizer(ctx context.Context, logger logr.Logger, cloudEventSource *eventingv1alpha1.CloudEventSource) error { - if !util.Contains(cloudEventSource.GetFinalizers(), cloudEventSourceFinalizer) { - logger.Info(fmt.Sprintf("Adding Finalizer to %s %s/%s", cloudEventSourceResourceType, cloudEventSource.Namespace, cloudEventSource.Name)) - cloudEventSource.SetFinalizers(append(cloudEventSource.GetFinalizers(), cloudEventSourceFinalizer)) - - // Update CR - err := r.Update(ctx, cloudEventSource) - if err != nil { - logger.Error(err, fmt.Sprintf("Failed to update %s with a finalizer", cloudEventSourceResourceType), "finalizer", cloudEventSourceFinalizer) - return err - } - } - return nil -} - -func (r *CloudEventSourceReconciler) FinalizeEventSourceResource(ctx context.Context, logger logr.Logger, cloudEventSource *eventingv1alpha1.CloudEventSource, namespacedName string) error { - if util.Contains(cloudEventSource.GetFinalizers(), cloudEventSourceFinalizer) { - if err := r.stopEventLoop(logger, cloudEventSource); err != nil { - return err - } - cloudEventSource.SetFinalizers(util.Remove(cloudEventSource.GetFinalizers(), cloudEventSourceFinalizer)) - if err := r.Update(ctx, cloudEventSource); err != nil { - logger.Error(err, fmt.Sprintf("Failed to update %s after removing a finalizer", cloudEventSourceResourceType), "finalizer", cloudEventSourceFinalizer) - return err - } - - r.UpdatePromMetricsOnDelete(namespacedName) - } - - logger.Info(fmt.Sprintf("Successfully finalized %s", cloudEventSourceResourceType)) - return nil -} diff --git a/controllers/eventing/clustercloudeventsource_controller.go b/controllers/eventing/clustercloudeventsource_controller.go new file mode 100644 index 00000000000..dae9beb0a44 --- /dev/null +++ b/controllers/eventing/clustercloudeventsource_controller.go @@ -0,0 +1,201 @@ +/* +Copyright 2024 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventing + +import ( + "context" + "sync" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/tools/cache" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1" + "github.com/kedacore/keda/v2/pkg/eventemitter" + "github.com/kedacore/keda/v2/pkg/metricscollector" + kedastatus "github.com/kedacore/keda/v2/pkg/status" + "github.com/kedacore/keda/v2/pkg/util" +) + +// ClusterCloudEventSourceReconciler reconciles a EventSource object +type ClusterCloudEventSourceReconciler struct { + client.Client + eventEmitter eventemitter.EventHandler + + clusterCloudEventSourceGenerations *sync.Map + eventSourcePromMetricsMap map[string]string + eventSourcePromMetricsLock *sync.Mutex +} + +// NewClusterCloudEventSourceReconciler creates a new ClusterCloudEventSourceReconciler +func NewClusterCloudEventSourceReconciler(c client.Client, e eventemitter.EventHandler) *ClusterCloudEventSourceReconciler { + return &ClusterCloudEventSourceReconciler{ + Client: c, + eventEmitter: e, + clusterCloudEventSourceGenerations: &sync.Map{}, + eventSourcePromMetricsMap: make(map[string]string), + eventSourcePromMetricsLock: &sync.Mutex{}, + } +} + +// +kubebuilder:rbac:groups=eventing.keda.sh,resources=clustercloudeventsources;clustercloudeventsources/status,verbs="*" + +// Reconcile performs reconciliation on the identified EventSource resource based on the request information passed, returns the result and an error (if any). +// +//nolint:dupl +func (r *ClusterCloudEventSourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + reqLogger := log.FromContext(ctx) + + // Fetch the EventSource instance + clustercloudEventSource := &eventingv1alpha1.ClusterCloudEventSource{} + err := r.Client.Get(ctx, req.NamespacedName, clustercloudEventSource) + if err != nil { + if errors.IsNotFound(err) { + // Request eventSource not found, could have been deleted after reconcile request. + // Owned eventSource are automatically garbage collected. For additional cleanup logic use finalizers. + // Return and don't requeue + return ctrl.Result{}, nil + } + // Error reading the object - requeue the request. + reqLogger.Error(err, "failed to get EventSource") + return ctrl.Result{}, err + } + + reqLogger.Info("Reconciling ClusterCloudEventSource") + + if !clustercloudEventSource.GetDeletionTimestamp().IsZero() { + return ctrl.Result{}, FinalizeCloudEventSourceResource(ctx, reqLogger, r, clustercloudEventSource, req.NamespacedName.String()) + } + r.updatePromMetrics(clustercloudEventSource, req.NamespacedName.String()) + + // ensure finalizer is set on this CR + if err := EnsureCloudEventSourceResourceFinalizer(ctx, reqLogger, r, clustercloudEventSource); err != nil { + return ctrl.Result{}, err + } + + // ensure Status Conditions are initialized + if !clustercloudEventSource.Status.Conditions.AreInitialized() { + conditions := eventingv1alpha1.GetCloudEventSourceInitializedConditions() + if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, clustercloudEventSource, conditions); err != nil { + return ctrl.Result{}, err + } + } + + eventSourceChanged, err := r.cloudEventSourceGenerationChanged(reqLogger, clustercloudEventSource) + if err != nil { + return ctrl.Result{}, err + } + + if eventSourceChanged { + if r.requestEventLoop(ctx, reqLogger, clustercloudEventSource) != nil { + return ctrl.Result{}, err + } + } + + return ctrl.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *ClusterCloudEventSourceReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&eventingv1alpha1.ClusterCloudEventSource{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). + WithEventFilter(util.IgnoreOtherNamespaces()). + Complete(r) +} + +// requestEventLoop tries to start EventLoop handler for the respective EventSource +func (r *ClusterCloudEventSourceReconciler) requestEventLoop(ctx context.Context, logger logr.Logger, eventSource eventingv1alpha1.CloudEventSourceInterface) error { + logger.V(1).Info("Notify eventHandler of an update in eventSource", "name", eventSource.GetName()) + + key, err := cache.MetaNamespaceKeyFunc(eventSource) + if err != nil { + logger.Error(err, "error getting key for eventSource") + return err + } + + if err = r.eventEmitter.HandleCloudEventSource(ctx, eventSource); err != nil { + return err + } + + // store ClusterCloudEventSource's current Generation + r.clusterCloudEventSourceGenerations.Store(key, eventSource.GetGeneration()) + + return nil +} + +// stopEventLoop stops EventLoop handler for the respective EventSource +func (r *ClusterCloudEventSourceReconciler) StopEventLoop(logger logr.Logger, obj client.Object) error { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + logger.Error(err, "error getting key for eventSource") + return err + } + + if err := r.eventEmitter.DeleteCloudEventSource(obj.(*eventingv1alpha1.ClusterCloudEventSource)); err != nil { + return err + } + // delete CloudEventSource's current Generation + r.clusterCloudEventSourceGenerations.Delete(key) + return nil +} + +// eventSourceGenerationChanged returns true if ClusterCloudEventSource's Generation was changed, ie. EventSource.Spec was changed +func (r *ClusterCloudEventSourceReconciler) cloudEventSourceGenerationChanged(logger logr.Logger, eventSource *eventingv1alpha1.ClusterCloudEventSource) (bool, error) { + key, err := cache.MetaNamespaceKeyFunc(eventSource) + if err != nil { + logger.Error(err, "error getting key for eventSource") + return true, err + } + + value, loaded := r.clusterCloudEventSourceGenerations.Load(key) + if loaded { + generation := value.(int64) + if generation == eventSource.Generation { + return false, nil + } + } + return true, nil +} + +func (r *ClusterCloudEventSourceReconciler) updatePromMetrics(eventSource *eventingv1alpha1.ClusterCloudEventSource, namespacedName string) { + r.eventSourcePromMetricsLock.Lock() + defer r.eventSourcePromMetricsLock.Unlock() + + if ns, ok := r.eventSourcePromMetricsMap[namespacedName]; ok { + metricscollector.DecrementCRDTotal(metricscollector.CloudEventSourceResource, ns) + } + + metricscollector.IncrementCRDTotal(metricscollector.CloudEventSourceResource, eventSource.Namespace) + r.eventSourcePromMetricsMap[namespacedName] = eventSource.Namespace +} + +// UpdatePromMetricsOnDelete is idempotent, so it can be called multiple times without side-effects +func (r *ClusterCloudEventSourceReconciler) UpdatePromMetricsOnDelete(namespacedName string) { + r.eventSourcePromMetricsLock.Lock() + defer r.eventSourcePromMetricsLock.Unlock() + + if ns, ok := r.eventSourcePromMetricsMap[namespacedName]; ok { + metricscollector.DecrementCRDTotal(metricscollector.CloudEventSourceResource, ns) + } + + delete(r.eventSourcePromMetricsMap, namespacedName) +} diff --git a/controllers/eventing/finalizer.go b/controllers/eventing/finalizer.go new file mode 100644 index 00000000000..60526789129 --- /dev/null +++ b/controllers/eventing/finalizer.go @@ -0,0 +1,70 @@ +/* +Copyright 2024 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventing + +import ( + "context" + "fmt" + + "github.com/go-logr/logr" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/kedacore/keda/v2/controllers/keda/util" +) + +const ( + cloudEventSourceFinalizer = "finalizer.keda.sh" +) + +type cloudEventSourceResourceReconciler interface { + client.Client + UpdatePromMetricsOnDelete(string) + StopEventLoop(logger logr.Logger, obj client.Object) error +} + +func EnsureCloudEventSourceResourceFinalizer(ctx context.Context, logger logr.Logger, reconciler cloudEventSourceResourceReconciler, cloudEventSourceResource client.Object) error { + if !util.Contains(cloudEventSourceResource.GetFinalizers(), cloudEventSourceFinalizer) { + logger.Info(fmt.Sprintf("Adding Finalizer for the %s", cloudEventSourceResource.GetName())) + cloudEventSourceResource.SetFinalizers(append(cloudEventSourceResource.GetFinalizers(), cloudEventSourceFinalizer)) + + // Update CR + err := reconciler.Update(ctx, cloudEventSourceResource) + if err != nil { + logger.Error(err, fmt.Sprintf("Failed to update %s with a finalizer", cloudEventSourceResource.GetName()), "finalizer", cloudEventSourceFinalizer) + return err + } + } + return nil +} + +func FinalizeCloudEventSourceResource(ctx context.Context, logger logr.Logger, reconciler cloudEventSourceResourceReconciler, cloudEventSourceResource client.Object, namespacedName string) error { + if util.Contains(cloudEventSourceResource.GetFinalizers(), cloudEventSourceFinalizer) { + if err := reconciler.StopEventLoop(logger, cloudEventSourceResource); err != nil { + return err + } + cloudEventSourceResource.SetFinalizers(util.Remove(cloudEventSourceResource.GetFinalizers(), cloudEventSourceFinalizer)) + if err := reconciler.Update(ctx, cloudEventSourceResource); err != nil { + logger.Error(err, fmt.Sprintf("Failed to update %s after removing a finalizer", cloudEventSourceResource.GetName()), "finalizer", cloudEventSourceFinalizer) + return err + } + + reconciler.UpdatePromMetricsOnDelete(namespacedName) + } + + logger.Info(fmt.Sprintf("Successfully finalized %s", cloudEventSourceResource.GetName())) + return nil +} diff --git a/pkg/eventemitter/eventemitter.go b/pkg/eventemitter/eventemitter.go index a46480ff537..0d5d6fdae95 100644 --- a/pkg/eventemitter/eventemitter.go +++ b/pkg/eventemitter/eventemitter.go @@ -71,8 +71,8 @@ type EventEmitter struct { // EventHandler defines the behavior for EventEmitter clients type EventHandler interface { - DeleteCloudEventSource(cloudEventSource *eventingv1alpha1.CloudEventSource) error - HandleCloudEventSource(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource) error + DeleteCloudEventSource(cloudEventSource eventingv1alpha1.CloudEventSourceInterface) error + HandleCloudEventSource(ctx context.Context, cloudEventSource eventingv1alpha1.CloudEventSourceInterface) error Emit(object runtime.Object, namesapce string, eventType string, cloudeventType eventingv1alpha1.CloudEventType, reason string, message string) } @@ -112,20 +112,20 @@ func NewEventEmitter(client client.Client, recorder record.EventRecorder, cluste } } -func initializeLogger(cloudEventSource *eventingv1alpha1.CloudEventSource, cloudEventSourceEmitterName string) logr.Logger { - return logf.Log.WithName(cloudEventSourceEmitterName).WithValues("type", cloudEventSource.Kind, "namespace", cloudEventSource.Namespace, "name", cloudEventSource.Name) +func initializeLogger(cloudEventSourceI eventingv1alpha1.CloudEventSourceInterface, cloudEventSourceEmitterName string) logr.Logger { + return logf.Log.WithName(cloudEventSourceEmitterName).WithValues("type", cloudEventSourceI.GetKind(), "namespace", cloudEventSourceI.GetNamespace(), "name", cloudEventSourceI.GetName()) } // HandleCloudEventSource will create CloudEventSource handlers that defined in spec and start an event loop once handlers // are created successfully. -func (e *EventEmitter) HandleCloudEventSource(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource) error { - e.createEventHandlers(ctx, cloudEventSource) +func (e *EventEmitter) HandleCloudEventSource(ctx context.Context, cloudEventSourceI eventingv1alpha1.CloudEventSourceInterface) error { + e.createEventHandlers(ctx, cloudEventSourceI) - if !e.checkIfEventHandlersExist(cloudEventSource) { - return fmt.Errorf("no CloudEventSource handler is created for %s/%s", cloudEventSource.Namespace, cloudEventSource.Name) + if !e.checkIfEventHandlersExist(cloudEventSourceI) { + return fmt.Errorf("no CloudEventSource handler is created for %s/%s", cloudEventSourceI.GetNamespace(), cloudEventSourceI.GetName()) } - key := cloudEventSource.GenerateIdentifier() + key := cloudEventSourceI.GenerateIdentifier() cancelCtx, cancel := context.WithCancel(ctx) // cancel the outdated EventLoop for the same CloudEventSource (if exists) @@ -137,7 +137,7 @@ func (e *EventEmitter) HandleCloudEventSource(ctx context.Context, cloudEventSou } e.eventLoopContexts.Store(key, cancel) } else { - if updateErr := e.setCloudEventSourceStatusActive(ctx, cloudEventSource); updateErr != nil { + if updateErr := e.setCloudEventSourceStatusActive(ctx, cloudEventSourceI); updateErr != nil { e.log.Error(updateErr, "Failed to update CloudEventSource status") return updateErr } @@ -147,15 +147,23 @@ func (e *EventEmitter) HandleCloudEventSource(ctx context.Context, cloudEventSou eventingMutex := &sync.Mutex{} // passing deep copy of CloudEventSource to the eventLoop go routines, it's a precaution to not have global objects shared between threads - e.log.V(1).Info("Start CloudEventSource loop.") - go e.startEventLoop(cancelCtx, cloudEventSource.DeepCopy(), eventingMutex) + switch obj := cloudEventSourceI.(type) { + case *eventingv1alpha1.CloudEventSource: + go e.startEventLoop(cancelCtx, obj.DeepCopy(), eventingMutex) + case *eventingv1alpha1.ClusterCloudEventSource: + go e.startClusterEventLoop(cancelCtx, obj.DeepCopy(), eventingMutex) + default: + return nil + } + return nil } // DeleteCloudEventSource will stop the event loop and clean event handlers in cache. -func (e *EventEmitter) DeleteCloudEventSource(cloudEventSource *eventingv1alpha1.CloudEventSource) error { +func (e *EventEmitter) DeleteCloudEventSource(cloudEventSource eventingv1alpha1.CloudEventSourceInterface) error { key := cloudEventSource.GenerateIdentifier() result, ok := e.eventLoopContexts.Load(key) + e.log.V(1).Info("successfully DeleteCloudEventSourceDeleteCloudEventSourceDeleteCloudEventSource", "key", key) if ok { cancel, ok := result.(context.CancelFunc) if ok { @@ -164,7 +172,7 @@ func (e *EventEmitter) DeleteCloudEventSource(cloudEventSource *eventingv1alpha1 e.eventLoopContexts.Delete(key) e.clearEventHandlersCache(cloudEventSource) } else { - e.log.V(1).Info("CloudEventSource was not found in controller cache", "key", key) + e.log.V(1).Info("successfully CloudEventSource was not found in controller cache", "key", key) } return nil @@ -172,32 +180,33 @@ func (e *EventEmitter) DeleteCloudEventSource(cloudEventSource *eventingv1alpha1 // createEventHandlers will create different handler as defined in CloudEventSource, and store them in cache for repeated // use in the loop. -func (e *EventEmitter) createEventHandlers(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource) { +func (e *EventEmitter) createEventHandlers(ctx context.Context, cloudEventSourceI eventingv1alpha1.CloudEventSourceInterface) { e.eventHandlersCacheLock.Lock() e.eventFilterCacheLock.Lock() defer e.eventHandlersCacheLock.Unlock() defer e.eventFilterCacheLock.Unlock() - key := cloudEventSource.GenerateIdentifier() + key := cloudEventSourceI.GenerateIdentifier() + spec := cloudEventSourceI.GetSpec() - clusterName := cloudEventSource.Spec.ClusterName + clusterName := spec.ClusterName if clusterName == "" { clusterName = e.clusterName } // Resolve auth related - authParams, podIdentity, err := resolver.ResolveAuthRefAndPodIdentity(ctx, e.client, e.log, cloudEventSource.Spec.AuthenticationRef, nil, cloudEventSource.Namespace, e.secretsLister) + authParams, podIdentity, err := resolver.ResolveAuthRefAndPodIdentity(ctx, e.client, e.log, spec.AuthenticationRef, nil, cloudEventSourceI.GetNamespace(), e.secretsLister) if err != nil { - e.log.Error(err, "error resolving auth params", "cloudEventSource", cloudEventSource) + e.log.Error(err, "error resolving auth params", "cloudEventSource", cloudEventSourceI) return } // Create EventFilter from CloudEventSource - e.eventFilterCache[key] = NewEventFilter(cloudEventSource.Spec.EventSubscription.IncludedEventTypes, cloudEventSource.Spec.EventSubscription.ExcludedEventTypes) + e.eventFilterCache[key] = NewEventFilter(spec.EventSubscription.IncludedEventTypes, spec.EventSubscription.ExcludedEventTypes) // Create different event destinations here - if cloudEventSource.Spec.Destination.HTTP != nil { - eventHandler, err := NewCloudEventHTTPHandler(ctx, clusterName, cloudEventSource.Spec.Destination.HTTP.URI, initializeLogger(cloudEventSource, "cloudevent_http")) + if spec.Destination.HTTP != nil { + eventHandler, err := NewCloudEventHTTPHandler(ctx, clusterName, spec.Destination.HTTP.URI, initializeLogger(cloudEventSourceI, "cloudevent_http")) if err != nil { e.log.Error(err, "create CloudEvent HTTP handler failed") return @@ -211,8 +220,8 @@ func (e *EventEmitter) createEventHandlers(ctx context.Context, cloudEventSource return } - if cloudEventSource.Spec.Destination.AzureEventGridTopic != nil { - eventHandler, err := NewAzureEventGridTopicHandler(ctx, clusterName, cloudEventSource.Spec.Destination.AzureEventGridTopic, authParams, podIdentity, initializeLogger(cloudEventSource, "azure_event_grid_topic")) + if spec.Destination.AzureEventGridTopic != nil { + eventHandler, err := NewAzureEventGridTopicHandler(ctx, clusterName, spec.Destination.AzureEventGridTopic, authParams, podIdentity, initializeLogger(cloudEventSourceI, "azure_event_grid_topic")) if err != nil { e.log.Error(err, "create Azure Event Grid handler failed") return @@ -226,40 +235,41 @@ func (e *EventEmitter) createEventHandlers(ctx context.Context, cloudEventSource return } - e.log.Info("No destionation is defined in CloudEventSource", "CloudEventSource", cloudEventSource.Name) + e.log.Info("No destionation is defined in CloudEventSource", "CloudEventSource", cloudEventSourceI.GetName()) } // clearEventHandlersCache will clear all event handlers that created by the passing CloudEventSource -func (e *EventEmitter) clearEventHandlersCache(cloudEventSource *eventingv1alpha1.CloudEventSource) { +func (e *EventEmitter) clearEventHandlersCache(cloudEventSource eventingv1alpha1.CloudEventSourceInterface) { e.eventHandlersCacheLock.Lock() defer e.eventHandlersCacheLock.Unlock() e.eventFilterCacheLock.Lock() defer e.eventFilterCacheLock.Unlock() + spec := cloudEventSource.GetSpec() key := cloudEventSource.GenerateIdentifier() delete(e.eventFilterCache, key) // Clear different event destination here. - if cloudEventSource.Spec.Destination.HTTP != nil { + if spec.Destination.HTTP != nil { eventHandlerKey := newEventHandlerKey(key, cloudEventHandlerTypeHTTP) if eventHandler, found := e.eventHandlersCache[eventHandlerKey]; found { eventHandler.CloseHandler() - delete(e.eventHandlersCache, key) + delete(e.eventHandlersCache, eventHandlerKey) } } - if cloudEventSource.Spec.Destination.AzureEventGridTopic != nil { + if spec.Destination.AzureEventGridTopic != nil { eventHandlerKey := newEventHandlerKey(key, cloudEventHandlerTypeAzureEventGridTopic) if eventHandler, found := e.eventHandlersCache[eventHandlerKey]; found { eventHandler.CloseHandler() - delete(e.eventHandlersCache, key) + delete(e.eventHandlersCache, eventHandlerKey) } } } // checkIfEventHandlersExist will check if the event handlers that were created by passing CloudEventSource exist -func (e *EventEmitter) checkIfEventHandlersExist(cloudEventSource *eventingv1alpha1.CloudEventSource) bool { +func (e *EventEmitter) checkIfEventHandlersExist(cloudEventSource eventingv1alpha1.CloudEventSourceInterface) bool { e.eventHandlersCacheLock.RLock() defer e.eventHandlersCacheLock.RUnlock() @@ -274,39 +284,70 @@ func (e *EventEmitter) checkIfEventHandlersExist(cloudEventSource *eventingv1alp } func (e *EventEmitter) startEventLoop(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource, cloudEventSourceMutex sync.Locker) { + e.log.V(1).Info("Start CloudEventSource loop.", "name", cloudEventSource.GetName()) for { select { case eventData := <-e.cloudEventProcessingChan: - e.log.V(1).Info("Consuming events from CloudEventSource.") + e.log.V(1).Info("Consuming events from CloudEventSource.", "name", cloudEventSource.GetName()) e.emitEventByHandler(eventData) e.checkEventHandlers(ctx, cloudEventSource, cloudEventSourceMutex) - metricscollector.RecordCloudEventQueueStatus(cloudEventSource.Namespace, len(e.cloudEventProcessingChan)) + metricscollector.RecordCloudEventQueueStatus(cloudEventSource.GetNamespace(), len(e.cloudEventProcessingChan)) case <-ctx.Done(): e.log.V(1).Info("CloudEventSource loop has stopped.") - metricscollector.RecordCloudEventQueueStatus(cloudEventSource.Namespace, len(e.cloudEventProcessingChan)) + metricscollector.RecordCloudEventQueueStatus(cloudEventSource.GetNamespace(), len(e.cloudEventProcessingChan)) + return + } + } +} + +func (e *EventEmitter) startClusterEventLoop(ctx context.Context, clusterCloudEventSource *eventingv1alpha1.ClusterCloudEventSource, cloudEventSourceMutex sync.Locker) { + e.log.V(1).Info("Start CloudEventSource loop.", "name", clusterCloudEventSource.GetName()) + for { + select { + case eventData := <-e.cloudEventProcessingChan: + e.log.V(1).Info("Consuming events from ClusterCloudEventSource.", "name", clusterCloudEventSource.GetName()) + e.emitEventByHandler(eventData) + e.checkEventHandlers(ctx, clusterCloudEventSource, cloudEventSourceMutex) + metricscollector.RecordCloudEventQueueStatus(clusterCloudEventSource.GetNamespace(), len(e.cloudEventProcessingChan)) + case <-ctx.Done(): + e.log.V(1).Info("ClusterCloudEventSource loop has stopped.") + metricscollector.RecordCloudEventQueueStatus(clusterCloudEventSource.GetNamespace(), len(e.cloudEventProcessingChan)) return } } } // checkEventHandlers will check each eventhandler active status -func (e *EventEmitter) checkEventHandlers(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource, cloudEventSourceMutex sync.Locker) { - e.log.V(1).Info("Checking event handlers status.") +func (e *EventEmitter) checkEventHandlers(ctx context.Context, cloudEventSourceI eventingv1alpha1.CloudEventSourceInterface, cloudEventSourceMutex sync.Locker) { + e.log.V(1).Info("Checking event handlers status.", "name", cloudEventSourceI.GetName()) cloudEventSourceMutex.Lock() defer cloudEventSourceMutex.Unlock() // Get the latest object - err := e.client.Get(ctx, types.NamespacedName{Name: cloudEventSource.Name, Namespace: cloudEventSource.Namespace}, cloudEventSource) - if err != nil { - e.log.Error(err, "error getting cloudEventSource", "cloudEventSource", cloudEventSource) - return + switch cloudEventSourceI.(type) { + case *eventingv1alpha1.CloudEventSource: + cloudEventSource := &eventingv1alpha1.CloudEventSource{} + err := e.client.Get(ctx, types.NamespacedName{Name: cloudEventSourceI.GetName(), Namespace: cloudEventSourceI.GetNamespace()}, cloudEventSource) + if err != nil { + e.log.Error(err, "error getting cloudEventSource", "cloudEventSource", cloudEventSource) + } + cloudEventSourceI = cloudEventSource + case *eventingv1alpha1.ClusterCloudEventSource: + clustercloudEventSource := &eventingv1alpha1.ClusterCloudEventSource{} + err := e.client.Get(ctx, types.NamespacedName{Name: cloudEventSourceI.GetName(), Namespace: cloudEventSourceI.GetNamespace()}, clustercloudEventSource) + if err != nil { + e.log.Error(err, "error getting clustercloudEventSource", "clustercloudEventSource", clustercloudEventSource) + } + cloudEventSourceI = clustercloudEventSource + default: } - keyPrefix := cloudEventSource.GenerateIdentifier() + + keyPrefix := cloudEventSourceI.GenerateIdentifier() needUpdate := false - cloudEventSourceStatus := cloudEventSource.Status.DeepCopy() + cloudEventSourceStatus := cloudEventSourceI.GetStatus() for k, v := range e.eventHandlersCache { - e.log.V(1).Info("Checking event handler status.", "handler", k, "status", cloudEventSource.Status.Conditions.GetActiveCondition().Status) + e.log.V(1).Info("Checking event handler status.", "handler", k, "status", cloudEventSourceStatus.Conditions.GetActiveCondition().Status) if strings.Contains(k, keyPrefix) { - if v.GetActiveStatus() != cloudEventSource.Status.Conditions.GetActiveCondition().Status { + if v.GetActiveStatus() != cloudEventSourceStatus.Conditions.GetActiveCondition().Status { needUpdate = true cloudEventSourceStatus.Conditions.SetActiveCondition( metav1.ConditionFalse, @@ -318,7 +359,7 @@ func (e *EventEmitter) checkEventHandlers(ctx context.Context, cloudEventSource } if needUpdate { - if updateErr := e.updateCloudEventSourceStatus(ctx, cloudEventSource, cloudEventSourceStatus); updateErr != nil { + if updateErr := e.updateCloudEventSourceStatus(ctx, cloudEventSourceI, cloudEventSourceStatus); updateErr != nil { e.log.Error(updateErr, "Failed to update CloudEventSource status") } } @@ -387,7 +428,6 @@ func (e *EventEmitter) emitEventByHandler(eventData eventdata.EventData) { return } } - eventData.HandlerKey = key if handler.GetActiveStatus() == metav1.ConditionTrue { go handler.EmitEvent(eventData, e.emitErrorHandle) @@ -425,33 +465,36 @@ func (e *EventEmitter) emitErrorHandle(eventData eventdata.EventData, err error) e.enqueueEventData(requeueData) } -func (e *EventEmitter) setCloudEventSourceStatusActive(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource) error { - cloudEventSourceStatus := cloudEventSource.Status.DeepCopy() +func (e *EventEmitter) setCloudEventSourceStatusActive(ctx context.Context, cloudEventSourceI eventingv1alpha1.CloudEventSourceInterface) error { + cloudEventSourceStatus := cloudEventSourceI.GetStatus() cloudEventSourceStatus.Conditions.SetActiveCondition( metav1.ConditionTrue, eventingv1alpha1.CloudEventSourceConditionActiveReason, eventingv1alpha1.CloudEventSourceConditionActiveMessage, ) - return e.updateCloudEventSourceStatus(ctx, cloudEventSource, cloudEventSourceStatus) + return e.updateCloudEventSourceStatus(ctx, cloudEventSourceI, cloudEventSourceStatus) } -func (e *EventEmitter) updateCloudEventSourceStatus(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource, cloudEventSourceStatus *eventingv1alpha1.CloudEventSourceStatus) error { - e.log.V(1).Info("Updating CloudEventSource status", "CloudEventSource", cloudEventSource.Name) +func (e *EventEmitter) updateCloudEventSourceStatus(ctx context.Context, cloudEventSourceI eventingv1alpha1.CloudEventSourceInterface, cloudEventSourceStatus eventingv1alpha1.CloudEventSourceStatus) error { + e.log.V(1).Info("Updating CloudEventSource status", "CloudEventSource", cloudEventSourceI.GetName()) transform := func(runtimeObj client.Object, target interface{}) error { - status, ok := target.(*eventingv1alpha1.CloudEventSourceStatus) + status, ok := target.(eventingv1alpha1.CloudEventSourceStatus) if !ok { return fmt.Errorf("transform target is not eventingv1alpha1.CloudEventSourceStatus type %v", target) } switch obj := runtimeObj.(type) { case *eventingv1alpha1.CloudEventSource: - e.log.V(1).Info("New CloudEventSource status", "status", *status) - obj.Status = *status + e.log.V(1).Info("New CloudEventSource status", "status", status) + obj.Status = status + case *eventingv1alpha1.ClusterCloudEventSource: + e.log.V(1).Info("New ClusterCloudEventSource status", "status", status) + obj.Status = status default: } return nil } - if err := kedastatus.TransformObject(ctx, e.client, e.log, cloudEventSource, cloudEventSourceStatus, transform); err != nil { + if err := kedastatus.TransformObject(ctx, e.client, e.log, cloudEventSourceI, cloudEventSourceStatus, transform); err != nil { e.log.Error(err, "Failed to update CloudEventSourceStatus") return err } diff --git a/pkg/mock/mock_eventemitter/mock_interface.go b/pkg/mock/mock_eventemitter/mock_interface.go index d3346ea50bf..20ff808ec19 100644 --- a/pkg/mock/mock_eventemitter/mock_interface.go +++ b/pkg/mock/mock_eventemitter/mock_interface.go @@ -45,7 +45,7 @@ func (m *MockEventHandler) EXPECT() *MockEventHandlerMockRecorder { } // DeleteCloudEventSource mocks base method. -func (m *MockEventHandler) DeleteCloudEventSource(cloudEventSource *v1alpha1.CloudEventSource) error { +func (m *MockEventHandler) DeleteCloudEventSource(cloudEventSource v1alpha1.CloudEventSourceInterface) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DeleteCloudEventSource", cloudEventSource) ret0, _ := ret[0].(error) @@ -71,7 +71,7 @@ func (mr *MockEventHandlerMockRecorder) Emit(object, namesapce, eventType, cloud } // HandleCloudEventSource mocks base method. -func (m *MockEventHandler) HandleCloudEventSource(ctx context.Context, cloudEventSource *v1alpha1.CloudEventSource) error { +func (m *MockEventHandler) HandleCloudEventSource(ctx context.Context, cloudEventSource v1alpha1.CloudEventSourceInterface) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "HandleCloudEventSource", ctx, cloudEventSource) ret0, _ := ret[0].(error) diff --git a/pkg/status/status.go b/pkg/status/status.go index 852843afac9..8c42190d5f7 100755 --- a/pkg/status/status.go +++ b/pkg/status/status.go @@ -44,6 +44,8 @@ func SetStatusConditions(ctx context.Context, client runtimeclient.StatusClient, obj.Status.Conditions = *conditions case *eventingv1alpha1.CloudEventSource: obj.Status.Conditions = *conditions + case *eventingv1alpha1.ClusterCloudEventSource: + obj.Status.Conditions = *conditions default: } return nil @@ -184,6 +186,12 @@ func TransformObject(ctx context.Context, client runtimeclient.StatusClient, log logger.Error(err, "failed to patch CloudEventSource") return err } + case *eventingv1alpha1.ClusterCloudEventSource: + patch = runtimeclient.MergeFrom(obj.DeepCopy()) + if err := transform(obj, target); err != nil { + logger.Error(err, "failed to patch ClusterCloudEventSource") + return err + } default: err := fmt.Errorf("unknown scalable object type %v", obj) logger.Error(err, "failed to patch Objects") diff --git a/tests/internals/cloudevent_source/cloudevent_source_test.go b/tests/internals/cloudevent_source/cloudevent_source_test.go index a5fe794c209..b1001c7c1e3 100644 --- a/tests/internals/cloudevent_source/cloudevent_source_test.go +++ b/tests/internals/cloudevent_source/cloudevent_source_test.go @@ -25,82 +25,43 @@ const ( var _ = godotenv.Load("../../.env") var ( - namespace = fmt.Sprintf("%s-ns", testName) - scaledObjectName = fmt.Sprintf("%s-so", testName) - deploymentName = fmt.Sprintf("%s-d", testName) - clientName = fmt.Sprintf("%s-client", testName) - cloudeventSourceName = fmt.Sprintf("%s-ce", testName) - cloudeventSourceErrName = fmt.Sprintf("%s-ce-err", testName) - cloudeventSourceErrName2 = fmt.Sprintf("%s-ce-err2", testName) - cloudEventHTTPReceiverName = fmt.Sprintf("%s-cloudevent-http-receiver", testName) - cloudEventHTTPServiceName = fmt.Sprintf("%s-cloudevent-http-service", testName) - cloudEventHTTPServiceURL = fmt.Sprintf("http://%s.%s.svc.cluster.local:8899", cloudEventHTTPServiceName, namespace) - clusterName = "test-cluster" - expectedSubject = fmt.Sprintf("/%s/%s/scaledobject/%s", clusterName, namespace, scaledObjectName) - expectedSource = fmt.Sprintf("/%s/keda/keda", clusterName) - lastCloudEventTime = time.Now() + namespace = fmt.Sprintf("%s-ns", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + deploymentName = fmt.Sprintf("%s-d", testName) + clientName = fmt.Sprintf("%s-client", testName) + cloudeventSourceName = fmt.Sprintf("%s-ce", testName) + cloudeventSourceErrName = fmt.Sprintf("%s-ce-err", testName) + cloudeventSourceErrName2 = fmt.Sprintf("%s-ce-err2", testName) + clusterCloudeventSourceName = fmt.Sprintf("%s-cce", testName) + clusterCloudeventSourceErrName = fmt.Sprintf("%s-cce-err", testName) + clusterCloudeventSourceErrName2 = fmt.Sprintf("%s-cce-err2", testName) + cloudEventHTTPReceiverName = fmt.Sprintf("%s-cloudevent-http-receiver", testName) + cloudEventHTTPServiceName = fmt.Sprintf("%s-cloudevent-http-service", testName) + cloudEventHTTPServiceURL = fmt.Sprintf("http://%s.%s.svc.cluster.local:8899", cloudEventHTTPServiceName, namespace) + clusterName = "test-cluster" + expectedSubject = fmt.Sprintf("/%s/%s/scaledobject/%s", clusterName, namespace, scaledObjectName) + expectedSource = fmt.Sprintf("/%s/keda/keda", clusterName) + lastCloudEventTime = time.Now() ) type templateData struct { - TestNamespace string - ScaledObject string - DeploymentName string - ClientName string - CloudEventSourceName string - CloudeventSourceErrName string - CloudeventSourceErrName2 string - CloudEventHTTPReceiverName string - CloudEventHTTPServiceName string - CloudEventHTTPServiceURL string - ClusterName string + TestNamespace string + ScaledObject string + DeploymentName string + ClientName string + CloudEventSourceName string + CloudeventSourceErrName string + CloudeventSourceErrName2 string + ClusterCloudEventSourceName string + ClusterCloudeventSourceErrName string + ClusterCloudeventSourceErrName2 string + CloudEventHTTPReceiverName string + CloudEventHTTPServiceName string + CloudEventHTTPServiceURL string + ClusterName string } const ( - cloudEventSourceTemplate = ` - apiVersion: eventing.keda.sh/v1alpha1 - kind: CloudEventSource - metadata: - name: {{.CloudEventSourceName}} - namespace: {{.TestNamespace}} - spec: - clusterName: {{.ClusterName}} - destination: - http: - uri: {{.CloudEventHTTPServiceURL}} - ` - - cloudEventSourceWithExcludeTemplate = ` - apiVersion: eventing.keda.sh/v1alpha1 - kind: CloudEventSource - metadata: - name: {{.CloudEventSourceName}} - namespace: {{.TestNamespace}} - spec: - clusterName: {{.ClusterName}} - destination: - http: - uri: {{.CloudEventHTTPServiceURL}} - eventSubscription: - excludedEventTypes: - - keda.scaledobject.failed.v1 - ` - - cloudEventSourceWithIncludeTemplate = ` - apiVersion: eventing.keda.sh/v1alpha1 - kind: CloudEventSource - metadata: - name: {{.CloudEventSourceName}} - namespace: {{.TestNamespace}} - spec: - clusterName: {{.ClusterName}} - destination: - http: - uri: {{.CloudEventHTTPServiceURL}} - eventSubscription: - includedEventTypes: - - keda.scaledobject.failed.v1 - ` - cloudEventHTTPServiceTemplate = ` apiVersion: v1 kind: Service @@ -179,6 +140,51 @@ spec: - -c - "exec tail -f /dev/null"` + cloudEventSourceTemplate = ` + apiVersion: eventing.keda.sh/v1alpha1 + kind: CloudEventSource + metadata: + name: {{.CloudEventSourceName}} + namespace: {{.TestNamespace}} + spec: + clusterName: {{.ClusterName}} + destination: + http: + uri: {{.CloudEventHTTPServiceURL}} + ` + + cloudEventSourceWithExcludeTemplate = ` + apiVersion: eventing.keda.sh/v1alpha1 + kind: CloudEventSource + metadata: + name: {{.CloudEventSourceName}} + namespace: {{.TestNamespace}} + spec: + clusterName: {{.ClusterName}} + destination: + http: + uri: {{.CloudEventHTTPServiceURL}} + eventSubscription: + excludedEventTypes: + - keda.scaledobject.failed.v1 + ` + + cloudEventSourceWithIncludeTemplate = ` + apiVersion: eventing.keda.sh/v1alpha1 + kind: CloudEventSource + metadata: + name: {{.CloudEventSourceName}} + namespace: {{.TestNamespace}} + spec: + clusterName: {{.ClusterName}} + destination: + http: + uri: {{.CloudEventHTTPServiceURL}} + eventSubscription: + includedEventTypes: + - keda.scaledobject.failed.v1 + ` + cloudEventSourceWithErrTypeTemplate = ` apiVersion: eventing.keda.sh/v1alpha1 kind: CloudEventSource @@ -262,6 +268,80 @@ spec: end: 5 * * * * desiredReplicas: '4' ` + + clusterCloudEventSourceTemplate = ` + apiVersion: eventing.keda.sh/v1alpha1 + kind: ClusterCloudEventSource + metadata: + name: {{.ClusterCloudEventSourceName}} + spec: + clusterName: {{.ClusterName}} + destination: + http: + uri: {{.CloudEventHTTPServiceURL}} + ` + + clusterCloudEventSourceWithExcludeTemplate = ` + apiVersion: eventing.keda.sh/v1alpha1 + kind: ClusterCloudEventSource + metadata: + name: {{.ClusterCloudEventSourceName}} + spec: + clusterName: {{.ClusterName}} + destination: + http: + uri: {{.CloudEventHTTPServiceURL}} + eventSubscription: + excludedEventTypes: + - keda.scaledobject.failed.v1 + ` + + clusterCloudEventSourceWithIncludeTemplate = ` + apiVersion: eventing.keda.sh/v1alpha1 + kind: ClusterCloudEventSource + metadata: + name: {{.ClusterCloudEventSourceName}} + spec: + clusterName: {{.ClusterName}} + destination: + http: + uri: {{.CloudEventHTTPServiceURL}} + eventSubscription: + includedEventTypes: + - keda.scaledobject.failed.v1 + ` + + clusterCloudEventSourceWithErrTypeTemplate = ` + apiVersion: eventing.keda.sh/v1alpha1 + kind: ClusterCloudEventSource + metadata: + name: {{.ClusterCloudeventSourceErrName}} + spec: + clusterName: {{.ClusterName}} + destination: + http: + uri: {{.CloudEventHTTPServiceURL}} + eventSubscription: + includedEventTypes: + - keda.scaledobject.failed.v2 + ` + + clusterCloudEventSourceWithErrTypeTemplate2 = ` + apiVersion: eventing.keda.sh/v1alpha1 + kind: ClusterCloudEventSource + metadata: + name: {{.ClusterCloudeventSourceErrName2}} + spec: + clusterName: {{.ClusterName}} + destination: + http: + uri: {{.CloudEventHTTPServiceURL}} + eventSubscription: + includedEventTypes: + - keda.scaledobject.failed.v1 + excludedEventTypes: + - keda.scaledobject.failed.v1 + ` ) func TestScaledObjectGeneral(t *testing.T) { @@ -274,18 +354,31 @@ func TestScaledObjectGeneral(t *testing.T) { assert.True(t, WaitForAllPodRunningInNamespace(t, kc, namespace, 5, 20), "all pods should be running") - testErrEventSourceEmitValue(t, kc, data) + testErrEventSourceEmitValue(t, kc, data, true) testEventSourceEmitValue(t, kc, data) - testErrEventSourceExcludeValue(t, kc, data) - testErrEventSourceIncludeValue(t, kc, data) - testErrEventSourceCreation(t, kc, data) + testErrEventSourceExcludeValue(t, kc, data, true) + testErrEventSourceIncludeValue(t, kc, data, true) + testErrEventSourceCreation(t, kc, data, true) + + testErrEventSourceEmitValue(t, kc, data, false) + testErrEventSourceExcludeValue(t, kc, data, false) + testErrEventSourceIncludeValue(t, kc, data, false) + testErrEventSourceCreation(t, kc, data, false) DeleteKubernetesResources(t, namespace, data, templates) } // tests error events emitted -func testErrEventSourceEmitValue(t *testing.T, _ *kubernetes.Clientset, data templateData) { +func testErrEventSourceEmitValue(t *testing.T, _ *kubernetes.Clientset, data templateData, isClusterScope bool) { + ceTemplate := "" + if isClusterScope { + ceTemplate = clusterCloudEventSourceTemplate + } else { + ceTemplate = cloudEventSourceTemplate + } + t.Log("--- test emitting eventsource about scaledobject err---") + KubectlApplyWithTemplate(t, data, "cloudEventSourceTemplate", ceTemplate) KubectlApplyWithTemplate(t, data, "scaledObjectErrTemplate", scaledObjectErrTemplate) // wait 15 seconds to ensure event propagation @@ -331,6 +424,8 @@ func testErrEventSourceEmitValue(t *testing.T, _ *kubernetes.Clientset, data tem } } assert.NotEmpty(t, foundEvents) + KubectlDeleteWithTemplate(t, data, "cloudEventSourceTemplate", ceTemplate) + t.Log("--- testErrEventSourceEmitValuetestErrEventSourceEmitValuer---", "cloud event time", lastCloudEventTime) } func testEventSourceEmitValue(t *testing.T, _ *kubernetes.Clientset, data templateData) { @@ -377,11 +472,17 @@ func testEventSourceEmitValue(t *testing.T, _ *kubernetes.Clientset, data templa } // tests error events not emitted by -func testErrEventSourceExcludeValue(t *testing.T, _ *kubernetes.Clientset, data templateData) { - t.Log("--- test emitting eventsource about scaledobject err with exclude filter---") +func testErrEventSourceExcludeValue(t *testing.T, _ *kubernetes.Clientset, data templateData, isClusterScope bool) { + t.Log("--- test emitting eventsource about scaledobject err with exclude filter---", "cloud event time", lastCloudEventTime) + + ceTemplate := "" + if isClusterScope { + ceTemplate = clusterCloudEventSourceWithExcludeTemplate + } else { + ceTemplate = cloudEventSourceWithExcludeTemplate + } - KubectlDeleteWithTemplate(t, data, "cloudEventSourceTemplate", cloudEventSourceTemplate) - KubectlApplyWithTemplate(t, data, "cloudEventSourceWithExcludeTemplate", cloudEventSourceWithExcludeTemplate) + KubectlApplyWithTemplate(t, data, "cloudEventSourceWithExcludeTemplate", ceTemplate) KubectlApplyWithTemplate(t, data, "scaledObjectErrTemplate", scaledObjectErrTemplate) // wait 15 seconds to ensure event propagation @@ -408,16 +509,21 @@ func testErrEventSourceExcludeValue(t *testing.T, _ *kubernetes.Clientset, data }, "get filtered event") } - KubectlDeleteWithTemplate(t, data, "cloudEventSourceWithExcludeTemplate", cloudEventSourceWithExcludeTemplate) - KubectlApplyWithTemplate(t, data, "cloudEventSourceTemplate", cloudEventSourceTemplate) + KubectlDeleteWithTemplate(t, data, "cloudEventSourceWithExcludeTemplate", ceTemplate) } // tests error events in include filter -func testErrEventSourceIncludeValue(t *testing.T, _ *kubernetes.Clientset, data templateData) { +func testErrEventSourceIncludeValue(t *testing.T, _ *kubernetes.Clientset, data templateData, isClusterScope bool) { t.Log("--- test emitting eventsource about scaledobject err with include filter---") - KubectlDeleteWithTemplate(t, data, "cloudEventSourceTemplate", cloudEventSourceTemplate) - KubectlApplyWithTemplate(t, data, "cloudEventSourceWithIncludeTemplate", cloudEventSourceWithIncludeTemplate) + ceTemplate := "" + if isClusterScope { + ceTemplate = clusterCloudEventSourceWithIncludeTemplate + } else { + ceTemplate = cloudEventSourceWithIncludeTemplate + } + + KubectlApplyWithTemplate(t, data, "cloudEventSourceWithIncludeTemplate", ceTemplate) KubectlApplyWithTemplate(t, data, "scaledObjectErrTemplate", scaledObjectErrTemplate) // wait 15 seconds to ensure event propagation @@ -442,43 +548,56 @@ func testErrEventSourceIncludeValue(t *testing.T, _ *kubernetes.Clientset, data } } assert.NotEmpty(t, foundEvents) - KubectlDeleteWithTemplate(t, data, "cloudEventSourceWithIncludeTemplate", cloudEventSourceWithIncludeTemplate) - KubectlApplyWithTemplate(t, data, "cloudEventSourceTemplate", cloudEventSourceTemplate) + KubectlDeleteWithTemplate(t, data, "cloudEventSourceWithIncludeTemplate", ceTemplate) } // tests error event type when creation -func testErrEventSourceCreation(t *testing.T, _ *kubernetes.Clientset, data templateData) { +func testErrEventSourceCreation(t *testing.T, _ *kubernetes.Clientset, data templateData, isClusterScope bool) { t.Log("--- test emitting eventsource about scaledobject err with include filter---") - KubectlDeleteWithTemplate(t, data, "cloudEventSourceTemplate", cloudEventSourceTemplate) + ceErrTemplate := "" + ceErrTemplate2 := "" + if isClusterScope { + ceErrTemplate = clusterCloudEventSourceWithErrTypeTemplate + ceErrTemplate2 = clusterCloudEventSourceWithErrTypeTemplate2 + } else { + ceErrTemplate = cloudEventSourceWithErrTypeTemplate + ceErrTemplate2 = cloudEventSourceWithErrTypeTemplate2 + } + + // KubectlDeleteWithTemplate(t, data, "cloudEventSourceTemplate", cloudEventSourceTemplate) - err := KubectlApplyWithErrors(t, data, "cloudEventSourceWithErrTypeTemplate", cloudEventSourceWithErrTypeTemplate) - assert.ErrorContains(t, err, `The CloudEventSource "eventsource-test-ce-err" is invalid:`) + err := KubectlApplyWithErrors(t, data, "cloudEventSourceWithErrTypeTemplate", ceErrTemplate) + if isClusterScope { + assert.ErrorContains(t, err, `The ClusterCloudEventSource "eventsource-test-cce-err" is invalid:`) + } else { + assert.ErrorContains(t, err, `The CloudEventSource "eventsource-test-ce-err" is invalid:`) + } - err = KubectlApplyWithErrors(t, data, "cloudEventSourceWithErrTypeTemplate2", cloudEventSourceWithErrTypeTemplate2) + err = KubectlApplyWithErrors(t, data, "cloudEventSourceWithErrTypeTemplate2", ceErrTemplate2) assert.ErrorContains(t, err, `setting included types and excluded types at the same time is not supported`) - - KubectlApplyWithTemplate(t, data, "cloudEventSourceTemplate", cloudEventSourceTemplate) } // help function to load template data func getTemplateData() (templateData, []Template) { return templateData{ - TestNamespace: namespace, - ScaledObject: scaledObjectName, - DeploymentName: deploymentName, - ClientName: clientName, - CloudEventSourceName: cloudeventSourceName, - CloudeventSourceErrName: cloudeventSourceErrName, - CloudeventSourceErrName2: cloudeventSourceErrName2, - CloudEventHTTPReceiverName: cloudEventHTTPReceiverName, - CloudEventHTTPServiceName: cloudEventHTTPServiceName, - CloudEventHTTPServiceURL: cloudEventHTTPServiceURL, - ClusterName: clusterName, + TestNamespace: namespace, + ScaledObject: scaledObjectName, + DeploymentName: deploymentName, + ClientName: clientName, + CloudEventSourceName: cloudeventSourceName, + CloudeventSourceErrName: cloudeventSourceErrName, + CloudeventSourceErrName2: cloudeventSourceErrName2, + ClusterCloudEventSourceName: clusterCloudeventSourceName, + ClusterCloudeventSourceErrName: clusterCloudeventSourceErrName, + ClusterCloudeventSourceErrName2: clusterCloudeventSourceErrName2, + CloudEventHTTPReceiverName: cloudEventHTTPReceiverName, + CloudEventHTTPServiceName: cloudEventHTTPServiceName, + CloudEventHTTPServiceURL: cloudEventHTTPServiceURL, + ClusterName: clusterName, }, []Template{ {Name: "cloudEventHTTPReceiverTemplate", Config: cloudEventHTTPReceiverTemplate}, {Name: "cloudEventHTTPServiceTemplate", Config: cloudEventHTTPServiceTemplate}, {Name: "clientTemplate", Config: clientTemplate}, - {Name: "cloudEventSourceTemplate", Config: cloudEventSourceTemplate}, } } From 54ec930e7a30d2923514ffa2df8d7d87150758c6 Mon Sep 17 00:00:00 2001 From: SpiritZhou Date: Tue, 21 May 2024 14:06:45 +0800 Subject: [PATCH 2/6] Update ChangeLog Signed-off-by: SpiritZhou --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 311e05cd298..0584e8752a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New - TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) +- **CloudEventSource**: Introduce ClusterCloudEvent ([#3533](https://github.com/kedacore/keda/issues/3533)) #### Experimental From 8457b620812153c4d6b2ad9ce5be3e07fab0a26a Mon Sep 17 00:00:00 2001 From: SpiritZhou Date: Tue, 21 May 2024 14:08:35 +0800 Subject: [PATCH 3/6] Update Signed-off-by: SpiritZhou --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0584e8752a0..951b22945c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,7 +57,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New - TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) -- **CloudEventSource**: Introduce ClusterCloudEvent ([#3533](https://github.com/kedacore/keda/issues/3533)) +- **CloudEventSource**: Introduce ClusterCloudEventSource ([#3533](https://github.com/kedacore/keda/issues/3533)) #### Experimental From 1f7444a5b2e017d0b6729d0e9957896b561c3952 Mon Sep 17 00:00:00 2001 From: SpiritZhou Date: Tue, 28 May 2024 10:14:30 +0800 Subject: [PATCH 4/6] Update CHANGELOG.md Co-authored-by: Tom Kerkhove Signed-off-by: SpiritZhou --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 951b22945c0..7941d2f57e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,7 +56,6 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New -- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) - **CloudEventSource**: Introduce ClusterCloudEventSource ([#3533](https://github.com/kedacore/keda/issues/3533)) #### Experimental From a3f71220ee8265bf931c7da5471d12c281d35b81 Mon Sep 17 00:00:00 2001 From: SpiritZhou Date: Thu, 4 Jul 2024 17:18:11 +0800 Subject: [PATCH 5/6] Refactor Signed-off-by: SpiritZhou --- .../v1alpha1/cloudeventsource_types.go | 58 ++----- .../eventing/cloudeventsource_controller.go | 117 ++------------- .../clustercloudeventsource_controller.go | 119 ++------------- controllers/eventing/finalizer.go | 18 +-- controllers/eventing/reconciler.go | 141 ++++++++++++++++++ pkg/eventemitter/eventemitter.go | 76 +++------- 6 files changed, 204 insertions(+), 325 deletions(-) create mode 100644 controllers/eventing/reconciler.go diff --git a/apis/eventing/v1alpha1/cloudeventsource_types.go b/apis/eventing/v1alpha1/cloudeventsource_types.go index f1436e23f94..c872fc6acd4 100644 --- a/apis/eventing/v1alpha1/cloudeventsource_types.go +++ b/apis/eventing/v1alpha1/cloudeventsource_types.go @@ -18,19 +18,17 @@ package v1alpha1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" v1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" ) // +kubebuilder:object:generate=false type CloudEventSourceInterface interface { - GetKind() string - GetName() string - GetNamespace() string - GetSpec() CloudEventSourceSpec - GetStatus() CloudEventSourceStatus - GetGeneration() int64 + client.Object GenerateIdentifier() string + GetSpec() *CloudEventSourceSpec + GetStatus() *CloudEventSourceStatus } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -129,28 +127,12 @@ func init() { SchemeBuilder.Register(&CloudEventSource{}, &CloudEventSourceList{}, &ClusterCloudEventSource{}, &ClusterCloudEventSourceList{}) } -func (ces *CloudEventSource) GetKind() string { - return ces.Kind +func (ces *CloudEventSource) GetSpec() *CloudEventSourceSpec { + return &ces.Spec } -func (ces *CloudEventSource) GetName() string { - return ces.Name -} - -func (ces *CloudEventSource) GetNamespace() string { - return ces.Namespace -} - -func (ces *CloudEventSource) GetSpec() CloudEventSourceSpec { - return ces.Spec -} - -func (ces *CloudEventSource) GetStatus() CloudEventSourceStatus { - return *ces.Status.DeepCopy() -} - -func (ces *CloudEventSource) GetGeneration() int64 { - return ces.Generation +func (ces *CloudEventSource) GetStatus() *CloudEventSourceStatus { + return &ces.Status } // GenerateIdentifier returns identifier for the object in for "kind.namespace.name" @@ -158,28 +140,12 @@ func (ces *CloudEventSource) GenerateIdentifier() string { return v1alpha1.GenerateIdentifier("CloudEventSource", ces.Namespace, ces.Name) } -func (cces *ClusterCloudEventSource) GetKind() string { - return cces.Kind -} - -func (cces *ClusterCloudEventSource) GetName() string { - return cces.Name -} - -func (cces *ClusterCloudEventSource) GetNamespace() string { - return cces.Namespace -} - -func (cces *ClusterCloudEventSource) GetSpec() CloudEventSourceSpec { - return cces.Spec -} - -func (cces *ClusterCloudEventSource) GetStatus() CloudEventSourceStatus { - return *cces.Status.DeepCopy() +func (cces *ClusterCloudEventSource) GetSpec() *CloudEventSourceSpec { + return &cces.Spec } -func (cces *ClusterCloudEventSource) GetGeneration() int64 { - return cces.Generation +func (cces *ClusterCloudEventSource) GetStatus() *CloudEventSourceStatus { + return &cces.Status } // GenerateIdentifier returns identifier for the object in for "kind.cluster-scoped.name" diff --git a/controllers/eventing/cloudeventsource_controller.go b/controllers/eventing/cloudeventsource_controller.go index 965d1554a47..5bb78f5e9ca 100644 --- a/controllers/eventing/cloudeventsource_controller.go +++ b/controllers/eventing/cloudeventsource_controller.go @@ -13,16 +13,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ - +// +//nolint:dupl package eventing import ( "context" "sync" - "github.com/go-logr/logr" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/client-go/tools/cache" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -32,7 +30,6 @@ import ( eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1" "github.com/kedacore/keda/v2/pkg/eventemitter" "github.com/kedacore/keda/v2/pkg/metricscollector" - kedastatus "github.com/kedacore/keda/v2/pkg/status" "github.com/kedacore/keda/v2/pkg/util" ) @@ -60,57 +57,11 @@ func NewCloudEventSourceReconciler(c client.Client, e eventemitter.EventHandler) // +kubebuilder:rbac:groups=eventing.keda.sh,resources=cloudeventsources;cloudeventsources/status,verbs="*" // Reconcile performs reconciliation on the identified EventSource resource based on the request information passed, returns the result and an error (if any). -// -//nolint:dupl + func (r *CloudEventSourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { reqLogger := log.FromContext(ctx) - // Fetch the EventSource instance cloudEventSource := &eventingv1alpha1.CloudEventSource{} - err := r.Client.Get(ctx, req.NamespacedName, cloudEventSource) - if err != nil { - if errors.IsNotFound(err) { - // Request eventSource not found, could have been deleted after reconcile request. - // Owned eventSource are automatically garbage collected. For additional cleanup logic use finalizers. - // Return and don't requeue - return ctrl.Result{}, nil - } - // Error reading the object - requeue the request. - reqLogger.Error(err, "failed to get EventSource") - return ctrl.Result{}, err - } - - reqLogger.Info("Reconciling CloudEventSource") - - if !cloudEventSource.GetDeletionTimestamp().IsZero() { - return ctrl.Result{}, FinalizeCloudEventSourceResource(ctx, reqLogger, r, cloudEventSource, req.NamespacedName.String()) - } - r.updatePromMetrics(cloudEventSource, req.NamespacedName.String()) - - // ensure finalizer is set on this CR - if err := EnsureCloudEventSourceResourceFinalizer(ctx, reqLogger, r, cloudEventSource); err != nil { - return ctrl.Result{}, err - } - - // ensure Status Conditions are initialized - if !cloudEventSource.Status.Conditions.AreInitialized() { - conditions := eventingv1alpha1.GetCloudEventSourceInitializedConditions() - if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, cloudEventSource, conditions); err != nil { - return ctrl.Result{}, err - } - } - - eventSourceChanged, err := r.cloudEventSourceGenerationChanged(reqLogger, cloudEventSource) - if err != nil { - return ctrl.Result{}, err - } - - if eventSourceChanged { - if r.requestEventLoop(ctx, reqLogger, cloudEventSource) != nil { - return ctrl.Result{}, err - } - } - - return ctrl.Result{}, nil + return Reconcile(ctx, reqLogger, r, req, cloudEventSource) } // SetupWithManager sets up the controller with the Manager. @@ -121,61 +72,19 @@ func (r *CloudEventSourceReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -// requestEventLoop tries to start EventLoop handler for the respective EventSource -func (r *CloudEventSourceReconciler) requestEventLoop(ctx context.Context, logger logr.Logger, eventSource *eventingv1alpha1.CloudEventSource) error { - logger.V(1).Info("Notify eventHandler of an update in eventSource") - - key, err := cache.MetaNamespaceKeyFunc(eventSource) - if err != nil { - logger.Error(err, "error getting key for eventSource") - return err - } - - if err = r.eventEmitter.HandleCloudEventSource(ctx, eventSource); err != nil { - return err - } - - // store CloudEventSource's current Generation - r.cloudEventSourceGenerations.Store(key, eventSource.Generation) - - return nil +func (r *CloudEventSourceReconciler) GetClient() client.Client { + return r.Client } -// stopEventLoop stops EventLoop handler for the respective EventSource -func (r *CloudEventSourceReconciler) StopEventLoop(logger logr.Logger, obj client.Object) error { - key, err := cache.MetaNamespaceKeyFunc(obj) - if err != nil { - logger.Error(err, "error getting key for eventSource") - return err - } - - if err := r.eventEmitter.DeleteCloudEventSource(obj.(*eventingv1alpha1.CloudEventSource)); err != nil { - return err - } - // delete CloudEventSource's current Generation - r.cloudEventSourceGenerations.Delete(key) - return nil +func (r *CloudEventSourceReconciler) GetEventEmitter() eventemitter.EventHandler { + return r.eventEmitter } -// eventSourceGenerationChanged returns true if CloudEventSource's Generation was changed, ie. EventSource.Spec was changed -func (r *CloudEventSourceReconciler) cloudEventSourceGenerationChanged(logger logr.Logger, eventSource *eventingv1alpha1.CloudEventSource) (bool, error) { - key, err := cache.MetaNamespaceKeyFunc(eventSource) - if err != nil { - logger.Error(err, "error getting key for eventSource") - return true, err - } - - value, loaded := r.cloudEventSourceGenerations.Load(key) - if loaded { - generation := value.(int64) - if generation == eventSource.Generation { - return false, nil - } - } - return true, nil +func (r *CloudEventSourceReconciler) GetCloudEventSourceGeneration() *sync.Map { + return r.cloudEventSourceGenerations } -func (r *CloudEventSourceReconciler) updatePromMetrics(eventSource *eventingv1alpha1.CloudEventSource, namespacedName string) { +func (r *CloudEventSourceReconciler) UpdatePromMetrics(eventSource eventingv1alpha1.CloudEventSourceInterface, namespacedName string) { r.eventSourcePromMetricsLock.Lock() defer r.eventSourcePromMetricsLock.Unlock() @@ -183,8 +92,8 @@ func (r *CloudEventSourceReconciler) updatePromMetrics(eventSource *eventingv1al metricscollector.DecrementCRDTotal(metricscollector.CloudEventSourceResource, ns) } - metricscollector.IncrementCRDTotal(metricscollector.CloudEventSourceResource, eventSource.Namespace) - r.eventSourcePromMetricsMap[namespacedName] = eventSource.Namespace + metricscollector.IncrementCRDTotal(metricscollector.CloudEventSourceResource, eventSource.GetNamespace()) + r.eventSourcePromMetricsMap[namespacedName] = eventSource.GetNamespace() } // UpdatePromMetricsOnDelete is idempotent, so it can be called multiple times without side-effects diff --git a/controllers/eventing/clustercloudeventsource_controller.go b/controllers/eventing/clustercloudeventsource_controller.go index dae9beb0a44..0ccb26f811a 100644 --- a/controllers/eventing/clustercloudeventsource_controller.go +++ b/controllers/eventing/clustercloudeventsource_controller.go @@ -13,16 +13,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ - +// +//nolint:dupl package eventing import ( "context" "sync" - "github.com/go-logr/logr" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/client-go/tools/cache" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -32,7 +30,6 @@ import ( eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1" "github.com/kedacore/keda/v2/pkg/eventemitter" "github.com/kedacore/keda/v2/pkg/metricscollector" - kedastatus "github.com/kedacore/keda/v2/pkg/status" "github.com/kedacore/keda/v2/pkg/util" ) @@ -60,58 +57,10 @@ func NewClusterCloudEventSourceReconciler(c client.Client, e eventemitter.EventH // +kubebuilder:rbac:groups=eventing.keda.sh,resources=clustercloudeventsources;clustercloudeventsources/status,verbs="*" // Reconcile performs reconciliation on the identified EventSource resource based on the request information passed, returns the result and an error (if any). -// -//nolint:dupl func (r *ClusterCloudEventSourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { reqLogger := log.FromContext(ctx) - - // Fetch the EventSource instance - clustercloudEventSource := &eventingv1alpha1.ClusterCloudEventSource{} - err := r.Client.Get(ctx, req.NamespacedName, clustercloudEventSource) - if err != nil { - if errors.IsNotFound(err) { - // Request eventSource not found, could have been deleted after reconcile request. - // Owned eventSource are automatically garbage collected. For additional cleanup logic use finalizers. - // Return and don't requeue - return ctrl.Result{}, nil - } - // Error reading the object - requeue the request. - reqLogger.Error(err, "failed to get EventSource") - return ctrl.Result{}, err - } - - reqLogger.Info("Reconciling ClusterCloudEventSource") - - if !clustercloudEventSource.GetDeletionTimestamp().IsZero() { - return ctrl.Result{}, FinalizeCloudEventSourceResource(ctx, reqLogger, r, clustercloudEventSource, req.NamespacedName.String()) - } - r.updatePromMetrics(clustercloudEventSource, req.NamespacedName.String()) - - // ensure finalizer is set on this CR - if err := EnsureCloudEventSourceResourceFinalizer(ctx, reqLogger, r, clustercloudEventSource); err != nil { - return ctrl.Result{}, err - } - - // ensure Status Conditions are initialized - if !clustercloudEventSource.Status.Conditions.AreInitialized() { - conditions := eventingv1alpha1.GetCloudEventSourceInitializedConditions() - if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, clustercloudEventSource, conditions); err != nil { - return ctrl.Result{}, err - } - } - - eventSourceChanged, err := r.cloudEventSourceGenerationChanged(reqLogger, clustercloudEventSource) - if err != nil { - return ctrl.Result{}, err - } - - if eventSourceChanged { - if r.requestEventLoop(ctx, reqLogger, clustercloudEventSource) != nil { - return ctrl.Result{}, err - } - } - - return ctrl.Result{}, nil + cloudEventSource := &eventingv1alpha1.ClusterCloudEventSource{} + return Reconcile(ctx, reqLogger, r, req, cloudEventSource) } // SetupWithManager sets up the controller with the Manager. @@ -122,61 +71,19 @@ func (r *ClusterCloudEventSourceReconciler) SetupWithManager(mgr ctrl.Manager) e Complete(r) } -// requestEventLoop tries to start EventLoop handler for the respective EventSource -func (r *ClusterCloudEventSourceReconciler) requestEventLoop(ctx context.Context, logger logr.Logger, eventSource eventingv1alpha1.CloudEventSourceInterface) error { - logger.V(1).Info("Notify eventHandler of an update in eventSource", "name", eventSource.GetName()) - - key, err := cache.MetaNamespaceKeyFunc(eventSource) - if err != nil { - logger.Error(err, "error getting key for eventSource") - return err - } - - if err = r.eventEmitter.HandleCloudEventSource(ctx, eventSource); err != nil { - return err - } - - // store ClusterCloudEventSource's current Generation - r.clusterCloudEventSourceGenerations.Store(key, eventSource.GetGeneration()) - - return nil +func (r *ClusterCloudEventSourceReconciler) GetClient() client.Client { + return r.Client } -// stopEventLoop stops EventLoop handler for the respective EventSource -func (r *ClusterCloudEventSourceReconciler) StopEventLoop(logger logr.Logger, obj client.Object) error { - key, err := cache.MetaNamespaceKeyFunc(obj) - if err != nil { - logger.Error(err, "error getting key for eventSource") - return err - } - - if err := r.eventEmitter.DeleteCloudEventSource(obj.(*eventingv1alpha1.ClusterCloudEventSource)); err != nil { - return err - } - // delete CloudEventSource's current Generation - r.clusterCloudEventSourceGenerations.Delete(key) - return nil +func (r *ClusterCloudEventSourceReconciler) GetEventEmitter() eventemitter.EventHandler { + return r.eventEmitter } -// eventSourceGenerationChanged returns true if ClusterCloudEventSource's Generation was changed, ie. EventSource.Spec was changed -func (r *ClusterCloudEventSourceReconciler) cloudEventSourceGenerationChanged(logger logr.Logger, eventSource *eventingv1alpha1.ClusterCloudEventSource) (bool, error) { - key, err := cache.MetaNamespaceKeyFunc(eventSource) - if err != nil { - logger.Error(err, "error getting key for eventSource") - return true, err - } - - value, loaded := r.clusterCloudEventSourceGenerations.Load(key) - if loaded { - generation := value.(int64) - if generation == eventSource.Generation { - return false, nil - } - } - return true, nil +func (r *ClusterCloudEventSourceReconciler) GetCloudEventSourceGeneration() *sync.Map { + return r.clusterCloudEventSourceGenerations } -func (r *ClusterCloudEventSourceReconciler) updatePromMetrics(eventSource *eventingv1alpha1.ClusterCloudEventSource, namespacedName string) { +func (r *ClusterCloudEventSourceReconciler) UpdatePromMetrics(eventSource eventingv1alpha1.CloudEventSourceInterface, namespacedName string) { r.eventSourcePromMetricsLock.Lock() defer r.eventSourcePromMetricsLock.Unlock() @@ -184,8 +91,8 @@ func (r *ClusterCloudEventSourceReconciler) updatePromMetrics(eventSource *event metricscollector.DecrementCRDTotal(metricscollector.CloudEventSourceResource, ns) } - metricscollector.IncrementCRDTotal(metricscollector.CloudEventSourceResource, eventSource.Namespace) - r.eventSourcePromMetricsMap[namespacedName] = eventSource.Namespace + metricscollector.IncrementCRDTotal(metricscollector.CloudEventSourceResource, eventSource.GetNamespace()) + r.eventSourcePromMetricsMap[namespacedName] = eventSource.GetNamespace() } // UpdatePromMetricsOnDelete is idempotent, so it can be called multiple times without side-effects diff --git a/controllers/eventing/finalizer.go b/controllers/eventing/finalizer.go index 60526789129..5c8a8f75297 100644 --- a/controllers/eventing/finalizer.go +++ b/controllers/eventing/finalizer.go @@ -30,19 +30,13 @@ const ( cloudEventSourceFinalizer = "finalizer.keda.sh" ) -type cloudEventSourceResourceReconciler interface { - client.Client - UpdatePromMetricsOnDelete(string) - StopEventLoop(logger logr.Logger, obj client.Object) error -} - -func EnsureCloudEventSourceResourceFinalizer(ctx context.Context, logger logr.Logger, reconciler cloudEventSourceResourceReconciler, cloudEventSourceResource client.Object) error { +func EnsureCloudEventSourceResourceFinalizer(ctx context.Context, logger logr.Logger, r cloudEventSourceReconcilerInterface, cloudEventSourceResource client.Object) error { if !util.Contains(cloudEventSourceResource.GetFinalizers(), cloudEventSourceFinalizer) { logger.Info(fmt.Sprintf("Adding Finalizer for the %s", cloudEventSourceResource.GetName())) cloudEventSourceResource.SetFinalizers(append(cloudEventSourceResource.GetFinalizers(), cloudEventSourceFinalizer)) // Update CR - err := reconciler.Update(ctx, cloudEventSourceResource) + err := r.GetClient().Update(ctx, cloudEventSourceResource) if err != nil { logger.Error(err, fmt.Sprintf("Failed to update %s with a finalizer", cloudEventSourceResource.GetName()), "finalizer", cloudEventSourceFinalizer) return err @@ -51,18 +45,18 @@ func EnsureCloudEventSourceResourceFinalizer(ctx context.Context, logger logr.Lo return nil } -func FinalizeCloudEventSourceResource(ctx context.Context, logger logr.Logger, reconciler cloudEventSourceResourceReconciler, cloudEventSourceResource client.Object, namespacedName string) error { +func FinalizeCloudEventSourceResource(ctx context.Context, logger logr.Logger, r cloudEventSourceReconcilerInterface, cloudEventSourceResource client.Object, namespacedName string) error { if util.Contains(cloudEventSourceResource.GetFinalizers(), cloudEventSourceFinalizer) { - if err := reconciler.StopEventLoop(logger, cloudEventSourceResource); err != nil { + if err := StopEventLoop(logger, r, cloudEventSourceResource); err != nil { return err } cloudEventSourceResource.SetFinalizers(util.Remove(cloudEventSourceResource.GetFinalizers(), cloudEventSourceFinalizer)) - if err := reconciler.Update(ctx, cloudEventSourceResource); err != nil { + if err := r.GetClient().Update(ctx, cloudEventSourceResource); err != nil { logger.Error(err, fmt.Sprintf("Failed to update %s after removing a finalizer", cloudEventSourceResource.GetName()), "finalizer", cloudEventSourceFinalizer) return err } - reconciler.UpdatePromMetricsOnDelete(namespacedName) + r.UpdatePromMetricsOnDelete(namespacedName) } logger.Info(fmt.Sprintf("Successfully finalized %s", cloudEventSourceResource.GetName())) diff --git a/controllers/eventing/reconciler.go b/controllers/eventing/reconciler.go new file mode 100644 index 00000000000..6461ee16a11 --- /dev/null +++ b/controllers/eventing/reconciler.go @@ -0,0 +1,141 @@ +/* +Copyright 2024 The KEDA Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventing + +import ( + "context" + "sync" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/tools/cache" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1" + "github.com/kedacore/keda/v2/pkg/eventemitter" + kedastatus "github.com/kedacore/keda/v2/pkg/status" +) + +type cloudEventSourceReconcilerInterface interface { + GetClient() client.Client + GetEventEmitter() eventemitter.EventHandler + GetCloudEventSourceGeneration() *sync.Map + UpdatePromMetrics(eventSource eventingv1alpha1.CloudEventSourceInterface, namespacedName string) + UpdatePromMetricsOnDelete(namespacedName string) +} + +func Reconcile(ctx context.Context, reqLogger logr.Logger, r cloudEventSourceReconcilerInterface, req ctrl.Request, cloudEventSource eventingv1alpha1.CloudEventSourceInterface) (ctrl.Result, error) { + err := r.GetClient().Get(ctx, req.NamespacedName, cloudEventSource) + if err != nil { + if errors.IsNotFound(err) { + // Request eventSource not found, could have been deleted after reconcile request. + // Owned eventSource are automatically garbage collected. For additional cleanup logic use finalizers. + // Return and don't requeue + return ctrl.Result{}, nil + } + // Error reading the object - requeue the request. + reqLogger.Error(err, "failed to get EventSource") + return ctrl.Result{}, err + } + + reqLogger.Info("Reconciling CloudEventSource") + + if !cloudEventSource.GetDeletionTimestamp().IsZero() { + return ctrl.Result{}, FinalizeCloudEventSourceResource(ctx, reqLogger, r, cloudEventSource, req.NamespacedName.String()) + } + r.UpdatePromMetrics(cloudEventSource, req.NamespacedName.String()) + + // ensure finalizer is set on this CR + if err := EnsureCloudEventSourceResourceFinalizer(ctx, reqLogger, r, cloudEventSource); err != nil { + return ctrl.Result{}, err + } + + // ensure Status Conditions are initialized + if !cloudEventSource.GetStatus().Conditions.AreInitialized() { + conditions := eventingv1alpha1.GetCloudEventSourceInitializedConditions() + if err := kedastatus.SetStatusConditions(ctx, r.GetClient(), reqLogger, cloudEventSource, conditions); err != nil { + return ctrl.Result{}, err + } + } + + eventSourceChanged, err := CloudEventSourceGenerationChanged(reqLogger, r, cloudEventSource) + if err != nil { + return ctrl.Result{}, err + } + + if eventSourceChanged { + if RequestEventLoop(ctx, reqLogger, r, cloudEventSource) != nil { + return ctrl.Result{}, err + } + } + + return ctrl.Result{}, nil +} + +// requestEventLoop tries to start EventLoop handler for the respective EventSource +func RequestEventLoop(ctx context.Context, logger logr.Logger, r cloudEventSourceReconcilerInterface, eventSourceI eventingv1alpha1.CloudEventSourceInterface) error { + logger.V(1).Info("Notify eventHandler of an update in eventSource") + + key, err := cache.MetaNamespaceKeyFunc(eventSourceI) + if err != nil { + logger.Error(err, "error getting key for eventSource") + return err + } + + if err = r.GetEventEmitter().HandleCloudEventSource(ctx, eventSourceI); err != nil { + return err + } + + // store CloudEventSource's current Generation + r.GetCloudEventSourceGeneration().Store(key, eventSourceI.GetGeneration()) + return nil +} + +// stopEventLoop stops EventLoop handler for the respective EventSource +func StopEventLoop(logger logr.Logger, r cloudEventSourceReconcilerInterface, obj client.Object) error { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + logger.Error(err, "error getting key for eventSource") + return err + } + + if err := r.GetEventEmitter().DeleteCloudEventSource(obj.(eventingv1alpha1.CloudEventSourceInterface)); err != nil { + return err + } + // delete CloudEventSource's current Generation + r.GetCloudEventSourceGeneration().Delete(key) + return nil +} + +// eventSourceGenerationChanged returns true if CloudEventSource's Generation was changed, ie. EventSource.Spec was changed +func CloudEventSourceGenerationChanged(logger logr.Logger, r cloudEventSourceReconcilerInterface, eventSourceI eventingv1alpha1.CloudEventSourceInterface) (bool, error) { + key, err := cache.MetaNamespaceKeyFunc(eventSourceI) + if err != nil { + logger.Error(err, "error getting key for eventSource") + return true, err + } + + value, loaded := r.GetCloudEventSourceGeneration().Load(key) + if loaded { + generation := value.(int64) + if generation == eventSourceI.GetGeneration() { + return false, nil + } + } + return true, nil +} diff --git a/pkg/eventemitter/eventemitter.go b/pkg/eventemitter/eventemitter.go index 0d5d6fdae95..91d57f3ca4d 100644 --- a/pkg/eventemitter/eventemitter.go +++ b/pkg/eventemitter/eventemitter.go @@ -113,7 +113,7 @@ func NewEventEmitter(client client.Client, recorder record.EventRecorder, cluste } func initializeLogger(cloudEventSourceI eventingv1alpha1.CloudEventSourceInterface, cloudEventSourceEmitterName string) logr.Logger { - return logf.Log.WithName(cloudEventSourceEmitterName).WithValues("type", cloudEventSourceI.GetKind(), "namespace", cloudEventSourceI.GetNamespace(), "name", cloudEventSourceI.GetName()) + return logf.Log.WithName(cloudEventSourceEmitterName).WithValues("type", cloudEventSourceI.GetObjectKind(), "namespace", cloudEventSourceI.GetNamespace(), "name", cloudEventSourceI.GetName()) } // HandleCloudEventSource will create CloudEventSource handlers that defined in spec and start an event loop once handlers @@ -147,15 +147,8 @@ func (e *EventEmitter) HandleCloudEventSource(ctx context.Context, cloudEventSou eventingMutex := &sync.Mutex{} // passing deep copy of CloudEventSource to the eventLoop go routines, it's a precaution to not have global objects shared between threads - switch obj := cloudEventSourceI.(type) { - case *eventingv1alpha1.CloudEventSource: - go e.startEventLoop(cancelCtx, obj.DeepCopy(), eventingMutex) - case *eventingv1alpha1.ClusterCloudEventSource: - go e.startClusterEventLoop(cancelCtx, obj.DeepCopy(), eventingMutex) - default: - return nil - } - + e.log.V(1).Info("Start CloudEventSource loop.") + go e.startEventLoop(cancelCtx, cloudEventSourceI.DeepCopyObject().(eventingv1alpha1.CloudEventSourceInterface), eventingMutex) return nil } @@ -283,35 +276,18 @@ func (e *EventEmitter) checkIfEventHandlersExist(cloudEventSource eventingv1alph return false } -func (e *EventEmitter) startEventLoop(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource, cloudEventSourceMutex sync.Locker) { - e.log.V(1).Info("Start CloudEventSource loop.", "name", cloudEventSource.GetName()) +func (e *EventEmitter) startEventLoop(ctx context.Context, cloudEventSourceI eventingv1alpha1.CloudEventSourceInterface, cloudEventSourceMutex sync.Locker) { + e.log.V(1).Info("Start CloudEventSource loop.", "name", cloudEventSourceI.GetName()) for { select { case eventData := <-e.cloudEventProcessingChan: - e.log.V(1).Info("Consuming events from CloudEventSource.", "name", cloudEventSource.GetName()) + e.log.V(1).Info("Consuming events from CloudEventSource.", "name", cloudEventSourceI.GetName()) e.emitEventByHandler(eventData) - e.checkEventHandlers(ctx, cloudEventSource, cloudEventSourceMutex) - metricscollector.RecordCloudEventQueueStatus(cloudEventSource.GetNamespace(), len(e.cloudEventProcessingChan)) + e.checkEventHandlers(ctx, cloudEventSourceI, cloudEventSourceMutex) + metricscollector.RecordCloudEventQueueStatus(cloudEventSourceI.GetNamespace(), len(e.cloudEventProcessingChan)) case <-ctx.Done(): e.log.V(1).Info("CloudEventSource loop has stopped.") - metricscollector.RecordCloudEventQueueStatus(cloudEventSource.GetNamespace(), len(e.cloudEventProcessingChan)) - return - } - } -} - -func (e *EventEmitter) startClusterEventLoop(ctx context.Context, clusterCloudEventSource *eventingv1alpha1.ClusterCloudEventSource, cloudEventSourceMutex sync.Locker) { - e.log.V(1).Info("Start CloudEventSource loop.", "name", clusterCloudEventSource.GetName()) - for { - select { - case eventData := <-e.cloudEventProcessingChan: - e.log.V(1).Info("Consuming events from ClusterCloudEventSource.", "name", clusterCloudEventSource.GetName()) - e.emitEventByHandler(eventData) - e.checkEventHandlers(ctx, clusterCloudEventSource, cloudEventSourceMutex) - metricscollector.RecordCloudEventQueueStatus(clusterCloudEventSource.GetNamespace(), len(e.cloudEventProcessingChan)) - case <-ctx.Done(): - e.log.V(1).Info("ClusterCloudEventSource loop has stopped.") - metricscollector.RecordCloudEventQueueStatus(clusterCloudEventSource.GetNamespace(), len(e.cloudEventProcessingChan)) + metricscollector.RecordCloudEventQueueStatus(cloudEventSourceI.GetNamespace(), len(e.cloudEventProcessingChan)) return } } @@ -319,35 +295,22 @@ func (e *EventEmitter) startClusterEventLoop(ctx context.Context, clusterCloudEv // checkEventHandlers will check each eventhandler active status func (e *EventEmitter) checkEventHandlers(ctx context.Context, cloudEventSourceI eventingv1alpha1.CloudEventSourceInterface, cloudEventSourceMutex sync.Locker) { - e.log.V(1).Info("Checking event handlers status.", "name", cloudEventSourceI.GetName()) + e.log.V(1).Info("Checking event handlers status.") cloudEventSourceMutex.Lock() defer cloudEventSourceMutex.Unlock() // Get the latest object - switch cloudEventSourceI.(type) { - case *eventingv1alpha1.CloudEventSource: - cloudEventSource := &eventingv1alpha1.CloudEventSource{} - err := e.client.Get(ctx, types.NamespacedName{Name: cloudEventSourceI.GetName(), Namespace: cloudEventSourceI.GetNamespace()}, cloudEventSource) - if err != nil { - e.log.Error(err, "error getting cloudEventSource", "cloudEventSource", cloudEventSource) - } - cloudEventSourceI = cloudEventSource - case *eventingv1alpha1.ClusterCloudEventSource: - clustercloudEventSource := &eventingv1alpha1.ClusterCloudEventSource{} - err := e.client.Get(ctx, types.NamespacedName{Name: cloudEventSourceI.GetName(), Namespace: cloudEventSourceI.GetNamespace()}, clustercloudEventSource) - if err != nil { - e.log.Error(err, "error getting clustercloudEventSource", "clustercloudEventSource", clustercloudEventSource) - } - cloudEventSourceI = clustercloudEventSource - default: + err := e.client.Get(ctx, types.NamespacedName{Name: cloudEventSourceI.GetName(), Namespace: cloudEventSourceI.GetNamespace()}, cloudEventSourceI) + if err != nil { + e.log.Error(err, "error getting cloudEventSource", "cloudEventSource", cloudEventSourceI) + return } - keyPrefix := cloudEventSourceI.GenerateIdentifier() needUpdate := false - cloudEventSourceStatus := cloudEventSourceI.GetStatus() + cloudEventSourceStatus := cloudEventSourceI.GetStatus().DeepCopy() for k, v := range e.eventHandlersCache { - e.log.V(1).Info("Checking event handler status.", "handler", k, "status", cloudEventSourceStatus.Conditions.GetActiveCondition().Status) + e.log.V(1).Info("Checking event handler status.", "handler", k, "status", cloudEventSourceI.GetStatus().Conditions.GetActiveCondition().Status) if strings.Contains(k, keyPrefix) { - if v.GetActiveStatus() != cloudEventSourceStatus.Conditions.GetActiveCondition().Status { + if v.GetActiveStatus() != cloudEventSourceI.GetStatus().Conditions.GetActiveCondition().Status { needUpdate = true cloudEventSourceStatus.Conditions.SetActiveCondition( metav1.ConditionFalse, @@ -357,7 +320,6 @@ func (e *EventEmitter) checkEventHandlers(ctx context.Context, cloudEventSourceI } } } - if needUpdate { if updateErr := e.updateCloudEventSourceStatus(ctx, cloudEventSourceI, cloudEventSourceStatus); updateErr != nil { e.log.Error(updateErr, "Failed to update CloudEventSource status") @@ -475,7 +437,7 @@ func (e *EventEmitter) setCloudEventSourceStatusActive(ctx context.Context, clou return e.updateCloudEventSourceStatus(ctx, cloudEventSourceI, cloudEventSourceStatus) } -func (e *EventEmitter) updateCloudEventSourceStatus(ctx context.Context, cloudEventSourceI eventingv1alpha1.CloudEventSourceInterface, cloudEventSourceStatus eventingv1alpha1.CloudEventSourceStatus) error { +func (e *EventEmitter) updateCloudEventSourceStatus(ctx context.Context, cloudEventSourceI eventingv1alpha1.CloudEventSourceInterface, cloudEventSourceStatus *eventingv1alpha1.CloudEventSourceStatus) error { e.log.V(1).Info("Updating CloudEventSource status", "CloudEventSource", cloudEventSourceI.GetName()) transform := func(runtimeObj client.Object, target interface{}) error { status, ok := target.(eventingv1alpha1.CloudEventSourceStatus) @@ -494,7 +456,7 @@ func (e *EventEmitter) updateCloudEventSourceStatus(ctx context.Context, cloudEv return nil } - if err := kedastatus.TransformObject(ctx, e.client, e.log, cloudEventSourceI, cloudEventSourceStatus, transform); err != nil { + if err := kedastatus.TransformObject(ctx, e.client, e.log, cloudEventSourceI, *cloudEventSourceStatus, transform); err != nil { e.log.Error(err, "Failed to update CloudEventSourceStatus") return err } From 50a19f08bf4bde849721c3f9d51e38a02dc45782 Mon Sep 17 00:00:00 2001 From: SpiritZhou Date: Tue, 16 Jul 2024 15:15:44 +0800 Subject: [PATCH 6/6] Update Signed-off-by: SpiritZhou --- ...eventing_v1alpha1_clustercloudeventsource.yaml | 15 +++++++++++++++ config/samples/kustomization.yaml | 1 + 2 files changed, 16 insertions(+) create mode 100644 config/samples/eventing_v1alpha1_clustercloudeventsource.yaml diff --git a/config/samples/eventing_v1alpha1_clustercloudeventsource.yaml b/config/samples/eventing_v1alpha1_clustercloudeventsource.yaml new file mode 100644 index 00000000000..afa6ff0aaa7 --- /dev/null +++ b/config/samples/eventing_v1alpha1_clustercloudeventsource.yaml @@ -0,0 +1,15 @@ +apiVersion: eventing.keda.sh/v1alpha1 +kind: ClusterCloudEventSource +metadata: + labels: + app.kubernetes.io/name: clustercloudeventsource + app.kubernetes.io/instance: clustercloudeventsource-sample + app.kubernetes.io/part-of: keda + app.kubernetes.io/managed-by: kustomize + app.kubernetes.io/created-by: keda + name: clustercloudeventsource-sample +spec: + clusterName: clustercluster-sample + destination: + http: + uri: http://foo.bar diff --git a/config/samples/kustomization.yaml b/config/samples/kustomization.yaml index 94ec29e2c3a..fc2b1f3e427 100644 --- a/config/samples/kustomization.yaml +++ b/config/samples/kustomization.yaml @@ -1,6 +1,7 @@ ## Append samples you want in your CSV to this file as resources ## resources: - eventing_v1alpha1_cloudeventsource.yaml +- eventing_v1alpha1_clustercloudeventsource.yaml - keda_v1alpha1_clustertriggerauthentication.yaml - keda_v1alpha1_scaledobject.yaml - keda_v1alpha1_scaledjob.yaml