Skip to content

Commit

Permalink
[v2] autoscaling v2beta1 -> v2beta2 and minor refactoring (kedacore#734)
Browse files Browse the repository at this point in the history
* scaledObjectFinalizer minor refactor

Signed-off-by: Zbynek Roubalik <zroubali@redhat.com>

* autoscaling v2beta1 -> v2beta2

Signed-off-by: Zbynek Roubalik <zroubali@redhat.com>

* apiregistration.k8s.io v1beta1 -> v1

Signed-off-by: Zbynek Roubalik <zroubali@redhat.com>
  • Loading branch information
zroubalik authored and Zbynek Roubalik committed Jun 17, 2020
1 parent 53a6aa5 commit 0df23b8
Show file tree
Hide file tree
Showing 28 changed files with 335 additions and 242 deletions.
2 changes: 1 addition & 1 deletion deploy/24-metrics-api_service.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
apiVersion: apiregistration.k8s.io/v1beta1
apiVersion: apiregistration.k8s.io/v1
kind: APIService
metadata:
labels:
Expand Down
53 changes: 13 additions & 40 deletions pkg/controller/scaledobject/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

"github.com/go-logr/logr"
version "github.com/kedacore/keda/version"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand Down Expand Up @@ -45,9 +45,8 @@ func (r *ReconcileScaledObject) createAndDeployNewHPA(logger logr.Logger, scaled
return nil
}

//FIXME unify location from where we load gvkt - param vs. scaledObject
// newHPAForScaledObject returns HPA as it is specified in ScaledObject
func (r *ReconcileScaledObject) newHPAForScaledObject(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, gvkr *kedautil.GroupVersionKindResource) (*autoscalingv2beta1.HorizontalPodAutoscaler, error) {
func (r *ReconcileScaledObject) newHPAForScaledObject(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, gvkr *kedautil.GroupVersionKindResource) (*autoscalingv2beta2.HorizontalPodAutoscaler, error) {
scaledObjectMetricSpecs, err := r.getScaledObjectMetricSpecs(logger, scaledObject)
if err != nil {
return nil, err
Expand All @@ -65,12 +64,12 @@ func (r *ReconcileScaledObject) newHPAForScaledObject(logger logr.Logger, scaled
"app.kubernetes.io/managed-by": "keda-operator",
}

return &autoscalingv2beta1.HorizontalPodAutoscaler{
Spec: autoscalingv2beta1.HorizontalPodAutoscalerSpec{
return &autoscalingv2beta2.HorizontalPodAutoscaler{
Spec: autoscalingv2beta2.HorizontalPodAutoscalerSpec{
MinReplicas: getHPAMinReplicas(scaledObject),
MaxReplicas: getHPAMaxReplicas(scaledObject),
Metrics: scaledObjectMetricSpecs,
ScaleTargetRef: autoscalingv2beta1.CrossVersionObjectReference{
ScaleTargetRef: autoscalingv2beta2.CrossVersionObjectReference{
Name: scaledObject.Spec.ScaleTargetRef.Name,
Kind: gvkr.Kind,
APIVersion: gvkr.GroupVersion().String(),
Expand All @@ -81,38 +80,13 @@ func (r *ReconcileScaledObject) newHPAForScaledObject(logger logr.Logger, scaled
Labels: labels,
},
TypeMeta: metav1.TypeMeta{
APIVersion: "v2beta1",
APIVersion: "v2beta2",
},
}, nil
}

// checkHPAForUpdate checks whether update of HPA is needed
func (r *ReconcileScaledObject) updateHPAIfNeeded(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, foundHpa *autoscalingv2beta1.HorizontalPodAutoscaler, gvkr *kedautil.GroupVersionKindResource) error {

// updateHPA := false
// scaledObjectMinReplicaCount := getHPAMinReplicas(scaledObject)
// if *foundHpa.Spec.MinReplicas != *scaledObjectMinReplicaCount {
// updateHPA = true
// foundHpa.Spec.MinReplicas = scaledObjectMinReplicaCount
// }

// scaledObjectMaxReplicaCount := getHPAMaxReplicas(scaledObject)
// if foundHpa.Spec.MaxReplicas != scaledObjectMaxReplicaCount {
// updateHPA = true
// foundHpa.Spec.MaxReplicas = scaledObjectMaxReplicaCount
// }

// newMetricSpec, err := r.getScaledObjectMetricSpecs(logger, scaledObject)
// if err != nil {
// logger.Error(err, "Failed to create MetricSpec")
// return err
// }
// if fmt.Sprintf("%v", foundHpa.Spec.Metrics) != fmt.Sprintf("%v", newMetricSpec) {
// updateHPA = true
// foundHpa.Spec.Metrics = newMetricSpec
// }

//if updateHPA {
// updateHPAIfNeeded checks whether update of HPA is needed
func (r *ReconcileScaledObject) updateHPAIfNeeded(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, foundHpa *autoscalingv2beta2.HorizontalPodAutoscaler, gvkr *kedautil.GroupVersionKindResource) error {

hpa, err := r.newHPAForScaledObject(logger, scaledObject, gvkr)
if err != nil {
Expand All @@ -122,7 +96,6 @@ func (r *ReconcileScaledObject) updateHPAIfNeeded(logger logr.Logger, scaledObje

if !equality.Semantic.DeepDerivative(hpa.Spec, foundHpa.Spec) {
if r.client.Update(context.TODO(), foundHpa) != nil {
//foundHpa.Spec = *hpa.Spec.DeepCopy()
foundHpa.Spec = hpa.Spec
logger.Error(err, "Failed to update HPA", "HPA.Namespace", foundHpa.Namespace, "HPA.Name", foundHpa.Name)
return err
Expand All @@ -134,8 +107,8 @@ func (r *ReconcileScaledObject) updateHPAIfNeeded(logger logr.Logger, scaledObje
}

// getScaledObjectMetricSpecs returns MetricSpec for HPA, generater from Triggers defitinion in ScaledObject
func (r *ReconcileScaledObject) getScaledObjectMetricSpecs(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) ([]autoscalingv2beta1.MetricSpec, error) {
var scaledObjectMetricSpecs []autoscalingv2beta1.MetricSpec
func (r *ReconcileScaledObject) getScaledObjectMetricSpecs(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) ([]autoscalingv2beta2.MetricSpec, error) {
var scaledObjectMetricSpecs []autoscalingv2beta2.MetricSpec
var externalMetricNames []string

scalers, err := scalehandler.NewScaleHandler(r.client, r.scaleClient, r.scheme).GetScaledObjectScalers(scaledObject)
Expand All @@ -149,9 +122,9 @@ func (r *ReconcileScaledObject) getScaledObjectMetricSpecs(logger logr.Logger, s

// add the scaledObjectName label. This is how the MetricsAdapter will know which scaledobject a metric is for when the HPA queries it.
for _, metricSpec := range metricSpecs {
metricSpec.External.MetricSelector = &metav1.LabelSelector{MatchLabels: make(map[string]string)}
metricSpec.External.MetricSelector.MatchLabels["scaledObjectName"] = scaledObject.Name
externalMetricNames = append(externalMetricNames, metricSpec.External.MetricName)
metricSpec.External.Metric.Selector = &metav1.LabelSelector{MatchLabels: make(map[string]string)}
metricSpec.External.Metric.Selector.MatchLabels["scaledObjectName"] = scaledObject.Name
externalMetricNames = append(externalMetricNames, metricSpec.External.Metric.Name)
}
scaledObjectMetricSpecs = append(scaledObjectMetricSpecs, metricSpecs...)
scaler.Close()
Expand Down
68 changes: 37 additions & 31 deletions pkg/controller/scaledobject/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
kedautil "github.com/kedacore/keda/pkg/util"

"github.com/go-logr/logr"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -41,11 +41,11 @@ func Add(mgr manager.Manager) error {
if err != nil {
return err
}
return add(mgr, newReconciler(mgr, scaleClient))
return add(mgr, newReconciler(mgr, &scaleClient))
}

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager, scaleClient scale.ScalesGetter) reconcile.Reconciler {
func newReconciler(mgr manager.Manager, scaleClient *scale.ScalesGetter) reconcile.Reconciler {
return &ReconcileScaledObject{
client: mgr.GetClient(),
scaleClient: scaleClient,
Expand Down Expand Up @@ -78,7 +78,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
}

// Watch for changes to secondary resource HPA and requeue the owner ScaledObject
err = c.Watch(&source.Kind{Type: &autoscalingv2beta1.HorizontalPodAutoscaler{}}, &handler.EnqueueRequestForOwner{
err = c.Watch(&source.Kind{Type: &autoscalingv2beta2.HorizontalPodAutoscaler{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &kedav1alpha1.ScaledObject{},
})
Expand Down Expand Up @@ -110,7 +110,7 @@ type ReconcileScaledObject struct {
// This client, initialized using mgr.Client() above, is a split client
// that reads objects from the cache and writes to the apiserver
client client.Client
scaleClient scale.ScalesGetter
scaleClient *scale.ScalesGetter
restMapper meta.RESTMapper
scheme *runtime.Scheme
scaleLoopContexts *sync.Map
Expand Down Expand Up @@ -143,32 +143,13 @@ func (r *ReconcileScaledObject) Reconcile(request reconcile.Request) (reconcile.

// Check if the ScaledObject instance is marked to be deleted, which is
// indicated by the deletion timestamp being set.
isScaledObjectMarkedToBeDeleted := scaledObject.GetDeletionTimestamp() != nil
if isScaledObjectMarkedToBeDeleted {
if contains(scaledObject.GetFinalizers(), scaledObjectFinalizer) {
// Run finalization logic for scaledObjectFinalizer. If the
// finalization logic fails, don't remove the finalizer so
// that we can retry during the next reconciliation.
if err := r.finalizeScaledObject(reqLogger, scaledObject); err != nil {
return reconcile.Result{}, err
}

// Remove scaledObjectFinalizer. Once all finalizers have been
// removed, the object will be deleted.
scaledObject.SetFinalizers(remove(scaledObject.GetFinalizers(), scaledObjectFinalizer))
err := r.client.Update(context.TODO(), scaledObject)
if err != nil {
return reconcile.Result{}, err
}
}
return reconcile.Result{}, nil
if scaledObject.GetDeletionTimestamp() != nil {
return reconcile.Result{}, r.finalizeScaledObject(reqLogger, scaledObject)
}

// Add finalizer for this CR
if !contains(scaledObject.GetFinalizers(), scaledObjectFinalizer) {
if err := r.addFinalizer(reqLogger, scaledObject); err != nil {
return reconcile.Result{}, err
}
// ensure finalizer is set on this CR
if err := r.ensureFinalizer(reqLogger, scaledObject); err != nil {
return reconcile.Result{}, err
}

return reconcile.Result{}, r.reconcileScaledObject(reqLogger, scaledObject)
Expand Down Expand Up @@ -256,7 +237,7 @@ func (r *ReconcileScaledObject) checkTargetResourceIsScalable(logger logr.Logger
logger.V(1).Info("Parsed Group, Version, Kind, Resource", "GVK", gvkr.GVKString(), "Resource", gvkr.Resource)

// let's try to detect /scale subresource
_, errScale := r.scaleClient.Scales(scaledObject.Namespace).Get(gvkr.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name)
_, errScale := (*r.scaleClient).Scales(scaledObject.Namespace).Get(gvkr.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name)
if errScale != nil {
// not able to get /scale subresource -> let's check if the resource even exist in the cluster
unstruct := &unstructured.Unstructured{}
Expand Down Expand Up @@ -286,7 +267,7 @@ func (r *ReconcileScaledObject) checkTargetResourceIsScalable(logger logr.Logger
// ensureHPAForScaledObjectExists ensures that in cluster exist up-to-date HPA for specified ScaledObject, returns true if a new HPA was created
func (r *ReconcileScaledObject) ensureHPAForScaledObjectExists(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, gvkr *kedautil.GroupVersionKindResource) (bool, error) {
hpaName := getHPAName(scaledObject)
foundHpa := &autoscalingv2beta1.HorizontalPodAutoscaler{}
foundHpa := &autoscalingv2beta2.HorizontalPodAutoscaler{}
// Check if HPA for this ScaledObject already exists
err := r.client.Get(context.TODO(), types.NamespacedName{Name: hpaName, Namespace: scaledObject.Namespace}, foundHpa)
if err != nil && errors.IsNotFound(err) {
Expand Down Expand Up @@ -344,6 +325,31 @@ func (r *ReconcileScaledObject) startScaleLoop(logger logr.Logger, scaledObject
return nil
}

// stopScaleLoop stops ScaleLoop handler for the respective ScaleObject
func (r *ReconcileScaledObject) stopScaleLoop(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
key, err := cache.MetaNamespaceKeyFunc(scaledObject)
if err != nil {
logger.Error(err, "Error getting key for scaledObject")
return err
}

// delete ScaledObject's current Generation
r.scaledObjectsGenerations.Delete(key)

result, ok := r.scaleLoopContexts.Load(key)
if ok {
cancel, ok := result.(context.CancelFunc)
if ok {
cancel()
}
r.scaleLoopContexts.Delete(key)
} else {
logger.V(1).Info("ScaleObject was not found in controller cache", "key", key)
}

return nil
}

// scaledObjectGenerationChanged returns true if ScaledObject's Generation was changed, ie. ScaledObject.Spec was changed
func (r *ReconcileScaledObject) scaledObjectGenerationChanged(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (bool, error) {
key, err := cache.MetaNamespaceKeyFunc(scaledObject)
Expand Down
56 changes: 28 additions & 28 deletions pkg/controller/scaledobject/scaledobject_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,52 +3,52 @@ package scaledobject
import (
"context"

"github.com/go-logr/logr"
"k8s.io/client-go/tools/cache"

kedav1alpha1 "github.com/kedacore/keda/pkg/apis/keda/v1alpha1"

"github.com/go-logr/logr"
)

const (
scaledObjectFinalizer = "finalizer.keda.sh"
)

// finalizeScaledObject is stopping ScaleLoop for the respective ScaleObject
// finalizeScaledObject runs finalization logic on ScaledObject if there's finalizer
func (r *ReconcileScaledObject) finalizeScaledObject(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
key, err := cache.MetaNamespaceKeyFunc(scaledObject)
if err != nil {
logger.Error(err, "Error getting key for scaledObject (%s/%s)", scaledObject.GetNamespace(), scaledObject.GetName())
return err
}

// store ScaledObject's current Generation
r.scaledObjectsGenerations.Delete(key)
if contains(scaledObject.GetFinalizers(), scaledObjectFinalizer) {
// Run finalization logic for scaledObjectFinalizer. If the
// finalization logic fails, don't remove the finalizer so
// that we can retry during the next reconciliation.
if err := r.stopScaleLoop(logger, scaledObject); err != nil {
return err
}

result, ok := r.scaleLoopContexts.Load(key)
if ok {
cancel, ok := result.(context.CancelFunc)
if ok {
cancel()
// Remove scaledObjectFinalizer. Once all finalizers have been
// removed, the object will be deleted.
scaledObject.SetFinalizers(remove(scaledObject.GetFinalizers(), scaledObjectFinalizer))
if err := r.client.Update(context.TODO(), scaledObject); err != nil {
logger.Error(err, "Failed to update ScaledObject after removing a finalizer", "finalizer", scaledObjectFinalizer)
return err
}
r.scaleLoopContexts.Delete(key)
} else {
logger.V(1).Info("ScaleObject was not found in controller cache", "key", key)
}

logger.Info("Successfully finalized ScaledObject")
return nil
}

// addFinalizer adds finalizer to the ScaledObject
func (r *ReconcileScaledObject) addFinalizer(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
logger.Info("Adding Finalizer for the ScaledObject")
scaledObject.SetFinalizers(append(scaledObject.GetFinalizers(), scaledObjectFinalizer))
// ensureFinalizer check there is finalizer present on the ScaledObject, if not it adds one
func (r *ReconcileScaledObject) ensureFinalizer(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {

// Update CR
err := r.client.Update(context.TODO(), scaledObject)
if err != nil {
logger.Error(err, "Failed to update ScaledObject with finalizer")
return err
if !contains(scaledObject.GetFinalizers(), scaledObjectFinalizer) {
logger.Info("Adding Finalizer for the ScaledObject")
scaledObject.SetFinalizers(append(scaledObject.GetFinalizers(), scaledObjectFinalizer))

// Update CR
err := r.client.Update(context.TODO(), scaledObject)
if err != nil {
logger.Error(err, "Failed to update ScaledObject with a finalizer", "finalizer", scaledObjectFinalizer)
return err
}
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/handler/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
// each ScaledObject and making the final scale decision and operation
type ScaleHandler struct {
client client.Client
scaleClient scale.ScalesGetter // TODO pointer
scaleClient *scale.ScalesGetter
logger logr.Logger
reconcilerScheme *runtime.Scheme
}
Expand All @@ -34,7 +34,7 @@ const (
)

// NewScaleHandler creates a ScaleHandler object
func NewScaleHandler(client client.Client, scaleClient scale.ScalesGetter, reconcilerScheme *runtime.Scheme) *ScaleHandler {
func NewScaleHandler(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme) *ScaleHandler {
handler := &ScaleHandler{
client: client,
scaleClient: scaleClient,
Expand Down
2 changes: 1 addition & 1 deletion pkg/handler/scale_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (h *ScaleHandler) handleScaleJob(ctx context.Context, scaledObject *kedav1a

var metricValue int64
for _, metric := range metricSpecs {
metricValue, _ = metric.External.TargetAverageValue.AsInt64()
metricValue, _ = metric.External.Target.AverageValue.AsInt64()
maxValue += metricValue
}
scalerLogger.Info("Scaler max value", "MaxValue", maxValue)
Expand Down
4 changes: 2 additions & 2 deletions pkg/handler/scale_scaledobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,11 @@ func (h *ScaleHandler) scaleFromZero(scaledObject *kedav1alpha1.ScaledObject, sc
}

func (h *ScaleHandler) getScaleTargetScale(scaledObject *kedav1alpha1.ScaledObject) (*autoscalingv1.Scale, error) {
return h.scaleClient.Scales(scaledObject.Namespace).Get(scaledObject.Status.ScaleTargetGVKR.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name)
return (*h.scaleClient).Scales(scaledObject.Namespace).Get(scaledObject.Status.ScaleTargetGVKR.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name)
}

func (h *ScaleHandler) updateScaleOnScaleTarget(scaledObject *kedav1alpha1.ScaledObject, scale *autoscalingv1.Scale) error {
_, err := h.scaleClient.Scales(scaledObject.Namespace).Update(scaledObject.Status.ScaleTargetGVKR.GroupResource(), scale)
_, err := (*h.scaleClient).Scales(scaledObject.Namespace).Update(scaledObject.Status.ScaleTargetGVKR.GroupResource(), scale)
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (p *KedaProvider) GetExternalMetric(namespace string, metricSelector labels

for _, metricSpec := range metricSpecs {
// Filter only the desired metric
if strings.EqualFold(metricSpec.External.MetricName, info.Metric) {
if strings.EqualFold(metricSpec.External.Metric.Name, info.Metric) {
metrics, err := scaler.GetMetrics(context.TODO(), info.Metric, metricSelector)
if err != nil {
logger.Error(err, "error getting metric for scaler", "ScaledObject.Namespace", scaledObject.Namespace, "ScaledObject.Name", scaledObject.Name, "Scaler", scaler)
Expand Down
Loading

0 comments on commit 0df23b8

Please sign in to comment.