diff --git a/CHANGELOG.md b/CHANGELOG.md index 88a918790cf..a71f79fc6ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ - **Azure Queue:** Don't call Azure queue GetProperties API unnecessarily ([#2613](https://github.com/kedacore/keda/pull/2613)) - **Datadog Scaler:** Validate query to contain `{` to prevent panic on invalid query ([#2625](https://github.com/kedacore/keda/issues/2625)) +- **Datadog Scaler:** Several improvements, including a new optional parameter `metricUnavailableValue` to fill data when no Datadog metric was returned ([#2657](https://github.com/kedacore/keda/issues/2657)) - **GCP Pubsub Scaler** Adding e2e test for GCP PubSub scaler ([#1528](https://github.com/kedacore/keda/issues/1528)) - **Kafka Scaler** Make "disable" a valid value for tls auth parameter ([#2608](https://github.com/kedacore/keda/issues/2608)) - **RabbitMQ Scaler:** Include `vhost` for RabbitMQ when retrieving queue info with `useRegex` ([#2498](https://github.com/kedacore/keda/issues/2498)) diff --git a/pkg/scalers/datadog_scaler.go b/pkg/scalers/datadog_scaler.go index a5db1edbfd5..cdee7309a61 100644 --- a/pkg/scalers/datadog_scaler.go +++ b/pkg/scalers/datadog_scaler.go @@ -3,6 +3,7 @@ package scalers import ( "context" "fmt" + "regexp" "strconv" "strings" "time" @@ -39,10 +40,20 @@ type datadogMetadata struct { vType valueType metricName string age int + useFiller bool + fillValue float64 } var datadogLog = logf.Log.WithName("datadog_scaler") +var aggregator, filter, rollup *regexp.Regexp + +func init() { + aggregator = regexp.MustCompile(`^(avg|sum|min|max):.*`) + filter = regexp.MustCompile(`.*\{.*\}.*`) + rollup = regexp.MustCompile(`.*\.rollup\(([a-z]+,)?\s*(.+)\)`) +} + // NewDatadogScaler creates a new Datadog scaler func NewDatadogScaler(ctx context.Context, config *ScalerConfig) (Scaler, error) { meta, err := parseDatadogMetadata(config) @@ -60,17 +71,65 @@ func NewDatadogScaler(ctx context.Context, config *ScalerConfig) (Scaler, error) }, nil } +// parseAndTransformDatadogQuery checks correctness of the user query and adds rollup if not available +func parseAndTransformDatadogQuery(q string, age int) (string, error) { + // Queries should start with a valid aggregator. If not found, prepend avg as default + if !aggregator.MatchString(q) { + q = "avg:" + q + } + + // Wellformed Datadog queries require a filter (between curly brackets) + if !filter.MatchString(q) { + return "", fmt.Errorf("malformed Datadog query") + } + + // Queries can contain rollup functions. + match := rollup.FindStringSubmatch(q) + if match != nil { + // If added, check that the number of seconds is an int + rollupAgg, err := strconv.Atoi(match[2]) + if err != nil { + return "", fmt.Errorf("malformed Datadog query") + } + + if rollupAgg > age { + return "", fmt.Errorf("rollup cannot be bigger than time window") + } + } else { // If not added, use a default rollup based on the time window size + s := fmt.Sprintf(".rollup(avg, %d)", age/5) + q += s + } + + return q, nil +} + func parseDatadogMetadata(config *ScalerConfig) (*datadogMetadata, error) { meta := datadogMetadata{} + if val, ok := config.TriggerMetadata["age"]; ok { + age, err := strconv.Atoi(val) + if err != nil { + return nil, fmt.Errorf("age parsing error %s", err.Error()) + } + meta.age = age + + if age < 60 { + datadogLog.Info("selecting a window smaller than 60 seconds can cause Datadog not finding a metric value for the query") + } + } else { + meta.age = 90 // Default window 90 seconds + } + if val, ok := config.TriggerMetadata["query"]; ok { - meta.query = val + query, err := parseAndTransformDatadogQuery(val, meta.age) + + if err != nil { + return nil, fmt.Errorf("error in query: %s", err.Error()) + } + meta.query = query } else { return nil, fmt.Errorf("no query given") } - if !strings.Contains(meta.query, "{") { - return nil, fmt.Errorf("expecting query to contain `{`, got %s", meta.query) - } if val, ok := config.TriggerMetadata["queryValue"]; ok { queryValue, err := strconv.Atoi(val) @@ -82,20 +141,15 @@ func parseDatadogMetadata(config *ScalerConfig) (*datadogMetadata, error) { return nil, fmt.Errorf("no queryValue given") } - if val, ok := config.TriggerMetadata["age"]; ok { - age, err := strconv.Atoi(val) + if val, ok := config.TriggerMetadata["metricUnavailableValue"]; ok { + fillValue, err := strconv.ParseFloat(val, 64) if err != nil { - return nil, fmt.Errorf("age parsing error %s", err.Error()) + return nil, fmt.Errorf("metricUnavailableValue parsing error %s", err.Error()) } - meta.age = age - } else { - meta.age = 90 // Default window 90 seconds + meta.fillValue = fillValue + meta.useFiller = true } - // For all the points in a given window, we take the rollup to the window size - rollup := fmt.Sprintf(".rollup(avg, %d)", meta.age) - meta.query += rollup - if val, ok := config.TriggerMetadata["type"]; ok { val = strings.ToLower(val) switch val { @@ -174,50 +228,19 @@ func (s *datadogScaler) Close(context.Context) error { return nil } -// IsActive returns true if we are able to get metrics from Datadog +// IsActive checks whether the value returned by Datadog is higher than the target value func (s *datadogScaler) IsActive(ctx context.Context) (bool, error) { - ctx = context.WithValue( - ctx, - datadog.ContextAPIKeys, - map[string]datadog.APIKey{ - "apiKeyAuth": { - Key: s.metadata.apiKey, - }, - "appKeyAuth": { - Key: s.metadata.appKey, - }, - }, - ) - - ctx = context.WithValue(ctx, - datadog.ContextServerVariables, - map[string]string{ - "site": s.metadata.datadogSite, - }) - - resp, _, err := s.apiClient.MetricsApi.QueryMetrics(ctx, time.Now().Unix()-int64(s.metadata.age), time.Now().Unix(), s.metadata.query) + num, err := s.getQueryResult(ctx) if err != nil { return false, err } - series := resp.GetSeries() - - if len(series) == 0 { - return false, nil - } - - points := series[0].GetPointlist() - - if len(points) == 0 { - return false, nil - } - - return true, nil + return num > float64(s.metadata.queryValue), nil } // getQueryResult returns result of the scaler query -func (s *datadogScaler) getQueryResult(ctx context.Context) (int, error) { +func (s *datadogScaler) getQueryResult(ctx context.Context) (float64, error) { ctx = context.WithValue( ctx, datadog.ContextAPIKeys, @@ -237,24 +260,48 @@ func (s *datadogScaler) getQueryResult(ctx context.Context) (int, error) { "site": s.metadata.datadogSite, }) - resp, _, err := s.apiClient.MetricsApi.QueryMetrics(ctx, time.Now().Unix()-int64(s.metadata.age), time.Now().Unix(), s.metadata.query) + resp, r, err := s.apiClient.MetricsApi.QueryMetrics(ctx, time.Now().Unix()-int64(s.metadata.age), time.Now().Unix(), s.metadata.query) + + if r.StatusCode == 429 { + rateLimit := r.Header.Get("X-Ratelimit-Limit") + rateLimitReset := r.Header.Get("X-Ratelimit-Reset") + + return -1, fmt.Errorf("your Datadog account reached the %s queries per hour rate limit, next limit reset will happen in %s seconds: %s", rateLimit, rateLimitReset, err) + } + + if r.StatusCode != 200 { + return -1, fmt.Errorf("error when retrieving Datadog metrics: %s", err) + } + if err != nil { return -1, fmt.Errorf("error when retrieving Datadog metrics: %s", err) } series := resp.GetSeries() + if len(series) > 1 { + return 0, fmt.Errorf("query returned more than 1 series; modify the query to return only 1 series") + } + if len(series) == 0 { - return 0, fmt.Errorf("no Datadog metrics returned") + if !s.metadata.useFiller { + return 0, fmt.Errorf("no Datadog metrics returned for the given time window") + } + return s.metadata.fillValue, nil } points := series[0].GetPointlist() if len(points) == 0 || len(points[0]) < 2 { - return 0, fmt.Errorf("no Datadog metrics returned") + if !s.metadata.useFiller { + return 0, fmt.Errorf("no Datadog metrics returned for the given time window") + } + return s.metadata.fillValue, nil } - return int(*points[0][1]), nil + // Return the last point from the series + index := len(points) - 1 + return *points[index][1], nil } // GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler @@ -301,7 +348,7 @@ func (s *datadogScaler) GetMetrics(ctx context.Context, metricName string, metri metric := external_metrics.ExternalMetricValue{ MetricName: s.metadata.metricName, - Value: *resource.NewQuantity(int64(num), resource.DecimalSI), + Value: *resource.NewMilliQuantity(int64(num*1000), resource.DecimalSI), Timestamp: metav1.Now(), } diff --git a/pkg/scalers/datadog_scaler_test.go b/pkg/scalers/datadog_scaler_test.go index ddc8fc062bd..9ecd84f7578 100644 --- a/pkg/scalers/datadog_scaler_test.go +++ b/pkg/scalers/datadog_scaler_test.go @@ -5,6 +5,13 @@ import ( "testing" ) +type datadogQueries struct { + input string + age int + output string + isError bool +} + type datadogMetricIdentifier struct { metadataTestData *datadogAuthMetadataTestData scalerIndex int @@ -17,11 +24,48 @@ type datadogAuthMetadataTestData struct { isError bool } +var testParseQueries = []datadogQueries{ + {"", 0, "", true}, + // All properly formed + {"avg:system.cpu.user{*}.rollup(sum, 30)", 120, "avg:system.cpu.user{*}.rollup(sum, 30)", false}, + {"sum:system.cpu.user{*}.rollup(30)", 30, "sum:system.cpu.user{*}.rollup(30)", false}, + {"avg:system.cpu.user{automatic-restart:false,bosh_address:192.168.101.12}.rollup(avg, 30)", 120, "avg:system.cpu.user{automatic-restart:false,bosh_address:192.168.101.12}.rollup(avg, 30)", false}, + + // Default aggregator + {"system.cpu.user{*}.rollup(sum, 30)", 120, "avg:system.cpu.user{*}.rollup(sum, 30)", false}, + + // Default rollup + {"min:system.cpu.user{*}", 120, "min:system.cpu.user{*}.rollup(avg, 24)", false}, + + // Missing filter + {"min:system.cpu.user", 120, "", true}, + + // Malformed rollup + {"min:system.cpu.user{*}.rollup(avg)", 120, "", true}, +} + +func TestDatadogScalerParseQueries(t *testing.T) { + for _, testData := range testParseQueries { + output, err := parseAndTransformDatadogQuery(testData.input, testData.age) + + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + + if output != testData.output { + t.Errorf("Expected %s, got %s", testData.output, output) + } + } +} + var testDatadogMetadata = []datadogAuthMetadataTestData{ {map[string]string{}, map[string]string{}, true}, // all properly formed - {map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "7", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, false}, + {map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "7", "metricUnavailableValue": "1.5", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, false}, // default age {map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "7", "type": "average"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, false}, // default type @@ -32,6 +76,10 @@ var testDatadogMetadata = []datadogAuthMetadataTestData{ {map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, true}, // wrong query value type {map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "notanint", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, true}, + // malformed query + {map[string]string{"query": "sum:trace.redis.command.hits", "queryValue": "7", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, true}, + // wrong unavailableMetricValue type + {map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "7", "metricUnavailableValue": "notafloat", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, true}, // success api/app keys {map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "7"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, false}, diff --git a/tests/scalers/datadog.test.ts b/tests/scalers/datadog.test.ts index fea614cb238..e3b1745a281 100644 --- a/tests/scalers/datadog.test.ts +++ b/tests/scalers/datadog.test.ts @@ -130,7 +130,7 @@ test.serial(`NGINX deployment should scale to 3 (the max) when getting too many // keda based deployment should start scaling up with http requests issued let replicaCount = '1' for (let i = 0; i < 60 && replicaCount !== '3'; i++) { - t.log(`Waited ${5 * i} seconds for nginx deployment to scale up`) + t.log(`Waited ${15 * i} seconds for nginx deployment to scale up`) replicaCount = sh.exec( `kubectl get deployment nginx --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` @@ -151,11 +151,12 @@ test.serial(`NGINX deployment should scale to 3 (the max) when getting too many ) for (let i = 0; i < 50 && replicaCount !== '1'; i++) { + t.log(`Waited ${15 * i} seconds for nginx deployment to scale down`) replicaCount = sh.exec( `kubectl get deployment nginx --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` ).stdout if (replicaCount !== '1') { - sh.exec('sleep 5') + sh.exec('sleep 15') } } @@ -164,6 +165,7 @@ test.serial(`NGINX deployment should scale to 3 (the max) when getting too many }) test.after.always.cb('clean up datadog resources', t => { + sh.exec(`kubectl delete scaledobject -n ${testNamespace} --all`) sh.exec(`helm repo rm datadog`) sh.exec(`kubectl delete namespace ${datadogNamespace} --force`) sh.exec(`kubectl delete namespace ${testNamespace} --force`) @@ -282,7 +284,6 @@ spec: name: nginx minReplicaCount: 1 maxReplicaCount: 3 - pollingInterval: 5 cooldownPeriod: 10 advanced: horizontalPodAutoscalerConfig: @@ -295,7 +296,7 @@ spec: query: "avg:nginx.net.request_per_s{cluster_name:keda-datadog-cluster}" queryValue: "2" type: "global" - age: "60" + age: "120" authenticationRef: name: keda-trigger-auth-datadog-secret `