Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce ClusterCloudEventSource #5816

Merged
merged 6 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### New

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **CloudEventSource**: Introduce ClusterCloudEventSource ([#3533](https://github.com/kedacore/keda/issues/3533))
SpiritZhou marked this conversation as resolved.
Show resolved Hide resolved

#### Experimental

Expand Down
54 changes: 53 additions & 1 deletion apis/eventing/v1alpha1/cloudeventsource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,19 @@ package v1alpha1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

v1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
)

// +kubebuilder:object:generate=false
type CloudEventSourceInterface interface {
client.Object
GenerateIdentifier() string
GetSpec() *CloudEventSourceSpec
GetStatus() *CloudEventSourceStatus
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// CloudEventSource defines how a KEDA event will be sent to event sink
Expand All @@ -45,6 +54,28 @@ type CloudEventSourceList struct {
Items []CloudEventSource `json:"items"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// +kubebuilder:resource:path=clustercloudeventsources,scope=Cluster
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Active",type="string",JSONPath=".status.conditions[?(@.type==\"Active\")].status"
type ClusterCloudEventSource struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec CloudEventSourceSpec `json:"spec"`
Status CloudEventSourceStatus `json:"status,omitempty"`
}

// +kubebuilder:object:root=true

// ClusterCloudEventSourceList is a list of ClusterCloudEventSource resources
type ClusterCloudEventSourceList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`
Items []ClusterCloudEventSource `json:"items"`
}

// CloudEventSourceSpec defines the spec of CloudEventSource
type CloudEventSourceSpec struct {
// +optional
Expand Down Expand Up @@ -93,14 +124,35 @@ type EventSubscription struct {
}

func init() {
SchemeBuilder.Register(&CloudEventSource{}, &CloudEventSourceList{})
SchemeBuilder.Register(&CloudEventSource{}, &CloudEventSourceList{}, &ClusterCloudEventSource{}, &ClusterCloudEventSourceList{})
}

func (ces *CloudEventSource) GetSpec() *CloudEventSourceSpec {
return &ces.Spec
}

func (ces *CloudEventSource) GetStatus() *CloudEventSourceStatus {
return &ces.Status
}

// GenerateIdentifier returns identifier for the object in for "kind.namespace.name"
func (ces *CloudEventSource) GenerateIdentifier() string {
return v1alpha1.GenerateIdentifier("CloudEventSource", ces.Namespace, ces.Name)
}

func (cces *ClusterCloudEventSource) GetSpec() *CloudEventSourceSpec {
return &cces.Spec
}

func (cces *ClusterCloudEventSource) GetStatus() *CloudEventSourceStatus {
return &cces.Status
}

// GenerateIdentifier returns identifier for the object in for "kind.cluster-scoped.name"
func (cces *ClusterCloudEventSource) GenerateIdentifier() string {
return v1alpha1.GenerateIdentifier("ClusterCloudEventSource", "cluster-scoped", cces.Name)
}

// GetCloudEventSourceInitializedConditions returns CloudEventSource Conditions initialized to the default -> Status: Unknown
func GetCloudEventSourceInitializedConditions() *v1alpha1.Conditions {
return &v1alpha1.Conditions{{Type: v1alpha1.ConditionActive, Status: metav1.ConditionUnknown}}
Expand Down
37 changes: 35 additions & 2 deletions apis/eventing/v1alpha1/cloudeventsource_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ func (ces *CloudEventSource) SetupWebhookWithManager(mgr ctrl.Manager) error {
Complete()
}

func (cces *ClusterCloudEventSource) SetupWebhookWithManager(mgr ctrl.Manager) error {
return ctrl.NewWebhookManagedBy(mgr).
For(cces).
Complete()
}

// +kubebuilder:webhook:path=/validate-eventing-keda-sh-v1alpha1-cloudeventsource,mutating=false,failurePolicy=ignore,sideEffects=None,groups=eventing.keda.sh,resources=cloudeventsources,verbs=create;update,versions=v1alpha1,name=vcloudeventsource.kb.io,admissionReviewVersions=v1

var _ webhook.Validator = &CloudEventSource{}
Expand Down Expand Up @@ -64,6 +70,33 @@ func (ces *CloudEventSource) ValidateDelete() (admission.Warnings, error) {
return nil, nil
}

// +kubebuilder:webhook:path=/validate-eventing-keda-sh-v1alpha1-clustercloudeventsource,mutating=false,failurePolicy=ignore,sideEffects=None,groups=eventing.keda.sh,resources=clustercloudeventsources,verbs=create;update,versions=v1alpha1,name=vclustercloudeventsource.kb.io,admissionReviewVersions=v1

var _ webhook.Validator = &ClusterCloudEventSource{}

// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
func (cces *ClusterCloudEventSource) ValidateCreate() (admission.Warnings, error) {
val, _ := json.MarshalIndent(cces, "", " ")
cloudeventsourcelog.Info(fmt.Sprintf("validating clustercloudeventsource creation for %s", string(val)))
return validateSpec(&cces.Spec)
}

func (cces *ClusterCloudEventSource) ValidateUpdate(old runtime.Object) (admission.Warnings, error) {
val, _ := json.MarshalIndent(cces, "", " ")
cloudeventsourcelog.V(1).Info(fmt.Sprintf("validating clustercloudeventsource update for %s", string(val)))

oldCes := old.(*ClusterCloudEventSource)
if isCloudEventSourceRemovingFinalizer(cces.ObjectMeta, oldCes.ObjectMeta, cces.Spec, oldCes.Spec) {
cloudeventsourcelog.V(1).Info("finalizer removal, skipping validation")
return nil, nil
}
return validateSpec(&cces.Spec)
}

func (cces *ClusterCloudEventSource) ValidateDelete() (admission.Warnings, error) {
return nil, nil
}

func isCloudEventSourceRemovingFinalizer(om metav1.ObjectMeta, oldOm metav1.ObjectMeta, spec CloudEventSourceSpec, oldSpec CloudEventSourceSpec) bool {
cesSpec, _ := json.MarshalIndent(spec, "", " ")
oldCesSpec, _ := json.MarshalIndent(oldSpec, "", " ")
Expand All @@ -81,15 +114,15 @@ func validateSpec(spec *CloudEventSourceSpec) (admission.Warnings, error) {
if spec.EventSubscription.ExcludedEventTypes != nil {
for _, excludedEventType := range spec.EventSubscription.ExcludedEventTypes {
if !slices.Contains(AllEventTypes, excludedEventType) {
return nil, fmt.Errorf("excludedEventType: %s in cloudeventsource spec is not supported", excludedEventType)
return nil, fmt.Errorf("excludedEventType: %s in cloudeventsource/clustercloudeventsource spec is not supported", excludedEventType)
}
}
}

if spec.EventSubscription.IncludedEventTypes != nil {
for _, includedEventType := range spec.EventSubscription.IncludedEventTypes {
if !slices.Contains(AllEventTypes, includedEventType) {
return nil, fmt.Errorf("includedEventType: %s in cloudeventsource spec is not supported", includedEventType)
return nil, fmt.Errorf("includedEventType: %s in cloudeventsource/clustercloudeventsource spec is not supported", includedEventType)
}
}
}
Expand Down
59 changes: 59 additions & 0 deletions apis/eventing/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,13 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "CloudEventSource")
os.Exit(1)
}
if err = (eventingcontrollers.NewClusterCloudEventSourceReconciler(
mgr.GetClient(),
eventEmitter,
)).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterCloudEventSource")
os.Exit(1)
}
SpiritZhou marked this conversation as resolved.
Show resolved Hide resolved
//+kubebuilder:scaffold:builder

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cmd/webhooks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,8 @@ func setupWebhook(mgr manager.Manager) {
setupLog.Error(err, "unable to create webhook", "webhook", "CloudEventSource")
os.Exit(1)
}
if err := (&eventingv1alpha1.ClusterCloudEventSource{}).SetupWebhookWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "ClusterCloudEventSource")
os.Exit(1)
}
}
138 changes: 138 additions & 0 deletions config/crd/bases/eventing.keda.sh_clustercloudeventsources.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.14.0
name: clustercloudeventsources.eventing.keda.sh
spec:
group: eventing.keda.sh
names:
kind: ClusterCloudEventSource
listKind: ClusterCloudEventSourceList
plural: clustercloudeventsources
singular: clustercloudeventsource
scope: Cluster
versions:
- additionalPrinterColumns:
- jsonPath: .status.conditions[?(@.type=="Active")].status
name: Active
type: string
name: v1alpha1
schema:
openAPIV3Schema:
properties:
apiVersion:
description: |-
APIVersion defines the versioned schema of this representation of an object.
Servers should convert recognized schemas to the latest internal value, and
may reject unrecognized values.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
type: string
kind:
description: |-
Kind is a string value representing the REST resource this object represents.
Servers may infer this from the endpoint the client submits requests to.
Cannot be updated.
In CamelCase.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
type: string
metadata:
type: object
spec:
description: CloudEventSourceSpec defines the spec of CloudEventSource
properties:
authenticationRef:
description: |-
AuthenticationRef points to the TriggerAuthentication or ClusterTriggerAuthentication object that
is used to authenticate the scaler with the environment
properties:
kind:
description: Kind of the resource being referred to. Defaults
to TriggerAuthentication.
type: string
name:
type: string
required:
- name
type: object
clusterName:
type: string
destination:
description: Destination defines the various ways to emit events
properties:
azureEventGridTopic:
properties:
endpoint:
type: string
required:
- endpoint
type: object
http:
properties:
uri:
type: string
required:
- uri
type: object
type: object
eventSubscription:
description: EventSubscription defines filters for events
properties:
excludedEventTypes:
items:
description: CloudEventType contains the list of cloudevent
types
enum:
- keda.scaledobject.ready.v1
- keda.scaledobject.failed.v1
type: string
type: array
includedEventTypes:
items:
description: CloudEventType contains the list of cloudevent
types
enum:
- keda.scaledobject.ready.v1
- keda.scaledobject.failed.v1
type: string
type: array
type: object
required:
- destination
type: object
status:
description: CloudEventSourceStatus defines the observed state of CloudEventSource
properties:
conditions:
description: Conditions an array representation to store multiple
Conditions
items:
description: Condition to store the condition state
properties:
message:
description: A human readable message indicating details about
the transition.
type: string
reason:
description: The reason for the condition's last transition.
type: string
status:
description: Status of the condition, one of True, False, Unknown.
type: string
type:
description: Type of condition
type: string
required:
- status
- type
type: object
type: array
type: object
required:
- spec
type: object
served: true
storage: true
subresources:
status: {}
1 change: 1 addition & 0 deletions config/crd/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ resources:
- bases/keda.sh_triggerauthentications.yaml
- bases/keda.sh_clustertriggerauthentications.yaml
- bases/eventing.keda.sh_cloudeventsources.yaml
- bases/eventing.keda.sh_clustercloudeventsources.yaml
# +kubebuilder:scaffold:crdkustomizeresource

## ScaledJob CRD needs to be patched because for some usecases (details in the patch file)
Expand Down
Loading
Loading