Skip to content

Commit

Permalink
Add Kubernetes Workload scaler (#2010)
Browse files Browse the repository at this point in the history
Signed-off-by: jorturfer <jorge_turrado@hotmail.es>

Co-authored-by: Zbynek Roubalik <726523+zroubalik@users.noreply.github.com>
Signed-off-by: Tsuyoshi Ushio <ushio@simplearchitect.com>
  • Loading branch information
JorTurFer and zroubalik authored Aug 6, 2021
1 parent 51f73bb commit 0eda18f
Show file tree
Hide file tree
Showing 6 changed files with 420 additions and 2 deletions.
5 changes: 5 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ issues:
- path: scale_resolvers_test.go
linters:
- staticcheck
# Got "sigs.k8s.io/controller-runtime/pkg/client/fake is deprecated: please use pkg/envtest for testing"
# This might not be ideal, see: https://github.com/kubernetes-sigs/controller-runtime/issues/768
- path: kubernetes_workload_scaler_test.go
linters:
- staticcheck
# https://github.com/go-critic/go-critic/issues/926
- linters:
- gocritic
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
- Add new scaler for Selenium Grid ([#1971](https://github.com/kedacore/keda/pull/1971))
- Support using regex to select the queues in RabbitMQ Scaler ([#1957](https://github.com/kedacore/keda/pull/1957))
- Support custom metric name in RabbitMQ Scaler ([#1976](https://github.com/kedacore/keda/pull/1976))
- Add new scaler for Kubernetes Workload ([#2010](https://github.com/kedacore/keda/pull/2010))

### Improvements

Expand Down
138 changes: 138 additions & 0 deletions pkg/scalers/kubernetes_workload_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package scalers

import (
"context"
"fmt"
"strconv"
"strings"

"k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
"sigs.k8s.io/controller-runtime/pkg/client"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

type kubernetesWorkloadScaler struct {
metadata *kubernetesWorkloadMetadata
kubeClient client.Client
}

const (
kubernetesWorkloadMetricType = "External"
podSelectorKey = "podSelector"
valueKey = "value"
)

type kubernetesWorkloadMetadata struct {
podSelector labels.Selector
namespace string
value int64
}

// NewKubernetesWorkloadScaler creates a new kubernetesWorkloadScaler
func NewKubernetesWorkloadScaler(kubeClient client.Client, config *ScalerConfig) (Scaler, error) {
meta, parseErr := parseWorkloadMetadata(config)
if parseErr != nil {
return nil, fmt.Errorf("error parsing kubernetes workload metadata: %s", parseErr)
}

return &kubernetesWorkloadScaler{
metadata: meta,
kubeClient: kubeClient,
}, nil
}

func parseWorkloadMetadata(config *ScalerConfig) (*kubernetesWorkloadMetadata, error) {
meta := &kubernetesWorkloadMetadata{}
var err error
meta.namespace = config.Namespace
meta.podSelector, err = labels.Parse(config.TriggerMetadata[podSelectorKey])
if err != nil || meta.podSelector.String() == "" {
return nil, fmt.Errorf("invalid pod selector")
}
meta.value, err = strconv.ParseInt(config.TriggerMetadata[valueKey], 10, 64)
if err != nil || meta.value == 0 {
return nil, fmt.Errorf("value must be an integer greater than 0")
}
return meta, nil
}

// IsActive determines if we need to scale from zero
func (s *kubernetesWorkloadScaler) IsActive(ctx context.Context) (bool, error) {
pods, err := s.getMetricValue(ctx)

if err != nil {
return false, err
}

return pods > 0, nil
}

// Close no need for kubernetes workload scaler
func (s *kubernetesWorkloadScaler) Close() error {
return nil
}

// GetMetricSpecForScaling returns the metric spec for the HPA
func (s *kubernetesWorkloadScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetMetricValue := resource.NewQuantity(s.metadata.value, resource.DecimalSI)
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s", "workload", s.metadata.namespace, normalizeSelectorString(s.metadata.podSelector))),
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetMetricValue,
},
}
metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: kubernetesWorkloadMetricType}
return []v2beta2.MetricSpec{metricSpec}
}

// GetMetrics returns value for a supported metric
func (s *kubernetesWorkloadScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
pods, err := s.getMetricValue(ctx)
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting kubernetes workload: %s", err)
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(pods), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func (s *kubernetesWorkloadScaler) getMetricValue(ctx context.Context) (int, error) {
podList := &corev1.PodList{}
listOptions := client.ListOptions{}
listOptions.LabelSelector = s.metadata.podSelector
listOptions.Namespace = s.metadata.namespace
opts := []client.ListOption{
&listOptions,
}

err := s.kubeClient.List(ctx, podList, opts...)
if err != nil {
return 0, err
}

return len(podList.Items), nil
}

func normalizeSelectorString(selector labels.Selector) string {
s := selector.String()
s = strings.ReplaceAll(s, " ", "")
s = strings.ReplaceAll(s, "(", "-")
s = strings.ReplaceAll(s, ")", "-")
s = strings.ReplaceAll(s, ",", "-")
s = strings.ReplaceAll(s, "!", "-")
return s
}
138 changes: 138 additions & 0 deletions pkg/scalers/kubernetes_workload_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package scalers

import (
"context"
"fmt"
"testing"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

type workloadMetadataTestData struct {
metadata map[string]string
namespace string
isError bool
}

var parseWorkloadMetadataTestDataset = []workloadMetadataTestData{
{map[string]string{"value": "1", "podSelector": "app=demo"}, "test", false},
{map[string]string{"value": "1", "podSelector": "app=demo"}, "default", false},
{map[string]string{"value": "1", "podSelector": "app in (demo1, demo2)"}, "test", false},
{map[string]string{"value": "1", "podSelector": "app in (demo1, demo2),deploy in (deploy1, deploy2)"}, "test", false},
{map[string]string{"podSelector": "app=demo"}, "test", true},
{map[string]string{"podSelector": "app=demo"}, "default", true},
{map[string]string{"value": "1"}, "test", true},
{map[string]string{"value": "1"}, "default", true},
{map[string]string{"value": "a", "podSelector": "app=demo"}, "test", true},
{map[string]string{"value": "a", "podSelector": "app=demo"}, "default", true},
{map[string]string{"value": "0", "podSelector": "app=demo"}, "test", true},
{map[string]string{"value": "0", "podSelector": "app=demo"}, "default", true},
}

func TestParseWorkloadMetadata(t *testing.T) {
for _, testData := range parseWorkloadMetadataTestDataset {
_, err := parseWorkloadMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, Namespace: testData.namespace})
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
}
}

type workloadIsActiveTestData struct {
metadata map[string]string
namespace string
podCount int
active bool
}

var isActiveWorkloadTestDataset = []workloadIsActiveTestData{
// "podSelector": "app=demo", "namespace": "test"
{parseWorkloadMetadataTestDataset[0].metadata, parseWorkloadMetadataTestDataset[0].namespace, 0, false},
{parseWorkloadMetadataTestDataset[0].metadata, parseWorkloadMetadataTestDataset[0].namespace, 1, false},
{parseWorkloadMetadataTestDataset[0].metadata, parseWorkloadMetadataTestDataset[0].namespace, 15, false},
// "podSelector": "app=demo", "namespace": "default"
{parseWorkloadMetadataTestDataset[1].metadata, parseWorkloadMetadataTestDataset[1].namespace, 0, false},
{parseWorkloadMetadataTestDataset[1].metadata, parseWorkloadMetadataTestDataset[1].namespace, 1, true},
{parseWorkloadMetadataTestDataset[1].metadata, parseWorkloadMetadataTestDataset[1].namespace, 15, true},
}

func TestWorkloadIsActive(t *testing.T) {
for _, testData := range isActiveWorkloadTestDataset {
s, _ := NewKubernetesWorkloadScaler(
fake.NewFakeClient(createPodlist(testData.podCount)),
&ScalerConfig{
TriggerMetadata: testData.metadata,
AuthParams: map[string]string{},
GlobalHTTPTimeout: 1000 * time.Millisecond,
Namespace: testData.namespace,
},
)
isActive, _ := s.IsActive(context.TODO())
if testData.active && !isActive {
t.Error("Expected active but got inactive")
}
if !testData.active && isActive {
t.Error("Expected inactive but got active")
}
}
}

type workloadGetMetricSpecForScalingTestData struct {
metadata map[string]string
namespace string
name string
}

var getMetricSpecForScalingTestDataset = []workloadGetMetricSpecForScalingTestData{
// "podSelector": "app=demo", "namespace": "test"
{parseWorkloadMetadataTestDataset[0].metadata, parseWorkloadMetadataTestDataset[0].namespace, "workload-test-app=demo"},
// "podSelector": "app=demo", "namespace": "default"
{parseWorkloadMetadataTestDataset[1].metadata, parseWorkloadMetadataTestDataset[1].namespace, "workload-default-app=demo"},
// "podSelector": "app in (demo1, demo2)", "namespace": "test"
{parseWorkloadMetadataTestDataset[2].metadata, parseWorkloadMetadataTestDataset[2].namespace, "workload-test-appin-demo1-demo2-"},
// "podSelector": "app in (demo1, demo2),deploy in (deploy1, deploy2)", "namespace": "test"
{parseWorkloadMetadataTestDataset[3].metadata, parseWorkloadMetadataTestDataset[3].namespace, "workload-test-appin-demo1-demo2--deployin-deploy1-deploy2-"},
}

func TestWorkloadGetMetricSpecForScaling(t *testing.T) {
for _, testData := range getMetricSpecForScalingTestDataset {
s, _ := NewKubernetesWorkloadScaler(
fake.NewFakeClient(),
&ScalerConfig{
TriggerMetadata: testData.metadata,
AuthParams: map[string]string{},
GlobalHTTPTimeout: 1000 * time.Millisecond,
Namespace: testData.namespace,
},
)
metric := s.GetMetricSpecForScaling()

if metric[0].External.Metric.Name != testData.name {
t.Errorf("Expected '%s' as metric name and got '%s'", testData.name, metric[0].External.Metric.Name)
}
}
}

func createPodlist(count int) *v1.PodList {
list := &v1.PodList{}
for i := 0; i < count; i++ {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("demo-pod-v%d", i),
Namespace: "default",
Annotations: map[string]string{},
Labels: map[string]string{
"app": "demo",
},
},
}
list.Items = append(list.Items, *pod)
}
return list
}
6 changes: 4 additions & 2 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func (h *scaleHandler) buildScalers(withTriggers *kedav1alpha1.WithTriggers, pod
return []scalers.Scaler{}, err
}

scaler, err := buildScaler(trigger.Type, config)
scaler, err := buildScaler(h.client, trigger.Type, config)
if err != nil {
closeScalers(scalersRes)
h.recorder.Event(withTriggers, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
Expand All @@ -386,7 +386,7 @@ func (h *scaleHandler) buildScalers(withTriggers *kedav1alpha1.WithTriggers, pod
return scalersRes, nil
}

func buildScaler(triggerType string, config *scalers.ScalerConfig) (scalers.Scaler, error) {
func buildScaler(client client.Client, triggerType string, config *scalers.ScalerConfig) (scalers.Scaler, error) {
// TRIGGERS-START
switch triggerType {
case "artemis-queue":
Expand Down Expand Up @@ -429,6 +429,8 @@ func buildScaler(triggerType string, config *scalers.ScalerConfig) (scalers.Scal
return scalers.NewInfluxDBScaler(config)
case "kafka":
return scalers.NewKafkaScaler(config)
case "kubernetes-workload":
return scalers.NewKubernetesWorkloadScaler(client, config)
case "liiklus":
return scalers.NewLiiklusScaler(config)
case "memory":
Expand Down
Loading

0 comments on commit 0eda18f

Please sign in to comment.