Skip to content

Commit

Permalink
Merge pull request #592 from weaveworks/check-metrics-server-availabi…
Browse files Browse the repository at this point in the history
…lity

Check metrics server availability during canary initialization
  • Loading branch information
mathetake authored May 16, 2020
2 parents 0b0c49b + e0de9d0 commit 0056b99
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 47 deletions.
11 changes: 7 additions & 4 deletions pkg/controller/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ import (
"github.com/weaveworks/flagger/pkg/router"
)

const (
MetricsProviderServiceSuffix = ":service"
)

// scheduleCanaries synchronises the canary map with the jobs map,
// for new canaries new jobs are created and started
// for the removed canaries the jobs are stopped and deleted
Expand Down Expand Up @@ -119,6 +115,13 @@ func (c *Controller) advanceCanary(name string, namespace string) {
return
}

// check metric servers' availability
if !cd.SkipAnalysis() && (cd.Status.Phase == "" || cd.Status.Phase == flaggerv1.CanaryPhaseInitializing) {
if err := c.checkMetricProviderAvailability(cd); err != nil {
c.recordEventErrorf(cd, "Error checking metric providers: %v", err)
}
}

// init mesh router
meshRouter := c.routerFactory.MeshRouter(provider, labelSelector)

Expand Down
33 changes: 0 additions & 33 deletions pkg/controller/scheduler_common_test.go

This file was deleted.

4 changes: 2 additions & 2 deletions pkg/controller/scheduler_daemonset_fixture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func newDaemonSetFixture(c *flaggerv1.Canary) daemonSetFixture {
rf := router.NewFactory(nil, kubeClient, flaggerClient, "annotationsPrefix", "", logger, flaggerClient)

// init observer
observerFactory, _ := observers.NewFactory("fake")
observerFactory, _ := observers.NewFactory(testMetricsServerURL)

// init canary factory
configTracker := &canary.ConfigTracker{
Expand Down Expand Up @@ -616,7 +616,7 @@ func newDaemonSetTestService() *corev1.Service {
func newDaemonSetTestMetricTemplate() *flaggerv1.MetricTemplate {
provider := flaggerv1.MetricTemplateProvider{
Type: "prometheus",
Address: "fake",
Address: testMetricsServerURL,
SecretRef: &corev1.LocalObjectReference{
Name: "podinfo-secret-env",
},
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/scheduler_deployment_fixture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func newDeploymentFixture(c *flaggerv1.Canary) fixture {
rf := router.NewFactory(nil, kubeClient, flaggerClient, "annotationsPrefix", "", logger, flaggerClient)

// init observer
observerFactory, _ := observers.NewFactory("fake")
observerFactory, _ := observers.NewFactory(testMetricsServerURL)

// init canary factory
configTracker := &canary.ConfigTracker{
Expand Down Expand Up @@ -708,7 +708,7 @@ func newDeploymentTestHPA() *hpav2.HorizontalPodAutoscaler {
func newDeploymentTestMetricTemplate() *flaggerv1.MetricTemplate {
provider := flaggerv1.MetricTemplateProvider{
Type: "prometheus",
Address: "fake",
Address: testMetricsServerURL,
SecretRef: &corev1.LocalObjectReference{
Name: "podinfo-secret-env",
},
Expand Down
2 changes: 0 additions & 2 deletions pkg/controller/scheduler_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,9 @@ func TestScheduler_DeploymentRollback(t *testing.T) {

// run metric checks
mocks.ctrl.advanceCanary("podinfo", "default")
require.NoError(t, err)

// finalise analysis
mocks.ctrl.advanceCanary("podinfo", "default")
require.NoError(t, err)

// check status
c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo", metav1.GetOptions{})
Expand Down
61 changes: 61 additions & 0 deletions pkg/controller/scheduler_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controller
import (
"context"
"errors"
"fmt"
"strings"
"time"

Expand All @@ -13,6 +14,66 @@ import (
"github.com/weaveworks/flagger/pkg/metrics/providers"
)

const (
MetricsProviderServiceSuffix = ":service"
)

// to be called during canary initialization
func (c *Controller) checkMetricProviderAvailability(canary *flaggerv1.Canary) error {
for _, metric := range canary.GetAnalysis().Metrics {
if metric.Name == "request-success-rate" || metric.Name == "request-duration" {
observerFactory := c.observerFactory
if canary.Spec.MetricsServer != "" {
var err error
observerFactory, err = observers.NewFactory(canary.Spec.MetricsServer)
if err != nil {
return fmt.Errorf("error building Prometheus client for %s %v", canary.Spec.MetricsServer, err)
}
}
if ok, err := observerFactory.Client.IsOnline(); !ok || err != nil {
return fmt.Errorf("prometheus not avaiable: %v", err)
}
continue
}

if metric.TemplateRef != nil {
namespace := canary.Namespace
if metric.TemplateRef.Namespace != "" {
namespace = metric.TemplateRef.Namespace
}

template, err := c.flaggerInformers.MetricInformer.Lister().MetricTemplates(namespace).Get(metric.TemplateRef.Name)
if err != nil {
return fmt.Errorf("metric template %s.%s error: %v", metric.TemplateRef.Name, namespace, err)
}

var credentials map[string][]byte
if template.Spec.Provider.SecretRef != nil {
secret, err := c.kubeClient.CoreV1().Secrets(namespace).Get(context.TODO(), template.Spec.Provider.SecretRef.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("metric template %s.%s secret %s error: %v",
metric.TemplateRef.Name, namespace, template.Spec.Provider.SecretRef.Name, err)
}
credentials = secret.Data
}

factory := providers.Factory{}
provider, err := factory.Provider(metric.Interval, template.Spec.Provider, credentials)
if err != nil {
return fmt.Errorf("metric template %s.%s provider %s error: %v",
metric.TemplateRef.Name, namespace, template.Spec.Provider.Type, err)
}

if ok, err := provider.IsOnline(); !ok || err != nil {
return fmt.Errorf("%v in metric tempalte %s.%s not avaiable: %v", template.Spec.Provider.Type,
template.Name, template.Namespace, err)
}
}
}
c.recordEventInfof(canary, "all the metrics providers are available!")
return nil
}

func (c *Controller) runBuiltinMetricChecks(canary *flaggerv1.Canary) bool {
// override the global provider if one is specified in the canary spec
var metricsProvider string
Expand Down
53 changes: 53 additions & 0 deletions pkg/controller/scheduler_metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package controller

import (
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
"k8s.io/client-go/tools/record"

flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1"
"github.com/weaveworks/flagger/pkg/metrics/observers"
)

func TestController_checkMetricProviderAvailability(t *testing.T) {
t.Run("builtin", func(t *testing.T) {
// ok
analysis := &flaggerv1.CanaryAnalysis{Metrics: []flaggerv1.CanaryMetric{{Name: "request-success-rate"}}}
canary := &flaggerv1.Canary{Spec: flaggerv1.CanarySpec{Analysis: analysis}}
obs, err := observers.NewFactory(testMetricsServerURL)
require.NoError(t, err)
ctrl := Controller{observerFactory: obs, logger: zap.S(), eventRecorder: &record.FakeRecorder{}}
require.NoError(t, ctrl.checkMetricProviderAvailability(canary))

// error
ctrl.observerFactory, err = observers.NewFactory("http://non-exist")
require.NoError(t, err)
require.Error(t, ctrl.checkMetricProviderAvailability(canary))

// ok
canary.Spec.MetricsServer = testMetricsServerURL
require.NoError(t, ctrl.checkMetricProviderAvailability(canary))
})

t.Run("templateRef", func(t *testing.T) {
ctrl := newDeploymentFixture(nil).ctrl

// error (not found)
analysis := &flaggerv1.CanaryAnalysis{Metrics: []flaggerv1.CanaryMetric{{
Name: "", TemplateRef: &flaggerv1.CrossNamespaceObjectReference{
Name: "non-exist", Namespace: "default",
},
}}}
canary := &flaggerv1.Canary{Spec: flaggerv1.CanarySpec{Analysis: analysis}}
require.Error(t, ctrl.checkMetricProviderAvailability(canary))

// ok
canary.Spec.Analysis.Metrics[0].TemplateRef = &flaggerv1.CrossNamespaceObjectReference{
Name: "envoy",
Namespace: "default",
}
require.NoError(t, ctrl.checkMetricProviderAvailability(canary))
})
}
54 changes: 54 additions & 0 deletions pkg/controller/scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package controller

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"os"
"testing"

flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1"
clientset "github.com/weaveworks/flagger/pkg/client/clientset/versioned"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var testMetricsServerURL string

func TestMain(m *testing.M) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Query()["query"][0] == "vector(1)" {
// for IsOnline invoked during canary initialization
w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1545905245.458,"1"]}]}}`))
return
}
w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1545905245.458,"100"]}]}}`))
}))

testMetricsServerURL = ts.URL
defer ts.Close()
os.Exit(m.Run())
}

func assertPhase(flaggerClient clientset.Interface, canary string, phase flaggerv1.CanaryPhase) error {
c, err := flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), canary, metav1.GetOptions{})
if err != nil {
return err
}

if c.Status.Phase != phase {
return fmt.Errorf("got canary state %s wanted %s", c.Status.Phase, phase)
}

return nil
}

func alwaysReady() bool {
return true
}

func toFloatPtr(val int) *float64 {
v := float64(val)
return &v
}
4 changes: 0 additions & 4 deletions pkg/metrics/providers/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,6 @@ func NewPrometheusProvider(provider flaggerv1.MetricTemplateProvider, credential

// RunQuery executes the promQL query and returns the the first result as float64
func (p *PrometheusProvider) RunQuery(query string) (float64, error) {
if p.url.String() == "fake" {
return 100, nil
}

query = url.QueryEscape(p.trimQuery(query))
u, err := url.Parse(fmt.Sprintf("./api/v1/query?query=%s", query))
if err != nil {
Expand Down

0 comments on commit 0056b99

Please sign in to comment.