Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Proposal for PR changes on ClusterCloudEventSource #5924

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions apis/eventing/v1alpha1/cloudeventsource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,32 @@ package v1alpha1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

v1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
)

type CloudEventSourceInterface interface {
client.Object
GenerateIdentifier() string
GetSpec() *CloudEventSourceSpec
GetStatus() *CloudEventSourceStatus
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// ClusterCloudEventSource defines how a KEDA event will be sent to event sink on cluster level
// +kubebuilder:resource:path=clustercloudeventsources,scope=Cluster
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Active",type="string",JSONPath=".status.conditions[?(@.type==\"Active\")].status"
type ClusterCloudEventSource struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec CloudEventSourceSpec `json:"spec"`
Status CloudEventSourceStatus `json:"status,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// CloudEventSource defines how a KEDA event will be sent to event sink
Expand All @@ -38,6 +60,15 @@ type CloudEventSource struct {

// +kubebuilder:object:root=true

// ClusterCloudEventSourceList is a list of ClusterCloudEventSource resources
type ClusterCloudEventSourceList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`
Items []ClusterCloudEventSource `json:"items"`
}

// +kubebuilder:object:root=true

// CloudEventSourceList is a list of CloudEventSource resources
type CloudEventSourceList struct {
metav1.TypeMeta `json:",inline"`
Expand Down Expand Up @@ -94,13 +125,25 @@ type EventSubscription struct {

func init() {
SchemeBuilder.Register(&CloudEventSource{}, &CloudEventSourceList{})
SchemeBuilder.Register(&ClusterCloudEventSource{}, &ClusterCloudEventSourceList{})
}

// GenerateIdentifier returns identifier for the object in for "kind.namespace.name"
func (ces *CloudEventSource) GenerateIdentifier() string {
return v1alpha1.GenerateIdentifier("CloudEventSource", ces.Namespace, ces.Name)
}

func (ces *CloudEventSource) GetSpec() *CloudEventSourceSpec { return &ces.Spec }
func (ces *CloudEventSource) GetStatus() *CloudEventSourceStatus { return &ces.Status }

// GenerateIdentifier returns identifier for the object in for "kind.namespace.name"
func (ces *ClusterCloudEventSource) GenerateIdentifier() string {
return v1alpha1.GenerateIdentifier("ClusterCloudEventSource", "cluster-scoped", ces.Name)
}

func (ces *ClusterCloudEventSource) GetSpec() *CloudEventSourceSpec { return &ces.Spec }
func (ces *ClusterCloudEventSource) GetStatus() *CloudEventSourceStatus { return &ces.Status }

// GetCloudEventSourceInitializedConditions returns CloudEventSource Conditions initialized to the default -> Status: Unknown
func GetCloudEventSourceInitializedConditions() *v1alpha1.Conditions {
return &v1alpha1.Conditions{{Type: v1alpha1.ConditionActive, Status: metav1.ConditionUnknown}}
Expand Down
59 changes: 59 additions & 0 deletions apis/eventing/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

140 changes: 140 additions & 0 deletions config/crd/bases/eventing.keda.sh_clustercloudeventsources.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.14.0
name: clustercloudeventsources.eventing.keda.sh
spec:
group: eventing.keda.sh
names:
kind: ClusterCloudEventSource
listKind: ClusterCloudEventSourceList
plural: clustercloudeventsources
singular: clustercloudeventsource
scope: Cluster
versions:
- additionalPrinterColumns:
- jsonPath: .status.conditions[?(@.type=="Active")].status
name: Active
type: string
name: v1alpha1
schema:
openAPIV3Schema:
description: ClusterCloudEventSource defines how a KEDA event will be sent
to event sink on cluster level
properties:
apiVersion:
description: |-
APIVersion defines the versioned schema of this representation of an object.
Servers should convert recognized schemas to the latest internal value, and
may reject unrecognized values.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
type: string
kind:
description: |-
Kind is a string value representing the REST resource this object represents.
Servers may infer this from the endpoint the client submits requests to.
Cannot be updated.
In CamelCase.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
type: string
metadata:
type: object
spec:
description: CloudEventSourceSpec defines the spec of CloudEventSource
properties:
authenticationRef:
description: |-
AuthenticationRef points to the TriggerAuthentication or ClusterTriggerAuthentication object that
is used to authenticate the scaler with the environment
properties:
kind:
description: Kind of the resource being referred to. Defaults
to TriggerAuthentication.
type: string
name:
type: string
required:
- name
type: object
clusterName:
type: string
destination:
description: Destination defines the various ways to emit events
properties:
azureEventGridTopic:
properties:
endpoint:
type: string
required:
- endpoint
type: object
http:
properties:
uri:
type: string
required:
- uri
type: object
type: object
eventSubscription:
description: EventSubscription defines filters for events
properties:
excludedEventTypes:
items:
description: CloudEventType contains the list of cloudevent
types
enum:
- keda.scaledobject.ready.v1
- keda.scaledobject.failed.v1
type: string
type: array
includedEventTypes:
items:
description: CloudEventType contains the list of cloudevent
types
enum:
- keda.scaledobject.ready.v1
- keda.scaledobject.failed.v1
type: string
type: array
type: object
required:
- destination
type: object
status:
description: CloudEventSourceStatus defines the observed state of CloudEventSource
properties:
conditions:
description: Conditions an array representation to store multiple
Conditions
items:
description: Condition to store the condition state
properties:
message:
description: A human readable message indicating details about
the transition.
type: string
reason:
description: The reason for the condition's last transition.
type: string
status:
description: Status of the condition, one of True, False, Unknown.
type: string
type:
description: Type of condition
type: string
required:
- status
- type
type: object
type: array
type: object
required:
- spec
type: object
served: true
storage: true
subresources:
status: {}
39 changes: 28 additions & 11 deletions controllers/eventing/cloudeventsource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"

Expand Down Expand Up @@ -64,7 +65,12 @@ func (r *CloudEventSourceReconciler) Reconcile(ctx context.Context, req ctrl.Req
reqLogger := log.FromContext(ctx)

// Fetch the EventSource instance
cloudEventSource := &eventingv1alpha1.CloudEventSource{}
var cloudEventSource eventingv1alpha1.CloudEventSourceInterface
if req.Namespace != "" {
cloudEventSource = &eventingv1alpha1.CloudEventSource{}
} else {
cloudEventSource = &eventingv1alpha1.ClusterCloudEventSource{}
}
err := r.Client.Get(ctx, req.NamespacedName, cloudEventSource)
if err != nil {
if errors.IsNotFound(err) {
Expand All @@ -91,7 +97,7 @@ func (r *CloudEventSourceReconciler) Reconcile(ctx context.Context, req ctrl.Req
}

// ensure Status Conditions are initialized
if !cloudEventSource.Status.Conditions.AreInitialized() {
if !cloudEventSource.GetStatus().Conditions.AreInitialized() {
conditions := eventingv1alpha1.GetCloudEventSourceInitializedConditions()
if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, cloudEventSource, conditions); err != nil {
return ctrl.Result{}, err
Expand All @@ -114,14 +120,25 @@ func (r *CloudEventSourceReconciler) Reconcile(ctx context.Context, req ctrl.Req

// SetupWithManager sets up the controller with the Manager.
func (r *CloudEventSourceReconciler) SetupWithManager(mgr ctrl.Manager) error {
nsFilter := util.IgnoreOtherNamespaces()
filter := predicate.Funcs{
GenericFunc: func(e event.GenericEvent) bool {
if e.Object.GetNamespace() == "" {
return true
}
return nsFilter.Generic(e)
},
}

return ctrl.NewControllerManagedBy(mgr).
For(&eventingv1alpha1.ClusterCloudEventSource{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reconciliation cannot accept multiple objects. I think we should still set up multiple controllers and eliminate the duplicates through abstraction.

Copy link
Member Author

@wozniakjan wozniakjan Jul 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, you are right, it's been a long time since I used that pattern. Looks like this was supported in controller-runtime older than 0.7.0 but is no longer an option since kubernetes-sigs/controller-runtime#1176

For(&eventingv1alpha1.CloudEventSource{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
WithEventFilter(util.IgnoreOtherNamespaces()).
WithEventFilter(filter).
Complete(r)
}

// requestEventLoop tries to start EventLoop handler for the respective EventSource
func (r *CloudEventSourceReconciler) requestEventLoop(ctx context.Context, logger logr.Logger, eventSource *eventingv1alpha1.CloudEventSource) error {
func (r *CloudEventSourceReconciler) requestEventLoop(ctx context.Context, logger logr.Logger, eventSource eventingv1alpha1.CloudEventSourceInterface) error {
logger.V(1).Info("Notify eventHandler of an update in eventSource")

key, err := cache.MetaNamespaceKeyFunc(eventSource)
Expand All @@ -135,13 +152,13 @@ func (r *CloudEventSourceReconciler) requestEventLoop(ctx context.Context, logge
}

// store CloudEventSource's current Generation
r.cloudEventSourceGenerations.Store(key, eventSource.Generation)
r.cloudEventSourceGenerations.Store(key, eventSource.GetGeneration())

return nil
}

// stopEventLoop stops EventLoop handler for the respective EventSource
func (r *CloudEventSourceReconciler) stopEventLoop(logger logr.Logger, eventSource *eventingv1alpha1.CloudEventSource) error {
func (r *CloudEventSourceReconciler) stopEventLoop(logger logr.Logger, eventSource eventingv1alpha1.CloudEventSourceInterface) error {
key, err := cache.MetaNamespaceKeyFunc(eventSource)
if err != nil {
logger.Error(err, "error getting key for eventSource")
Expand All @@ -157,7 +174,7 @@ func (r *CloudEventSourceReconciler) stopEventLoop(logger logr.Logger, eventSour
}

// eventSourceGenerationChanged returns true if CloudEventSource's Generation was changed, ie. EventSource.Spec was changed
func (r *CloudEventSourceReconciler) cloudEventSourceGenerationChanged(logger logr.Logger, eventSource *eventingv1alpha1.CloudEventSource) (bool, error) {
func (r *CloudEventSourceReconciler) cloudEventSourceGenerationChanged(logger logr.Logger, eventSource eventingv1alpha1.CloudEventSourceInterface) (bool, error) {
key, err := cache.MetaNamespaceKeyFunc(eventSource)
if err != nil {
logger.Error(err, "error getting key for eventSource")
Expand All @@ -167,23 +184,23 @@ func (r *CloudEventSourceReconciler) cloudEventSourceGenerationChanged(logger lo
value, loaded := r.cloudEventSourceGenerations.Load(key)
if loaded {
generation := value.(int64)
if generation == eventSource.Generation {
if generation == eventSource.GetGeneration() {
return false, nil
}
}
return true, nil
}

func (r *CloudEventSourceReconciler) updatePromMetrics(eventSource *eventingv1alpha1.CloudEventSource, namespacedName string) {
func (r *CloudEventSourceReconciler) updatePromMetrics(eventSource eventingv1alpha1.CloudEventSourceInterface, namespacedName string) {
r.eventSourcePromMetricsLock.Lock()
defer r.eventSourcePromMetricsLock.Unlock()

if ns, ok := r.eventSourcePromMetricsMap[namespacedName]; ok {
metricscollector.DecrementCRDTotal(metricscollector.CloudEventSourceResource, ns)
}

metricscollector.IncrementCRDTotal(metricscollector.CloudEventSourceResource, eventSource.Namespace)
r.eventSourcePromMetricsMap[namespacedName] = eventSource.Namespace
metricscollector.IncrementCRDTotal(metricscollector.CloudEventSourceResource, eventSource.GetNamespace())
r.eventSourcePromMetricsMap[namespacedName] = eventSource.GetNamespace()
}

// UpdatePromMetricsOnDelete is idempotent, so it can be called multiple times without side-effects
Expand Down
Loading
Loading