Skip to content

Commit

Permalink
Fix stackdriver client returning 0 for metric types of double (kedaco…
Browse files Browse the repository at this point in the history
…re#3788)

* Update stackdriver client to handle metrics of value type double

Signed-off-by: Eric Takemoto <24865872+octothorped@users.noreply.github.com>

* move change log note to below general

Signed-off-by: Eric Takemoto <24865872+octothorped@users.noreply.github.com>

* parse activation value as float64

Signed-off-by: Eric Takemoto <24865872+octothorped@users.noreply.github.com>

* change target value to float64 for GCP pub/sub and stackdriver

Signed-off-by: Eric Takemoto <24865872+octothorped@users.noreply.github.com>

Signed-off-by: Eric Takemoto <24865872+octothorped@users.noreply.github.com>
  • Loading branch information
octothorped authored and josephangbc committed Dec 6, 2022
1 parent 40a2be6 commit a0e74a0
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **General:** Provide patch for CVE-2022-3172 vulnerability ([#3690](https://github.com/kedacore/keda/issues/3690))
- **General:** Respect optional parameter inside envs for ScaledJobs ([#3568](https://github.com/kedacore/keda/issues/3568))
- **Azure Blob Scaler** Store forgotten logger ([#3811](https://github.com/kedacore/keda/issues/3811))
- **GCP Stackdriver Scalar:** Update Stackdriver client to handle detecting double and int64 value types ([#3777](https://github.com/kedacore/keda/issues/3777))
- **New Relic Scaler** Store forgotten logger ([#3945](https://github.com/kedacore/keda/issues/3945))
- **Prometheus Scaler:** Treat Inf the same as Null result ([#3644](https://github.com/kedacore/keda/issues/3644))
- **NATS Jetstream:** Correctly count messages that should be redelivered (waiting for ack) towards keda value ([#3787](https://github.com/kedacore/keda/issues/3787))
Expand Down
18 changes: 9 additions & 9 deletions pkg/scalers/gcp_pubsub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ type pubsubScaler struct {

type pubsubMetadata struct {
mode string
value int64
activationValue int64
value float64
activationValue float64

subscriptionName string
gcpAuthorization *gcpAuthorizationMetadata
Expand Down Expand Up @@ -79,7 +79,7 @@ func parsePubSubMetadata(config *ScalerConfig, logger logr.Logger) (*pubsubMetad
}
logger.Info("subscriptionSize field is deprecated. Use mode and value fields instead")
meta.mode = pubsubModeSubscriptionSize
subSizeValue, err := strconv.ParseInt(subSize, 10, 64)
subSizeValue, err := strconv.ParseFloat(subSize, 64)
if err != nil {
return nil, fmt.Errorf("value parsing error %s", err.Error())
}
Expand All @@ -99,7 +99,7 @@ func parsePubSubMetadata(config *ScalerConfig, logger logr.Logger) (*pubsubMetad
}

if valuePresent {
triggerValue, err := strconv.ParseInt(value, 10, 64)
triggerValue, err := strconv.ParseFloat(value, 64)
if err != nil {
return nil, fmt.Errorf("value parsing error %s", err.Error())
}
Expand All @@ -119,7 +119,7 @@ func parsePubSubMetadata(config *ScalerConfig, logger logr.Logger) (*pubsubMetad

meta.activationValue = 0
if val, ok := config.TriggerMetadata["activationValue"]; ok {
activationValue, err := strconv.ParseInt(val, 10, 64)
activationValue, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("activationValue parsing error %s", err.Error())
}
Expand Down Expand Up @@ -175,7 +175,7 @@ func (s *pubsubScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, kedautil.NormalizeString(fmt.Sprintf("gcp-ps-%s", s.metadata.subscriptionName))),
},
Target: GetMetricTarget(s.metricType, s.metadata.value),
Target: GetMetricTargetMili(s.metricType, s.metadata.value),
}

// Create the metric spec for the HPA
Expand All @@ -189,7 +189,7 @@ func (s *pubsubScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec

// GetMetrics connects to Stack Driver and finds the size of the pub sub subscription
func (s *pubsubScaler) GetMetrics(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, error) {
var value int64
var value float64
var err error

switch s.metadata.mode {
Expand All @@ -207,7 +207,7 @@ func (s *pubsubScaler) GetMetrics(ctx context.Context, metricName string) ([]ext
}
}

metric := GenerateMetricInMili(metricName, float64(value))
metric := GenerateMetricInMili(metricName, value)

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}
Expand All @@ -229,7 +229,7 @@ func (s *pubsubScaler) setStackdriverClient(ctx context.Context) error {
}

// getMetrics gets metric type value from stackdriver api
func (s *pubsubScaler) getMetrics(ctx context.Context, metricType string) (int64, error) {
func (s *pubsubScaler) getMetrics(ctx context.Context, metricType string) (float64, error) {
if s.client == nil {
err := s.setStackdriverClient(ctx)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/scalers/gcp_pubsub_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ var testPubSubMetadata = []parsePubSubMetadataTestData{
{nil, map[string]string{"subscriptionName": "projects/myproject/subscriptions/mysubscription", "subscriptionSize": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// with full (bad) link to subscription
{nil, map[string]string{"subscriptionName": "projects/myproject/mysubscription", "subscriptionSize": "7", "credentialsFromEnv": "SAMPLE_CREDS"}, false},
// properly formed float value and activationTargetValue
{nil, map[string]string{"subscriptionName": "mysubscription", "value": "7.1", "credentialsFromEnv": "SAMPLE_CREDS", "activationValue": "2.1"}, false},
}

var gcpPubSubMetricIdentifiers = []gcpPubSubMetricIdentifier{
Expand Down
22 changes: 19 additions & 3 deletions pkg/scalers/gcp_stackdriver_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (s StackDriverClient) GetMetrics(
ctx context.Context,
filter string,
projectID string,
aggregation *monitoringpb.Aggregation) (int64, error) {
aggregation *monitoringpb.Aggregation) (float64, error) {
// Set the start time to 1 minute ago
startTime := time.Now().UTC().Add(time.Minute * -2)

Expand Down Expand Up @@ -197,7 +197,7 @@ func (s StackDriverClient) GetMetrics(
// Get an iterator with the list of time series
it := s.metricsClient.ListTimeSeries(ctx, req)

var value int64 = -1
var value float64 = -1

// Get the value from the first metric returned
resp, err := it.Next()
Expand All @@ -212,12 +212,28 @@ func (s StackDriverClient) GetMetrics(

if len(resp.GetPoints()) > 0 {
point := resp.GetPoints()[0]
value = point.GetValue().GetInt64Value()
value, err = extractValueFromPoint(point)

if err != nil {
return value, err
}
}

return value, nil
}

// extractValueFromPoint attempts to extract a float64 by asserting the point's value type
func extractValueFromPoint(point *monitoringpb.Point) (float64, error) {
typedValue := point.GetValue()
switch typedValue.Value.(type) {
case *monitoringpb.TypedValue_DoubleValue:
return typedValue.GetDoubleValue(), nil
case *monitoringpb.TypedValue_Int64Value:
return float64(typedValue.GetInt64Value()), nil
}
return -1, fmt.Errorf("could not extract value from metric of type %T", typedValue)
}

// GoogleApplicationCredentials is a struct representing the format of a service account
// credentials file
type GoogleApplicationCredentials struct {
Expand Down
16 changes: 8 additions & 8 deletions pkg/scalers/gcp_stackdriver_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type stackdriverScaler struct {
type stackdriverMetadata struct {
projectID string
filter string
targetValue int64
activationTargetValue int64
targetValue float64
activationTargetValue float64
metricName string

gcpAuthorization *gcpAuthorizationMetadata
Expand Down Expand Up @@ -91,7 +91,7 @@ func parseStackdriverMetadata(config *ScalerConfig, logger logr.Logger) (*stackd
meta.metricName = GenerateMetricNameWithIndex(config.ScalerIndex, name)

if val, ok := config.TriggerMetadata["targetValue"]; ok {
targetValue, err := strconv.ParseInt(val, 10, 64)
targetValue, err := strconv.ParseFloat(val, 64)
if err != nil {
logger.Error(err, "Error parsing targetValue")
return nil, fmt.Errorf("error parsing targetValue: %s", err.Error())
Expand All @@ -102,7 +102,7 @@ func parseStackdriverMetadata(config *ScalerConfig, logger logr.Logger) (*stackd

meta.activationTargetValue = 0
if val, ok := config.TriggerMetadata["activationTargetValue"]; ok {
activationTargetValue, err := strconv.ParseInt(val, 10, 64)
activationTargetValue, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("activationTargetValue parsing error %s", err.Error())
}
Expand Down Expand Up @@ -188,7 +188,7 @@ func (s *stackdriverScaler) GetMetricSpecForScaling(context.Context) []v2.Metric
Metric: v2.MetricIdentifier{
Name: s.metadata.metricName,
},
Target: GetMetricTarget(s.metricType, s.metadata.targetValue),
Target: GetMetricTargetMili(s.metricType, s.metadata.targetValue),
}

// Create the metric spec for the HPA
Expand All @@ -208,17 +208,17 @@ func (s *stackdriverScaler) GetMetrics(ctx context.Context, metricName string) (
return []external_metrics.ExternalMetricValue{}, err
}

metric := GenerateMetricInMili(metricName, float64(value))
metric := GenerateMetricInMili(metricName, value)

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// getMetrics gets metric type value from stackdriver api
func (s *stackdriverScaler) getMetrics(ctx context.Context) (int64, error) {
func (s *stackdriverScaler) getMetrics(ctx context.Context) (float64, error) {
val, err := s.client.GetMetrics(ctx, s.metadata.filter, s.metadata.projectID, s.metadata.aggregation)
if err == nil {
s.logger.V(1).Info(
fmt.Sprintf("Getting metrics for project %s, filter %s and aggregation %v. Result: %d",
fmt.Sprintf("Getting metrics for project %s, filter %s and aggregation %v. Result: %f",
s.metadata.projectID,
s.metadata.filter,
s.metadata.aggregation,
Expand Down
2 changes: 2 additions & 0 deletions pkg/scalers/gcp_stackdriver_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ var testStackdriverMetadata = []parseStackdriverMetadataTestData{
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "credentialsFromEnv": "SAMPLE_CREDS", "alignmentPeriodSeconds": "30"}, true},
// With bad alignment period
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "credentialsFromEnv": "SAMPLE_CREDS", "alignmentPeriodSeconds": "a"}, true},
// properly formed float targetValue and activationTargetValue
{nil, map[string]string{"projectId": "myProject", "filter": sdFilter, "credentialsFromEnv": "SAMPLE_CREDS", "targetValue": "1.1", "activationTargetValue": "2.1"}, false},
}

var gcpStackdriverMetricIdentifiers = []gcpStackdriverMetricIdentifier{
Expand Down

0 comments on commit a0e74a0

Please sign in to comment.