From c21db9490c361b8538bad9b91caf4e4928548dbc Mon Sep 17 00:00:00 2001 From: Jose Maria Alvarez Date: Wed, 10 Nov 2021 17:54:45 +0100 Subject: [PATCH 01/11] Add logic to accept subscription IDs identifying the whole chain, including projectID Signed-off-by: Jose Maria Alvarez --- pkg/scalers/gcp_pub_sub_scaler.go | 17 ++++++++++--- pkg/scalers/stackdriver_client.go | 42 +++++++++++++++++++++---------- 2 files changed, 43 insertions(+), 16 deletions(-) diff --git a/pkg/scalers/gcp_pub_sub_scaler.go b/pkg/scalers/gcp_pub_sub_scaler.go index b084a3a0d94..3efea434325 100644 --- a/pkg/scalers/gcp_pub_sub_scaler.go +++ b/pkg/scalers/gcp_pub_sub_scaler.go @@ -3,7 +3,9 @@ package scalers import ( "context" "fmt" + "regexp" "strconv" + "strings" v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" @@ -19,6 +21,7 @@ import ( const ( defaultTargetSubscriptionSize = 5 pubSubStackDriverMetricName = "pubsub.googleapis.com/subscription/num_undelivered_messages" + compositeSubscriptionIdPrefix = "projects/[a-zA-Z0-9-]+/subscriptions/[a-zA-Z0-9-]+" ) type gcpAuthorizationMetadata struct { @@ -167,10 +170,18 @@ func (s *pubsubScaler) GetSubscriptionSize(ctx context.Context) (int64, error) { } s.client = client } + var subscriptionID string + var projectID string = "" + regexpExpression, _ := regexp.Compile(compositeSubscriptionIdPrefix) + if regexpExpression.MatchString(s.metadata.subscriptionName) { + subscriptionID = strings.Split(s.metadata.subscriptionName, "/")[3] + projectID = strings.Split(s.metadata.subscriptionName, "/")[1] + } else { + subscriptionID = s.metadata.subscriptionName + } + filter := `metric.type="` + pubSubStackDriverMetricName + `" AND resource.labels.subscription_id="` + subscriptionID + `"` - filter := `metric.type="` + pubSubStackDriverMetricName + `" AND resource.labels.subscription_id="` + s.metadata.subscriptionName + `"` - - return s.client.GetMetrics(ctx, filter) + return s.client.GetMetrics(ctx, filter, projectID) } func getGcpAuthorization(config *ScalerConfig, resolvedEnv map[string]string) (*gcpAuthorizationMetadata, error) { diff --git a/pkg/scalers/stackdriver_client.go b/pkg/scalers/stackdriver_client.go index cb72037629f..6ff0c29ac19 100644 --- a/pkg/scalers/stackdriver_client.go +++ b/pkg/scalers/stackdriver_client.go @@ -62,7 +62,7 @@ func NewStackDriverClientPodIdentity(ctx context.Context) (*StackDriverClient, e } // GetMetrics fetches metrics from stackdriver for a specific filter for the last minute -func (s StackDriverClient) GetMetrics(ctx context.Context, filter string) (int64, error) { +func (s StackDriverClient) GetMetrics(ctx context.Context, filter string, projectID string) (int64, error) { // Set the start time to 1 minute ago startTime := time.Now().UTC().Add(time.Minute * -2) @@ -71,22 +71,38 @@ func (s StackDriverClient) GetMetrics(ctx context.Context, filter string) (int64 // Create a request with the filter and the GCP project ID var req *monitoringpb.ListTimeSeriesRequest - if len(s.projectID) > 0 { - req = &monitoringpb.ListTimeSeriesRequest{ - Name: "projects/" + s.projectID, - Filter: filter, - Interval: &monitoringpb.TimeInterval{ - StartTime: ×tamp.Timestamp{ - Seconds: startTime.Unix(), + switch projectID { + case "": + if len(s.projectID) > 0 { + req = &monitoringpb.ListTimeSeriesRequest{ + Name: "projects/" + s.projectID, + Filter: filter, + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamp.Timestamp{ + Seconds: startTime.Unix(), + }, + EndTime: ×tamp.Timestamp{ + Seconds: endTime.Unix(), + }, }, - EndTime: ×tamp.Timestamp{ - Seconds: endTime.Unix(), + } + } else { + req = &monitoringpb.ListTimeSeriesRequest{ + Name: "projects/" + s.credentials.ProjectID, + Filter: filter, + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamp.Timestamp{ + Seconds: startTime.Unix(), + }, + EndTime: ×tamp.Timestamp{ + Seconds: endTime.Unix(), + }, }, - }, + } } - } else { + default: req = &monitoringpb.ListTimeSeriesRequest{ - Name: "projects/" + s.credentials.ProjectID, + Name: "projects/" + projectID, Filter: filter, Interval: &monitoringpb.TimeInterval{ StartTime: ×tamp.Timestamp{ From 34ba675a8e2ce1c9c9b7e39187341e0048f9b8e0 Mon Sep 17 00:00:00 2001 From: Jose Maria Alvarez Date: Wed, 10 Nov 2021 20:26:25 +0100 Subject: [PATCH 02/11] Correct static checks Signed-off-by: Jose Maria Alvarez --- pkg/scalers/gcp_pub_sub_scaler.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/scalers/gcp_pub_sub_scaler.go b/pkg/scalers/gcp_pub_sub_scaler.go index 3efea434325..d14133ef3e6 100644 --- a/pkg/scalers/gcp_pub_sub_scaler.go +++ b/pkg/scalers/gcp_pub_sub_scaler.go @@ -21,7 +21,7 @@ import ( const ( defaultTargetSubscriptionSize = 5 pubSubStackDriverMetricName = "pubsub.googleapis.com/subscription/num_undelivered_messages" - compositeSubscriptionIdPrefix = "projects/[a-zA-Z0-9-]+/subscriptions/[a-zA-Z0-9-]+" + compositeSubscriptionIDPrefix = "projects/[a-zA-Z0-9-]+/subscriptions/[a-zA-Z0-9-]+" ) type gcpAuthorizationMetadata struct { @@ -171,8 +171,8 @@ func (s *pubsubScaler) GetSubscriptionSize(ctx context.Context) (int64, error) { s.client = client } var subscriptionID string - var projectID string = "" - regexpExpression, _ := regexp.Compile(compositeSubscriptionIdPrefix) + var projectID string + regexpExpression, _ := regexp.Compile(compositeSubscriptionIDPrefix) if regexpExpression.MatchString(s.metadata.subscriptionName) { subscriptionID = strings.Split(s.metadata.subscriptionName, "/")[3] projectID = strings.Split(s.metadata.subscriptionName, "/")[1] From c2e1e252c3035b58ba113ddfe7ac51f5e60b24c3 Mon Sep 17 00:00:00 2001 From: Jose Maria Alvarez Fernandez <52029309+jmalvarezf-lmes@users.noreply.github.com> Date: Fri, 12 Nov 2021 14:16:01 +0100 Subject: [PATCH 03/11] Update pkg/scalers/gcp_pub_sub_scaler.go Co-authored-by: Herman Signed-off-by: Jose Maria Alvarez --- pkg/scalers/gcp_pub_sub_scaler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scalers/gcp_pub_sub_scaler.go b/pkg/scalers/gcp_pub_sub_scaler.go index d14133ef3e6..dcd0be31e15 100644 --- a/pkg/scalers/gcp_pub_sub_scaler.go +++ b/pkg/scalers/gcp_pub_sub_scaler.go @@ -21,7 +21,7 @@ import ( const ( defaultTargetSubscriptionSize = 5 pubSubStackDriverMetricName = "pubsub.googleapis.com/subscription/num_undelivered_messages" - compositeSubscriptionIDPrefix = "projects/[a-zA-Z0-9-]+/subscriptions/[a-zA-Z0-9-]+" + compositeSubscriptionIDPrefix = "projects/[a-z][a-zA-Z0-9-]*[a-zA-Z0-9]/subscriptions/[a-zA-Z][a-zA-Z0-9-_~%+]*" ) type gcpAuthorizationMetadata struct { From 51f83f92360dcfe9667fcd8757bc1866a2bb3103 Mon Sep 17 00:00:00 2001 From: Jose Maria Alvarez Date: Fri, 12 Nov 2021 14:19:25 +0100 Subject: [PATCH 04/11] Avoid unneccessary logic Signed-off-by: Jose Maria Alvarez --- pkg/scalers/stackdriver_client.go | 49 ++++++++----------------------- 1 file changed, 12 insertions(+), 37 deletions(-) diff --git a/pkg/scalers/stackdriver_client.go b/pkg/scalers/stackdriver_client.go index 6ff0c29ac19..40f1b48b60b 100644 --- a/pkg/scalers/stackdriver_client.go +++ b/pkg/scalers/stackdriver_client.go @@ -70,50 +70,25 @@ func (s StackDriverClient) GetMetrics(ctx context.Context, filter string, projec endTime := time.Now().UTC() // Create a request with the filter and the GCP project ID - var req *monitoringpb.ListTimeSeriesRequest + var req = &monitoringpb.ListTimeSeriesRequest{ + Filter: filter, + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamp.Timestamp{Seconds: startTime.Unix()}, + EndTime: ×tamp.Timestamp{Seconds: endTime.Unix()}, + }, + } + switch projectID { case "": if len(s.projectID) > 0 { - req = &monitoringpb.ListTimeSeriesRequest{ - Name: "projects/" + s.projectID, - Filter: filter, - Interval: &monitoringpb.TimeInterval{ - StartTime: ×tamp.Timestamp{ - Seconds: startTime.Unix(), - }, - EndTime: ×tamp.Timestamp{ - Seconds: endTime.Unix(), - }, - }, - } + req.Name = "projects/" + s.projectID } else { - req = &monitoringpb.ListTimeSeriesRequest{ - Name: "projects/" + s.credentials.ProjectID, - Filter: filter, - Interval: &monitoringpb.TimeInterval{ - StartTime: ×tamp.Timestamp{ - Seconds: startTime.Unix(), - }, - EndTime: ×tamp.Timestamp{ - Seconds: endTime.Unix(), - }, - }, - } + req.Name = "projects/" + s.credentials.ProjectID } default: - req = &monitoringpb.ListTimeSeriesRequest{ - Name: "projects/" + projectID, - Filter: filter, - Interval: &monitoringpb.TimeInterval{ - StartTime: ×tamp.Timestamp{ - Seconds: startTime.Unix(), - }, - EndTime: ×tamp.Timestamp{ - Seconds: endTime.Unix(), - }, - }, - } + req.Name = "projects/" + projectID } + // Get an iterator with the list of time series it := s.metricsClient.ListTimeSeries(ctx, req) From bee53c7e6218547b37e0f13437f646c8bf9eb209 Mon Sep 17 00:00:00 2001 From: Jose Maria Alvarez Date: Fri, 12 Nov 2021 15:52:03 +0100 Subject: [PATCH 05/11] Correct regular expression Signed-off-by: Jose Maria Alvarez --- pkg/scalers/gcp_pub_sub_scaler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scalers/gcp_pub_sub_scaler.go b/pkg/scalers/gcp_pub_sub_scaler.go index dcd0be31e15..7264d59231f 100644 --- a/pkg/scalers/gcp_pub_sub_scaler.go +++ b/pkg/scalers/gcp_pub_sub_scaler.go @@ -21,7 +21,7 @@ import ( const ( defaultTargetSubscriptionSize = 5 pubSubStackDriverMetricName = "pubsub.googleapis.com/subscription/num_undelivered_messages" - compositeSubscriptionIDPrefix = "projects/[a-z][a-zA-Z0-9-]*[a-zA-Z0-9]/subscriptions/[a-zA-Z][a-zA-Z0-9-_~%+]*" + compositeSubscriptionIDPrefix = "projects/[a-z][a-zA-Z0-9-]*[a-zA-Z0-9]/subscriptions/[a-zA-Z][a-zA-Z0-9-_~%\\+\\.]*" ) type gcpAuthorizationMetadata struct { From 376dfbc2ee97b6f41f2b0f5b60592fc7457884ef Mon Sep 17 00:00:00 2001 From: Jose Maria Alvarez Date: Fri, 12 Nov 2021 16:08:42 +0100 Subject: [PATCH 06/11] Add private method to get subscription data from subscription string Signed-off-by: Jose Maria Alvarez --- pkg/scalers/gcp_pub_sub_scaler.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/scalers/gcp_pub_sub_scaler.go b/pkg/scalers/gcp_pub_sub_scaler.go index 7264d59231f..db02bfb2470 100644 --- a/pkg/scalers/gcp_pub_sub_scaler.go +++ b/pkg/scalers/gcp_pub_sub_scaler.go @@ -170,6 +170,13 @@ func (s *pubsubScaler) GetSubscriptionSize(ctx context.Context) (int64, error) { } s.client = client } + subscriptionID, projectID := getSubscriptionData(s) + filter := `metric.type="` + pubSubStackDriverMetricName + `" AND resource.labels.subscription_id="` + subscriptionID + `"` + + return s.client.GetMetrics(ctx, filter, projectID) +} + +func getSubscriptionData(s *pubsubScaler) (string, string) { var subscriptionID string var projectID string regexpExpression, _ := regexp.Compile(compositeSubscriptionIDPrefix) @@ -179,9 +186,7 @@ func (s *pubsubScaler) GetSubscriptionSize(ctx context.Context) (int64, error) { } else { subscriptionID = s.metadata.subscriptionName } - filter := `metric.type="` + pubSubStackDriverMetricName + `" AND resource.labels.subscription_id="` + subscriptionID + `"` - - return s.client.GetMetrics(ctx, filter, projectID) + return subscriptionID, projectID } func getGcpAuthorization(config *ScalerConfig, resolvedEnv map[string]string) (*gcpAuthorizationMetadata, error) { From 443c22e588d55e62c8855dbc6c5331c2e3baa571 Mon Sep 17 00:00:00 2001 From: Jose Maria Alvarez Date: Wed, 17 Nov 2021 11:36:30 +0100 Subject: [PATCH 07/11] Add tests to check regexp expression Signed-off-by: Jose Maria Alvarez --- pkg/scalers/gcp_pubsub_scaler_test.go | 31 +++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/pkg/scalers/gcp_pubsub_scaler_test.go b/pkg/scalers/gcp_pubsub_scaler_test.go index a17060e843a..4fe19483f44 100644 --- a/pkg/scalers/gcp_pubsub_scaler_test.go +++ b/pkg/scalers/gcp_pubsub_scaler_test.go @@ -21,6 +21,13 @@ type gcpPubSubMetricIdentifier struct { name string } +type gcpPubSubSubscription struct { + metadataTestData *parsePubSubMetadataTestData + scalerIndex int + name string + projectID string +} + var testPubSubMetadata = []parsePubSubMetadataTestData{ {map[string]string{}, map[string]string{}, true}, // all properly formed @@ -35,6 +42,10 @@ var testPubSubMetadata = []parsePubSubMetadataTestData{ {map[string]string{"GoogleApplicationCredentials": "Creds", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7"}, false}, // Credentials from AuthParams with empty creds {map[string]string{"GoogleApplicationCredentials": "", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7"}, true}, + // with full link to subscription + {nil, map[string]string{"subscriptionName": "projects/myproject/subscriptions/mysubscription", "subscriptionSize": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false}, + // with full (bad) link to subscription + {nil, map[string]string{"subscriptionName": "projects/myproject/mysubscription", "subscriptionSize": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false}, } var gcpPubSubMetricIdentifiers = []gcpPubSubMetricIdentifier{ @@ -42,6 +53,11 @@ var gcpPubSubMetricIdentifiers = []gcpPubSubMetricIdentifier{ {&testPubSubMetadata[1], 1, "s1-gcp-ps-mysubscription"}, } +var gcpSubscriptionNameTests = []gcpPubSubSubscription{ + {&testPubSubMetadata[7], 1, "mysubscription", "myproject"}, + {&testPubSubMetadata[8], 1, "projects/myproject/mysubscription", ""}, +} + func TestPubSubParseMetadata(t *testing.T) { for _, testData := range testPubSubMetadata { _, err := parsePubSubMetadata(&ScalerConfig{AuthParams: testData.authParams, TriggerMetadata: testData.metadata, ResolvedEnv: testPubSubResolvedEnv}) @@ -69,3 +85,18 @@ func TestGcpPubSubGetMetricSpecForScaling(t *testing.T) { } } } + +func TestGcpPubSubSubscriptionName(t *testing.T) { + for _, testData := range gcpSubscriptionNameTests { + meta, err := parsePubSubMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testPubSubResolvedEnv, ScalerIndex: testData.scalerIndex}) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + mockGcpPubSubScaler := pubsubScaler{nil, meta} + subscriptionID, projectID := getSubscriptionData(&mockGcpPubSubScaler) + + if subscriptionID != testData.name || projectID != testData.projectID { + t.Error("Wrong Subscription parsing:", subscriptionID, projectID) + } + } +} From a04f3c0b81c52f43aa984e45cf3c4a2146573a30 Mon Sep 17 00:00:00 2001 From: Jose Maria Alvarez Date: Wed, 17 Nov 2021 11:37:59 +0100 Subject: [PATCH 08/11] Add functionality in improvements Signed-off-by: Jose Maria Alvarez --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index de54b670f73..6cd58521a00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,6 +56,7 @@ - Refactor aws related scalers to reuse the aws clients instead of creating a new one for every GetMetrics call([#2255](https://github.com/kedacore/keda/pull/2255)) - Cleanup metric names inside scalers ([#2260](https://github.com/kedacore/keda/pull/2260)) - Validating values length in prometheus query response ([#2264](https://github.com/kedacore/keda/pull/2264)) +- Add possibility to reference a GCP PubSub subscription by full link, including project ID ([#2266](https://github.com/kedacore/keda/pull/2266)) ### Breaking Changes From a336618aa92e968beaf8df6db5c7ce4752e72f5a Mon Sep 17 00:00:00 2001 From: Jose Maria Alvarez Date: Wed, 17 Nov 2021 11:38:54 +0100 Subject: [PATCH 09/11] Correct PR number Signed-off-by: Jose Maria Alvarez --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6cd58521a00..429c276741a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -56,7 +56,7 @@ - Refactor aws related scalers to reuse the aws clients instead of creating a new one for every GetMetrics call([#2255](https://github.com/kedacore/keda/pull/2255)) - Cleanup metric names inside scalers ([#2260](https://github.com/kedacore/keda/pull/2260)) - Validating values length in prometheus query response ([#2264](https://github.com/kedacore/keda/pull/2264)) -- Add possibility to reference a GCP PubSub subscription by full link, including project ID ([#2266](https://github.com/kedacore/keda/pull/2266)) +- Add possibility to reference a GCP PubSub subscription by full link, including project ID ([#2269](https://github.com/kedacore/keda/pull/2269)) ### Breaking Changes From ab61f1d31c4baa3f54ce3c1b6ae248a0d0ba5a6c Mon Sep 17 00:00:00 2001 From: Jose Maria Alvarez Date: Tue, 23 Nov 2021 13:47:04 +0100 Subject: [PATCH 10/11] Correct indentation and unused var Signed-off-by: Jose Maria Alvarez --- pkg/scalers/gcp_pub_sub_scaler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/scalers/gcp_pub_sub_scaler.go b/pkg/scalers/gcp_pub_sub_scaler.go index 720ba1b0442..64a5e558cb0 100644 --- a/pkg/scalers/gcp_pub_sub_scaler.go +++ b/pkg/scalers/gcp_pub_sub_scaler.go @@ -20,7 +20,7 @@ import ( ) const ( - compositeSubscriptionIDPrefix = "projects/[a-z][a-zA-Z0-9-]*[a-zA-Z0-9]/subscriptions/[a-zA-Z][a-zA-Z0-9-_~%\\+\\.]*" + compositeSubscriptionIDPrefix = "projects/[a-z][a-zA-Z0-9-]*[a-zA-Z0-9]/subscriptions/[a-zA-Z][a-zA-Z0-9-_~%\\+\\.]*" defaultTargetSubscriptionSize = 5 defaultTargetOldestUnackedMessageAge = 10 pubSubStackDriverSubscriptionSizeMetricName = "pubsub.googleapis.com/subscription/num_undelivered_messages" @@ -236,7 +236,7 @@ func (s *pubsubScaler) getMetrics(ctx context.Context, metricType string) (int64 } } subscriptionID, projectID := getSubscriptionData(s) - filter := `metric.type="` + metricType + `" AND resource.labels.subscription_id="` + s.metadata.subscriptionName + `"` + filter := `metric.type="` + metricType + `" AND resource.labels.subscription_id="` + subscriptionID + `"` return s.client.GetMetrics(ctx, filter, projectID) } From 071284762a15ab021d7ebd6e0bc446adbb5d593d Mon Sep 17 00:00:00 2001 From: Jose Maria Alvarez Date: Tue, 23 Nov 2021 14:12:11 +0100 Subject: [PATCH 11/11] Correct indexes for testing Signed-off-by: Jose Maria Alvarez --- pkg/scalers/gcp_pubsub_scaler_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/scalers/gcp_pubsub_scaler_test.go b/pkg/scalers/gcp_pubsub_scaler_test.go index 69bc6ff9db6..aba69eb0c2e 100644 --- a/pkg/scalers/gcp_pubsub_scaler_test.go +++ b/pkg/scalers/gcp_pubsub_scaler_test.go @@ -60,8 +60,8 @@ var gcpPubSubMetricIdentifiers = []gcpPubSubMetricIdentifier{ } var gcpSubscriptionNameTests = []gcpPubSubSubscription{ - {&testPubSubMetadata[7], 1, "mysubscription", "myproject"}, - {&testPubSubMetadata[8], 1, "projects/myproject/mysubscription", ""}, + {&testPubSubMetadata[10], 1, "mysubscription", "myproject"}, + {&testPubSubMetadata[11], 1, "projects/myproject/mysubscription", ""}, } func TestPubSubParseMetadata(t *testing.T) {