Skip to content

Commit

Permalink
feat: add activation feature for CPU/Memory scaler
Browse files Browse the repository at this point in the history
Signed-off-by: kunwooy <vbtkdpf148@gmail.com>
  • Loading branch information
kunwooy committed Sep 19, 2024
1 parent 4444aca commit 8bca6cf
Show file tree
Hide file tree
Showing 7 changed files with 345 additions and 66 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ Here is an overview of all new **experimental** features:
### Improvements

- **AWS CloudWatch Scaler**: Add support for ignoreNullValues ([#5352](https://github.com/kedacore/keda/issues/5352))
- **CPU/Memory Scaler**: Add activation feature ([#6057](https://github.com/kedacore/keda/issues/6057))
- **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778))
- **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738))
- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689))
- **RabbitMQ Scaler**: Add connection name for AMQP ([#5958](https://github.com/kedacore/keda/issues/5958))
- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))

### Fixes

Expand Down
5 changes: 4 additions & 1 deletion controllers/keda/scaledobject_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,13 +790,16 @@ var _ = Describe("ScaledObjectController", func() {
Eventually(func() error {
return k8sClient.Get(context.Background(), types.NamespacedName{Name: getHPAName(so), Namespace: "default"}, hpa)
}).ShouldNot(HaveOccurred())

averageUtilization := int32(100)
hpa.Status.CurrentMetrics = []autoscalingv2.MetricStatus{
{
Type: autoscalingv2.ResourceMetricSourceType,
Resource: &autoscalingv2.ResourceMetricStatus{
Name: corev1.ResourceCPU,
Current: autoscalingv2.MetricValueStatus{
Value: resource.NewQuantity(int64(100), resource.DecimalSI),
Value: resource.NewQuantity(int64(100), resource.DecimalSI),
AverageUtilization: &averageUtilization,
},
},
},
Expand Down
117 changes: 108 additions & 9 deletions pkg/scalers/cpu_memory_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,35 @@ import (
v2 "k8s.io/api/autoscaling/v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"k8s.io/metrics/pkg/apis/external_metrics"
"sigs.k8s.io/controller-runtime/pkg/client"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
)

type cpuMemoryScaler struct {
metadata *cpuMemoryMetadata
resourceName v1.ResourceName
logger logr.Logger
kubeClient client.Client
}

type cpuMemoryMetadata struct {
Type v2.MetricTargetType
AverageValue *resource.Quantity
AverageUtilization *int32
ContainerName string
Type v2.MetricTargetType
AverageValue *resource.Quantity
AverageUtilization *int32
ContainerName string
ActivationAverageValue *resource.Quantity
ActivationAverageUtilization *int32
ScalableObjectName string
ScalableObjectType string
ScalableObjectNamespace string
}

// NewCPUMemoryScaler creates a new cpuMemoryScaler
func NewCPUMemoryScaler(resourceName v1.ResourceName, config *scalersconfig.ScalerConfig) (Scaler, error) {
func NewCPUMemoryScaler(resourceName v1.ResourceName, config *scalersconfig.ScalerConfig, kubeClient client.Client) (Scaler, error) {
logger := InitializeLogger(config, "cpu_memory_scaler")

meta, parseErr := parseResourceMetadata(config, logger)
Expand All @@ -40,12 +49,13 @@ func NewCPUMemoryScaler(resourceName v1.ResourceName, config *scalersconfig.Scal
metadata: meta,
resourceName: resourceName,
logger: logger,
kubeClient: kubeClient,
}, nil
}

func parseResourceMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger) (*cpuMemoryMetadata, error) {
meta := &cpuMemoryMetadata{}
var value string
var value, activationValue string
var ok bool
value, ok = config.TriggerMetadata["type"]
switch {
Expand All @@ -63,17 +73,31 @@ func parseResourceMetadata(config *scalersconfig.ScalerConfig, logger logr.Logge
if value, ok = config.TriggerMetadata["value"]; !ok || value == "" {
return nil, fmt.Errorf("no value given")
}
if activationValue, ok = config.TriggerMetadata["activationValue"]; !ok || activationValue == "" {
activationValue = "0"
}

switch meta.Type {
case v2.AverageValueMetricType:
averageValueQuantity := resource.MustParse(value)
meta.AverageValue = &averageValueQuantity

activationValueQuantity := resource.MustParse(activationValue)
meta.ActivationAverageValue = &activationValueQuantity
case v2.UtilizationMetricType:
valueNum, err := strconv.ParseInt(value, 10, 32)
if err != nil {
return nil, err
}
utilizationNum := int32(valueNum)
meta.AverageUtilization = &utilizationNum

valueNum, err = strconv.ParseInt(activationValue, 10, 32)
if err != nil {
return nil, err
}
activationAverageUtilization := int32(valueNum)
meta.ActivationAverageUtilization = &activationAverageUtilization
default:
return nil, fmt.Errorf("unsupported metric type, allowed values are 'Utilization' or 'AverageValue'")
}
Expand All @@ -82,6 +106,10 @@ func parseResourceMetadata(config *scalersconfig.ScalerConfig, logger logr.Logge
meta.ContainerName = value
}

meta.ScalableObjectName = config.ScalableObjectName
meta.ScalableObjectNamespace = config.ScalableObjectNamespace
meta.ScalableObjectType = config.ScalableObjectType

return meta, nil
}

Expand All @@ -90,6 +118,42 @@ func (s *cpuMemoryScaler) Close(context.Context) error {
return nil
}

func (s *cpuMemoryScaler) getHPA(ctx context.Context) (*v2.HorizontalPodAutoscaler, error) {
if s.metadata.ScalableObjectType == "ScaledObject" {
scaledObject := &kedav1alpha1.ScaledObject{}
err := s.kubeClient.Get(ctx, types.NamespacedName{
Name: s.metadata.ScalableObjectName,
Namespace: s.metadata.ScalableObjectNamespace,
}, scaledObject)

if err != nil {
return nil, err
}

hpa := &v2.HorizontalPodAutoscaler{}
err = s.kubeClient.Get(ctx, types.NamespacedName{
Name: scaledObject.Status.HpaName,
Namespace: s.metadata.ScalableObjectNamespace,
}, hpa)

if err != nil {
return nil, err
}

return hpa, nil
} else if s.metadata.ScalableObjectType == "ScaledJob" {
scaledJob := &kedav1alpha1.ScaledJob{}
err := s.kubeClient.Get(ctx, types.NamespacedName{
Name: s.metadata.ScalableObjectName,
Namespace: s.metadata.ScalableObjectNamespace,
}, scaledJob)

return nil, err
}

return nil, fmt.Errorf("invalid scalable object type: %s", s.metadata.ScalableObjectType)
}

// GetMetricSpecForScaling returns the metric spec for the HPA
func (s *cpuMemoryScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
var metricSpec v2.MetricSpec
Expand Down Expand Up @@ -120,7 +184,42 @@ func (s *cpuMemoryScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSp
return []v2.MetricSpec{metricSpec}
}

// GetMetricsAndActivity no need for cpu/memory scaler and always active for cpu/memory scaler
func (s *cpuMemoryScaler) GetMetricsAndActivity(_ context.Context, _ string) ([]external_metrics.ExternalMetricValue, bool, error) {
return nil, true, nil
// GetMetricsAndActivity only returns the activity of the cpu/memory scaler
func (s *cpuMemoryScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
hpa, err := s.getHPA(ctx)
if err != nil {
return nil, false, err
}

if hpa == nil {
return nil, false, fmt.Errorf("HPA not found")
}

for _, metric := range hpa.Status.CurrentMetrics {
if metric.Resource == nil {
continue
}

if string(metric.Resource.Name) != metricName {
continue
}

if s.metadata.Type == v2.AverageValueMetricType {
averageValue := metric.Resource.Current.AverageValue
if averageValue == nil {
return nil, false, fmt.Errorf("HPA has no average value")
}

return nil, averageValue.Cmp(*s.metadata.ActivationAverageValue) == 1, nil
} else if s.metadata.Type == v2.UtilizationMetricType {
averageUtilization := metric.Resource.Current.AverageUtilization
if averageUtilization == nil {
return nil, false, fmt.Errorf("HPA has no average utilization")
}

return nil, *averageUtilization > *s.metadata.ActivationAverageUtilization, nil
}
}

return nil, false, fmt.Errorf("no matching resource metric found for %s", s.resourceName)
}
Loading

0 comments on commit 8bca6cf

Please sign in to comment.