Skip to content

Commit

Permalink
Rely on Datadog API to validate the query (kedacore#2771)
Browse files Browse the repository at this point in the history
Signed-off-by: Ram Cohen <ram.cohen@gmail.com>
  • Loading branch information
arapulido authored and RamCohen committed Mar 23, 2022
1 parent 54d1e58 commit 579b091
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 53 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
- **Azure Queue:** Don't call Azure queue GetProperties API unnecessarily ([#2613](https://github.com/kedacore/keda/pull/2613))
- **Datadog Scaler:** Validate query to contain `{` to prevent panic on invalid query ([#2625](https://github.com/kedacore/keda/issues/2625))
- **Datadog Scaler:** Several improvements, including a new optional parameter `metricUnavailableValue` to fill data when no Datadog metric was returned ([#2657](https://github.com/kedacore/keda/issues/2657))
- **Datadog Scaler:** Rely on Datadog API to validate the query ([2761](https://github.com/kedacore/keda/issues/2761))
- **GCP Pubsub Scaler** Adding e2e test for GCP PubSub scaler ([#1528](https://github.com/kedacore/keda/issues/1528))
- **Kafka Scaler** Make "disable" a valid value for tls auth parameter ([#2608](https://github.com/kedacore/keda/issues/2608))
- **Metric API Scaler:** Improve error handling on not-ok response ([#2317](https://github.com/kedacore/keda/issues/2317))
Expand Down
53 changes: 18 additions & 35 deletions pkg/scalers/datadog_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,10 @@ type datadogMetadata struct {

var datadogLog = logf.Log.WithName("datadog_scaler")

var aggregator, filter, rollup *regexp.Regexp
var filter *regexp.Regexp

func init() {
aggregator = regexp.MustCompile(`^(avg|sum|min|max):.*`)
filter = regexp.MustCompile(`.*\{.*\}.*`)
rollup = regexp.MustCompile(`.*\.rollup\(([a-z]+,)?\s*(.+)\)`)
}

// NewDatadogScaler creates a new Datadog scaler
Expand All @@ -71,36 +69,14 @@ func NewDatadogScaler(ctx context.Context, config *ScalerConfig) (Scaler, error)
}, nil
}

// parseAndTransformDatadogQuery checks correctness of the user query and adds rollup if not available
func parseAndTransformDatadogQuery(q string, age int) (string, error) {
// Queries should start with a valid aggregator. If not found, prepend avg as default
if !aggregator.MatchString(q) {
q = "avg:" + q
}

// parseDatadogQuery checks correctness of the user query
func parseDatadogQuery(q string) (bool, error) {
// Wellformed Datadog queries require a filter (between curly brackets)
if !filter.MatchString(q) {
return "", fmt.Errorf("malformed Datadog query")
}

// Queries can contain rollup functions.
match := rollup.FindStringSubmatch(q)
if match != nil {
// If added, check that the number of seconds is an int
rollupAgg, err := strconv.Atoi(match[2])
if err != nil {
return "", fmt.Errorf("malformed Datadog query")
}

if rollupAgg > age {
return "", fmt.Errorf("rollup cannot be bigger than time window")
}
} else { // If not added, use a default rollup based on the time window size
s := fmt.Sprintf(".rollup(avg, %d)", age/5)
q += s
return false, fmt.Errorf("malformed Datadog query: missing query scope")
}

return q, nil
return true, nil
}

func parseDatadogMetadata(config *ScalerConfig) (*datadogMetadata, error) {
Expand All @@ -121,12 +97,12 @@ func parseDatadogMetadata(config *ScalerConfig) (*datadogMetadata, error) {
}

if val, ok := config.TriggerMetadata["query"]; ok {
query, err := parseAndTransformDatadogQuery(val, meta.age)
_, err := parseDatadogQuery(val)

if err != nil {
return nil, fmt.Errorf("error in query: %s", err.Error())
}
meta.query = query
meta.query = val
} else {
return nil, fmt.Errorf("no query given")
}
Expand Down Expand Up @@ -262,19 +238,26 @@ func (s *datadogScaler) getQueryResult(ctx context.Context) (float64, error) {

resp, r, err := s.apiClient.MetricsApi.QueryMetrics(ctx, time.Now().Unix()-int64(s.metadata.age), time.Now().Unix(), s.metadata.query)

if err != nil {
return -1, fmt.Errorf("error when retrieving Datadog metrics: %s", err)
}

if r.StatusCode == 429 {
rateLimit := r.Header.Get("X-Ratelimit-Limit")
rateLimitReset := r.Header.Get("X-Ratelimit-Reset")

return -1, fmt.Errorf("your Datadog account reached the %s queries per hour rate limit, next limit reset will happen in %s seconds: %s", rateLimit, rateLimitReset, err)
return -1, fmt.Errorf("your Datadog account reached the %s queries per hour rate limit, next limit reset will happen in %s seconds", rateLimit, rateLimitReset)
}

if r.StatusCode != 200 {
return -1, fmt.Errorf("error when retrieving Datadog metrics: %s", err)
return -1, fmt.Errorf("error when retrieving Datadog metrics")
}

if err != nil {
return -1, fmt.Errorf("error when retrieving Datadog metrics: %s", err)
if resp.GetStatus() == "error" {
if msg, ok := resp.GetErrorOk(); ok {
return -1, fmt.Errorf("error when retrieving Datadog metrics: %s", *msg)
}
return -1, fmt.Errorf("error when retrieving Datadog metrics")
}

series := resp.GetSeries()
Expand Down
29 changes: 11 additions & 18 deletions pkg/scalers/datadog_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import (

type datadogQueries struct {
input string
age int
output string
output bool
isError bool
}

Expand All @@ -25,28 +24,22 @@ type datadogAuthMetadataTestData struct {
}

var testParseQueries = []datadogQueries{
{"", 0, "", true},
{"", false, true},
// All properly formed
{"avg:system.cpu.user{*}.rollup(sum, 30)", 120, "avg:system.cpu.user{*}.rollup(sum, 30)", false},
{"sum:system.cpu.user{*}.rollup(30)", 30, "sum:system.cpu.user{*}.rollup(30)", false},
{"avg:system.cpu.user{automatic-restart:false,bosh_address:192.168.101.12}.rollup(avg, 30)", 120, "avg:system.cpu.user{automatic-restart:false,bosh_address:192.168.101.12}.rollup(avg, 30)", false},

// Default aggregator
{"system.cpu.user{*}.rollup(sum, 30)", 120, "avg:system.cpu.user{*}.rollup(sum, 30)", false},

// Default rollup
{"min:system.cpu.user{*}", 120, "min:system.cpu.user{*}.rollup(avg, 24)", false},
{"avg:system.cpu.user{*}.rollup(sum, 30)", true, false},
{"sum:system.cpu.user{*}.rollup(30)", true, false},
{"avg:system.cpu.user{automatic-restart:false,bosh_address:192.168.101.12}.rollup(avg, 30)", true, false},
{"top(per_second(abs(sum:http.requests{service:myapp,dc:us-west-2}.rollup(max, 2))), 5, 'mean', 'desc')", true, false},
{"system.cpu.user{*}.rollup(sum, 30)", true, false},
{"min:system.cpu.user{*}", true, false},

// Missing filter
{"min:system.cpu.user", 120, "", true},

// Malformed rollup
{"min:system.cpu.user{*}.rollup(avg)", 120, "", true},
{"min:system.cpu.user", false, true},
}

func TestDatadogScalerParseQueries(t *testing.T) {
for _, testData := range testParseQueries {
output, err := parseAndTransformDatadogQuery(testData.input, testData.age)
output, err := parseDatadogQuery(testData.input)

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
Expand All @@ -56,7 +49,7 @@ func TestDatadogScalerParseQueries(t *testing.T) {
}

if output != testData.output {
t.Errorf("Expected %s, got %s", testData.output, output)
t.Errorf("Expected %v, got %v", testData.output, output)
}
}
}
Expand Down

0 comments on commit 579b091

Please sign in to comment.