From a15782413064bd3b728c0d4ec2d7bac5d2668174 Mon Sep 17 00:00:00 2001 From: mathetake Date: Thu, 27 Feb 2020 17:34:55 +0900 Subject: [PATCH] metrics/provider: add datadog metrics provider implement metrics provider interface for datadog, tested against the actual datadog account resolves #284 --- artifacts/flagger/crd.yaml | 1 + charts/flagger/crds/crd.yaml | 1 + kustomize/base/flagger/crd.yaml | 1 + pkg/controller/scheduler.go | 2 +- pkg/metrics/providers/datadog.go | 168 ++++++++++++++++++++++++++ pkg/metrics/providers/datadog_test.go | 156 ++++++++++++++++++++++++ pkg/metrics/providers/factory.go | 16 ++- 7 files changed, 340 insertions(+), 5 deletions(-) create mode 100644 pkg/metrics/providers/datadog.go create mode 100644 pkg/metrics/providers/datadog_test.go diff --git a/artifacts/flagger/crd.yaml b/artifacts/flagger/crd.yaml index 786dfb7e2..17fcc3f5f 100644 --- a/artifacts/flagger/crd.yaml +++ b/artifacts/flagger/crd.yaml @@ -738,6 +738,7 @@ spec: enum: - prometheus - influxdb + - datadog address: description: API address of this provider type: string diff --git a/charts/flagger/crds/crd.yaml b/charts/flagger/crds/crd.yaml index 786dfb7e2..17fcc3f5f 100644 --- a/charts/flagger/crds/crd.yaml +++ b/charts/flagger/crds/crd.yaml @@ -738,6 +738,7 @@ spec: enum: - prometheus - influxdb + - datadog address: description: API address of this provider type: string diff --git a/kustomize/base/flagger/crd.yaml b/kustomize/base/flagger/crd.yaml index 786dfb7e2..17fcc3f5f 100644 --- a/kustomize/base/flagger/crd.yaml +++ b/kustomize/base/flagger/crd.yaml @@ -738,6 +738,7 @@ spec: enum: - prometheus - influxdb + - datadog address: description: API address of this provider type: string diff --git a/pkg/controller/scheduler.go b/pkg/controller/scheduler.go index bfbef5853..f930f0e1f 100644 --- a/pkg/controller/scheduler.go +++ b/pkg/controller/scheduler.go @@ -939,7 +939,7 @@ func (c *Controller) runMetricChecks(canary *flaggerv1.Canary) bool { } factory := providers.Factory{} - provider, err := factory.Provider(template.Spec.Provider, credentials) + provider, err := factory.Provider(metric.Interval, template.Spec.Provider, credentials) if err != nil { c.recordEventErrorf(canary, "Metric template %s.%s provider %s error: %v", metric.TemplateRef.Name, namespace, template.Spec.Provider.Type, err) diff --git a/pkg/metrics/providers/datadog.go b/pkg/metrics/providers/datadog.go new file mode 100644 index 000000000..a27763fdb --- /dev/null +++ b/pkg/metrics/providers/datadog.go @@ -0,0 +1,168 @@ +package providers + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strconv" + "time" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1" +) + +// https://docs.datadoghq.com/api/ +const ( + datadogDefaultHost = "https://api.datadoghq.com" + + datadogMetricsQueryPath = "/api/v1/query" + datadogAPIKeyValidationPath = "/api/v1/validate" + + datadogAPIKeySecretKey = "datadog_api_key" + datadogAPIKeyHeaderKey = "DD-API-KEY" + + datadogApplicationKeySecretKey = "datadog_application_key" + datadogApplicationKeyHeaderKey = "DD-APPLICATION-KEY" + + datadogFromDeltaMultiplierOnMetricInterval = 10 +) + +// DatadogProvider executes datadog queries +type DatadogProvider struct { + metricsQueryEndpoint string + apiKeyValidationEndpoint string + + timeout time.Duration + apiKey string + applicationKey string + fromDelta int64 +} + +type datadogResponse struct { + Series []struct { + Pointlist [][]float64 `json:"pointlist"` + } +} + +// NewDatadogProvider takes a canary spec, a provider spec and the credentials map, and +// returns a Datadog client ready to execute queries against the API +func NewDatadogProvider(metricInterval string, + provider flaggerv1.MetricTemplateProvider, + credentials map[string][]byte) (*DatadogProvider, error) { + + address := provider.Address + if address == "" { + address = datadogDefaultHost + } + + dd := DatadogProvider{ + timeout: 5 * time.Second, + metricsQueryEndpoint: address + datadogMetricsQueryPath, + apiKeyValidationEndpoint: address + datadogAPIKeyValidationPath, + } + + if b, ok := credentials[datadogAPIKeySecretKey]; ok { + dd.apiKey = string(b) + } else { + return nil, fmt.Errorf("datadog credentials does not contain datadog_api_key") + } + + if b, ok := credentials[datadogApplicationKeySecretKey]; ok { + dd.applicationKey = string(b) + } else { + return nil, fmt.Errorf("datadog credentials does not contain datadog_application_key") + } + + md, err := time.ParseDuration(metricInterval) + if err != nil { + return nil, fmt.Errorf("error parsing metric interval: %s", err.Error()) + } + + dd.fromDelta = int64(datadogFromDeltaMultiplierOnMetricInterval * md.Seconds()) + return &dd, nil +} + +// RunQuery executes the datadog query against DatadogProvider.metricsQueryEndpoint +// and returns the the first result as float64 +func (p *DatadogProvider) RunQuery(query string) (float64, error) { + + req, err := http.NewRequest("GET", p.metricsQueryEndpoint, nil) + if err != nil { + return 0, fmt.Errorf("error http.NewRequest: %s", err.Error()) + } + + req.Header.Set(datadogAPIKeyHeaderKey, p.apiKey) + req.Header.Set(datadogApplicationKeyHeaderKey, p.applicationKey) + now := time.Now().Unix() + q := req.URL.Query() + q.Add("query", query) + q.Add("from", strconv.FormatInt(now-p.fromDelta, 10)) + q.Add("to", strconv.FormatInt(now, 10)) + req.URL.RawQuery = q.Encode() + + ctx, cancel := context.WithTimeout(req.Context(), p.timeout) + defer cancel() + r, err := http.DefaultClient.Do(req.WithContext(ctx)) + if err != nil { + return 0, err + } + + defer r.Body.Close() + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return 0, fmt.Errorf("error reading body: %s", err.Error()) + } + + if r.StatusCode != http.StatusOK { + return 0, fmt.Errorf("error response: %s", string(b)) + } + + var res datadogResponse + if err := json.Unmarshal(b, &res); err != nil { + return 0, fmt.Errorf("error unmarshaling result: %s, '%s'", err.Error(), string(b)) + } + + if len(res.Series) < 1 { + return 0, fmt.Errorf("no values found in response: %s", string(b)) + } + + s := res.Series[0] + vs := s.Pointlist[len(s.Pointlist)-1] + if len(vs) < 1 { + return 0, fmt.Errorf("no values found in response: %s", string(b)) + } + + return vs[1], nil +} + +// IsOnline calls the Datadog's validation endpoint with api keys +// and returns an error if the validation fails +func (p *DatadogProvider) IsOnline() (bool, error) { + req, err := http.NewRequest("GET", p.apiKeyValidationEndpoint, nil) + if err != nil { + return false, fmt.Errorf("error http.NewRequest: %s", err.Error()) + } + + req.Header.Add(datadogAPIKeyHeaderKey, p.apiKey) + req.Header.Add(datadogApplicationKeyHeaderKey, p.applicationKey) + + ctx, cancel := context.WithTimeout(req.Context(), p.timeout) + defer cancel() + r, err := http.DefaultClient.Do(req.WithContext(ctx)) + if err != nil { + return false, err + } + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return false, fmt.Errorf("error reading body: %s", err.Error()) + } + + if r.StatusCode != http.StatusOK { + return false, fmt.Errorf("error response: %s", string(b)) + } + + return true, nil +} diff --git a/pkg/metrics/providers/datadog_test.go b/pkg/metrics/providers/datadog_test.go new file mode 100644 index 000000000..c87a225a4 --- /dev/null +++ b/pkg/metrics/providers/datadog_test.go @@ -0,0 +1,156 @@ +package providers + +import ( + "fmt" + "net/http" + "net/http/httptest" + "strconv" + "testing" + "time" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1" +) + +func TestNewDatadogProvider(t *testing.T) { + appKey := "app-key" + apiKey := "api-key" + cs := map[string][]byte{ + datadogApplicationKeySecretKey: []byte(appKey), + datadogAPIKeySecretKey: []byte(apiKey), + } + + mi := "100s" + md, err := time.ParseDuration(mi) + if err != nil { + t.Fatal(err) + } + + dp, err := NewDatadogProvider("100s", flaggerv1.MetricTemplateProvider{}, cs) + + if err != nil { + t.Fatal(err) + } + + if exp := "https://api.datadoghq.com/api/v1/validate"; dp.apiKeyValidationEndpoint != exp { + t.Fatalf("apiKeyValidationEndpoint expected %s but got %s", exp, dp.apiKeyValidationEndpoint) + } + + if exp := "https://api.datadoghq.com/api/v1/query"; dp.metricsQueryEndpoint != exp { + t.Fatalf("metricsQueryEndpoint expected %s but got %s", exp, dp.metricsQueryEndpoint) + } + + if exp := int64(md.Seconds() * datadogFromDeltaMultiplierOnMetricInterval); dp.fromDelta != exp { + t.Fatalf("fromDelta expected %d but got %d", exp, dp.fromDelta) + } + + if dp.applicationKey != appKey { + t.Fatalf("application key expected %s but got %s", appKey, dp.applicationKey) + } + + if dp.apiKey != apiKey { + t.Fatalf("api key expected %s but got %s", apiKey, dp.apiKey) + } +} + +func TestDatadogProvider_RunQuery(t *testing.T) { + eq := `avg:system.cpu.user\{*}by{host}` + appKey := "app-key" + apiKey := "api-key" + expected := 1.11111 + + now := time.Now().Unix() + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + aq := r.URL.Query().Get("query") + if aq != eq { + t.Errorf("\nquery expected %s bug got %s", eq, aq) + } + + if vs := r.Header.Get(datadogApplicationKeyHeaderKey); vs != appKey { + t.Errorf("\n%s header expected %s but got %s", datadogApplicationKeyHeaderKey, appKey, vs) + } + if vs := r.Header.Get(datadogAPIKeyHeaderKey); vs != apiKey { + t.Errorf("\n%s header expected %s but got %s", datadogAPIKeyHeaderKey, apiKey, vs) + } + + rf := r.URL.Query().Get("from") + if from, err := strconv.ParseInt(rf, 10, 64); err == nil && from >= now { + t.Errorf("\nfrom %d should be less than %d", from, now) + } else if err != nil { + t.Errorf("\nfailed to parse from: %v", err) + } + + rt := r.URL.Query().Get("to") + if to, err := strconv.ParseInt(rt, 10, 64); err == nil && to < now { + t.Errorf("\nto %d should be greater than or equals %d", to, now) + } else if err != nil { + t.Errorf("\nfailed to parse to: %v", err) + } + + json := fmt.Sprintf(`{"series": [{"pointlist": [[1577232000000,29325.102158814265],[1577318400000,56294.46758591842],[1577404800000,%f]]}]}`, expected) + w.Write([]byte(json)) + })) + defer ts.Close() + + dp, err := NewDatadogProvider("1m", + flaggerv1.MetricTemplateProvider{Address: ts.URL}, + map[string][]byte{ + datadogApplicationKeySecretKey: []byte(appKey), + datadogAPIKeySecretKey: []byte(apiKey), + }, + ) + if err != nil { + t.Fatal(err) + } + + f, err := dp.RunQuery(eq) + if err != nil { + t.Fatal(err) + } + + if f != expected { + t.Fatalf("metric value expected %f but got %f", expected, f) + } +} + +func TestDatadogProvider_IsOnline(t *testing.T) { + for _, c := range []struct { + code int + errExpected bool + }{ + {code: http.StatusOK, errExpected: false}, + {code: http.StatusUnauthorized, errExpected: true}, + } { + t.Run(fmt.Sprintf("%d", c.code), func(t *testing.T) { + appKey := "app-key" + apiKey := "api-key" + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if vs := r.Header.Get(datadogApplicationKeyHeaderKey); vs != appKey { + t.Errorf("\n%s header expected %s but got %s", datadogApplicationKeyHeaderKey, appKey, vs) + } + if vs := r.Header.Get(datadogAPIKeyHeaderKey); vs != apiKey { + t.Errorf("\n%s header expected %s but got %s", datadogAPIKeyHeaderKey, apiKey, vs) + } + w.WriteHeader(c.code) + })) + defer ts.Close() + + dp, err := NewDatadogProvider("1m", + flaggerv1.MetricTemplateProvider{Address: ts.URL}, + map[string][]byte{ + datadogApplicationKeySecretKey: []byte(appKey), + datadogAPIKeySecretKey: []byte(apiKey), + }, + ) + if err != nil { + t.Fatal(err) + } + + _, err = dp.IsOnline() + if c.errExpected && err == nil { + t.Fatal("error expected but got no error") + } else if !c.errExpected && err != nil { + t.Fatalf("no error expected but got %v", err) + } + }) + } +} diff --git a/pkg/metrics/providers/factory.go b/pkg/metrics/providers/factory.go index a9d5057d6..926dd0321 100644 --- a/pkg/metrics/providers/factory.go +++ b/pkg/metrics/providers/factory.go @@ -1,14 +1,22 @@ package providers -import flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1" +import ( + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1" +) -type Factory struct { -} +type Factory struct{} + +func (factory Factory) Provider( + metricInterval string, + provider flaggerv1.MetricTemplateProvider, + credentials map[string][]byte, +) (Interface, error) { -func (factory Factory) Provider(provider flaggerv1.MetricTemplateProvider, credentials map[string][]byte) (Interface, error) { switch { case provider.Type == "prometheus": return NewPrometheusProvider(provider, credentials) + case provider.Type == "datadog": + return NewDatadogProvider(metricInterval, provider, credentials) default: return NewPrometheusProvider(provider, credentials) }