Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rely on Datadog API to validate the query #2771

Merged
merged 3 commits into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
- **Azure Queue:** Don't call Azure queue GetProperties API unnecessarily ([#2613](https://github.com/kedacore/keda/pull/2613))
- **Datadog Scaler:** Validate query to contain `{` to prevent panic on invalid query ([#2625](https://github.com/kedacore/keda/issues/2625))
- **Datadog Scaler:** Several improvements, including a new optional parameter `metricUnavailableValue` to fill data when no Datadog metric was returned ([#2657](https://github.com/kedacore/keda/issues/2657))
- **Datadog Scaler:** Rely on Datadog API to validate the query ([2761](https://github.com/kedacore/keda/issues/2761))
- **GCP Pubsub Scaler** Adding e2e test for GCP PubSub scaler ([#1528](https://github.com/kedacore/keda/issues/1528))
- **Kafka Scaler** Make "disable" a valid value for tls auth parameter ([#2608](https://github.com/kedacore/keda/issues/2608))
- **Metric API Scaler:** Improve error handling on not-ok response ([#2317](https://github.com/kedacore/keda/issues/2317))
Expand Down
53 changes: 18 additions & 35 deletions pkg/scalers/datadog_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,10 @@ type datadogMetadata struct {

var datadogLog = logf.Log.WithName("datadog_scaler")

var aggregator, filter, rollup *regexp.Regexp
var filter *regexp.Regexp

func init() {
aggregator = regexp.MustCompile(`^(avg|sum|min|max):.*`)
filter = regexp.MustCompile(`.*\{.*\}.*`)
rollup = regexp.MustCompile(`.*\.rollup\(([a-z]+,)?\s*(.+)\)`)
}

// NewDatadogScaler creates a new Datadog scaler
Expand All @@ -71,36 +69,14 @@ func NewDatadogScaler(ctx context.Context, config *ScalerConfig) (Scaler, error)
}, nil
}

// parseAndTransformDatadogQuery checks correctness of the user query and adds rollup if not available
func parseAndTransformDatadogQuery(q string, age int) (string, error) {
// Queries should start with a valid aggregator. If not found, prepend avg as default
if !aggregator.MatchString(q) {
q = "avg:" + q
}

// parseDatadogQuery checks correctness of the user query
func parseDatadogQuery(q string) (bool, error) {
// Wellformed Datadog queries require a filter (between curly brackets)
if !filter.MatchString(q) {
return "", fmt.Errorf("malformed Datadog query")
}

// Queries can contain rollup functions.
match := rollup.FindStringSubmatch(q)
if match != nil {
// If added, check that the number of seconds is an int
rollupAgg, err := strconv.Atoi(match[2])
if err != nil {
return "", fmt.Errorf("malformed Datadog query")
}

if rollupAgg > age {
return "", fmt.Errorf("rollup cannot be bigger than time window")
}
} else { // If not added, use a default rollup based on the time window size
s := fmt.Sprintf(".rollup(avg, %d)", age/5)
q += s
return false, fmt.Errorf("malformed Datadog query: missing query scope")
}

return q, nil
return true, nil
}

func parseDatadogMetadata(config *ScalerConfig) (*datadogMetadata, error) {
Expand All @@ -121,12 +97,12 @@ func parseDatadogMetadata(config *ScalerConfig) (*datadogMetadata, error) {
}

if val, ok := config.TriggerMetadata["query"]; ok {
query, err := parseAndTransformDatadogQuery(val, meta.age)
_, err := parseDatadogQuery(val)

if err != nil {
return nil, fmt.Errorf("error in query: %s", err.Error())
}
meta.query = query
meta.query = val
} else {
return nil, fmt.Errorf("no query given")
}
Expand Down Expand Up @@ -262,19 +238,26 @@ func (s *datadogScaler) getQueryResult(ctx context.Context) (float64, error) {

resp, r, err := s.apiClient.MetricsApi.QueryMetrics(ctx, time.Now().Unix()-int64(s.metadata.age), time.Now().Unix(), s.metadata.query)

if err != nil {
return -1, fmt.Errorf("error when retrieving Datadog metrics: %s", err)
}

if r.StatusCode == 429 {
rateLimit := r.Header.Get("X-Ratelimit-Limit")
rateLimitReset := r.Header.Get("X-Ratelimit-Reset")

return -1, fmt.Errorf("your Datadog account reached the %s queries per hour rate limit, next limit reset will happen in %s seconds: %s", rateLimit, rateLimitReset, err)
return -1, fmt.Errorf("your Datadog account reached the %s queries per hour rate limit, next limit reset will happen in %s seconds", rateLimit, rateLimitReset)
}

if r.StatusCode != 200 {
return -1, fmt.Errorf("error when retrieving Datadog metrics: %s", err)
return -1, fmt.Errorf("error when retrieving Datadog metrics")
}

if err != nil {
return -1, fmt.Errorf("error when retrieving Datadog metrics: %s", err)
if resp.GetStatus() == "error" {
if msg, ok := resp.GetErrorOk(); ok {
return -1, fmt.Errorf("error when retrieving Datadog metrics: %s", *msg)
}
return -1, fmt.Errorf("error when retrieving Datadog metrics")
}

series := resp.GetSeries()
Expand Down
29 changes: 11 additions & 18 deletions pkg/scalers/datadog_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import (

type datadogQueries struct {
input string
age int
output string
output bool
isError bool
}

Expand All @@ -25,28 +24,22 @@ type datadogAuthMetadataTestData struct {
}

var testParseQueries = []datadogQueries{
{"", 0, "", true},
{"", false, true},
// All properly formed
{"avg:system.cpu.user{*}.rollup(sum, 30)", 120, "avg:system.cpu.user{*}.rollup(sum, 30)", false},
{"sum:system.cpu.user{*}.rollup(30)", 30, "sum:system.cpu.user{*}.rollup(30)", false},
{"avg:system.cpu.user{automatic-restart:false,bosh_address:192.168.101.12}.rollup(avg, 30)", 120, "avg:system.cpu.user{automatic-restart:false,bosh_address:192.168.101.12}.rollup(avg, 30)", false},

// Default aggregator
{"system.cpu.user{*}.rollup(sum, 30)", 120, "avg:system.cpu.user{*}.rollup(sum, 30)", false},

// Default rollup
{"min:system.cpu.user{*}", 120, "min:system.cpu.user{*}.rollup(avg, 24)", false},
{"avg:system.cpu.user{*}.rollup(sum, 30)", true, false},
{"sum:system.cpu.user{*}.rollup(30)", true, false},
{"avg:system.cpu.user{automatic-restart:false,bosh_address:192.168.101.12}.rollup(avg, 30)", true, false},
{"top(per_second(abs(sum:http.requests{service:myapp,dc:us-west-2}.rollup(max, 2))), 5, 'mean', 'desc')", true, false},
{"system.cpu.user{*}.rollup(sum, 30)", true, false},
{"min:system.cpu.user{*}", true, false},

// Missing filter
{"min:system.cpu.user", 120, "", true},

// Malformed rollup
{"min:system.cpu.user{*}.rollup(avg)", 120, "", true},
{"min:system.cpu.user", false, true},
}

func TestDatadogScalerParseQueries(t *testing.T) {
for _, testData := range testParseQueries {
output, err := parseAndTransformDatadogQuery(testData.input, testData.age)
output, err := parseDatadogQuery(testData.input)

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
Expand All @@ -56,7 +49,7 @@ func TestDatadogScalerParseQueries(t *testing.T) {
}

if output != testData.output {
t.Errorf("Expected %s, got %s", testData.output, output)
t.Errorf("Expected %v, got %v", testData.output, output)
}
}
}
Expand Down