From 7daa09af544ea41a5c41c4ee0f5f51f46299bfb5 Mon Sep 17 00:00:00 2001 From: Jose Maria Alvarez Fernandez <52029309+jmalvarezf-lmes@users.noreply.github.com> Date: Mon, 11 Sep 2023 23:12:20 +0200 Subject: [PATCH] feat: Add scaler for Google Cloud Tasks (#4834) * feature: add cloud tasks scaler Signed-off-by: Jose Maria Alvarez * Add cloud tasks e2e test Signed-off-by: Jose Maria Alvarez * fix: Return files to the original state Signed-off-by: Jose Maria Alvarez * fix: static checks Signed-off-by: Jose Maria Alvarez * fix: more static checks fixed Signed-off-by: Jose Maria Alvarez * fix: add location for queue Signed-off-by: Jose Maria Alvarez * fix: add correct command to create test tasks in queue Signed-off-by: Jose Maria Alvarez * fix: add fixes to test and add pod identity test Signed-off-by: Jose Maria Alvarez * fix: specify location also in tasks Signed-off-by: Jose Maria Alvarez * fix: correct indentation and location for purge Signed-off-by: Jose Maria Alvarez * fix: correct naming, add package correctly to identity tests Signed-off-by: Jose Maria Alvarez * fix: change test name Signed-off-by: Jose Maria Alvarez * fix: add gcp as prefix for naming for clarity Signed-off-by: Jose Maria Alvarez * fix: correct problem in test when changing name in struct Signed-off-by: Jose Maria Alvarez * fix: order in changelog Signed-off-by: Jose Maria Alvarez * fix: another order change in changelog Signed-off-by: Jose Maria Alvarez * fix: more renaming Signed-off-by: Jose Maria Alvarez * fix: put it in a new section Signed-off-by: Jose Maria Alvarez * fix: try a new order for the changelog Signed-off-by: Jose Maria Alvarez * fix: delete unneeded line Signed-off-by: Jose Maria Alvarez * fix: into the new Changelog section Signed-off-by: Jose Maria Alvarez * fix: another order fix Signed-off-by: Jose Maria Alvarez --------- Signed-off-by: Jose Maria Alvarez Signed-off-by: Jose Maria Alvarez Fernandez <52029309+jmalvarezf-lmes@users.noreply.github.com> --- CHANGELOG.md | 2 + pkg/scalers/gcp_cloud_tasks_scaler.go | 184 +++++++++++ pkg/scalers/gcp_cloud_tasks_scaler_test.go | 79 +++++ pkg/scaling/scalers_builder.go | 2 + .../gcp_cloud_tasks/gcp_cloud_tasks_test.go | 288 +++++++++++++++++ .../gcp_cloud_tasks_workload_identity_test.go | 299 ++++++++++++++++++ 6 files changed, 854 insertions(+) create mode 100644 pkg/scalers/gcp_cloud_tasks_scaler.go create mode 100644 pkg/scalers/gcp_cloud_tasks_scaler_test.go create mode 100644 tests/scalers/gcp/gcp_cloud_tasks/gcp_cloud_tasks_test.go create mode 100644 tests/scalers/gcp/gcp_cloud_tasks_workload_identity/gcp_cloud_tasks_workload_identity_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 95839326868..fc4315167db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,10 +49,12 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New +- **General**: Introduce new Google Cloud Tasks scaler functionality to scale based on the queue length ([#3613](https://github.com/kedacore/keda/issues/3613)) - **AWS SQS Scaler**: Support for scaling to include delayed messages. ([#4377](https://github.com/kedacore/keda/issues/4377)) - **Governance**: KEDA transitioned to CNCF Graduated project ([#63](https://github.com/kedacore/governance/issues/63)) ### Improvements + - **General**: Add more events for user checking ([#796](https://github.com/kedacore/keda/issues/3764)) - **General**: Add ScaledObject/ScaledJob names to output of `kubectl get triggerauthentication/clustertriggerauthentication` ([#796](https://github.com/kedacore/keda/issues/796)) - **General**: Add standalone CRD generation to release workflow ([#2726](https://github.com/kedacore/keda/issues/2726)) diff --git a/pkg/scalers/gcp_cloud_tasks_scaler.go b/pkg/scalers/gcp_cloud_tasks_scaler.go new file mode 100644 index 00000000000..88a3382702c --- /dev/null +++ b/pkg/scalers/gcp_cloud_tasks_scaler.go @@ -0,0 +1,184 @@ +package scalers + +import ( + "context" + "fmt" + "strconv" + + "github.com/go-logr/logr" + v2 "k8s.io/api/autoscaling/v2" + "k8s.io/metrics/pkg/apis/external_metrics" + + kedautil "github.com/kedacore/keda/v2/pkg/util" +) + +const ( + cloudTasksStackDriverQueueSize = "cloudtasks.googleapis.com/queue/depth" + + cloudTaskDefaultValue = 100 +) + +type gcpCloudTasksScaler struct { + client *StackDriverClient + metricType v2.MetricTargetType + metadata *gcpCloudTaskMetadata + logger logr.Logger +} + +type gcpCloudTaskMetadata struct { + value float64 + activationValue float64 + + queueName string + projectID string + gcpAuthorization *gcpAuthorizationMetadata + scalerIndex int +} + +// NewCloudTaskScaler creates a new cloudTaskScaler +func NewGcpCloudTasksScaler(config *ScalerConfig) (Scaler, error) { + metricType, err := GetMetricTargetType(config) + if err != nil { + return nil, fmt.Errorf("error getting scaler metric type: %w", err) + } + + logger := InitializeLogger(config, "gcp_cloud_tasks_scaler") + + meta, err := parseGcpCloudTasksMetadata(config) + if err != nil { + return nil, fmt.Errorf("error parsing Cloud Tasks metadata: %w", err) + } + + return &gcpCloudTasksScaler{ + metricType: metricType, + metadata: meta, + logger: logger, + }, nil +} + +func parseGcpCloudTasksMetadata(config *ScalerConfig) (*gcpCloudTaskMetadata, error) { + meta := gcpCloudTaskMetadata{value: cloudTaskDefaultValue} + + value, valuePresent := config.TriggerMetadata["value"] + + if valuePresent { + triggerValue, err := strconv.ParseFloat(value, 64) + if err != nil { + return nil, fmt.Errorf("value parsing error %w", err) + } + meta.value = triggerValue + } + + if val, ok := config.TriggerMetadata["queueName"]; ok { + if val == "" { + return nil, fmt.Errorf("no queue name given") + } + + meta.queueName = val + } else { + return nil, fmt.Errorf("no queue name given") + } + + meta.activationValue = 0 + if val, ok := config.TriggerMetadata["activationValue"]; ok { + activationValue, err := strconv.ParseFloat(val, 64) + if err != nil { + return nil, fmt.Errorf("activationValue parsing error %w", err) + } + meta.activationValue = activationValue + } + + if val, ok := config.TriggerMetadata["projectID"]; ok { + if val == "" { + return nil, fmt.Errorf("no project id given") + } + + meta.projectID = val + } else { + return nil, fmt.Errorf("no project id given") + } + + auth, err := getGCPAuthorization(config) + if err != nil { + return nil, err + } + meta.gcpAuthorization = auth + meta.scalerIndex = config.ScalerIndex + return &meta, nil +} + +func (s *gcpCloudTasksScaler) Close(context.Context) error { + if s.client != nil { + err := s.client.metricsClient.Close() + s.client = nil + if err != nil { + s.logger.Error(err, "error closing StackDriver client") + } + } + + return nil +} + +// GetMetricSpecForScaling returns the metric spec for the HPA +func (s *gcpCloudTasksScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { + externalMetric := &v2.ExternalMetricSource{ + Metric: v2.MetricIdentifier{ + Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("gcp-ct-%s", s.metadata.queueName))), + }, + Target: GetMetricTargetMili(s.metricType, s.metadata.value), + } + + // Create the metric spec for the HPA + metricSpec := v2.MetricSpec{ + External: externalMetric, + Type: externalMetricType, + } + + return []v2.MetricSpec{metricSpec} +} + +// GetMetricsAndActivity connects to Stack Driver and finds the size of the cloud task +func (s *gcpCloudTasksScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { + metricType := cloudTasksStackDriverQueueSize + + value, err := s.getMetrics(ctx, metricType) + if err != nil { + s.logger.Error(err, "error getting metric", "metricType", metricType) + return []external_metrics.ExternalMetricValue{}, false, err + } + + metric := GenerateMetricInMili(metricName, value) + + return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.activationValue, nil +} + +func (s *gcpCloudTasksScaler) setStackdriverClient(ctx context.Context) error { + var client *StackDriverClient + var err error + if s.metadata.gcpAuthorization.podIdentityProviderEnabled { + client, err = NewStackDriverClientPodIdentity(ctx) + } else { + client, err = NewStackDriverClient(ctx, s.metadata.gcpAuthorization.GoogleApplicationCredentials) + } + + if err != nil { + return err + } + s.client = client + return nil +} + +// getMetrics gets metric type value from stackdriver api +func (s *gcpCloudTasksScaler) getMetrics(ctx context.Context, metricType string) (float64, error) { + if s.client == nil { + err := s.setStackdriverClient(ctx) + if err != nil { + return -1, err + } + } + filter := `metric.type="` + metricType + `" AND resource.labels.queue_id="` + s.metadata.queueName + `"` + + // Cloud Tasks metrics are collected every 60 seconds so no need to aggregate them. + // See: https://cloud.google.com/monitoring/api/metrics_gcp#gcp-cloudtasks + return s.client.GetMetrics(ctx, filter, s.metadata.projectID, nil) +} diff --git a/pkg/scalers/gcp_cloud_tasks_scaler_test.go b/pkg/scalers/gcp_cloud_tasks_scaler_test.go new file mode 100644 index 00000000000..a5801ef2456 --- /dev/null +++ b/pkg/scalers/gcp_cloud_tasks_scaler_test.go @@ -0,0 +1,79 @@ +package scalers + +import ( + "context" + "testing" + + "github.com/go-logr/logr" +) + +var testGcpCloudTasksResolvedEnv = map[string]string{ + "SAMPLE_CREDS": "{}", +} + +type parseGcpCloudTasksMetadataTestData struct { + authParams map[string]string + metadata map[string]string + isError bool +} + +type gcpCloudTasksMetricIdentifier struct { + metadataTestData *parseGcpCloudTasksMetadataTestData + scalerIndex int + name string +} + +var testGcpCloudTasksMetadata = []parseGcpCloudTasksMetadataTestData{ + {map[string]string{}, map[string]string{}, true}, + // all properly formed + {nil, map[string]string{"queueName": "myQueue", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "projectID": "myproject", "activationValue": "5"}, false}, + // missing subscriptionName + {nil, map[string]string{"queueName": "", "value": "7", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, + // missing credentials + {nil, map[string]string{"queueName": "myQueue", "value": "7", "projectID": "myproject", "credentialsFromEnv": ""}, true}, + // malformed subscriptionSize + {nil, map[string]string{"queueName": "myQueue", "value": "AA", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, + // malformed mode + {nil, map[string]string{"queueName": "", "mode": "AA", "value": "7", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, + // malformed activationTargetValue + {nil, map[string]string{"queueName": "myQueue", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "projectID": "myproject", "activationValue": "AA"}, true}, + // Credentials from AuthParams + {map[string]string{"GoogleApplicationCredentials": "Creds"}, map[string]string{"queueName": "myQueue", "value": "7", "projectID": "myproject"}, false}, + // Credentials from AuthParams with empty creds + {map[string]string{"GoogleApplicationCredentials": ""}, map[string]string{"queueName": "myQueue", "subscriptionSize": "7", "projectID": "myproject"}, true}, + // properly formed float value and activationTargetValue + {nil, map[string]string{"queueName": "mysubscription", "value": "7.1", "credentialsFromEnv": "SAMPLE_CREDS", "activationValue": "2.1", "projectID": "myproject"}, false}, +} + +var gcpCloudTasksMetricIdentifiers = []gcpCloudTasksMetricIdentifier{ + {&testGcpCloudTasksMetadata[1], 0, "s0-gcp-ct-myQueue"}, + {&testGcpCloudTasksMetadata[1], 1, "s1-gcp-ct-myQueue"}, +} + +func TestGcpCloudTasksParseMetadata(t *testing.T) { + for _, testData := range testGcpCloudTasksMetadata { + _, err := parseGcpCloudTasksMetadata(&ScalerConfig{AuthParams: testData.authParams, TriggerMetadata: testData.metadata, ResolvedEnv: testGcpCloudTasksResolvedEnv}) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + } +} + +func TestGcpCloudTasksGetMetricSpecForScaling(t *testing.T) { + for _, testData := range gcpCloudTasksMetricIdentifiers { + meta, err := parseGcpCloudTasksMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testGcpCloudTasksResolvedEnv, ScalerIndex: testData.scalerIndex}) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + mockGcpCloudTasksScaler := gcpCloudTasksScaler{nil, "", meta, logr.Discard()} + + metricSpec := mockGcpCloudTasksScaler.GetMetricSpecForScaling(context.Background()) + metricName := metricSpec[0].External.Metric.Name + if metricName != testData.name { + t.Error("Wrong External metric source name:", metricName) + } + } +} diff --git a/pkg/scaling/scalers_builder.go b/pkg/scaling/scalers_builder.go index f03a57c6df1..13175790aad 100644 --- a/pkg/scaling/scalers_builder.go +++ b/pkg/scaling/scalers_builder.go @@ -160,6 +160,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string, return scalers.NewExternalMockScaler(config) case "external-push": return scalers.NewExternalPushScaler(config) + case "gcp-cloudtasks": + return scalers.NewGcpCloudTasksScaler(config) case "gcp-pubsub": return scalers.NewPubSubScaler(config) case "gcp-stackdriver": diff --git a/tests/scalers/gcp/gcp_cloud_tasks/gcp_cloud_tasks_test.go b/tests/scalers/gcp/gcp_cloud_tasks/gcp_cloud_tasks_test.go new file mode 100644 index 00000000000..61cc92e5ef2 --- /dev/null +++ b/tests/scalers/gcp/gcp_cloud_tasks/gcp_cloud_tasks_test.go @@ -0,0 +1,288 @@ +//go:build e2e +// +build e2e + +package gcp_cloud_tasks_test + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "os" + "testing" + "time" + + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../.env") + +var now = time.Now().UnixNano() + +const ( + testName = "gcp-cloud-tasks-test" +) + +var ( + gcpKey = os.Getenv("TF_GCP_SA_CREDENTIALS") + creds = make(map[string]interface{}) + errGcpKey = json.Unmarshal([]byte(gcpKey), &creds) + testNamespace = fmt.Sprintf("%s-ns", testName) + secretName = fmt.Sprintf("%s-secret", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + projectID = fmt.Sprintf("%s", creds["project_id"]) + queueID = fmt.Sprintf("keda-test-queue-%d", now) + maxReplicaCount = 4 + activationThreshold = 5 + gsPrefix = fmt.Sprintf("kubectl exec --namespace %s deploy/gcp-sdk -- ", testNamespace) +) + +type templateData struct { + TestNamespace string + SecretName string + GcpCreds string + DeploymentName string + ScaledObjectName string + QueueID string + ProjectID string + MaxReplicaCount int + ActivationThreshold int +} + +const ( + secretTemplate = ` +apiVersion: v1 +kind: Secret +metadata: + name: {{.SecretName}} + namespace: {{.TestNamespace}} +data: + creds.json: {{.GcpCreds}} +` + + scaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + pollingInterval: 5 + minReplicaCount: 0 + maxReplicaCount: {{.MaxReplicaCount}} + cooldownPeriod: 10 + triggers: + - type: gcp-cloudtasks + metadata: + projectID: {{.ProjectID}} + queueName: {{.QueueID}} + value: "5" + activationValue: "{{.ActivationThreshold}}" + credentialsFromEnv: GOOGLE_APPLICATION_CREDENTIALS_JSON +` + + deploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: {{.DeploymentName}}-processor + image: google/cloud-sdk:slim + # Consume a message + command: [ "/bin/bash", "-c", "--" ] + args: [ "gcloud auth activate-service-account --key-file /etc/secret-volume/creds.json && \ + while true; do gcloud tasks list --queue={{.QueueID}} --location=europe-west1; sleep 20; done" ] + env: + - name: GOOGLE_APPLICATION_CREDENTIALS_JSON + valueFrom: + secretKeyRef: + name: {{.SecretName}} + key: creds.json + volumeMounts: + - name: secret-volume + mountPath: /etc/secret-volume + volumes: + - name: secret-volume + secret: + secretName: {{.SecretName}} +` + + gcpSdkTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: gcp-sdk + namespace: {{.TestNamespace}} + labels: + app: gcp-sdk +spec: + replicas: 1 + selector: + matchLabels: + app: gcp-sdk + template: + metadata: + labels: + app: gcp-sdk + spec: + containers: + - name: gcp-sdk-container + image: google/cloud-sdk:slim + # Just spin & wait forever + command: [ "/bin/bash", "-c", "--" ] + args: [ "ls /tmp && while true; do sleep 30; done;" ] + volumeMounts: + - name: secret-volume + mountPath: /etc/secret-volume + volumes: + - name: secret-volume + secret: + secretName: {{.SecretName}} +` +) + +func TestScaler(t *testing.T) { + // setup + t.Log("--- setting up ---") + require.NotEmpty(t, gcpKey, "TF_GCP_SA_CREDENTIALS env variable is required for GCP storage test") + assert.NoErrorf(t, errGcpKey, "Failed to load credentials from gcpKey - %s", errGcpKey) + + // Create kubernetes resources + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), + "replica count should be 0 after a minute") + + sdkReady := WaitForDeploymentReplicaReadyCount(t, kc, "gcp-sdk", testNamespace, 1, 60, 1) + assert.True(t, sdkReady, "gcp-sdk deployment should be ready after a minute") + + if sdkReady { + if createGcpCloudTasks(t) == nil { + // test scaling + testActivation(t, kc) + testScaleOut(t, kc) + testScaleIn(t, kc) + + // cleanup + t.Log("--- cleanup ---") + cleanupGcpCloudTasks(t) + } + } + + DeleteKubernetesResources(t, testNamespace, data, templates) +} + +func createGcpCloudTasks(t *testing.T) error { + // Authenticate to GCP + t.Log("--- authenticate to GCP ---") + cmd := fmt.Sprintf("%sgcloud auth activate-service-account %s --key-file /etc/secret-volume/creds.json --project=%s", gsPrefix, creds["client_email"], projectID) + _, err := ExecuteCommand(cmd) + assert.NoErrorf(t, err, "Failed to set GCP authentication on gcp-sdk - %s", err) + if err != nil { + return err + } + + // Create queue + t.Log("--- create queue ---") + cmd = fmt.Sprintf("%sgcloud tasks queues create %s --location europe-west1 --max-concurrent-dispatches 1 --max-dispatches-per-second 1", gsPrefix, queueID) + _, err = ExecuteCommand(cmd) + assert.NoErrorf(t, err, "Failed to create Cloud Tasks queue %s: %s", queueID, err) + if err != nil { + return err + } + + return err +} + +func cleanupGcpCloudTasks(t *testing.T) { + // Delete the queue + t.Log("--- cleaning up the queue ---") + _, _ = ExecuteCommand(fmt.Sprintf("%sgcloud tasks queues delete %s --location europe-west1 --quiet", gsPrefix, queueID)) +} + +func getTemplateData() (templateData, []Template) { + base64GcpCreds := base64.StdEncoding.EncodeToString([]byte(gcpKey)) + + return templateData{ + TestNamespace: testNamespace, + SecretName: secretName, + GcpCreds: base64GcpCreds, + DeploymentName: deploymentName, + ScaledObjectName: scaledObjectName, + QueueID: queueID, + ProjectID: projectID, + MaxReplicaCount: maxReplicaCount, + ActivationThreshold: activationThreshold, + }, []Template{ + {Name: "secretTemplate", Config: secretTemplate}, + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + {Name: "gcpSdkTemplate", Config: gcpSdkTemplate}, + } +} + +func createGcpTasks(t *testing.T, count int) { + t.Logf("--- creating %d tasks ---", count) + publish := fmt.Sprintf( + "%s/bin/bash -c -- 'for i in {1..%d}; do gcloud tasks create-http-task --location europe-west1 --queue %s --url http://foo.bar;done'", + gsPrefix, + count, + queueID) + _, err := ExecuteCommand(publish) + assert.NoErrorf(t, err, "cannot create tasks to queue - %s", err) +} + +func testActivation(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing not scaling if below threshold ---") + + createGcpTasks(t, activationThreshold) + + t.Log("--- waiting to see replicas are not scaled up ---") + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 240) +} + +func testScaleOut(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing scale out ---") + + createGcpTasks(t, 20-activationThreshold) + + t.Log("--- waiting for replicas to scale out ---") + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 30, 10), + fmt.Sprintf("replica count should be %d after five minutes", maxReplicaCount)) +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing scale in ---") + cmd := fmt.Sprintf("%sgcloud tasks queues purge %s --location europe-west1 --quiet", gsPrefix, queueID) + _, err := ExecuteCommand(cmd) + assert.NoErrorf(t, err, "cannot purge queue - %s", err) + + t.Log("--- waiting for replicas to scale in to zero ---") + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 30, 10), + "replica count should be 0 after five minute") +} diff --git a/tests/scalers/gcp/gcp_cloud_tasks_workload_identity/gcp_cloud_tasks_workload_identity_test.go b/tests/scalers/gcp/gcp_cloud_tasks_workload_identity/gcp_cloud_tasks_workload_identity_test.go new file mode 100644 index 00000000000..1de52b5068a --- /dev/null +++ b/tests/scalers/gcp/gcp_cloud_tasks_workload_identity/gcp_cloud_tasks_workload_identity_test.go @@ -0,0 +1,299 @@ +//go:build e2e +// +build e2e + +package gcp_cloud_tasks_workload_identity_test + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "os" + "testing" + "time" + + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../.env") + +var now = time.Now().UnixNano() + +const ( + testName = "gcp-cloud-tasks-workload-identity-test" +) + +var ( + gcpKey = os.Getenv("TF_GCP_SA_CREDENTIALS") + creds = make(map[string]interface{}) + errGcpKey = json.Unmarshal([]byte(gcpKey), &creds) + testNamespace = fmt.Sprintf("%s-ns", testName) + secretName = fmt.Sprintf("%s-secret", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + projectID = fmt.Sprintf("%s", creds["project_id"]) + queueID = fmt.Sprintf("keda-test-queue-%d", now) + maxReplicaCount = 4 + activationThreshold = 5 + gsPrefix = fmt.Sprintf("kubectl exec --namespace %s deploy/gcp-sdk -- ", testNamespace) +) + +type templateData struct { + TestNamespace string + SecretName string + GcpCreds string + DeploymentName string + ScaledObjectName string + QueueID string + ProjectID string + MaxReplicaCount int + ActivationThreshold int +} + +const ( + secretTemplate = ` +apiVersion: v1 +kind: Secret +metadata: + name: {{.SecretName}} + namespace: {{.TestNamespace}} +data: + creds.json: {{.GcpCreds}} +` + + scaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + pollingInterval: 5 + minReplicaCount: 0 + maxReplicaCount: {{.MaxReplicaCount}} + cooldownPeriod: 10 + triggers: + - type: gcp-cloudtasks + authenticationRef: + name: keda-trigger-auth-gcp-credentials + metadata: + projectID: {{.ProjectID}} + queueName: {{.QueueID}} + value: "5" + activationValue: "{{.ActivationThreshold}}" +` + + triggerAuthenticationTemplate = `apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: keda-trigger-auth-gcp-credentials + namespace: {{.TestNamespace}} +spec: + podIdentity: + provider: gcp` + + deploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: {{.DeploymentName}}-processor + image: google/cloud-sdk:slim + # Consume a message + command: [ "/bin/bash", "-c", "--" ] + args: [ "gcloud auth activate-service-account --key-file /etc/secret-volume/creds.json && \ + while true; do gcloud tasks list --queue={{.QueueID}} --location=europe-west1; sleep 20; done" ] + env: + - name: GOOGLE_APPLICATION_CREDENTIALS_JSON + valueFrom: + secretKeyRef: + name: {{.SecretName}} + key: creds.json + volumeMounts: + - name: secret-volume + mountPath: /etc/secret-volume + volumes: + - name: secret-volume + secret: + secretName: {{.SecretName}} +` + + gcpSdkTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: gcp-sdk + namespace: {{.TestNamespace}} + labels: + app: gcp-sdk +spec: + replicas: 1 + selector: + matchLabels: + app: gcp-sdk + template: + metadata: + labels: + app: gcp-sdk + spec: + containers: + - name: gcp-sdk-container + image: google/cloud-sdk:slim + # Just spin & wait forever + command: [ "/bin/bash", "-c", "--" ] + args: [ "ls /tmp && while true; do sleep 30; done;" ] + volumeMounts: + - name: secret-volume + mountPath: /etc/secret-volume + volumes: + - name: secret-volume + secret: + secretName: {{.SecretName}} +` +) + +func TestScaler(t *testing.T) { + // setup + t.Log("--- setting up ---") + require.NotEmpty(t, gcpKey, "TF_GCP_SA_CREDENTIALS env variable is required for GCP storage test") + assert.NoErrorf(t, errGcpKey, "Failed to load credentials from gcpKey - %s", errGcpKey) + + // Create kubernetes resources + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), + "replica count should be 0 after a minute") + + sdkReady := WaitForDeploymentReplicaReadyCount(t, kc, "gcp-sdk", testNamespace, 1, 60, 1) + assert.True(t, sdkReady, "gcp-sdk deployment should be ready after a minute") + + if sdkReady { + if createGcpCloudTasks(t) == nil { + // test scaling + testActivation(t, kc) + testScaleOut(t, kc) + testScaleIn(t, kc) + + // cleanup + t.Log("--- cleanup ---") + cleanupGcpCloudTasks(t) + } + } + + DeleteKubernetesResources(t, testNamespace, data, templates) +} + +func createGcpCloudTasks(t *testing.T) error { + // Authenticate to GCP + t.Log("--- authenticate to GCP ---") + cmd := fmt.Sprintf("%sgcloud auth activate-service-account %s --key-file /etc/secret-volume/creds.json --project=%s", gsPrefix, creds["client_email"], projectID) + _, err := ExecuteCommand(cmd) + assert.NoErrorf(t, err, "Failed to set GCP authentication on gcp-sdk - %s", err) + if err != nil { + return err + } + + // Create queue + t.Log("--- create queue ---") + cmd = fmt.Sprintf("%sgcloud tasks queues create %s --location europe-west1 --max-concurrent-dispatches 1 --max-dispatches-per-second 1", gsPrefix, queueID) + _, err = ExecuteCommand(cmd) + assert.NoErrorf(t, err, "Failed to create Cloud Tasks queue %s: %s", queueID, err) + if err != nil { + return err + } + + return err +} + +func cleanupGcpCloudTasks(t *testing.T) { + // Delete the queue + t.Log("--- cleaning up the queue ---") + _, _ = ExecuteCommand(fmt.Sprintf("%sgcloud tasks queues delete %s --location europe-west1 --quiet", gsPrefix, queueID)) +} + +func getTemplateData() (templateData, []Template) { + base64GcpCreds := base64.StdEncoding.EncodeToString([]byte(gcpKey)) + + return templateData{ + TestNamespace: testNamespace, + SecretName: secretName, + GcpCreds: base64GcpCreds, + DeploymentName: deploymentName, + ScaledObjectName: scaledObjectName, + QueueID: queueID, + ProjectID: projectID, + MaxReplicaCount: maxReplicaCount, + ActivationThreshold: activationThreshold, + }, []Template{ + {Name: "secretTemplate", Config: secretTemplate}, + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "triggerAuthenticationTemplate", Config: triggerAuthenticationTemplate}, + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + {Name: "gcpSdkTemplate", Config: gcpSdkTemplate}, + } +} + +func createGcpTasks(t *testing.T, count int) { + t.Logf("--- creating %d tasks ---", count) + publish := fmt.Sprintf( + "%s/bin/bash -c -- 'for i in {1..%d}; do gcloud tasks create-http-task --location europe-west1 --queue %s --url http://foo.bar;done'", + gsPrefix, + count, + queueID) + _, err := ExecuteCommand(publish) + assert.NoErrorf(t, err, "cannot create tasks to queue - %s", err) +} + +func testActivation(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing not scaling if below threshold ---") + + createGcpTasks(t, activationThreshold) + + t.Log("--- waiting to see replicas are not scaled up ---") + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 240) +} + +func testScaleOut(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing scale out ---") + + createGcpTasks(t, 20-activationThreshold) + + t.Log("--- waiting for replicas to scale out ---") + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 30, 10), + fmt.Sprintf("replica count should be %d after five minutes", maxReplicaCount)) +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing scale in ---") + cmd := fmt.Sprintf("%sgcloud tasks queues purge %s --location europe-west1 --quiet", gsPrefix, queueID) + _, err := ExecuteCommand(cmd) + assert.NoErrorf(t, err, "cannot purge queue - %s", err) + + t.Log("--- waiting for replicas to scale in to zero ---") + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 30, 10), + "replica count should be 0 after five minute") +}