Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2] autoscaling v2beta1 -> v2beta2 and minor refactoring #734

Merged
merged 3 commits into from
Apr 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deploy/24-metrics-api_service.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
apiVersion: apiregistration.k8s.io/v1beta1
apiVersion: apiregistration.k8s.io/v1
kind: APIService
metadata:
labels:
Expand Down
53 changes: 13 additions & 40 deletions pkg/controller/scaledobject/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

"github.com/go-logr/logr"
version "github.com/kedacore/keda/version"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand Down Expand Up @@ -45,9 +45,8 @@ func (r *ReconcileScaledObject) createAndDeployNewHPA(logger logr.Logger, scaled
return nil
}

//FIXME unify location from where we load gvkt - param vs. scaledObject
// newHPAForScaledObject returns HPA as it is specified in ScaledObject
func (r *ReconcileScaledObject) newHPAForScaledObject(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, gvkr *kedautil.GroupVersionKindResource) (*autoscalingv2beta1.HorizontalPodAutoscaler, error) {
func (r *ReconcileScaledObject) newHPAForScaledObject(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, gvkr *kedautil.GroupVersionKindResource) (*autoscalingv2beta2.HorizontalPodAutoscaler, error) {
scaledObjectMetricSpecs, err := r.getScaledObjectMetricSpecs(logger, scaledObject)
if err != nil {
return nil, err
Expand All @@ -65,12 +64,12 @@ func (r *ReconcileScaledObject) newHPAForScaledObject(logger logr.Logger, scaled
"app.kubernetes.io/managed-by": "keda-operator",
}

return &autoscalingv2beta1.HorizontalPodAutoscaler{
Spec: autoscalingv2beta1.HorizontalPodAutoscalerSpec{
return &autoscalingv2beta2.HorizontalPodAutoscaler{
Spec: autoscalingv2beta2.HorizontalPodAutoscalerSpec{
MinReplicas: getHPAMinReplicas(scaledObject),
MaxReplicas: getHPAMaxReplicas(scaledObject),
Metrics: scaledObjectMetricSpecs,
ScaleTargetRef: autoscalingv2beta1.CrossVersionObjectReference{
ScaleTargetRef: autoscalingv2beta2.CrossVersionObjectReference{
Name: scaledObject.Spec.ScaleTargetRef.Name,
Kind: gvkr.Kind,
APIVersion: gvkr.GroupVersion().String(),
Expand All @@ -81,38 +80,13 @@ func (r *ReconcileScaledObject) newHPAForScaledObject(logger logr.Logger, scaled
Labels: labels,
},
TypeMeta: metav1.TypeMeta{
APIVersion: "v2beta1",
APIVersion: "v2beta2",
},
}, nil
}

// checkHPAForUpdate checks whether update of HPA is needed
func (r *ReconcileScaledObject) updateHPAIfNeeded(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, foundHpa *autoscalingv2beta1.HorizontalPodAutoscaler, gvkr *kedautil.GroupVersionKindResource) error {

// updateHPA := false
// scaledObjectMinReplicaCount := getHPAMinReplicas(scaledObject)
// if *foundHpa.Spec.MinReplicas != *scaledObjectMinReplicaCount {
// updateHPA = true
// foundHpa.Spec.MinReplicas = scaledObjectMinReplicaCount
// }

// scaledObjectMaxReplicaCount := getHPAMaxReplicas(scaledObject)
// if foundHpa.Spec.MaxReplicas != scaledObjectMaxReplicaCount {
// updateHPA = true
// foundHpa.Spec.MaxReplicas = scaledObjectMaxReplicaCount
// }

// newMetricSpec, err := r.getScaledObjectMetricSpecs(logger, scaledObject)
// if err != nil {
// logger.Error(err, "Failed to create MetricSpec")
// return err
// }
// if fmt.Sprintf("%v", foundHpa.Spec.Metrics) != fmt.Sprintf("%v", newMetricSpec) {
// updateHPA = true
// foundHpa.Spec.Metrics = newMetricSpec
// }

//if updateHPA {
// updateHPAIfNeeded checks whether update of HPA is needed
func (r *ReconcileScaledObject) updateHPAIfNeeded(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, foundHpa *autoscalingv2beta2.HorizontalPodAutoscaler, gvkr *kedautil.GroupVersionKindResource) error {

hpa, err := r.newHPAForScaledObject(logger, scaledObject, gvkr)
if err != nil {
Expand All @@ -122,7 +96,6 @@ func (r *ReconcileScaledObject) updateHPAIfNeeded(logger logr.Logger, scaledObje

if !equality.Semantic.DeepDerivative(hpa.Spec, foundHpa.Spec) {
if r.client.Update(context.TODO(), foundHpa) != nil {
//foundHpa.Spec = *hpa.Spec.DeepCopy()
foundHpa.Spec = hpa.Spec
logger.Error(err, "Failed to update HPA", "HPA.Namespace", foundHpa.Namespace, "HPA.Name", foundHpa.Name)
return err
Expand All @@ -134,8 +107,8 @@ func (r *ReconcileScaledObject) updateHPAIfNeeded(logger logr.Logger, scaledObje
}

// getScaledObjectMetricSpecs returns MetricSpec for HPA, generater from Triggers defitinion in ScaledObject
func (r *ReconcileScaledObject) getScaledObjectMetricSpecs(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) ([]autoscalingv2beta1.MetricSpec, error) {
var scaledObjectMetricSpecs []autoscalingv2beta1.MetricSpec
func (r *ReconcileScaledObject) getScaledObjectMetricSpecs(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) ([]autoscalingv2beta2.MetricSpec, error) {
var scaledObjectMetricSpecs []autoscalingv2beta2.MetricSpec
var externalMetricNames []string

scalers, err := scalehandler.NewScaleHandler(r.client, r.scaleClient, r.scheme).GetScaledObjectScalers(scaledObject)
Expand All @@ -149,9 +122,9 @@ func (r *ReconcileScaledObject) getScaledObjectMetricSpecs(logger logr.Logger, s

// add the scaledObjectName label. This is how the MetricsAdapter will know which scaledobject a metric is for when the HPA queries it.
for _, metricSpec := range metricSpecs {
metricSpec.External.MetricSelector = &metav1.LabelSelector{MatchLabels: make(map[string]string)}
metricSpec.External.MetricSelector.MatchLabels["scaledObjectName"] = scaledObject.Name
externalMetricNames = append(externalMetricNames, metricSpec.External.MetricName)
metricSpec.External.Metric.Selector = &metav1.LabelSelector{MatchLabels: make(map[string]string)}
metricSpec.External.Metric.Selector.MatchLabels["scaledObjectName"] = scaledObject.Name
externalMetricNames = append(externalMetricNames, metricSpec.External.Metric.Name)
}
scaledObjectMetricSpecs = append(scaledObjectMetricSpecs, metricSpecs...)
scaler.Close()
Expand Down
68 changes: 37 additions & 31 deletions pkg/controller/scaledobject/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
kedautil "github.com/kedacore/keda/pkg/util"

"github.com/go-logr/logr"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -41,11 +41,11 @@ func Add(mgr manager.Manager) error {
if err != nil {
return err
}
return add(mgr, newReconciler(mgr, scaleClient))
return add(mgr, newReconciler(mgr, &scaleClient))
}

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager, scaleClient scale.ScalesGetter) reconcile.Reconciler {
func newReconciler(mgr manager.Manager, scaleClient *scale.ScalesGetter) reconcile.Reconciler {
return &ReconcileScaledObject{
client: mgr.GetClient(),
scaleClient: scaleClient,
Expand Down Expand Up @@ -78,7 +78,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
}

// Watch for changes to secondary resource HPA and requeue the owner ScaledObject
err = c.Watch(&source.Kind{Type: &autoscalingv2beta1.HorizontalPodAutoscaler{}}, &handler.EnqueueRequestForOwner{
err = c.Watch(&source.Kind{Type: &autoscalingv2beta2.HorizontalPodAutoscaler{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &kedav1alpha1.ScaledObject{},
})
Expand Down Expand Up @@ -110,7 +110,7 @@ type ReconcileScaledObject struct {
// This client, initialized using mgr.Client() above, is a split client
// that reads objects from the cache and writes to the apiserver
client client.Client
scaleClient scale.ScalesGetter
scaleClient *scale.ScalesGetter
restMapper meta.RESTMapper
scheme *runtime.Scheme
scaleLoopContexts *sync.Map
Expand Down Expand Up @@ -143,32 +143,13 @@ func (r *ReconcileScaledObject) Reconcile(request reconcile.Request) (reconcile.

// Check if the ScaledObject instance is marked to be deleted, which is
// indicated by the deletion timestamp being set.
isScaledObjectMarkedToBeDeleted := scaledObject.GetDeletionTimestamp() != nil
if isScaledObjectMarkedToBeDeleted {
if contains(scaledObject.GetFinalizers(), scaledObjectFinalizer) {
// Run finalization logic for scaledObjectFinalizer. If the
// finalization logic fails, don't remove the finalizer so
// that we can retry during the next reconciliation.
if err := r.finalizeScaledObject(reqLogger, scaledObject); err != nil {
return reconcile.Result{}, err
}

// Remove scaledObjectFinalizer. Once all finalizers have been
// removed, the object will be deleted.
scaledObject.SetFinalizers(remove(scaledObject.GetFinalizers(), scaledObjectFinalizer))
err := r.client.Update(context.TODO(), scaledObject)
if err != nil {
return reconcile.Result{}, err
}
}
return reconcile.Result{}, nil
if scaledObject.GetDeletionTimestamp() != nil {
return reconcile.Result{}, r.finalizeScaledObject(reqLogger, scaledObject)
}

// Add finalizer for this CR
if !contains(scaledObject.GetFinalizers(), scaledObjectFinalizer) {
if err := r.addFinalizer(reqLogger, scaledObject); err != nil {
return reconcile.Result{}, err
}
// ensure finalizer is set on this CR
if err := r.ensureFinalizer(reqLogger, scaledObject); err != nil {
return reconcile.Result{}, err
}

return reconcile.Result{}, r.reconcileScaledObject(reqLogger, scaledObject)
Expand Down Expand Up @@ -256,7 +237,7 @@ func (r *ReconcileScaledObject) checkTargetResourceIsScalable(logger logr.Logger
logger.V(1).Info("Parsed Group, Version, Kind, Resource", "GVK", gvkr.GVKString(), "Resource", gvkr.Resource)

// let's try to detect /scale subresource
_, errScale := r.scaleClient.Scales(scaledObject.Namespace).Get(gvkr.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name)
_, errScale := (*r.scaleClient).Scales(scaledObject.Namespace).Get(gvkr.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name)
if errScale != nil {
// not able to get /scale subresource -> let's check if the resource even exist in the cluster
unstruct := &unstructured.Unstructured{}
Expand Down Expand Up @@ -286,7 +267,7 @@ func (r *ReconcileScaledObject) checkTargetResourceIsScalable(logger logr.Logger
// ensureHPAForScaledObjectExists ensures that in cluster exist up-to-date HPA for specified ScaledObject, returns true if a new HPA was created
func (r *ReconcileScaledObject) ensureHPAForScaledObjectExists(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, gvkr *kedautil.GroupVersionKindResource) (bool, error) {
hpaName := getHPAName(scaledObject)
foundHpa := &autoscalingv2beta1.HorizontalPodAutoscaler{}
foundHpa := &autoscalingv2beta2.HorizontalPodAutoscaler{}
// Check if HPA for this ScaledObject already exists
err := r.client.Get(context.TODO(), types.NamespacedName{Name: hpaName, Namespace: scaledObject.Namespace}, foundHpa)
if err != nil && errors.IsNotFound(err) {
Expand Down Expand Up @@ -344,6 +325,31 @@ func (r *ReconcileScaledObject) startScaleLoop(logger logr.Logger, scaledObject
return nil
}

// stopScaleLoop stops ScaleLoop handler for the respective ScaleObject
func (r *ReconcileScaledObject) stopScaleLoop(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
key, err := cache.MetaNamespaceKeyFunc(scaledObject)
if err != nil {
logger.Error(err, "Error getting key for scaledObject")
return err
}

// delete ScaledObject's current Generation
r.scaledObjectsGenerations.Delete(key)

result, ok := r.scaleLoopContexts.Load(key)
if ok {
cancel, ok := result.(context.CancelFunc)
if ok {
cancel()
}
r.scaleLoopContexts.Delete(key)
} else {
logger.V(1).Info("ScaleObject was not found in controller cache", "key", key)
}

return nil
}

// scaledObjectGenerationChanged returns true if ScaledObject's Generation was changed, ie. ScaledObject.Spec was changed
func (r *ReconcileScaledObject) scaledObjectGenerationChanged(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (bool, error) {
key, err := cache.MetaNamespaceKeyFunc(scaledObject)
Expand Down
56 changes: 28 additions & 28 deletions pkg/controller/scaledobject/scaledobject_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,52 +3,52 @@ package scaledobject
import (
"context"

"github.com/go-logr/logr"
"k8s.io/client-go/tools/cache"

kedav1alpha1 "github.com/kedacore/keda/pkg/apis/keda/v1alpha1"

"github.com/go-logr/logr"
)

const (
scaledObjectFinalizer = "finalizer.keda.sh"
)

// finalizeScaledObject is stopping ScaleLoop for the respective ScaleObject
// finalizeScaledObject runs finalization logic on ScaledObject if there's finalizer
func (r *ReconcileScaledObject) finalizeScaledObject(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
key, err := cache.MetaNamespaceKeyFunc(scaledObject)
if err != nil {
logger.Error(err, "Error getting key for scaledObject (%s/%s)", scaledObject.GetNamespace(), scaledObject.GetName())
return err
}

// store ScaledObject's current Generation
r.scaledObjectsGenerations.Delete(key)
if contains(scaledObject.GetFinalizers(), scaledObjectFinalizer) {
// Run finalization logic for scaledObjectFinalizer. If the
// finalization logic fails, don't remove the finalizer so
// that we can retry during the next reconciliation.
if err := r.stopScaleLoop(logger, scaledObject); err != nil {
return err
}

result, ok := r.scaleLoopContexts.Load(key)
if ok {
cancel, ok := result.(context.CancelFunc)
if ok {
cancel()
// Remove scaledObjectFinalizer. Once all finalizers have been
// removed, the object will be deleted.
scaledObject.SetFinalizers(remove(scaledObject.GetFinalizers(), scaledObjectFinalizer))
if err := r.client.Update(context.TODO(), scaledObject); err != nil {
logger.Error(err, "Failed to update ScaledObject after removing a finalizer", "finalizer", scaledObjectFinalizer)
return err
}
r.scaleLoopContexts.Delete(key)
} else {
logger.V(1).Info("ScaleObject was not found in controller cache", "key", key)
}

logger.Info("Successfully finalized ScaledObject")
return nil
}

// addFinalizer adds finalizer to the ScaledObject
func (r *ReconcileScaledObject) addFinalizer(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
logger.Info("Adding Finalizer for the ScaledObject")
scaledObject.SetFinalizers(append(scaledObject.GetFinalizers(), scaledObjectFinalizer))
// ensureFinalizer check there is finalizer present on the ScaledObject, if not it adds one
func (r *ReconcileScaledObject) ensureFinalizer(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {

// Update CR
err := r.client.Update(context.TODO(), scaledObject)
if err != nil {
logger.Error(err, "Failed to update ScaledObject with finalizer")
return err
if !contains(scaledObject.GetFinalizers(), scaledObjectFinalizer) {
logger.Info("Adding Finalizer for the ScaledObject")
scaledObject.SetFinalizers(append(scaledObject.GetFinalizers(), scaledObjectFinalizer))

// Update CR
err := r.client.Update(context.TODO(), scaledObject)
if err != nil {
logger.Error(err, "Failed to update ScaledObject with a finalizer", "finalizer", scaledObjectFinalizer)
return err
}
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/handler/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
// each ScaledObject and making the final scale decision and operation
type ScaleHandler struct {
client client.Client
scaleClient scale.ScalesGetter // TODO pointer
scaleClient *scale.ScalesGetter
logger logr.Logger
reconcilerScheme *runtime.Scheme
}
Expand All @@ -34,7 +34,7 @@ const (
)

// NewScaleHandler creates a ScaleHandler object
func NewScaleHandler(client client.Client, scaleClient scale.ScalesGetter, reconcilerScheme *runtime.Scheme) *ScaleHandler {
func NewScaleHandler(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme) *ScaleHandler {
handler := &ScaleHandler{
client: client,
scaleClient: scaleClient,
Expand Down
2 changes: 1 addition & 1 deletion pkg/handler/scale_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (h *ScaleHandler) handleScaleJob(ctx context.Context, scaledObject *kedav1a

var metricValue int64
for _, metric := range metricSpecs {
metricValue, _ = metric.External.TargetAverageValue.AsInt64()
metricValue, _ = metric.External.Target.AverageValue.AsInt64()
maxValue += metricValue
}
scalerLogger.Info("Scaler max value", "MaxValue", maxValue)
Expand Down
4 changes: 2 additions & 2 deletions pkg/handler/scale_scaledobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,11 @@ func (h *ScaleHandler) scaleFromZero(scaledObject *kedav1alpha1.ScaledObject, sc
}

func (h *ScaleHandler) getScaleTargetScale(scaledObject *kedav1alpha1.ScaledObject) (*autoscalingv1.Scale, error) {
return h.scaleClient.Scales(scaledObject.Namespace).Get(scaledObject.Status.ScaleTargetGVKR.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name)
return (*h.scaleClient).Scales(scaledObject.Namespace).Get(scaledObject.Status.ScaleTargetGVKR.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name)
}

func (h *ScaleHandler) updateScaleOnScaleTarget(scaledObject *kedav1alpha1.ScaledObject, scale *autoscalingv1.Scale) error {
_, err := h.scaleClient.Scales(scaledObject.Namespace).Update(scaledObject.Status.ScaleTargetGVKR.GroupResource(), scale)
_, err := (*h.scaleClient).Scales(scaledObject.Namespace).Update(scaledObject.Status.ScaleTargetGVKR.GroupResource(), scale)
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (p *KedaProvider) GetExternalMetric(namespace string, metricSelector labels

for _, metricSpec := range metricSpecs {
// Filter only the desired metric
if strings.EqualFold(metricSpec.External.MetricName, info.Metric) {
if strings.EqualFold(metricSpec.External.Metric.Name, info.Metric) {
metrics, err := scaler.GetMetrics(context.TODO(), info.Metric, metricSelector)
if err != nil {
logger.Error(err, "error getting metric for scaler", "ScaledObject.Namespace", scaledObject.Namespace, "ScaledObject.Name", scaledObject.Name, "Scaler", scaler)
Expand Down
Loading