Skip to content

Commit

Permalink
Improve Datadog scaler, including a new optional parameter `metricUna…
Browse files Browse the repository at this point in the history
…vailableValue` to fill data when no Datadog metric was returned (kedacore#2694)

Signed-off-by: Ara Pulido <ara.pulido@datadoghq.com>
Signed-off-by: Ram Cohen <ram.cohen@gmail.com>
  • Loading branch information
arapulido authored and RamCohen committed Mar 23, 2022
1 parent 669a9be commit c5250d4
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 59 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

- **Azure Queue:** Don't call Azure queue GetProperties API unnecessarily ([#2613](https://github.com/kedacore/keda/pull/2613))
- **Datadog Scaler:** Validate query to contain `{` to prevent panic on invalid query ([#2625](https://github.com/kedacore/keda/issues/2625))
- **Datadog Scaler:** Several improvements, including a new optional parameter `metricUnavailableValue` to fill data when no Datadog metric was returned ([#2657](https://github.com/kedacore/keda/issues/2657))
- **GCP Pubsub Scaler** Adding e2e test for GCP PubSub scaler ([#1528](https://github.com/kedacore/keda/issues/1528))
- **Kafka Scaler** Make "disable" a valid value for tls auth parameter ([#2608](https://github.com/kedacore/keda/issues/2608))
- **RabbitMQ Scaler:** Include `vhost` for RabbitMQ when retrieving queue info with `useRegex` ([#2498](https://github.com/kedacore/keda/issues/2498))
Expand Down
155 changes: 101 additions & 54 deletions pkg/scalers/datadog_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scalers
import (
"context"
"fmt"
"regexp"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -39,10 +40,20 @@ type datadogMetadata struct {
vType valueType
metricName string
age int
useFiller bool
fillValue float64
}

var datadogLog = logf.Log.WithName("datadog_scaler")

var aggregator, filter, rollup *regexp.Regexp

func init() {
aggregator = regexp.MustCompile(`^(avg|sum|min|max):.*`)
filter = regexp.MustCompile(`.*\{.*\}.*`)
rollup = regexp.MustCompile(`.*\.rollup\(([a-z]+,)?\s*(.+)\)`)
}

// NewDatadogScaler creates a new Datadog scaler
func NewDatadogScaler(ctx context.Context, config *ScalerConfig) (Scaler, error) {
meta, err := parseDatadogMetadata(config)
Expand All @@ -60,17 +71,65 @@ func NewDatadogScaler(ctx context.Context, config *ScalerConfig) (Scaler, error)
}, nil
}

// parseAndTransformDatadogQuery checks correctness of the user query and adds rollup if not available
func parseAndTransformDatadogQuery(q string, age int) (string, error) {
// Queries should start with a valid aggregator. If not found, prepend avg as default
if !aggregator.MatchString(q) {
q = "avg:" + q
}

// Wellformed Datadog queries require a filter (between curly brackets)
if !filter.MatchString(q) {
return "", fmt.Errorf("malformed Datadog query")
}

// Queries can contain rollup functions.
match := rollup.FindStringSubmatch(q)
if match != nil {
// If added, check that the number of seconds is an int
rollupAgg, err := strconv.Atoi(match[2])
if err != nil {
return "", fmt.Errorf("malformed Datadog query")
}

if rollupAgg > age {
return "", fmt.Errorf("rollup cannot be bigger than time window")
}
} else { // If not added, use a default rollup based on the time window size
s := fmt.Sprintf(".rollup(avg, %d)", age/5)
q += s
}

return q, nil
}

func parseDatadogMetadata(config *ScalerConfig) (*datadogMetadata, error) {
meta := datadogMetadata{}

if val, ok := config.TriggerMetadata["age"]; ok {
age, err := strconv.Atoi(val)
if err != nil {
return nil, fmt.Errorf("age parsing error %s", err.Error())
}
meta.age = age

if age < 60 {
datadogLog.Info("selecting a window smaller than 60 seconds can cause Datadog not finding a metric value for the query")
}
} else {
meta.age = 90 // Default window 90 seconds
}

if val, ok := config.TriggerMetadata["query"]; ok {
meta.query = val
query, err := parseAndTransformDatadogQuery(val, meta.age)

if err != nil {
return nil, fmt.Errorf("error in query: %s", err.Error())
}
meta.query = query
} else {
return nil, fmt.Errorf("no query given")
}
if !strings.Contains(meta.query, "{") {
return nil, fmt.Errorf("expecting query to contain `{`, got %s", meta.query)
}

if val, ok := config.TriggerMetadata["queryValue"]; ok {
queryValue, err := strconv.Atoi(val)
Expand All @@ -82,20 +141,15 @@ func parseDatadogMetadata(config *ScalerConfig) (*datadogMetadata, error) {
return nil, fmt.Errorf("no queryValue given")
}

if val, ok := config.TriggerMetadata["age"]; ok {
age, err := strconv.Atoi(val)
if val, ok := config.TriggerMetadata["metricUnavailableValue"]; ok {
fillValue, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("age parsing error %s", err.Error())
return nil, fmt.Errorf("metricUnavailableValue parsing error %s", err.Error())
}
meta.age = age
} else {
meta.age = 90 // Default window 90 seconds
meta.fillValue = fillValue
meta.useFiller = true
}

// For all the points in a given window, we take the rollup to the window size
rollup := fmt.Sprintf(".rollup(avg, %d)", meta.age)
meta.query += rollup

if val, ok := config.TriggerMetadata["type"]; ok {
val = strings.ToLower(val)
switch val {
Expand Down Expand Up @@ -174,50 +228,19 @@ func (s *datadogScaler) Close(context.Context) error {
return nil
}

// IsActive returns true if we are able to get metrics from Datadog
// IsActive checks whether the value returned by Datadog is higher than the target value
func (s *datadogScaler) IsActive(ctx context.Context) (bool, error) {
ctx = context.WithValue(
ctx,
datadog.ContextAPIKeys,
map[string]datadog.APIKey{
"apiKeyAuth": {
Key: s.metadata.apiKey,
},
"appKeyAuth": {
Key: s.metadata.appKey,
},
},
)

ctx = context.WithValue(ctx,
datadog.ContextServerVariables,
map[string]string{
"site": s.metadata.datadogSite,
})

resp, _, err := s.apiClient.MetricsApi.QueryMetrics(ctx, time.Now().Unix()-int64(s.metadata.age), time.Now().Unix(), s.metadata.query)
num, err := s.getQueryResult(ctx)

if err != nil {
return false, err
}

series := resp.GetSeries()

if len(series) == 0 {
return false, nil
}

points := series[0].GetPointlist()

if len(points) == 0 {
return false, nil
}

return true, nil
return num > float64(s.metadata.queryValue), nil
}

// getQueryResult returns result of the scaler query
func (s *datadogScaler) getQueryResult(ctx context.Context) (int, error) {
func (s *datadogScaler) getQueryResult(ctx context.Context) (float64, error) {
ctx = context.WithValue(
ctx,
datadog.ContextAPIKeys,
Expand All @@ -237,24 +260,48 @@ func (s *datadogScaler) getQueryResult(ctx context.Context) (int, error) {
"site": s.metadata.datadogSite,
})

resp, _, err := s.apiClient.MetricsApi.QueryMetrics(ctx, time.Now().Unix()-int64(s.metadata.age), time.Now().Unix(), s.metadata.query)
resp, r, err := s.apiClient.MetricsApi.QueryMetrics(ctx, time.Now().Unix()-int64(s.metadata.age), time.Now().Unix(), s.metadata.query)

if r.StatusCode == 429 {
rateLimit := r.Header.Get("X-Ratelimit-Limit")
rateLimitReset := r.Header.Get("X-Ratelimit-Reset")

return -1, fmt.Errorf("your Datadog account reached the %s queries per hour rate limit, next limit reset will happen in %s seconds: %s", rateLimit, rateLimitReset, err)
}

if r.StatusCode != 200 {
return -1, fmt.Errorf("error when retrieving Datadog metrics: %s", err)
}

if err != nil {
return -1, fmt.Errorf("error when retrieving Datadog metrics: %s", err)
}

series := resp.GetSeries()

if len(series) > 1 {
return 0, fmt.Errorf("query returned more than 1 series; modify the query to return only 1 series")
}

if len(series) == 0 {
return 0, fmt.Errorf("no Datadog metrics returned")
if !s.metadata.useFiller {
return 0, fmt.Errorf("no Datadog metrics returned for the given time window")
}
return s.metadata.fillValue, nil
}

points := series[0].GetPointlist()

if len(points) == 0 || len(points[0]) < 2 {
return 0, fmt.Errorf("no Datadog metrics returned")
if !s.metadata.useFiller {
return 0, fmt.Errorf("no Datadog metrics returned for the given time window")
}
return s.metadata.fillValue, nil
}

return int(*points[0][1]), nil
// Return the last point from the series
index := len(points) - 1
return *points[index][1], nil
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
Expand Down Expand Up @@ -301,7 +348,7 @@ func (s *datadogScaler) GetMetrics(ctx context.Context, metricName string, metri

metric := external_metrics.ExternalMetricValue{
MetricName: s.metadata.metricName,
Value: *resource.NewQuantity(int64(num), resource.DecimalSI),
Value: *resource.NewMilliQuantity(int64(num*1000), resource.DecimalSI),
Timestamp: metav1.Now(),
}

Expand Down
50 changes: 49 additions & 1 deletion pkg/scalers/datadog_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ import (
"testing"
)

type datadogQueries struct {
input string
age int
output string
isError bool
}

type datadogMetricIdentifier struct {
metadataTestData *datadogAuthMetadataTestData
scalerIndex int
Expand All @@ -17,11 +24,48 @@ type datadogAuthMetadataTestData struct {
isError bool
}

var testParseQueries = []datadogQueries{
{"", 0, "", true},
// All properly formed
{"avg:system.cpu.user{*}.rollup(sum, 30)", 120, "avg:system.cpu.user{*}.rollup(sum, 30)", false},
{"sum:system.cpu.user{*}.rollup(30)", 30, "sum:system.cpu.user{*}.rollup(30)", false},
{"avg:system.cpu.user{automatic-restart:false,bosh_address:192.168.101.12}.rollup(avg, 30)", 120, "avg:system.cpu.user{automatic-restart:false,bosh_address:192.168.101.12}.rollup(avg, 30)", false},

// Default aggregator
{"system.cpu.user{*}.rollup(sum, 30)", 120, "avg:system.cpu.user{*}.rollup(sum, 30)", false},

// Default rollup
{"min:system.cpu.user{*}", 120, "min:system.cpu.user{*}.rollup(avg, 24)", false},

// Missing filter
{"min:system.cpu.user", 120, "", true},

// Malformed rollup
{"min:system.cpu.user{*}.rollup(avg)", 120, "", true},
}

func TestDatadogScalerParseQueries(t *testing.T) {
for _, testData := range testParseQueries {
output, err := parseAndTransformDatadogQuery(testData.input, testData.age)

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}

if output != testData.output {
t.Errorf("Expected %s, got %s", testData.output, output)
}
}
}

var testDatadogMetadata = []datadogAuthMetadataTestData{
{map[string]string{}, map[string]string{}, true},

// all properly formed
{map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "7", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, false},
{map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "7", "metricUnavailableValue": "1.5", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, false},
// default age
{map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "7", "type": "average"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, false},
// default type
Expand All @@ -32,6 +76,10 @@ var testDatadogMetadata = []datadogAuthMetadataTestData{
{map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, true},
// wrong query value type
{map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "notanint", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, true},
// malformed query
{map[string]string{"query": "sum:trace.redis.command.hits", "queryValue": "7", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, true},
// wrong unavailableMetricValue type
{map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "7", "metricUnavailableValue": "notafloat", "type": "average", "age": "60"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, true},

// success api/app keys
{map[string]string{"query": "sum:trace.redis.command.hits{env:none,service:redis}.as_count()", "queryValue": "7"}, map[string]string{"apiKey": "apiKey", "appKey": "appKey", "datadogSite": "datadogSite"}, false},
Expand Down
9 changes: 5 additions & 4 deletions tests/scalers/datadog.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ test.serial(`NGINX deployment should scale to 3 (the max) when getting too many
// keda based deployment should start scaling up with http requests issued
let replicaCount = '1'
for (let i = 0; i < 60 && replicaCount !== '3'; i++) {
t.log(`Waited ${5 * i} seconds for nginx deployment to scale up`)
t.log(`Waited ${15 * i} seconds for nginx deployment to scale up`)

replicaCount = sh.exec(
`kubectl get deployment nginx --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"`
Expand All @@ -151,11 +151,12 @@ test.serial(`NGINX deployment should scale to 3 (the max) when getting too many
)

for (let i = 0; i < 50 && replicaCount !== '1'; i++) {
t.log(`Waited ${15 * i} seconds for nginx deployment to scale down`)
replicaCount = sh.exec(
`kubectl get deployment nginx --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"`
).stdout
if (replicaCount !== '1') {
sh.exec('sleep 5')
sh.exec('sleep 15')
}
}

Expand All @@ -164,6 +165,7 @@ test.serial(`NGINX deployment should scale to 3 (the max) when getting too many
})

test.after.always.cb('clean up datadog resources', t => {
sh.exec(`kubectl delete scaledobject -n ${testNamespace} --all`)
sh.exec(`helm repo rm datadog`)
sh.exec(`kubectl delete namespace ${datadogNamespace} --force`)
sh.exec(`kubectl delete namespace ${testNamespace} --force`)
Expand Down Expand Up @@ -282,7 +284,6 @@ spec:
name: nginx
minReplicaCount: 1
maxReplicaCount: 3
pollingInterval: 5
cooldownPeriod: 10
advanced:
horizontalPodAutoscalerConfig:
Expand All @@ -295,7 +296,7 @@ spec:
query: "avg:nginx.net.request_per_s{cluster_name:keda-datadog-cluster}"
queryValue: "2"
type: "global"
age: "60"
age: "120"
authenticationRef:
name: keda-trigger-auth-datadog-secret
`

0 comments on commit c5250d4

Please sign in to comment.