Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit kubernetes events from KEDA #1523

Merged
merged 6 commits into from
Feb 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
### New

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- Emit Kubernetes Events on KEDA events ([#1523](https://github.com/kedacore/keda/pull/1523))

### Improvements

Expand Down
7 changes: 6 additions & 1 deletion adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"strconv"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/util/wait"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
Expand Down Expand Up @@ -70,7 +73,9 @@ func (a *Adapter) makeProvider(globalHTTPTimeout time.Duration) (provider.Metric
return nil, fmt.Errorf("unable to construct new client (%s)", err)
}

handler := scaling.NewScaleHandler(kubeclient, nil, scheme, globalHTTPTimeout)
broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme, corev1.EventSource{Component: "keda-metrics-adapter"})
handler := scaling.NewScaleHandler(kubeclient, nil, scheme, globalHTTPTimeout, recorder)

namespace, err := getWatchNamespace()
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions api/v1alpha1/condition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ func (c *Conditions) GetActiveCondition() Condition {
return c.getCondition(ConditionActive)
}

// GetReadyCondition returns Condition of type Ready
func (c *Conditions) GetReadyCondition() Condition {
if *c == nil {
c = GetInitializedConditions()
}
return c.getCondition(ConditionReady)
}

func (c Conditions) getCondition(conditionType ConditionType) Condition {
for i := range c {
if c[i].Type == conditionType {
Expand Down
13 changes: 12 additions & 1 deletion controllers/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import (
"fmt"
"time"

"github.com/kedacore/keda/v2/pkg/eventreason"
corev1 "k8s.io/api/core/v1"

"k8s.io/client-go/tools/record"

"github.com/go-logr/logr"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -29,12 +34,13 @@ type ScaledJobReconciler struct {
Log logr.Logger
Scheme *runtime.Scheme
GlobalHTTPTimeout time.Duration
Recorder record.EventRecorder
scaleHandler scaling.ScaleHandler
}

// SetupWithManager initializes the ScaledJobReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout)
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), nil, mgr.GetScheme(), r.GlobalHTTPTimeout, mgr.GetEventRecorderFor("scale-handler"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Event recorder for Metrics Adapter is named keda-metrics-adapter and this one is named scale-handler. For consistency, this one could be maybe named keda-operator/ keda-controller and sync them with those set in main.go WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think that makes sense. I'll change it to one recorder with the name keda-operator


return ctrl.NewControllerManagedBy(mgr).
// Ignore updates to ScaledJob Status (in this case metadata.Generation does not change)
Expand Down Expand Up @@ -84,7 +90,12 @@ func (r *ScaledJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledJobCheckFailed", msg)
conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledJob check failed")
r.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.ScaledJobCheckFailed, msg)
} else {
wasReady := conditions.GetReadyCondition()
if wasReady.IsFalse() || wasReady.IsUnknown() {
r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.ScaledJobReady, "ScaledJob is ready for scaling")
}
reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg)
}
Expand Down
4 changes: 4 additions & 0 deletions controllers/scaledjob_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package controllers
import (
"context"

"github.com/kedacore/keda/v2/pkg/eventreason"
corev1 "k8s.io/api/core/v1"

"github.com/go-logr/logr"

kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
Expand Down Expand Up @@ -33,6 +36,7 @@ func (r *ScaledJobReconciler) finalizeScaledJob(logger logr.Logger, scaledJob *k
}

logger.Info("Successfully finalized ScaledJob")
r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.ScaledJobDeleted, "ScaledJob was deleted")
return nil
}

Expand Down
14 changes: 13 additions & 1 deletion controllers/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
"sync"
"time"

"github.com/kedacore/keda/v2/pkg/eventreason"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"

"github.com/go-logr/logr"
autoscalingv1 "k8s.io/api/autoscaling/v1"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
Expand Down Expand Up @@ -46,6 +50,7 @@ type ScaledObjectReconciler struct {
Client client.Client
Scheme *runtime.Scheme
GlobalHTTPTimeout time.Duration
Recorder record.EventRecorder

scaleClient *scale.ScalesGetter
restMapper meta.RESTMapper
Expand Down Expand Up @@ -91,7 +96,7 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Init the rest of ScaledObjectReconciler
r.restMapper = mgr.GetRESTMapper()
r.scaledObjectsGenerations = &sync.Map{}
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), r.scaleClient, mgr.GetScheme(), r.GlobalHTTPTimeout)
r.scaleHandler = scaling.NewScaleHandler(mgr.GetClient(), r.scaleClient, mgr.GetScheme(), r.GlobalHTTPTimeout, r.Recorder)

// Start controller
return ctrl.NewControllerManagedBy(mgr).
Expand Down Expand Up @@ -159,13 +164,20 @@ func (r *ScaledObjectReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledObjectCheckFailed", msg)
conditions.SetActiveCondition(metav1.ConditionUnknown, "UnkownState", "ScaledObject check failed")
r.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.ScaledObjectCheckFailed, msg)
} else {
wasReady := conditions.GetReadyCondition()
if wasReady.IsFalse() || wasReady.IsUnknown() {
r.Recorder.Event(scaledObject, corev1.EventTypeNormal, eventreason.ScaledObjectReady, "ScaledObject is ready for scaling")
}
reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledObjectReady", msg)
}

if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledObject, &conditions); err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{}, err
}

Expand Down
4 changes: 4 additions & 0 deletions controllers/scaledobject_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package controllers
import (
"context"

"github.com/kedacore/keda/v2/pkg/eventreason"
corev1 "k8s.io/api/core/v1"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -54,6 +57,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaled
}

logger.Info("Successfully finalized ScaledObject")
r.Recorder.Event(scaledObject, corev1.EventTypeNormal, eventreason.ScaledObjectDeleted, "ScaledObject was deleted")
return nil
}

Expand Down
58 changes: 58 additions & 0 deletions controllers/triggerauthentication_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package controllers

import (
"context"

"github.com/go-logr/logr"
kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventreason"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// +kubebuilder:rbac:groups=keda.sh,resources=triggerauthentications;triggerauthentications/status,verbs="*"

// TriggerAuthenticationReconciler reconciles a TriggerAuthentication object
type TriggerAuthenticationReconciler struct {
Client client.Client
Log logr.Logger
Recorder record.EventRecorder
}

// Reconcile performs reconciliation on the identified TriggerAuthentication resource based on the request information passed, returns the result and an error (if any).
func (r *TriggerAuthenticationReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
reqLogger := r.Log.WithValues("TriggerAuthentication.Namespace", req.Namespace, "TriggerAuthentication.Name", req.Name)

triggerAuthentication := &kedav1alpha1.TriggerAuthentication{}
err := r.Client.Get(context.TODO(), req.NamespacedName, triggerAuthentication)
if err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
reqLogger.Error(err, "Failed ot get TriggerAuthentication")
return ctrl.Result{}, err
}

if triggerAuthentication.GetDeletionTimestamp() != nil {
r.Recorder.Event(triggerAuthentication, corev1.EventTypeNormal, eventreason.TriggerAuthenticationDeleted, "TriggerAuthentication was deleted")
return ctrl.Result{}, nil
}

if triggerAuthentication.ObjectMeta.Generation == 1 {
r.Recorder.Event(triggerAuthentication, corev1.EventTypeNormal, eventreason.TriggerAuthenticationAdded, "New TriggerAuthentication configured")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a slightly weird one, but I don't think it will do any harm :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we can go with this for now :)

}

return ctrl.Result{}, nil
}

// SetupWithManager initializes the TriggerAuthenticationReconciler instance and starts a new controller managed by the passed Manager instance.
func (r *TriggerAuthenticationReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&kedav1alpha1.TriggerAuthentication{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Complete(r)
}
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,7 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/kedacore/keda v1.5.0 h1:c8xA1Vo3H7rPwFiWUX3CBXnjBSrbYDmUs9iEfDlf4bQ=
github.com/kelseyhightower/envconfig v1.3.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
Expand Down
11 changes: 11 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,14 @@ func main() {
}

globalHTTPTimeout := time.Duration(globalHTTPTimeoutMS) * time.Millisecond
eventRecorder := mgr.GetEventRecorderFor("keda-operator")

if err = (&controllers.ScaledObjectReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("ScaledObject"),
Scheme: mgr.GetScheme(),
GlobalHTTPTimeout: globalHTTPTimeout,
Recorder: eventRecorder,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ScaledObject")
os.Exit(1)
Expand All @@ -138,10 +140,19 @@ func main() {
Log: ctrl.Log.WithName("controllers").WithName("ScaledJob"),
Scheme: mgr.GetScheme(),
GlobalHTTPTimeout: globalHTTPTimeout,
Recorder: eventRecorder,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ScaledJob")
os.Exit(1)
}
if err = (&controllers.TriggerAuthenticationReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("TriggerAuthentication"),
Recorder: eventRecorder,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "TriggerAuthentication")
os.Exit(1)
}
// +kubebuilder:scaffold:builder

setupLog.Info("Starting manager")
Expand Down
67 changes: 67 additions & 0 deletions pkg/eventreason/eventreason.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
Copyright 2020 The KEDA Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package eventreason

const (
tomkerkhove marked this conversation as resolved.
Show resolved Hide resolved
// ScaledObjectReady is for event when a new ScaledObject is ready
ScaledObjectReady = "ScaledObjectReady"

// ScaledJobReady is for event when a new ScaledJob is ready
ScaledJobReady = "ScaledJobReady"

// ScaledObjectCheckFailed is for event when ScaledObject validation check fails
ScaledObjectCheckFailed = "ScaledObjectCheckFailed"

// ScaledJobCheckFailed is for event when ScaledJob validation check fails
ScaledJobCheckFailed = "ScaledJobCheckFailed"

// ScaledObjectDeleted is for event when ScaledObject is deleted
ScaledObjectDeleted = "ScaledObjectDeleted"

// ScaledJobDeleted is for event when ScaledJob is deleted
ScaledJobDeleted = "ScaledJobDeleted"

// KEDAScalersStarted is for event when scalers watch started for ScaledObject or ScaledJob
KEDAScalersStarted = "KEDAScalersStarted"

// KEDAScalersStopped is for event when scalers watch was stopped for ScaledObject or ScaledJob
KEDAScalersStopped = "KEDAScalersStopped"

// KEDAScalerFailed is for event when a scaler fails for a ScaledJob or a ScaledObject
KEDAScalerFailed = "KEDAScalerFailed"

// KEDAScaleTargetActivated is for event when the scale target of ScaledObject was activated
KEDAScaleTargetActivated = "KEDAScaleTargetActivated"

// KEDAScaleTargetDeactivated is for event when the scale target for ScaledObject was deactivated
KEDAScaleTargetDeactivated = "KEDAScaleTargetDeactivated"

// KEDAScaleTargetActivationFailed is for event when the activation the scale target for ScaledObject fails
KEDAScaleTargetActivationFailed = "KEDAScaleTargetActivationFailed"

// KEDAScaleTargetDeactivationFailed is for event when the deactivation of the scale target for ScaledObject fails
KEDAScaleTargetDeactivationFailed = "KEDAScaleTargetDeactivationFailed"

// KEDAJobsCreated is for event when jobs for ScaledJob are created
KEDAJobsCreated = "KEDAJobsCreated"

// TriggerAuthenticationDeleted is for event when a TriggerAuthentication is deleted
TriggerAuthenticationDeleted = "TriggerAuthenticationDeleted"

// TriggerAuthenticationAdded is for event when a TriggerAuthentication is added
TriggerAuthenticationAdded = "TriggerAuthenticationAdded"
)
6 changes: 5 additions & 1 deletion pkg/scaling/executor/scale_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

"k8s.io/client-go/tools/record"

"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -30,15 +32,17 @@ type scaleExecutor struct {
scaleClient *scale.ScalesGetter
reconcilerScheme *runtime.Scheme
logger logr.Logger
recorder record.EventRecorder
}

// NewScaleExecutor creates a ScaleExecutor object
func NewScaleExecutor(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme) ScaleExecutor {
func NewScaleExecutor(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme, recorder record.EventRecorder) ScaleExecutor {
return &scaleExecutor{
client: client,
scaleClient: scaleClient,
reconcilerScheme: reconcilerScheme,
logger: logf.Log.WithName("scaleexecutor"),
recorder: recorder,
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sort"
"strconv"

"github.com/kedacore/keda/v2/pkg/eventreason"

"github.com/go-logr/logr"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -108,6 +110,7 @@ func (e *scaleExecutor) createJobs(logger logr.Logger, scaledJob *kedav1alpha1.S
}
}
logger.Info("Created jobs", "Number of jobs", scaleTo)
e.recorder.Eventf(scaledJob, corev1.EventTypeNormal, eventreason.KEDAJobsCreated, "Created %d jobs", scaleTo)
}

func (e *scaleExecutor) isJobFinished(j *batchv1.Job) bool {
Expand Down
Loading