Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for StatefulSet #498

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions deploy/crds/keda.k8s.io_scaledobjects_crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4472,8 +4472,8 @@ spec:
type: string
deploymentName:
type: string
required:
- deploymentName
statefulSetName:
type: string
type: object
scaleType:
description: ScaledObjectScaleType distinguish between Deployment based
Expand Down
9 changes: 7 additions & 2 deletions pkg/apis/keda/v1alpha1/scaledobject_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ const (
ScaleTypeDeployment ScaledObjectScaleType = "deployment"
// ScaleTypeJob specifies K8s Jobs based ScaleObject
ScaleTypeJob ScaledObjectScaleType = "job"
// ScaleTypeStatefulSet specifies K8s StatefulSet based ScaleObject
ScaleTypeStatefulSet ScaledObjectScaleType = "statefulSet"
)

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down Expand Up @@ -58,7 +60,10 @@ type ScaledObjectSpec struct {
// ScaledObject applies
// +k8s:openapi-gen=true
type ObjectReference struct {
DeploymentName string `json:"deploymentName"`
// +optional
DeploymentName string `json:"deploymentName,omitempty"`
// +optional
StatefulSetName string `json:"statefulSetName,omitempty"`
// +optional
ContainerName string `json:"containerName,omitempty"`
}
Expand All @@ -79,7 +84,7 @@ type ScaleTriggers struct {
// +optional
type ScaledObjectStatus struct {
// +optional
LastActiveTime *metav1.Time `json:"lastActiveTime,omitempty"`
LastActiveTime *metav1.Time `json:"lastActiveTime,omitempty"`
// +optional
// +listType
ExternalMetricNames []string `json:"externalMetricNames,omitempty"`
Expand Down
87 changes: 60 additions & 27 deletions pkg/controller/scaledobject/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-logr/logr"
kedav1alpha1 "github.com/kedacore/keda/pkg/apis/keda/v1alpha1"
scalehandler "github.com/kedacore/keda/pkg/handler"
"github.com/kedacore/keda/pkg/scalers"

autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
batchv1 "k8s.io/api/batch/v1"
Expand All @@ -34,6 +35,11 @@ const (
defaultHPAMaxReplicas int32 = 100
)

const (
deploymentKind string = "Deployment"
statefulSetKind string = "StatefulSet"
)

var log = logf.Log.WithName("controller_scaledobject")

// Add creates a new ScaledObject Controller and adds it to the Manager. The Manager will set fields on the Controller
Expand Down Expand Up @@ -138,18 +144,21 @@ func (r *ReconcileScaledObject) Reconcile(request reconcile.Request) (reconcile.
}

reqLogger.Info("Detecting ScaleType from ScaledObject")
if scaledObject.Spec.ScaleTargetRef == nil || scaledObject.Spec.ScaleTargetRef.DeploymentName == "" {
if scaledObject.Spec.ScaleTargetRef == nil || (scaledObject.Spec.ScaleTargetRef.DeploymentName == "" && scaledObject.Spec.ScaleTargetRef.StatefulSetName == "") {
reqLogger.Info("Detected ScaleType = Job")
return r.reconcileJobType(reqLogger, scaledObject)
return r.reconcileJobType(reqLogger, kedav1alpha1.ScaleTypeJob, scaledObject)
} else if scaledObject.Spec.ScaleTargetRef.StatefulSetName != "" {
reqLogger.Info("Detected ScaleType = StatefulSet")
return r.reconcileDeploymentOrStatefulSetType(reqLogger, kedav1alpha1.ScaleTypeStatefulSet, scaledObject)
} else {
reqLogger.Info("Detected ScaleType = Deployment")
return r.reconcileDeploymentType(reqLogger, scaledObject)
return r.reconcileDeploymentOrStatefulSetType(reqLogger, kedav1alpha1.ScaleTypeDeployment, scaledObject)
}
}

// reconcileJobType implemets reconciler logic for K8s Jobs based ScaleObject
func (r *ReconcileScaledObject) reconcileJobType(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (reconcile.Result, error) {
scaledObject.Spec.ScaleType = kedav1alpha1.ScaleTypeJob
func (r *ReconcileScaledObject) reconcileJobType(logger logr.Logger, scaleType kedav1alpha1.ScaledObjectScaleType, scaledObject *kedav1alpha1.ScaledObject) (reconcile.Result, error) {
scaledObject.Spec.ScaleType = scaleType

// Delete Jobs owned by the previous version of the ScaledObject
opts := []client.ListOption{
Expand Down Expand Up @@ -180,26 +189,30 @@ func (r *ReconcileScaledObject) reconcileJobType(logger logr.Logger, scaledObjec
return reconcile.Result{}, nil
}

// reconcileDeploymentType implements reconciler logic for Deployment based ScaleObject
func (r *ReconcileScaledObject) reconcileDeploymentType(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (reconcile.Result, error) {
scaledObject.Spec.ScaleType = kedav1alpha1.ScaleTypeDeployment
// reconcileDeploymentOrStatefulSetType implements reconciler logic for Deployment or StatefulSet based ScaleObject
func (r *ReconcileScaledObject) reconcileDeploymentOrStatefulSetType(logger logr.Logger, scaleType kedav1alpha1.ScaledObjectScaleType, scaledObject *kedav1alpha1.ScaledObject) (reconcile.Result, error) {
scaledObject.Spec.ScaleType = scaleType

deploymentName := scaledObject.Spec.ScaleTargetRef.DeploymentName
if deploymentName == "" {
err := fmt.Errorf("Notified about ScaledObject with missing deployment name")
logger.Error(err, "Notified about ScaledObject with missing deployment")
appName := scaledObject.Spec.ScaleTargetRef.DeploymentName
if scaleType == kedav1alpha1.ScaleTypeStatefulSet {
appName = scaledObject.Spec.ScaleTargetRef.StatefulSetName
}

if appName == "" {
err := fmt.Errorf("Notified about ScaledObject with missing deployment or statefulSet name")
logger.Error(err, "Notified about ScaledObject with missing deployment or statefulSet")
return reconcile.Result{}, err
}

hpaName := getHpaName(deploymentName)
hpaName := getHpaName(scaleType, appName)
hpaNamespace := scaledObject.Namespace

// Check if this HPA already exists
foundHpa := &autoscalingv2beta1.HorizontalPodAutoscaler{}
err := r.client.Get(context.TODO(), types.NamespacedName{Name: hpaName, Namespace: hpaNamespace}, foundHpa)
if err != nil && errors.IsNotFound(err) {
logger.Info("Creating a new HPA", "HPA.Namespace", hpaNamespace, "HPA.Name", hpaName)
hpa, err := r.newHPAForScaledObject(logger, scaledObject)
hpa, err := r.newHPAForScaledObject(logger, scaleType, appName, scaledObject)
if err != nil {
logger.Error(err, "Failed to create new HPA resource", "HPA.Namespace", hpaNamespace, "HPA.Name", hpaName)
return reconcile.Result{}, err
Expand Down Expand Up @@ -240,7 +253,7 @@ func (r *ReconcileScaledObject) reconcileDeploymentType(logger logr.Logger, scal
foundHpa.Spec.MaxReplicas = scaledObjectMaxReplicaCount
}

newMetricSpec, err := r.getScaledObjectMetricSpecs(logger, scaledObject, deploymentName)
newMetricSpec, err := r.getScaledObjectMetricSpecs(logger, scaleType, appName, scaledObject)
if err != nil {
logger.Error(err, "Failed to create MetricSpec")
return reconcile.Result{}, err
Expand Down Expand Up @@ -291,26 +304,30 @@ func (r *ReconcileScaledObject) startScaleLoop(logger logr.Logger, scaledObject
}

// newHPAForScaledObject returns HPA as it is specified in ScaledObject
func (r *ReconcileScaledObject) newHPAForScaledObject(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (*autoscalingv2beta1.HorizontalPodAutoscaler, error) {
deploymentName := scaledObject.Spec.ScaleTargetRef.DeploymentName
scaledObjectMetricSpecs, err := r.getScaledObjectMetricSpecs(logger, scaledObject, deploymentName)
func (r *ReconcileScaledObject) newHPAForScaledObject(logger logr.Logger, scaleType kedav1alpha1.ScaledObjectScaleType, appName string, scaledObject *kedav1alpha1.ScaledObject) (*autoscalingv2beta1.HorizontalPodAutoscaler, error) {
scaledObjectMetricSpecs, err := r.getScaledObjectMetricSpecs(logger, scaleType, appName, scaledObject)

if err != nil {
return nil, err
}

appKind := deploymentKind
if scaleType == kedav1alpha1.ScaleTypeStatefulSet {
appKind = statefulSetKind
}

return &autoscalingv2beta1.HorizontalPodAutoscaler{
Spec: autoscalingv2beta1.HorizontalPodAutoscalerSpec{
MinReplicas: getHpaMinReplicas(scaledObject),
MaxReplicas: getHpaMaxReplicas(scaledObject),
Metrics: scaledObjectMetricSpecs,
ScaleTargetRef: autoscalingv2beta1.CrossVersionObjectReference{
Name: deploymentName,
Kind: "Deployment",
Name: appName,
Kind: appKind,
APIVersion: "apps/v1",
}},
ObjectMeta: metav1.ObjectMeta{
Name: getHpaName(deploymentName),
Name: getHpaName(scaleType, appName),
Namespace: scaledObject.Namespace,
},
TypeMeta: metav1.TypeMeta{
Expand All @@ -320,11 +337,19 @@ func (r *ReconcileScaledObject) newHPAForScaledObject(logger logr.Logger, scaled
}

// getScaledObjectMetricSpecs returns MetricSpec for HPA, generater from Triggers defitinion in ScaledObject
func (r *ReconcileScaledObject) getScaledObjectMetricSpecs(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, deploymentName string) ([]autoscalingv2beta1.MetricSpec, error) {
func (r *ReconcileScaledObject) getScaledObjectMetricSpecs(logger logr.Logger, scaleType kedav1alpha1.ScaledObjectScaleType, appName string, scaledObject *kedav1alpha1.ScaledObject) ([]autoscalingv2beta1.MetricSpec, error) {
var scaledObjectMetricSpecs []autoscalingv2beta1.MetricSpec
var externalMetricNames []string

scalers, _, err := scalehandler.NewScaleHandler(r.client, r.scheme).GetDeploymentScalers(scaledObject)
var scalers []scalers.Scaler
var err error

if scaleType == kedav1alpha1.ScaleTypeStatefulSet {
scalers, _, err = scalehandler.NewScaleHandler(r.client, r.scheme).GetStatefulSetScalers(scaledObject)
} else {
scalers, _, err = scalehandler.NewScaleHandler(r.client, r.scheme).GetDeploymentScalers(scaledObject)
}

if err != nil {
logger.Error(err, "Error getting scalers")
return nil, err
Expand All @@ -336,7 +361,12 @@ func (r *ReconcileScaledObject) getScaledObjectMetricSpecs(logger logr.Logger, s
// add the deploymentName label. This is how the MetricsAdapter will know which scaledobject a metric is for when the HPA queries it.
for _, metricSpec := range metricSpecs {
metricSpec.External.MetricSelector = &metav1.LabelSelector{MatchLabels: make(map[string]string)}
metricSpec.External.MetricSelector.MatchLabels["deploymentName"] = deploymentName
if scaleType == kedav1alpha1.ScaleTypeStatefulSet {
metricSpec.External.MetricSelector.MatchLabels["statefulSetName"] = appName
} else {
metricSpec.External.MetricSelector.MatchLabels["deploymentName"] = appName
}

externalMetricNames = append(externalMetricNames, metricSpec.External.MetricName)
}
scaledObjectMetricSpecs = append(scaledObjectMetricSpecs, metricSpecs...)
Expand All @@ -354,9 +384,12 @@ func (r *ReconcileScaledObject) getScaledObjectMetricSpecs(logger logr.Logger, s
return scaledObjectMetricSpecs, nil
}

// getHpaName returns generated HPA name for DeploymentName specified in the parameter
func getHpaName(deploymentName string) string {
return fmt.Sprintf("keda-hpa-%s", deploymentName)
// getHpaName returns generated HPA name for DeploymentName or StatefulSetName specified in the parameter
func getHpaName(scaleType kedav1alpha1.ScaledObjectScaleType, appName string) string {
if scaleType == kedav1alpha1.ScaleTypeStatefulSet {
return fmt.Sprintf("keda-hpa-%s-%s", scaleType, appName)
}
return fmt.Sprintf("keda-hpa-%s", appName)
}

// getHpaMinReplicas returns MinReplicas based on definition in ScaledObject or default value if not defined
Expand Down
4 changes: 2 additions & 2 deletions pkg/handler/scale_deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (h *ScaleHandler) scaleDeployment(deployment *appsv1.Deployment, scaledObje
if *deployment.Spec.Replicas == 0 && isActive {
// current replica count is 0, but there is an active trigger.
// scale the deployment up
h.scaleFromZero(deployment, scaledObject)
h.scaleDeploymentFromZero(deployment, scaledObject)
} else if !isActive &&
*deployment.Spec.Replicas > 0 &&
(scaledObject.Spec.MinReplicaCount == nil || *scaledObject.Spec.MinReplicaCount == 0) {
Expand Down Expand Up @@ -78,7 +78,7 @@ func (h *ScaleHandler) scaleDeploymentToZero(deployment *appsv1.Deployment, scal
}
}

func (h *ScaleHandler) scaleFromZero(deployment *appsv1.Deployment, scaledObject *kedav1alpha1.ScaledObject) {
func (h *ScaleHandler) scaleDeploymentFromZero(deployment *appsv1.Deployment, scaledObject *kedav1alpha1.ScaledObject) {
currentReplicas := *deployment.Spec.Replicas
if scaledObject.Spec.MinReplicaCount != nil && *scaledObject.Spec.MinReplicaCount > 0 {
deployment.Spec.Replicas = scaledObject.Spec.MinReplicaCount
Expand Down
33 changes: 33 additions & 0 deletions pkg/handler/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,39 @@ func (h *ScaleHandler) GetDeploymentScalers(scaledObject *kedav1alpha1.ScaledObj
return scalers, deployment, nil
}

// GetStatefulSetScalers returns list of Scalers and StatefulSet for the specified ScaledObject
func (h *ScaleHandler) GetStatefulSetScalers(scaledObject *kedav1alpha1.ScaledObject) ([]scalers.Scaler, *appsv1.StatefulSet, error) {
scalers := []scalers.Scaler{}

statefulSetName := scaledObject.Spec.ScaleTargetRef.StatefulSetName
if statefulSetName == "" {
return scalers, nil, fmt.Errorf("notified about ScaledObject with missing StatefulSet name: %s", scaledObject.GetName())
}

statefulSet := &appsv1.StatefulSet{}
err := h.client.Get(context.TODO(), types.NamespacedName{Name: statefulSetName, Namespace: scaledObject.GetNamespace()}, statefulSet)
if err != nil {
return scalers, nil, fmt.Errorf("error getting statefulSet: %s", err)
}

resolvedEnv, err := h.resolveStatefulSetEnv(statefulSet, scaledObject.Spec.ScaleTargetRef.ContainerName)
if err != nil {
return scalers, nil, fmt.Errorf("error resolving secrets for statefulSet: %s", err)
}

for i, trigger := range scaledObject.Spec.Triggers {
authParams, podIdentity := h.parseStatefulSetAuthRef(trigger.AuthenticationRef, scaledObject, statefulSet)
scaler, err := h.getScaler(scaledObject.Name, scaledObject.Namespace, trigger.Type, resolvedEnv, trigger.Metadata, authParams, podIdentity)
if err != nil {
return scalers, nil, fmt.Errorf("error getting scaler for trigger #%d: %s", i, err)
}

scalers = append(scalers, scaler)
}

return scalers, statefulSet, nil
}

func (h *ScaleHandler) getJobScalers(scaledObject *kedav1alpha1.ScaledObject) ([]scalers.Scaler, error) {
scalers := []scalers.Scaler{}

Expand Down
34 changes: 34 additions & 0 deletions pkg/handler/scale_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ func (h *ScaleHandler) handleScale(ctx context.Context, scaledObject *kedav1alph
case kedav1alpha1.ScaleTypeJob:
h.handleScaleJob(ctx, scaledObject)
break
case kedav1alpha1.ScaleTypeStatefulSet:
h.handleScaleStatefulSet(ctx, scaledObject)
break
default:
h.handleScaleDeployment(ctx, scaledObject)
}
Expand Down Expand Up @@ -129,3 +132,34 @@ func (h *ScaleHandler) handleScaleDeployment(ctx context.Context, scaledObject *

h.scaleDeployment(deployment, scaledObject, isScaledObjectActive)
}

// handleScaleStatefulSet contains the main logic for the ScaleHandler scaling logic.
// It'll check each trigger active status then call scaleStatefulSet
func (h *ScaleHandler) handleScaleStatefulSet(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject) {
scalers, statefulSet, err := h.GetStatefulSetScalers(scaledObject)

if statefulSet == nil {
return
}
if err != nil {
h.logger.Error(err, "Error getting scalers")
return
}

isScaledObjectActive := false

for _, scaler := range scalers {
defer scaler.Close()
isTriggerActive, err := scaler.IsActive(ctx)

if err != nil {
h.logger.V(1).Info("Error getting scale decision", "Error", err)
continue
} else if isTriggerActive {
isScaledObjectActive = true
h.logger.V(1).Info("Scaler for scaledObject is active", "Scaler", scaler)
}
}

h.scaleStatefulSet(statefulSet, scaledObject, isScaledObjectActive)
}
Loading