Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Provide Prometheus metrics for total number of custom resources. #3719

Merged
merged 14 commits into from
Nov 7, 2022
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### New

- **General**: Expand Prometheus metric with label "ScalerName" to distinguish different triggers. The scaleName is defined per Trigger.Name ([#3588](https://github.com/kedacore/keda/issues/3588)
- **General**: Expand Prometheus metric with label "ScalerName" to distinguish different triggers. The scaleName is defined per Trigger.Name ([#3588](https://github.com/kedacore/keda/issues/3588))
- **General:** Introduce new Loki Scaler ([#3699](https://github.com/kedacore/keda/issues/3699))
- **General**: Add ratelimitting parameters to keda manager to allow override of client defaults ([#3730](https://github.com/kedacore/keda/issues/2920))
- **General**: Provide Prometheus metric with indication of total number of custom resources per namespace for each custom resource type (CRD). ([#2637](https://github.com/kedacore/keda/issues/2637)|[#2638](https://github.com/kedacore/keda/issues/2638)|[#2639](https://github.com/kedacore/keda/issues/2639))
- **General**: Provide Prometheus metric with indication of total number of triggers per trigger type in `ScaledJob`/`ScaledObject`. ([#3663](https://github.com/kedacore/keda/issues/3663))
- **AWS Scalers**: Add setting AWS endpoint url. ([#3337](https://github.com/kedacore/keda/issues/3337))
- **Azure Service Bus Scaler**: Add support for Shared Access Signature (SAS) tokens for authentication. ([#2920](https://github.com/kedacore/keda/issues/2920))
Expand Down
56 changes: 49 additions & 7 deletions controllers/keda/clustertriggerauthentication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package keda

import (
"context"
"sync"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand All @@ -31,13 +31,27 @@ import (

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/metrics"
)

// ClusterTriggerAuthenticationReconciler reconciles a ClusterTriggerAuthentication object
type ClusterTriggerAuthenticationReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
record.EventRecorder
}

type clusterTriggerAuthMetricsData struct {
namespace string
}

var (
clusterTriggerAuthMetricsMap map[string]clusterTriggerAuthMetricsData
clusterTriggerAuthMetricsLock *sync.Mutex
)

func init() {
clusterTriggerAuthMetricsMap = make(map[string]clusterTriggerAuthMetricsData)
clusterTriggerAuthMetricsLock = &sync.Mutex{}
}

// +kubebuilder:rbac:groups=keda.sh,resources=clustertriggerauthentications;clustertriggerauthentications/status,verbs="*"
Expand All @@ -52,17 +66,21 @@ func (r *ClusterTriggerAuthenticationReconciler) Reconcile(ctx context.Context,
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
reqLogger.Error(err, "Failed ot get ClusterTriggerAuthentication")
reqLogger.Error(err, "Failed to get ClusterTriggerAuthentication")
return ctrl.Result{}, err
}

if clusterTriggerAuthentication.GetDeletionTimestamp() != nil {
r.Recorder.Event(clusterTriggerAuthentication, corev1.EventTypeNormal, eventreason.ClusterTriggerAuthenticationDeleted, "ClusterTriggerAuthentication was deleted")
return ctrl.Result{}, nil
return ctrl.Result{}, r.finalizeClusterTriggerAuthentication(ctx, reqLogger, clusterTriggerAuthentication, req.NamespacedName.String())
}

if err := r.ensureFinalizer(ctx, reqLogger, clusterTriggerAuthentication); err != nil {
return ctrl.Result{}, err
}
r.updateMetrics(clusterTriggerAuthentication, req.NamespacedName.String())

if clusterTriggerAuthentication.ObjectMeta.Generation == 1 {
r.Recorder.Event(clusterTriggerAuthentication, corev1.EventTypeNormal, eventreason.ClusterTriggerAuthenticationAdded, "New ClusterTriggerAuthentication configured")
r.EventRecorder.Event(clusterTriggerAuthentication, corev1.EventTypeNormal, eventreason.ClusterTriggerAuthenticationAdded, "New ClusterTriggerAuthentication configured")
}
return ctrl.Result{}, nil
}
Expand All @@ -73,3 +91,27 @@ func (r *ClusterTriggerAuthenticationReconciler) SetupWithManager(mgr ctrl.Manag
For(&kedav1alpha1.ClusterTriggerAuthentication{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Complete(r)
}

func (r *ClusterTriggerAuthenticationReconciler) updateMetrics(clusterTriggerAuth *kedav1alpha1.ClusterTriggerAuthentication, namespacedName string) {
clusterTriggerAuthMetricsLock.Lock()
defer clusterTriggerAuthMetricsLock.Unlock()

if metricsData, ok := clusterTriggerAuthMetricsMap[namespacedName]; ok {
metrics.DecrementCRDTotal(metrics.ClusterTriggerAuthenticationResource, metricsData.namespace)
}

metrics.IncrementCRDTotal(metrics.ClusterTriggerAuthenticationResource, clusterTriggerAuth.Namespace)
clusterTriggerAuthMetricsMap[namespacedName] = clusterTriggerAuthMetricsData{namespace: clusterTriggerAuth.Namespace}
}

// this method is idempotent, so it can be called multiple times without side-effects
func (r *ClusterTriggerAuthenticationReconciler) UpdateMetricsOnDelete(namespacedName string) {
clusterTriggerAuthMetricsLock.Lock()
defer clusterTriggerAuthMetricsLock.Unlock()

if metricsData, ok := clusterTriggerAuthMetricsMap[namespacedName]; ok {
metrics.DecrementCRDTotal(metrics.ClusterTriggerAuthenticationResource, metricsData.namespace)
}

delete(clusterTriggerAuthMetricsMap, namespacedName)
}
19 changes: 19 additions & 0 deletions controllers/keda/clustertriggerauthentication_finalizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package keda

import (
"context"

"github.com/go-logr/logr"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/controllers/keda/util"
)

func (r *ClusterTriggerAuthenticationReconciler) ensureFinalizer(ctx context.Context, logger logr.Logger, clusterTriggerAuth *kedav1alpha1.ClusterTriggerAuthentication) error {
return util.EnsureAuthenticationResourceFinalizer(ctx, logger, r, clusterTriggerAuth)
}

func (r *ClusterTriggerAuthenticationReconciler) finalizeClusterTriggerAuthentication(ctx context.Context, logger logr.Logger,
clusterTriggerAuth *kedav1alpha1.ClusterTriggerAuthentication, namespacedName string) error {
return util.FinalizeAuthenticationResource(ctx, logger, r, clusterTriggerAuth, namespacedName)
}
73 changes: 29 additions & 44 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,19 @@ type ScaledJobReconciler struct {
scaleHandler scaling.ScaleHandler
}

type scaledJobMetricsData struct {
namespace string
triggerTypes []string
}

var (
scaledJobTriggers map[string][]string
scaledJobTriggersLock *sync.Mutex
scaledJobMetricsMap map[string]scaledJobMetricsData
scaledJobMetricsLock *sync.Mutex
)

func init() {
scaledJobTriggers = make(map[string][]string)
scaledJobTriggersLock = &sync.Mutex{}
scaledJobMetricsMap = make(map[string]scaledJobMetricsData)
scaledJobMetricsLock = &sync.Mutex{}
}

// SetupWithManager initializes the ScaledJobReconciler instance and starts a new controller managed by the passed Manager instance.
Expand Down Expand Up @@ -107,7 +112,7 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if scaledJob.GetDeletionTimestamp() != nil {
return ctrl.Result{}, r.finalizeScaledJob(ctx, reqLogger, scaledJob, req.NamespacedName.String())
}
r.updateTriggerTotals(reqLogger, scaledJob, req.NamespacedName.String())
r.updateMetrics(scaledJob, req.NamespacedName.String())

// ensure finalizer is set on this CR
if err := r.ensureFinalizer(ctx, reqLogger, scaledJob); err != nil {
Expand Down Expand Up @@ -263,62 +268,42 @@ func (r *ScaledJobReconciler) stopScaleLoop(ctx context.Context, logger logr.Log
return nil
}

// scaledJobGenerationChanged returns true if ScaledJob's Generation was changed, ie. ScaledJob.Spec was changed
func (r *ScaledJobReconciler) scaledJobGenerationChanged(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (bool, error) {
key, err := cache.MetaNamespaceKeyFunc(scaledJob)
if err != nil {
logger.Error(err, "Error getting key for scaledJob")
return true, err
}

value, loaded := r.scaledJobGenerations.Load(key)
if loaded {
generation := value.(int64)
if generation == scaledJob.Generation {
return false, nil
}
}
return true, nil
}

func (r *ScaledJobReconciler) updateTriggerTotals(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, namespacedName string) {
specChanged, err := r.scaledJobGenerationChanged(logger, scaledJob)
if err != nil {
logger.Error(err, "failed to update trigger totals")
return
}

if !specChanged {
return
}
func (r *ScaledJobReconciler) updateMetrics(scaledJob *kedav1alpha1.ScaledJob, namespacedName string) {
scaledJobMetricsLock.Lock()
defer scaledJobMetricsLock.Unlock()

scaledJobTriggersLock.Lock()
defer scaledJobTriggersLock.Unlock()
metricsData, ok := scaledJobMetricsMap[namespacedName]

if triggerTypes, ok := scaledJobTriggers[namespacedName]; ok {
for _, triggerType := range triggerTypes {
if ok {
metrics.DecrementCRDTotal(metrics.ScaledJobResource, metricsData.namespace)
for _, triggerType := range metricsData.triggerTypes {
metrics.DecrementTriggerTotal(triggerType)
}
}

metrics.IncrementCRDTotal(metrics.ScaledJobResource, scaledJob.Namespace)
metricsData.namespace = scaledJob.Namespace

triggerTypes := make([]string, len(scaledJob.Spec.Triggers))
for _, trigger := range scaledJob.Spec.Triggers {
metrics.IncrementTriggerTotal(trigger.Type)
triggerTypes = append(triggerTypes, trigger.Type)
}
metricsData.triggerTypes = triggerTypes

scaledJobTriggers[namespacedName] = triggerTypes
scaledJobMetricsMap[namespacedName] = metricsData
}

func (r *ScaledJobReconciler) updateTriggerTotalsOnDelete(namespacedName string) {
scaledJobTriggersLock.Lock()
defer scaledJobTriggersLock.Unlock()
func (r *ScaledJobReconciler) updateMetricsOnDelete(namespacedName string) {
scaledJobMetricsLock.Lock()
defer scaledJobMetricsLock.Unlock()

if triggerTypes, ok := scaledJobTriggers[namespacedName]; ok {
for _, triggerType := range triggerTypes {
if metricsData, ok := scaledJobMetricsMap[namespacedName]; ok {
metrics.DecrementCRDTotal(metrics.ScaledJobResource, metricsData.namespace)
for _, triggerType := range metricsData.triggerTypes {
metrics.DecrementTriggerTotal(triggerType)
}
}

delete(scaledJobTriggers, namespacedName)
delete(scaledJobMetricsMap, namespacedName)
}
2 changes: 1 addition & 1 deletion controllers/keda/scaledjob_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (r *ScaledJobReconciler) finalizeScaledJob(ctx context.Context, logger logr
return err
}

r.updateTriggerTotalsOnDelete(namespacedName)
r.updateMetricsOnDelete(namespacedName)
}

logger.Info("Successfully finalized ScaledJob")
Expand Down
55 changes: 29 additions & 26 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,17 @@ type ScaledObjectReconciler struct {
kubeVersion kedautil.K8sVersion
}

type scaledObjectMetricsData struct {
namespace string
triggerTypes []string
}

var (
// A cache mapping "resource.group" to true or false if we know if this resource is scalable.
isScalableCache *sync.Map

scaledObjectTriggers map[string][]string
scaledObjectTriggersLock *sync.Mutex
scaledObjectMetricsMap map[string]scaledObjectMetricsData
scaledObjectMetricsLock *sync.Mutex
)

func init() {
Expand All @@ -91,8 +96,8 @@ func init() {
isScalableCache.Store("deployments.apps", true)
isScalableCache.Store("statefulsets.apps", true)

scaledObjectTriggers = make(map[string][]string)
scaledObjectTriggersLock = &sync.Mutex{}
scaledObjectMetricsMap = make(map[string]scaledObjectMetricsData)
scaledObjectMetricsLock = &sync.Mutex{}
}

// SetupWithManager initializes the ScaledObjectReconciler instance and starts a new controller managed by the passed Manager instance.
Expand Down Expand Up @@ -177,7 +182,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
if scaledObject.GetDeletionTimestamp() != nil {
return ctrl.Result{}, r.finalizeScaledObject(ctx, reqLogger, scaledObject, req.NamespacedName.String())
}
r.updateTriggerTotals(reqLogger, scaledObject, req.NamespacedName.String())
r.updateMetrics(scaledObject, req.NamespacedName.String())

// ensure finalizer is set on this CR
if err := r.ensureFinalizer(ctx, reqLogger, scaledObject); err != nil {
Expand Down Expand Up @@ -480,44 +485,42 @@ func (r *ScaledObjectReconciler) scaledObjectGenerationChanged(logger logr.Logge
return true, nil
}

func (r *ScaledObjectReconciler) updateTriggerTotals(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, namespacedName string) {
specChanged, err := r.scaledObjectGenerationChanged(logger, scaledObject)
if err != nil {
logger.Error(err, "failed to update trigger totals")
return
}
func (r *ScaledObjectReconciler) updateMetrics(scaledObject *kedav1alpha1.ScaledObject, namespacedName string) {
scaledObjectMetricsLock.Lock()
defer scaledObjectMetricsLock.Unlock()

if !specChanged {
return
}

scaledObjectTriggersLock.Lock()
defer scaledObjectTriggersLock.Unlock()
metricsData, ok := scaledObjectMetricsMap[namespacedName]

if triggerTypes, ok := scaledObjectTriggers[namespacedName]; ok {
for _, triggerType := range triggerTypes {
if ok {
metrics.DecrementCRDTotal(metrics.ScaledObjectResource, metricsData.namespace)
for _, triggerType := range metricsData.triggerTypes {
metrics.DecrementTriggerTotal(triggerType)
}
}

metrics.IncrementCRDTotal(metrics.ScaledObjectResource, scaledObject.Namespace)
metricsData.namespace = scaledObject.Namespace

triggerTypes := make([]string, len(scaledObject.Spec.Triggers))
for _, trigger := range scaledObject.Spec.Triggers {
metrics.IncrementTriggerTotal(trigger.Type)
triggerTypes = append(triggerTypes, trigger.Type)
}
metricsData.triggerTypes = triggerTypes

scaledObjectTriggers[namespacedName] = triggerTypes
scaledObjectMetricsMap[namespacedName] = metricsData
}

func (r *ScaledObjectReconciler) updateTriggerTotalsOnDelete(namespacedName string) {
scaledObjectTriggersLock.Lock()
defer scaledObjectTriggersLock.Unlock()
func (r *ScaledObjectReconciler) updateMetricsOnDelete(namespacedName string) {
scaledObjectMetricsLock.Lock()
defer scaledObjectMetricsLock.Unlock()

if triggerTypes, ok := scaledObjectTriggers[namespacedName]; ok {
for _, triggerType := range triggerTypes {
if metricsData, ok := scaledObjectMetricsMap[namespacedName]; ok {
metrics.DecrementCRDTotal(metrics.ScaledObjectResource, metricsData.namespace)
for _, triggerType := range metricsData.triggerTypes {
metrics.DecrementTriggerTotal(triggerType)
}
}

delete(scaledObjectTriggers, namespacedName)
delete(scaledObjectMetricsMap, namespacedName)
}
2 changes: 1 addition & 1 deletion controllers/keda/scaledobject_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(ctx context.Context, logge
return err
}

r.updateTriggerTotalsOnDelete(namespacedName)
r.updateMetricsOnDelete(namespacedName)
}

logger.Info("Successfully finalized ScaledObject")
Expand Down
Loading