-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Kubernetes Workload scaler (#2010)
Signed-off-by: jorturfer <jorge_turrado@hotmail.es> Co-authored-by: Zbynek Roubalik <726523+zroubalik@users.noreply.github.com>
- Loading branch information
Showing
6 changed files
with
420 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
package scalers | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"strconv" | ||
"strings" | ||
|
||
"k8s.io/api/autoscaling/v2beta2" | ||
corev1 "k8s.io/api/core/v1" | ||
"k8s.io/apimachinery/pkg/api/resource" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/labels" | ||
"k8s.io/metrics/pkg/apis/external_metrics" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
|
||
kedautil "github.com/kedacore/keda/v2/pkg/util" | ||
) | ||
|
||
type kubernetesWorkloadScaler struct { | ||
metadata *kubernetesWorkloadMetadata | ||
kubeClient client.Client | ||
} | ||
|
||
const ( | ||
kubernetesWorkloadMetricType = "External" | ||
podSelectorKey = "podSelector" | ||
valueKey = "value" | ||
) | ||
|
||
type kubernetesWorkloadMetadata struct { | ||
podSelector labels.Selector | ||
namespace string | ||
value int64 | ||
} | ||
|
||
// NewKubernetesWorkloadScaler creates a new kubernetesWorkloadScaler | ||
func NewKubernetesWorkloadScaler(kubeClient client.Client, config *ScalerConfig) (Scaler, error) { | ||
meta, parseErr := parseWorkloadMetadata(config) | ||
if parseErr != nil { | ||
return nil, fmt.Errorf("error parsing kubernetes workload metadata: %s", parseErr) | ||
} | ||
|
||
return &kubernetesWorkloadScaler{ | ||
metadata: meta, | ||
kubeClient: kubeClient, | ||
}, nil | ||
} | ||
|
||
func parseWorkloadMetadata(config *ScalerConfig) (*kubernetesWorkloadMetadata, error) { | ||
meta := &kubernetesWorkloadMetadata{} | ||
var err error | ||
meta.namespace = config.Namespace | ||
meta.podSelector, err = labels.Parse(config.TriggerMetadata[podSelectorKey]) | ||
if err != nil || meta.podSelector.String() == "" { | ||
return nil, fmt.Errorf("invalid pod selector") | ||
} | ||
meta.value, err = strconv.ParseInt(config.TriggerMetadata[valueKey], 10, 64) | ||
if err != nil || meta.value == 0 { | ||
return nil, fmt.Errorf("value must be an integer greater than 0") | ||
} | ||
return meta, nil | ||
} | ||
|
||
// IsActive determines if we need to scale from zero | ||
func (s *kubernetesWorkloadScaler) IsActive(ctx context.Context) (bool, error) { | ||
pods, err := s.getMetricValue(ctx) | ||
|
||
if err != nil { | ||
return false, err | ||
} | ||
|
||
return pods > 0, nil | ||
} | ||
|
||
// Close no need for kubernetes workload scaler | ||
func (s *kubernetesWorkloadScaler) Close() error { | ||
return nil | ||
} | ||
|
||
// GetMetricSpecForScaling returns the metric spec for the HPA | ||
func (s *kubernetesWorkloadScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { | ||
targetMetricValue := resource.NewQuantity(s.metadata.value, resource.DecimalSI) | ||
externalMetric := &v2beta2.ExternalMetricSource{ | ||
Metric: v2beta2.MetricIdentifier{ | ||
Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s", "workload", s.metadata.namespace, normalizeSelectorString(s.metadata.podSelector))), | ||
}, | ||
Target: v2beta2.MetricTarget{ | ||
Type: v2beta2.AverageValueMetricType, | ||
AverageValue: targetMetricValue, | ||
}, | ||
} | ||
metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: kubernetesWorkloadMetricType} | ||
return []v2beta2.MetricSpec{metricSpec} | ||
} | ||
|
||
// GetMetrics returns value for a supported metric | ||
func (s *kubernetesWorkloadScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { | ||
pods, err := s.getMetricValue(ctx) | ||
if err != nil { | ||
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting kubernetes workload: %s", err) | ||
} | ||
|
||
metric := external_metrics.ExternalMetricValue{ | ||
MetricName: metricName, | ||
Value: *resource.NewQuantity(int64(pods), resource.DecimalSI), | ||
Timestamp: metav1.Now(), | ||
} | ||
|
||
return append([]external_metrics.ExternalMetricValue{}, metric), nil | ||
} | ||
|
||
func (s *kubernetesWorkloadScaler) getMetricValue(ctx context.Context) (int, error) { | ||
podList := &corev1.PodList{} | ||
listOptions := client.ListOptions{} | ||
listOptions.LabelSelector = s.metadata.podSelector | ||
listOptions.Namespace = s.metadata.namespace | ||
opts := []client.ListOption{ | ||
&listOptions, | ||
} | ||
|
||
err := s.kubeClient.List(ctx, podList, opts...) | ||
if err != nil { | ||
return 0, err | ||
} | ||
|
||
return len(podList.Items), nil | ||
} | ||
|
||
func normalizeSelectorString(selector labels.Selector) string { | ||
s := selector.String() | ||
s = strings.ReplaceAll(s, " ", "") | ||
s = strings.ReplaceAll(s, "(", "-") | ||
s = strings.ReplaceAll(s, ")", "-") | ||
s = strings.ReplaceAll(s, ",", "-") | ||
s = strings.ReplaceAll(s, "!", "-") | ||
return s | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
package scalers | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"testing" | ||
"time" | ||
|
||
v1 "k8s.io/api/core/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"sigs.k8s.io/controller-runtime/pkg/client/fake" | ||
) | ||
|
||
type workloadMetadataTestData struct { | ||
metadata map[string]string | ||
namespace string | ||
isError bool | ||
} | ||
|
||
var parseWorkloadMetadataTestDataset = []workloadMetadataTestData{ | ||
{map[string]string{"value": "1", "podSelector": "app=demo"}, "test", false}, | ||
{map[string]string{"value": "1", "podSelector": "app=demo"}, "default", false}, | ||
{map[string]string{"value": "1", "podSelector": "app in (demo1, demo2)"}, "test", false}, | ||
{map[string]string{"value": "1", "podSelector": "app in (demo1, demo2),deploy in (deploy1, deploy2)"}, "test", false}, | ||
{map[string]string{"podSelector": "app=demo"}, "test", true}, | ||
{map[string]string{"podSelector": "app=demo"}, "default", true}, | ||
{map[string]string{"value": "1"}, "test", true}, | ||
{map[string]string{"value": "1"}, "default", true}, | ||
{map[string]string{"value": "a", "podSelector": "app=demo"}, "test", true}, | ||
{map[string]string{"value": "a", "podSelector": "app=demo"}, "default", true}, | ||
{map[string]string{"value": "0", "podSelector": "app=demo"}, "test", true}, | ||
{map[string]string{"value": "0", "podSelector": "app=demo"}, "default", true}, | ||
} | ||
|
||
func TestParseWorkloadMetadata(t *testing.T) { | ||
for _, testData := range parseWorkloadMetadataTestDataset { | ||
_, err := parseWorkloadMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, Namespace: testData.namespace}) | ||
if err != nil && !testData.isError { | ||
t.Error("Expected success but got error", err) | ||
} | ||
if testData.isError && err == nil { | ||
t.Error("Expected error but got success") | ||
} | ||
} | ||
} | ||
|
||
type workloadIsActiveTestData struct { | ||
metadata map[string]string | ||
namespace string | ||
podCount int | ||
active bool | ||
} | ||
|
||
var isActiveWorkloadTestDataset = []workloadIsActiveTestData{ | ||
// "podSelector": "app=demo", "namespace": "test" | ||
{parseWorkloadMetadataTestDataset[0].metadata, parseWorkloadMetadataTestDataset[0].namespace, 0, false}, | ||
{parseWorkloadMetadataTestDataset[0].metadata, parseWorkloadMetadataTestDataset[0].namespace, 1, false}, | ||
{parseWorkloadMetadataTestDataset[0].metadata, parseWorkloadMetadataTestDataset[0].namespace, 15, false}, | ||
// "podSelector": "app=demo", "namespace": "default" | ||
{parseWorkloadMetadataTestDataset[1].metadata, parseWorkloadMetadataTestDataset[1].namespace, 0, false}, | ||
{parseWorkloadMetadataTestDataset[1].metadata, parseWorkloadMetadataTestDataset[1].namespace, 1, true}, | ||
{parseWorkloadMetadataTestDataset[1].metadata, parseWorkloadMetadataTestDataset[1].namespace, 15, true}, | ||
} | ||
|
||
func TestWorkloadIsActive(t *testing.T) { | ||
for _, testData := range isActiveWorkloadTestDataset { | ||
s, _ := NewKubernetesWorkloadScaler( | ||
fake.NewFakeClient(createPodlist(testData.podCount)), | ||
&ScalerConfig{ | ||
TriggerMetadata: testData.metadata, | ||
AuthParams: map[string]string{}, | ||
GlobalHTTPTimeout: 1000 * time.Millisecond, | ||
Namespace: testData.namespace, | ||
}, | ||
) | ||
isActive, _ := s.IsActive(context.TODO()) | ||
if testData.active && !isActive { | ||
t.Error("Expected active but got inactive") | ||
} | ||
if !testData.active && isActive { | ||
t.Error("Expected inactive but got active") | ||
} | ||
} | ||
} | ||
|
||
type workloadGetMetricSpecForScalingTestData struct { | ||
metadata map[string]string | ||
namespace string | ||
name string | ||
} | ||
|
||
var getMetricSpecForScalingTestDataset = []workloadGetMetricSpecForScalingTestData{ | ||
// "podSelector": "app=demo", "namespace": "test" | ||
{parseWorkloadMetadataTestDataset[0].metadata, parseWorkloadMetadataTestDataset[0].namespace, "workload-test-app=demo"}, | ||
// "podSelector": "app=demo", "namespace": "default" | ||
{parseWorkloadMetadataTestDataset[1].metadata, parseWorkloadMetadataTestDataset[1].namespace, "workload-default-app=demo"}, | ||
// "podSelector": "app in (demo1, demo2)", "namespace": "test" | ||
{parseWorkloadMetadataTestDataset[2].metadata, parseWorkloadMetadataTestDataset[2].namespace, "workload-test-appin-demo1-demo2-"}, | ||
// "podSelector": "app in (demo1, demo2),deploy in (deploy1, deploy2)", "namespace": "test" | ||
{parseWorkloadMetadataTestDataset[3].metadata, parseWorkloadMetadataTestDataset[3].namespace, "workload-test-appin-demo1-demo2--deployin-deploy1-deploy2-"}, | ||
} | ||
|
||
func TestWorkloadGetMetricSpecForScaling(t *testing.T) { | ||
for _, testData := range getMetricSpecForScalingTestDataset { | ||
s, _ := NewKubernetesWorkloadScaler( | ||
fake.NewFakeClient(), | ||
&ScalerConfig{ | ||
TriggerMetadata: testData.metadata, | ||
AuthParams: map[string]string{}, | ||
GlobalHTTPTimeout: 1000 * time.Millisecond, | ||
Namespace: testData.namespace, | ||
}, | ||
) | ||
metric := s.GetMetricSpecForScaling() | ||
|
||
if metric[0].External.Metric.Name != testData.name { | ||
t.Errorf("Expected '%s' as metric name and got '%s'", testData.name, metric[0].External.Metric.Name) | ||
} | ||
} | ||
} | ||
|
||
func createPodlist(count int) *v1.PodList { | ||
list := &v1.PodList{} | ||
for i := 0; i < count; i++ { | ||
pod := &v1.Pod{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: fmt.Sprintf("demo-pod-v%d", i), | ||
Namespace: "default", | ||
Annotations: map[string]string{}, | ||
Labels: map[string]string{ | ||
"app": "demo", | ||
}, | ||
}, | ||
} | ||
list.Items = append(list.Items, *pod) | ||
} | ||
return list | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.