forked from kedacore/keda
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Add scaler for Google Cloud Tasks (kedacore#4834)
* feature: add cloud tasks scaler Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es> * Add cloud tasks e2e test Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es> * fix: Return files to the original state Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es> * fix: static checks Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es> * fix: more static checks fixed Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es> * fix: add location for queue Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es> * fix: add correct command to create test tasks in queue Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es> * fix: add fixes to test and add pod identity test Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es> * fix: specify location also in tasks Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es> * fix: correct indentation and location for purge Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es> * fix: correct naming, add package correctly to identity tests Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es> * fix: change test name Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es> * fix: add gcp as prefix for naming for clarity Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es> * fix: correct problem in test when changing name in struct Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es> * fix: order in changelog Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es> * fix: another order change in changelog Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es> * fix: more renaming Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es> * fix: put it in a new section Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es> * fix: try a new order for the changelog Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es> * fix: delete unneeded line Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es> * fix: into the new Changelog section Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es> * fix: another order fix Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es> --------- Signed-off-by: Jose Maria Alvarez <jose-maria.alvarez@leroymerlin.es> Signed-off-by: Jose Maria Alvarez Fernandez <52029309+jmalvarezf-lmes@users.noreply.github.com>
- Loading branch information
1 parent
c94c20f
commit 7daa09a
Showing
6 changed files
with
854 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
package scalers | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"strconv" | ||
|
||
"github.com/go-logr/logr" | ||
v2 "k8s.io/api/autoscaling/v2" | ||
"k8s.io/metrics/pkg/apis/external_metrics" | ||
|
||
kedautil "github.com/kedacore/keda/v2/pkg/util" | ||
) | ||
|
||
const ( | ||
cloudTasksStackDriverQueueSize = "cloudtasks.googleapis.com/queue/depth" | ||
|
||
cloudTaskDefaultValue = 100 | ||
) | ||
|
||
type gcpCloudTasksScaler struct { | ||
client *StackDriverClient | ||
metricType v2.MetricTargetType | ||
metadata *gcpCloudTaskMetadata | ||
logger logr.Logger | ||
} | ||
|
||
type gcpCloudTaskMetadata struct { | ||
value float64 | ||
activationValue float64 | ||
|
||
queueName string | ||
projectID string | ||
gcpAuthorization *gcpAuthorizationMetadata | ||
scalerIndex int | ||
} | ||
|
||
// NewCloudTaskScaler creates a new cloudTaskScaler | ||
func NewGcpCloudTasksScaler(config *ScalerConfig) (Scaler, error) { | ||
metricType, err := GetMetricTargetType(config) | ||
if err != nil { | ||
return nil, fmt.Errorf("error getting scaler metric type: %w", err) | ||
} | ||
|
||
logger := InitializeLogger(config, "gcp_cloud_tasks_scaler") | ||
|
||
meta, err := parseGcpCloudTasksMetadata(config) | ||
if err != nil { | ||
return nil, fmt.Errorf("error parsing Cloud Tasks metadata: %w", err) | ||
} | ||
|
||
return &gcpCloudTasksScaler{ | ||
metricType: metricType, | ||
metadata: meta, | ||
logger: logger, | ||
}, nil | ||
} | ||
|
||
func parseGcpCloudTasksMetadata(config *ScalerConfig) (*gcpCloudTaskMetadata, error) { | ||
meta := gcpCloudTaskMetadata{value: cloudTaskDefaultValue} | ||
|
||
value, valuePresent := config.TriggerMetadata["value"] | ||
|
||
if valuePresent { | ||
triggerValue, err := strconv.ParseFloat(value, 64) | ||
if err != nil { | ||
return nil, fmt.Errorf("value parsing error %w", err) | ||
} | ||
meta.value = triggerValue | ||
} | ||
|
||
if val, ok := config.TriggerMetadata["queueName"]; ok { | ||
if val == "" { | ||
return nil, fmt.Errorf("no queue name given") | ||
} | ||
|
||
meta.queueName = val | ||
} else { | ||
return nil, fmt.Errorf("no queue name given") | ||
} | ||
|
||
meta.activationValue = 0 | ||
if val, ok := config.TriggerMetadata["activationValue"]; ok { | ||
activationValue, err := strconv.ParseFloat(val, 64) | ||
if err != nil { | ||
return nil, fmt.Errorf("activationValue parsing error %w", err) | ||
} | ||
meta.activationValue = activationValue | ||
} | ||
|
||
if val, ok := config.TriggerMetadata["projectID"]; ok { | ||
if val == "" { | ||
return nil, fmt.Errorf("no project id given") | ||
} | ||
|
||
meta.projectID = val | ||
} else { | ||
return nil, fmt.Errorf("no project id given") | ||
} | ||
|
||
auth, err := getGCPAuthorization(config) | ||
if err != nil { | ||
return nil, err | ||
} | ||
meta.gcpAuthorization = auth | ||
meta.scalerIndex = config.ScalerIndex | ||
return &meta, nil | ||
} | ||
|
||
func (s *gcpCloudTasksScaler) Close(context.Context) error { | ||
if s.client != nil { | ||
err := s.client.metricsClient.Close() | ||
s.client = nil | ||
if err != nil { | ||
s.logger.Error(err, "error closing StackDriver client") | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// GetMetricSpecForScaling returns the metric spec for the HPA | ||
func (s *gcpCloudTasksScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { | ||
externalMetric := &v2.ExternalMetricSource{ | ||
Metric: v2.MetricIdentifier{ | ||
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("gcp-ct-%s", s.metadata.queueName))), | ||
}, | ||
Target: GetMetricTargetMili(s.metricType, s.metadata.value), | ||
} | ||
|
||
// Create the metric spec for the HPA | ||
metricSpec := v2.MetricSpec{ | ||
External: externalMetric, | ||
Type: externalMetricType, | ||
} | ||
|
||
return []v2.MetricSpec{metricSpec} | ||
} | ||
|
||
// GetMetricsAndActivity connects to Stack Driver and finds the size of the cloud task | ||
func (s *gcpCloudTasksScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { | ||
metricType := cloudTasksStackDriverQueueSize | ||
|
||
value, err := s.getMetrics(ctx, metricType) | ||
if err != nil { | ||
s.logger.Error(err, "error getting metric", "metricType", metricType) | ||
return []external_metrics.ExternalMetricValue{}, false, err | ||
} | ||
|
||
metric := GenerateMetricInMili(metricName, value) | ||
|
||
return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.activationValue, nil | ||
} | ||
|
||
func (s *gcpCloudTasksScaler) setStackdriverClient(ctx context.Context) error { | ||
var client *StackDriverClient | ||
var err error | ||
if s.metadata.gcpAuthorization.podIdentityProviderEnabled { | ||
client, err = NewStackDriverClientPodIdentity(ctx) | ||
} else { | ||
client, err = NewStackDriverClient(ctx, s.metadata.gcpAuthorization.GoogleApplicationCredentials) | ||
} | ||
|
||
if err != nil { | ||
return err | ||
} | ||
s.client = client | ||
return nil | ||
} | ||
|
||
// getMetrics gets metric type value from stackdriver api | ||
func (s *gcpCloudTasksScaler) getMetrics(ctx context.Context, metricType string) (float64, error) { | ||
if s.client == nil { | ||
err := s.setStackdriverClient(ctx) | ||
if err != nil { | ||
return -1, err | ||
} | ||
} | ||
filter := `metric.type="` + metricType + `" AND resource.labels.queue_id="` + s.metadata.queueName + `"` | ||
|
||
// Cloud Tasks metrics are collected every 60 seconds so no need to aggregate them. | ||
// See: https://cloud.google.com/monitoring/api/metrics_gcp#gcp-cloudtasks | ||
return s.client.GetMetrics(ctx, filter, s.metadata.projectID, nil) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
package scalers | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/go-logr/logr" | ||
) | ||
|
||
var testGcpCloudTasksResolvedEnv = map[string]string{ | ||
"SAMPLE_CREDS": "{}", | ||
} | ||
|
||
type parseGcpCloudTasksMetadataTestData struct { | ||
authParams map[string]string | ||
metadata map[string]string | ||
isError bool | ||
} | ||
|
||
type gcpCloudTasksMetricIdentifier struct { | ||
metadataTestData *parseGcpCloudTasksMetadataTestData | ||
scalerIndex int | ||
name string | ||
} | ||
|
||
var testGcpCloudTasksMetadata = []parseGcpCloudTasksMetadataTestData{ | ||
{map[string]string{}, map[string]string{}, true}, | ||
// all properly formed | ||
{nil, map[string]string{"queueName": "myQueue", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "projectID": "myproject", "activationValue": "5"}, false}, | ||
// missing subscriptionName | ||
{nil, map[string]string{"queueName": "", "value": "7", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, | ||
// missing credentials | ||
{nil, map[string]string{"queueName": "myQueue", "value": "7", "projectID": "myproject", "credentialsFromEnv": ""}, true}, | ||
// malformed subscriptionSize | ||
{nil, map[string]string{"queueName": "myQueue", "value": "AA", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, | ||
// malformed mode | ||
{nil, map[string]string{"queueName": "", "mode": "AA", "value": "7", "projectID": "myproject", "credentialsFromEnv": "SAMPLE_CREDS"}, true}, | ||
// malformed activationTargetValue | ||
{nil, map[string]string{"queueName": "myQueue", "value": "7", "credentialsFromEnv": "SAMPLE_CREDS", "projectID": "myproject", "activationValue": "AA"}, true}, | ||
// Credentials from AuthParams | ||
{map[string]string{"GoogleApplicationCredentials": "Creds"}, map[string]string{"queueName": "myQueue", "value": "7", "projectID": "myproject"}, false}, | ||
// Credentials from AuthParams with empty creds | ||
{map[string]string{"GoogleApplicationCredentials": ""}, map[string]string{"queueName": "myQueue", "subscriptionSize": "7", "projectID": "myproject"}, true}, | ||
// properly formed float value and activationTargetValue | ||
{nil, map[string]string{"queueName": "mysubscription", "value": "7.1", "credentialsFromEnv": "SAMPLE_CREDS", "activationValue": "2.1", "projectID": "myproject"}, false}, | ||
} | ||
|
||
var gcpCloudTasksMetricIdentifiers = []gcpCloudTasksMetricIdentifier{ | ||
{&testGcpCloudTasksMetadata[1], 0, "s0-gcp-ct-myQueue"}, | ||
{&testGcpCloudTasksMetadata[1], 1, "s1-gcp-ct-myQueue"}, | ||
} | ||
|
||
func TestGcpCloudTasksParseMetadata(t *testing.T) { | ||
for _, testData := range testGcpCloudTasksMetadata { | ||
_, err := parseGcpCloudTasksMetadata(&ScalerConfig{AuthParams: testData.authParams, TriggerMetadata: testData.metadata, ResolvedEnv: testGcpCloudTasksResolvedEnv}) | ||
if err != nil && !testData.isError { | ||
t.Error("Expected success but got error", err) | ||
} | ||
if testData.isError && err == nil { | ||
t.Error("Expected error but got success") | ||
} | ||
} | ||
} | ||
|
||
func TestGcpCloudTasksGetMetricSpecForScaling(t *testing.T) { | ||
for _, testData := range gcpCloudTasksMetricIdentifiers { | ||
meta, err := parseGcpCloudTasksMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testGcpCloudTasksResolvedEnv, ScalerIndex: testData.scalerIndex}) | ||
if err != nil { | ||
t.Fatal("Could not parse metadata:", err) | ||
} | ||
mockGcpCloudTasksScaler := gcpCloudTasksScaler{nil, "", meta, logr.Discard()} | ||
|
||
metricSpec := mockGcpCloudTasksScaler.GetMetricSpecForScaling(context.Background()) | ||
metricName := metricSpec[0].External.Metric.Name | ||
if metricName != testData.name { | ||
t.Error("Wrong External metric source name:", metricName) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.