Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GCP Pub/Sub Scaler: add logic to accept subscription IDs with projectID #2269

Merged
merged 13 commits into from
Nov 23, 2021
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
- GCP PubSub scaler may be used in SubscriptionSize and OldestUnackedMessageAge modes
- Cleanup metric names inside scalers ([#2260](https://github.com/kedacore/keda/pull/2260))
- Validating values length in prometheus query response ([#2264](https://github.com/kedacore/keda/pull/2264))
- Add possibility to reference a GCP PubSub subscription by full link, including project ID ([#2269](https://github.com/kedacore/keda/pull/2269))
- Add `unsafeSsl` parameter in SeleniumGrid scaler ([#2157](https://github.com/kedacore/keda/pull/2157))
- Improve logs of Azure Pipelines Scaler. ([#2297](https://github.com/kedacore/keda/pull/2297))

Expand Down
20 changes: 18 additions & 2 deletions pkg/scalers/gcp_pub_sub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"errors"
"fmt"
"regexp"
"strconv"
"strings"

"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -18,6 +20,7 @@ import (
)

const (
compositeSubscriptionIDPrefix = "projects/[a-z][a-zA-Z0-9-]*[a-zA-Z0-9]/subscriptions/[a-zA-Z][a-zA-Z0-9-_~%\\+\\.]*"
defaultTargetSubscriptionSize = 5
defaultTargetOldestUnackedMessageAge = 10
pubSubStackDriverSubscriptionSizeMetricName = "pubsub.googleapis.com/subscription/num_undelivered_messages"
Expand Down Expand Up @@ -232,10 +235,23 @@ func (s *pubsubScaler) getMetrics(ctx context.Context, metricType string) (int64
return -1, err
}
}
subscriptionID, projectID := getSubscriptionData(s)
filter := `metric.type="` + metricType + `" AND resource.labels.subscription_id="` + subscriptionID + `"`

filter := `metric.type="` + metricType + `" AND resource.labels.subscription_id="` + s.metadata.subscriptionName + `"`
return s.client.GetMetrics(ctx, filter, projectID)
}

return s.client.GetMetrics(ctx, filter)
func getSubscriptionData(s *pubsubScaler) (string, string) {
var subscriptionID string
jmalvarezf-lmes marked this conversation as resolved.
Show resolved Hide resolved
var projectID string
regexpExpression, _ := regexp.Compile(compositeSubscriptionIDPrefix)
if regexpExpression.MatchString(s.metadata.subscriptionName) {
subscriptionID = strings.Split(s.metadata.subscriptionName, "/")[3]
projectID = strings.Split(s.metadata.subscriptionName, "/")[1]
} else {
subscriptionID = s.metadata.subscriptionName
}
return subscriptionID, projectID
}

func getGcpAuthorization(config *ScalerConfig, resolvedEnv map[string]string) (*gcpAuthorizationMetadata, error) {
Expand Down
33 changes: 32 additions & 1 deletion pkg/scalers/gcp_pubsub_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ type gcpPubSubMetricIdentifier struct {
name string
}

type gcpPubSubSubscription struct {
metadataTestData *parsePubSubMetadataTestData
scalerIndex int
name string
projectID string
}

var testPubSubMetadata = []parsePubSubMetadataTestData{
{map[string]string{}, map[string]string{}, true},
// all properly formed with deprecated field
Expand All @@ -40,14 +47,23 @@ var testPubSubMetadata = []parsePubSubMetadataTestData{
// Credentials from AuthParams
{map[string]string{"GoogleApplicationCredentials": "Creds", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "value": "7"}, false},
// Credentials from AuthParams with empty creds
{map[string]string{"GoogleApplicationCredentials": "", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "value": "7"}, true},
{map[string]string{"GoogleApplicationCredentials": "", "podIdentityOwner": ""}, map[string]string{"subscriptionName": "mysubscription", "subscriptionSize": "7"}, true},
// with full link to subscription
{nil, map[string]string{"subscriptionName": "projects/myproject/subscriptions/mysubscription", "subscriptionSize": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// with full (bad) link to subscription
{nil, map[string]string{"subscriptionName": "projects/myproject/mysubscription", "subscriptionSize": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
}

var gcpPubSubMetricIdentifiers = []gcpPubSubMetricIdentifier{
{&testPubSubMetadata[1], 0, "s0-gcp-ps-mysubscription"},
{&testPubSubMetadata[1], 1, "s1-gcp-ps-mysubscription"},
}

var gcpSubscriptionNameTests = []gcpPubSubSubscription{
{&testPubSubMetadata[10], 1, "mysubscription", "myproject"},
{&testPubSubMetadata[11], 1, "projects/myproject/mysubscription", ""},
}

func TestPubSubParseMetadata(t *testing.T) {
for _, testData := range testPubSubMetadata {
_, err := parsePubSubMetadata(&ScalerConfig{AuthParams: testData.authParams, TriggerMetadata: testData.metadata, ResolvedEnv: testPubSubResolvedEnv})
Expand Down Expand Up @@ -75,3 +91,18 @@ func TestGcpPubSubGetMetricSpecForScaling(t *testing.T) {
}
}
}

func TestGcpPubSubSubscriptionName(t *testing.T) {
for _, testData := range gcpSubscriptionNameTests {
meta, err := parsePubSubMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, ResolvedEnv: testPubSubResolvedEnv, ScalerIndex: testData.scalerIndex})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockGcpPubSubScaler := pubsubScaler{nil, meta}
subscriptionID, projectID := getSubscriptionData(&mockGcpPubSubScaler)

if subscriptionID != testData.name || projectID != testData.projectID {
t.Error("Wrong Subscription parsing:", subscriptionID, projectID)
}
}
}
45 changes: 18 additions & 27 deletions pkg/scalers/stackdriver_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,42 +62,33 @@ func NewStackDriverClientPodIdentity(ctx context.Context) (*StackDriverClient, e
}

// GetMetrics fetches metrics from stackdriver for a specific filter for the last minute
func (s StackDriverClient) GetMetrics(ctx context.Context, filter string) (int64, error) {
func (s StackDriverClient) GetMetrics(ctx context.Context, filter string, projectID string) (int64, error) {
// Set the start time to 1 minute ago
startTime := time.Now().UTC().Add(time.Minute * -2)

// Set the end time to now
endTime := time.Now().UTC()

// Create a request with the filter and the GCP project ID
var req *monitoringpb.ListTimeSeriesRequest
if len(s.projectID) > 0 {
req = &monitoringpb.ListTimeSeriesRequest{
Name: "projects/" + s.projectID,
Filter: filter,
Interval: &monitoringpb.TimeInterval{
StartTime: &timestamp.Timestamp{
Seconds: startTime.Unix(),
},
EndTime: &timestamp.Timestamp{
Seconds: endTime.Unix(),
},
},
}
} else {
req = &monitoringpb.ListTimeSeriesRequest{
Name: "projects/" + s.credentials.ProjectID,
Filter: filter,
Interval: &monitoringpb.TimeInterval{
StartTime: &timestamp.Timestamp{
Seconds: startTime.Unix(),
},
EndTime: &timestamp.Timestamp{
Seconds: endTime.Unix(),
},
},
var req = &monitoringpb.ListTimeSeriesRequest{
Filter: filter,
Interval: &monitoringpb.TimeInterval{
StartTime: &timestamp.Timestamp{Seconds: startTime.Unix()},
EndTime: &timestamp.Timestamp{Seconds: endTime.Unix()},
},
}

switch projectID {
case "":
if len(s.projectID) > 0 {
req.Name = "projects/" + s.projectID
} else {
req.Name = "projects/" + s.credentials.ProjectID
}
default:
req.Name = "projects/" + projectID
}

// Get an iterator with the list of time series
it := s.metricsClient.ListTimeSeries(ctx, req)

Expand Down