diff --git a/deploy/crds/keda.k8s.io_scaledobjects_crd.yaml b/deploy/crds/keda.k8s.io_scaledobjects_crd.yaml index 22c77e10ade..bb59a9e7f88 100644 --- a/deploy/crds/keda.k8s.io_scaledobjects_crd.yaml +++ b/deploy/crds/keda.k8s.io_scaledobjects_crd.yaml @@ -4472,8 +4472,8 @@ spec: type: string deploymentName: type: string - required: - - deploymentName + statefulSetName: + type: string type: object scaleType: description: ScaledObjectScaleType distinguish between Deployment based diff --git a/pkg/apis/keda/v1alpha1/scaledobject_types.go b/pkg/apis/keda/v1alpha1/scaledobject_types.go index f7cef539251..d4db376d1b0 100644 --- a/pkg/apis/keda/v1alpha1/scaledobject_types.go +++ b/pkg/apis/keda/v1alpha1/scaledobject_types.go @@ -13,6 +13,8 @@ const ( ScaleTypeDeployment ScaledObjectScaleType = "deployment" // ScaleTypeJob specifies K8s Jobs based ScaleObject ScaleTypeJob ScaledObjectScaleType = "job" + // ScaleTypeStatefulSet specifies K8s StatefulSet based ScaleObject + ScaleTypeStatefulSet ScaledObjectScaleType = "statefulSet" ) // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -58,7 +60,10 @@ type ScaledObjectSpec struct { // ScaledObject applies // +k8s:openapi-gen=true type ObjectReference struct { - DeploymentName string `json:"deploymentName"` + // +optional + DeploymentName string `json:"deploymentName,omitempty"` + // +optional + StatefulSetName string `json:"statefulSetName,omitempty"` // +optional ContainerName string `json:"containerName,omitempty"` } @@ -79,7 +84,7 @@ type ScaleTriggers struct { // +optional type ScaledObjectStatus struct { // +optional - LastActiveTime *metav1.Time `json:"lastActiveTime,omitempty"` + LastActiveTime *metav1.Time `json:"lastActiveTime,omitempty"` // +optional // +listType ExternalMetricNames []string `json:"externalMetricNames,omitempty"` diff --git a/pkg/controller/scaledobject/scaledobject_controller.go b/pkg/controller/scaledobject/scaledobject_controller.go index 19a414278e4..5f4b82987cb 100644 --- a/pkg/controller/scaledobject/scaledobject_controller.go +++ b/pkg/controller/scaledobject/scaledobject_controller.go @@ -9,6 +9,7 @@ import ( "github.com/go-logr/logr" kedav1alpha1 "github.com/kedacore/keda/pkg/apis/keda/v1alpha1" scalehandler "github.com/kedacore/keda/pkg/handler" + "github.com/kedacore/keda/pkg/scalers" autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1" batchv1 "k8s.io/api/batch/v1" @@ -34,6 +35,11 @@ const ( defaultHPAMaxReplicas int32 = 100 ) +const ( + deploymentKind string = "Deployment" + statefulSetKind string = "StatefulSet" +) + var log = logf.Log.WithName("controller_scaledobject") // Add creates a new ScaledObject Controller and adds it to the Manager. The Manager will set fields on the Controller @@ -138,18 +144,21 @@ func (r *ReconcileScaledObject) Reconcile(request reconcile.Request) (reconcile. } reqLogger.Info("Detecting ScaleType from ScaledObject") - if scaledObject.Spec.ScaleTargetRef == nil || scaledObject.Spec.ScaleTargetRef.DeploymentName == "" { + if scaledObject.Spec.ScaleTargetRef == nil || (scaledObject.Spec.ScaleTargetRef.DeploymentName == "" && scaledObject.Spec.ScaleTargetRef.StatefulSetName == "") { reqLogger.Info("Detected ScaleType = Job") - return r.reconcileJobType(reqLogger, scaledObject) + return r.reconcileJobType(reqLogger, kedav1alpha1.ScaleTypeJob, scaledObject) + } else if scaledObject.Spec.ScaleTargetRef.StatefulSetName != "" { + reqLogger.Info("Detected ScaleType = StatefulSet") + return r.reconcileDeploymentOrStatefulSetType(reqLogger, kedav1alpha1.ScaleTypeStatefulSet, scaledObject) } else { reqLogger.Info("Detected ScaleType = Deployment") - return r.reconcileDeploymentType(reqLogger, scaledObject) + return r.reconcileDeploymentOrStatefulSetType(reqLogger, kedav1alpha1.ScaleTypeDeployment, scaledObject) } } // reconcileJobType implemets reconciler logic for K8s Jobs based ScaleObject -func (r *ReconcileScaledObject) reconcileJobType(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (reconcile.Result, error) { - scaledObject.Spec.ScaleType = kedav1alpha1.ScaleTypeJob +func (r *ReconcileScaledObject) reconcileJobType(logger logr.Logger, scaleType kedav1alpha1.ScaledObjectScaleType, scaledObject *kedav1alpha1.ScaledObject) (reconcile.Result, error) { + scaledObject.Spec.ScaleType = scaleType // Delete Jobs owned by the previous version of the ScaledObject opts := []client.ListOption{ @@ -180,18 +189,22 @@ func (r *ReconcileScaledObject) reconcileJobType(logger logr.Logger, scaledObjec return reconcile.Result{}, nil } -// reconcileDeploymentType implements reconciler logic for Deployment based ScaleObject -func (r *ReconcileScaledObject) reconcileDeploymentType(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (reconcile.Result, error) { - scaledObject.Spec.ScaleType = kedav1alpha1.ScaleTypeDeployment +// reconcileDeploymentOrStatefulSetType implements reconciler logic for Deployment or StatefulSet based ScaleObject +func (r *ReconcileScaledObject) reconcileDeploymentOrStatefulSetType(logger logr.Logger, scaleType kedav1alpha1.ScaledObjectScaleType, scaledObject *kedav1alpha1.ScaledObject) (reconcile.Result, error) { + scaledObject.Spec.ScaleType = scaleType - deploymentName := scaledObject.Spec.ScaleTargetRef.DeploymentName - if deploymentName == "" { - err := fmt.Errorf("Notified about ScaledObject with missing deployment name") - logger.Error(err, "Notified about ScaledObject with missing deployment") + appName := scaledObject.Spec.ScaleTargetRef.DeploymentName + if scaleType == kedav1alpha1.ScaleTypeStatefulSet { + appName = scaledObject.Spec.ScaleTargetRef.StatefulSetName + } + + if appName == "" { + err := fmt.Errorf("Notified about ScaledObject with missing deployment or statefulSet name") + logger.Error(err, "Notified about ScaledObject with missing deployment or statefulSet") return reconcile.Result{}, err } - hpaName := getHpaName(deploymentName) + hpaName := getHpaName(scaleType, appName) hpaNamespace := scaledObject.Namespace // Check if this HPA already exists @@ -199,7 +212,7 @@ func (r *ReconcileScaledObject) reconcileDeploymentType(logger logr.Logger, scal err := r.client.Get(context.TODO(), types.NamespacedName{Name: hpaName, Namespace: hpaNamespace}, foundHpa) if err != nil && errors.IsNotFound(err) { logger.Info("Creating a new HPA", "HPA.Namespace", hpaNamespace, "HPA.Name", hpaName) - hpa, err := r.newHPAForScaledObject(logger, scaledObject) + hpa, err := r.newHPAForScaledObject(logger, scaleType, appName, scaledObject) if err != nil { logger.Error(err, "Failed to create new HPA resource", "HPA.Namespace", hpaNamespace, "HPA.Name", hpaName) return reconcile.Result{}, err @@ -240,7 +253,7 @@ func (r *ReconcileScaledObject) reconcileDeploymentType(logger logr.Logger, scal foundHpa.Spec.MaxReplicas = scaledObjectMaxReplicaCount } - newMetricSpec, err := r.getScaledObjectMetricSpecs(logger, scaledObject, deploymentName) + newMetricSpec, err := r.getScaledObjectMetricSpecs(logger, scaleType, appName, scaledObject) if err != nil { logger.Error(err, "Failed to create MetricSpec") return reconcile.Result{}, err @@ -291,26 +304,30 @@ func (r *ReconcileScaledObject) startScaleLoop(logger logr.Logger, scaledObject } // newHPAForScaledObject returns HPA as it is specified in ScaledObject -func (r *ReconcileScaledObject) newHPAForScaledObject(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (*autoscalingv2beta1.HorizontalPodAutoscaler, error) { - deploymentName := scaledObject.Spec.ScaleTargetRef.DeploymentName - scaledObjectMetricSpecs, err := r.getScaledObjectMetricSpecs(logger, scaledObject, deploymentName) +func (r *ReconcileScaledObject) newHPAForScaledObject(logger logr.Logger, scaleType kedav1alpha1.ScaledObjectScaleType, appName string, scaledObject *kedav1alpha1.ScaledObject) (*autoscalingv2beta1.HorizontalPodAutoscaler, error) { + scaledObjectMetricSpecs, err := r.getScaledObjectMetricSpecs(logger, scaleType, appName, scaledObject) if err != nil { return nil, err } + appKind := deploymentKind + if scaleType == kedav1alpha1.ScaleTypeStatefulSet { + appKind = statefulSetKind + } + return &autoscalingv2beta1.HorizontalPodAutoscaler{ Spec: autoscalingv2beta1.HorizontalPodAutoscalerSpec{ MinReplicas: getHpaMinReplicas(scaledObject), MaxReplicas: getHpaMaxReplicas(scaledObject), Metrics: scaledObjectMetricSpecs, ScaleTargetRef: autoscalingv2beta1.CrossVersionObjectReference{ - Name: deploymentName, - Kind: "Deployment", + Name: appName, + Kind: appKind, APIVersion: "apps/v1", }}, ObjectMeta: metav1.ObjectMeta{ - Name: getHpaName(deploymentName), + Name: getHpaName(scaleType, appName), Namespace: scaledObject.Namespace, }, TypeMeta: metav1.TypeMeta{ @@ -320,11 +337,19 @@ func (r *ReconcileScaledObject) newHPAForScaledObject(logger logr.Logger, scaled } // getScaledObjectMetricSpecs returns MetricSpec for HPA, generater from Triggers defitinion in ScaledObject -func (r *ReconcileScaledObject) getScaledObjectMetricSpecs(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, deploymentName string) ([]autoscalingv2beta1.MetricSpec, error) { +func (r *ReconcileScaledObject) getScaledObjectMetricSpecs(logger logr.Logger, scaleType kedav1alpha1.ScaledObjectScaleType, appName string, scaledObject *kedav1alpha1.ScaledObject) ([]autoscalingv2beta1.MetricSpec, error) { var scaledObjectMetricSpecs []autoscalingv2beta1.MetricSpec var externalMetricNames []string - scalers, _, err := scalehandler.NewScaleHandler(r.client, r.scheme).GetDeploymentScalers(scaledObject) + var scalers []scalers.Scaler + var err error + + if scaleType == kedav1alpha1.ScaleTypeStatefulSet { + scalers, _, err = scalehandler.NewScaleHandler(r.client, r.scheme).GetStatefulSetScalers(scaledObject) + } else { + scalers, _, err = scalehandler.NewScaleHandler(r.client, r.scheme).GetDeploymentScalers(scaledObject) + } + if err != nil { logger.Error(err, "Error getting scalers") return nil, err @@ -336,7 +361,12 @@ func (r *ReconcileScaledObject) getScaledObjectMetricSpecs(logger logr.Logger, s // add the deploymentName label. This is how the MetricsAdapter will know which scaledobject a metric is for when the HPA queries it. for _, metricSpec := range metricSpecs { metricSpec.External.MetricSelector = &metav1.LabelSelector{MatchLabels: make(map[string]string)} - metricSpec.External.MetricSelector.MatchLabels["deploymentName"] = deploymentName + if scaleType == kedav1alpha1.ScaleTypeStatefulSet { + metricSpec.External.MetricSelector.MatchLabels["statefulSetName"] = appName + } else { + metricSpec.External.MetricSelector.MatchLabels["deploymentName"] = appName + } + externalMetricNames = append(externalMetricNames, metricSpec.External.MetricName) } scaledObjectMetricSpecs = append(scaledObjectMetricSpecs, metricSpecs...) @@ -354,9 +384,12 @@ func (r *ReconcileScaledObject) getScaledObjectMetricSpecs(logger logr.Logger, s return scaledObjectMetricSpecs, nil } -// getHpaName returns generated HPA name for DeploymentName specified in the parameter -func getHpaName(deploymentName string) string { - return fmt.Sprintf("keda-hpa-%s", deploymentName) +// getHpaName returns generated HPA name for DeploymentName or StatefulSetName specified in the parameter +func getHpaName(scaleType kedav1alpha1.ScaledObjectScaleType, appName string) string { + if scaleType == kedav1alpha1.ScaleTypeStatefulSet { + return fmt.Sprintf("keda-hpa-%s-%s", scaleType, appName) + } + return fmt.Sprintf("keda-hpa-%s", appName) } // getHpaMinReplicas returns MinReplicas based on definition in ScaledObject or default value if not defined diff --git a/pkg/handler/scale_deployments.go b/pkg/handler/scale_deployments.go index 4524c3af186..4dcf8826e6e 100644 --- a/pkg/handler/scale_deployments.go +++ b/pkg/handler/scale_deployments.go @@ -17,7 +17,7 @@ func (h *ScaleHandler) scaleDeployment(deployment *appsv1.Deployment, scaledObje if *deployment.Spec.Replicas == 0 && isActive { // current replica count is 0, but there is an active trigger. // scale the deployment up - h.scaleFromZero(deployment, scaledObject) + h.scaleDeploymentFromZero(deployment, scaledObject) } else if !isActive && *deployment.Spec.Replicas > 0 && (scaledObject.Spec.MinReplicaCount == nil || *scaledObject.Spec.MinReplicaCount == 0) { @@ -78,7 +78,7 @@ func (h *ScaleHandler) scaleDeploymentToZero(deployment *appsv1.Deployment, scal } } -func (h *ScaleHandler) scaleFromZero(deployment *appsv1.Deployment, scaledObject *kedav1alpha1.ScaledObject) { +func (h *ScaleHandler) scaleDeploymentFromZero(deployment *appsv1.Deployment, scaledObject *kedav1alpha1.ScaledObject) { currentReplicas := *deployment.Spec.Replicas if scaledObject.Spec.MinReplicaCount != nil && *scaledObject.Spec.MinReplicaCount > 0 { deployment.Spec.Replicas = scaledObject.Spec.MinReplicaCount diff --git a/pkg/handler/scale_handler.go b/pkg/handler/scale_handler.go index 66715dc37ed..c825372c377 100644 --- a/pkg/handler/scale_handler.go +++ b/pkg/handler/scale_handler.go @@ -193,6 +193,39 @@ func (h *ScaleHandler) GetDeploymentScalers(scaledObject *kedav1alpha1.ScaledObj return scalers, deployment, nil } +// GetStatefulSetScalers returns list of Scalers and StatefulSet for the specified ScaledObject +func (h *ScaleHandler) GetStatefulSetScalers(scaledObject *kedav1alpha1.ScaledObject) ([]scalers.Scaler, *appsv1.StatefulSet, error) { + scalers := []scalers.Scaler{} + + statefulSetName := scaledObject.Spec.ScaleTargetRef.StatefulSetName + if statefulSetName == "" { + return scalers, nil, fmt.Errorf("notified about ScaledObject with missing StatefulSet name: %s", scaledObject.GetName()) + } + + statefulSet := &appsv1.StatefulSet{} + err := h.client.Get(context.TODO(), types.NamespacedName{Name: statefulSetName, Namespace: scaledObject.GetNamespace()}, statefulSet) + if err != nil { + return scalers, nil, fmt.Errorf("error getting statefulSet: %s", err) + } + + resolvedEnv, err := h.resolveStatefulSetEnv(statefulSet, scaledObject.Spec.ScaleTargetRef.ContainerName) + if err != nil { + return scalers, nil, fmt.Errorf("error resolving secrets for statefulSet: %s", err) + } + + for i, trigger := range scaledObject.Spec.Triggers { + authParams, podIdentity := h.parseStatefulSetAuthRef(trigger.AuthenticationRef, scaledObject, statefulSet) + scaler, err := h.getScaler(scaledObject.Name, scaledObject.Namespace, trigger.Type, resolvedEnv, trigger.Metadata, authParams, podIdentity) + if err != nil { + return scalers, nil, fmt.Errorf("error getting scaler for trigger #%d: %s", i, err) + } + + scalers = append(scalers, scaler) + } + + return scalers, statefulSet, nil +} + func (h *ScaleHandler) getJobScalers(scaledObject *kedav1alpha1.ScaledObject) ([]scalers.Scaler, error) { scalers := []scalers.Scaler{} diff --git a/pkg/handler/scale_loop.go b/pkg/handler/scale_loop.go index 4eb0069d42f..e01e45189bc 100644 --- a/pkg/handler/scale_loop.go +++ b/pkg/handler/scale_loop.go @@ -41,6 +41,9 @@ func (h *ScaleHandler) handleScale(ctx context.Context, scaledObject *kedav1alph case kedav1alpha1.ScaleTypeJob: h.handleScaleJob(ctx, scaledObject) break + case kedav1alpha1.ScaleTypeStatefulSet: + h.handleScaleStatefulSet(ctx, scaledObject) + break default: h.handleScaleDeployment(ctx, scaledObject) } @@ -129,3 +132,34 @@ func (h *ScaleHandler) handleScaleDeployment(ctx context.Context, scaledObject * h.scaleDeployment(deployment, scaledObject, isScaledObjectActive) } + +// handleScaleStatefulSet contains the main logic for the ScaleHandler scaling logic. +// It'll check each trigger active status then call scaleStatefulSet +func (h *ScaleHandler) handleScaleStatefulSet(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject) { + scalers, statefulSet, err := h.GetStatefulSetScalers(scaledObject) + + if statefulSet == nil { + return + } + if err != nil { + h.logger.Error(err, "Error getting scalers") + return + } + + isScaledObjectActive := false + + for _, scaler := range scalers { + defer scaler.Close() + isTriggerActive, err := scaler.IsActive(ctx) + + if err != nil { + h.logger.V(1).Info("Error getting scale decision", "Error", err) + continue + } else if isTriggerActive { + isScaledObjectActive = true + h.logger.V(1).Info("Scaler for scaledObject is active", "Scaler", scaler) + } + } + + h.scaleStatefulSet(statefulSet, scaledObject, isScaledObjectActive) +} diff --git a/pkg/handler/scale_statefulSets.go b/pkg/handler/scale_statefulSets.go new file mode 100644 index 00000000000..a26f892e7fe --- /dev/null +++ b/pkg/handler/scale_statefulSets.go @@ -0,0 +1,143 @@ +package handler + +import ( + "context" + "fmt" + "time" + + kedav1alpha1 "github.com/kedacore/keda/pkg/apis/keda/v1alpha1" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" +) + +func (h *ScaleHandler) scaleStatefulSet(statefulSet *appsv1.StatefulSet, scaledObject *kedav1alpha1.ScaledObject, isActive bool) { + + if *statefulSet.Spec.Replicas == 0 && isActive { + // current replica count is 0, but there is an active trigger. + // scale the deployment up + h.scaleStatefulSetFromZero(statefulSet, scaledObject) + } else if !isActive && + *statefulSet.Spec.Replicas > 0 && + (scaledObject.Spec.MinReplicaCount == nil || *scaledObject.Spec.MinReplicaCount == 0) { + // there are no active triggers, but the deployment has replicas. + // AND + // There is no minimum configured or minimum is set to ZERO. HPA will handles other scale down operations + + // Try to scale it down. + h.scaleStatefulSetToZero(statefulSet, scaledObject) + } else if isActive { + // triggers are active, but we didn't need to scale (replica count > 0) + // Update LastActiveTime to now. + now := metav1.Now() + scaledObject.Status.LastActiveTime = &now + h.updateScaledObjectStatus(scaledObject) + } else { + h.logger.V(1).Info("StatefulSet no change", "StatefulSet.Namespace", statefulSet.GetNamespace(), "StatefulSet.Name", statefulSet.GetName()) + } +} + +func (h *ScaleHandler) updateStatefulSet(statefulSet *appsv1.StatefulSet) error { + + err := h.client.Update(context.TODO(), statefulSet) + if err != nil { + h.logger.Error(err, "Error updating statefulSet", "StatefulSet.Namespace", statefulSet.GetNamespace(), "StatefulSet.Name", statefulSet.GetName()) + return err + } + return nil +} + +// A statefulSet will be scaled down to 0 only if it's passed its cooldown period +// or if LastActiveTime is nil +func (h *ScaleHandler) scaleStatefulSetToZero(statefulSet *appsv1.StatefulSet, scaledObject *kedav1alpha1.ScaledObject) { + var cooldownPeriod time.Duration + + if scaledObject.Spec.CooldownPeriod != nil { + cooldownPeriod = time.Second * time.Duration(*scaledObject.Spec.CooldownPeriod) + } else { + cooldownPeriod = time.Second * time.Duration(defaultCooldownPeriod) + } + + // LastActiveTime can be nil if the statefulSet was scaled outside of Keda. + // In this case we will ignore the cooldown period and scale it down + if scaledObject.Status.LastActiveTime == nil || + scaledObject.Status.LastActiveTime.Add(cooldownPeriod).Before(time.Now()) { + // or last time a trigger was active was > cooldown period, so scale down. + *statefulSet.Spec.Replicas = 0 + err := h.updateStatefulSet(statefulSet) + if err == nil { + h.logger.Info("Successfully scaled statefulSet to 0 replicas", "StatefulSet.Namespace", statefulSet.GetNamespace(), "StatefulSet.Name", statefulSet.GetName()) + } + } else { + h.logger.V(1).Info("scaledObject cooling down", + "LastActiveTime", + scaledObject.Status.LastActiveTime, + "CoolDownPeriod", + cooldownPeriod) + } +} + +func (h *ScaleHandler) scaleStatefulSetFromZero(statefulSet *appsv1.StatefulSet, scaledObject *kedav1alpha1.ScaledObject) { + currentReplicas := *statefulSet.Spec.Replicas + if scaledObject.Spec.MinReplicaCount != nil && *scaledObject.Spec.MinReplicaCount > 0 { + statefulSet.Spec.Replicas = scaledObject.Spec.MinReplicaCount + } else { + *statefulSet.Spec.Replicas = 1 + } + + err := h.updateStatefulSet(statefulSet) + + if err == nil { + h.logger.Info("Successfully updated statefulSet", "StatefulSet.Namespace", statefulSet.GetNamespace(), "StatefulSet.Name", statefulSet.GetName(), + "Original Replicas Count", + currentReplicas, + "New Replicas Count", + *statefulSet.Spec.Replicas) + + // Scale was successful. Update lastScaleTime and lastActiveTime on the scaledObject + now := metav1.Now() + scaledObject.Status.LastActiveTime = &now + h.updateScaledObjectStatus(scaledObject) + } +} + +func (h *ScaleHandler) resolveStatefulSetEnv(statefulSet *appsv1.StatefulSet, containerName string) (map[string]string, error) { + statefulSetKey, err := cache.MetaNamespaceKeyFunc(statefulSet) + if err != nil { + return nil, err + } + + if len(statefulSet.Spec.Template.Spec.Containers) < 1 { + return nil, fmt.Errorf("StatefulSet (%s) doesn't have containers", statefulSetKey) + } + + var container corev1.Container + + if containerName != "" { + for _, c := range statefulSet.Spec.Template.Spec.Containers { + if c.Name == containerName { + container = c + break + } + } + + if &container == nil { + return nil, fmt.Errorf("Couldn't find container with name %s on statefulSet %s", containerName, statefulSet.GetName()) + } + } else { + container = statefulSet.Spec.Template.Spec.Containers[0] + } + + return h.resolveEnv(&container, statefulSet.GetNamespace()) +} + +func (h *ScaleHandler) parseStatefulSetAuthRef(triggerAuthRef *kedav1alpha1.ScaledObjectAuthRef, scaledObject *kedav1alpha1.ScaledObject, statefulSet *appsv1.StatefulSet) (map[string]string, string) { + return h.parseAuthRef(triggerAuthRef, scaledObject, func(name, containerName string) string { + env, err := h.resolveStatefulSetEnv(statefulSet, containerName) + if err != nil { + return "" + } + return env[name] + }) +}