Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check metrics server availability during canary initialization #592

Merged
merged 4 commits into from
May 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions pkg/controller/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ import (
"github.com/weaveworks/flagger/pkg/router"
)

const (
MetricsProviderServiceSuffix = ":service"
)

// scheduleCanaries synchronises the canary map with the jobs map,
// for new canaries new jobs are created and started
// for the removed canaries the jobs are stopped and deleted
Expand Down Expand Up @@ -119,6 +115,13 @@ func (c *Controller) advanceCanary(name string, namespace string) {
return
}

// check metric servers' availability
if !cd.SkipAnalysis() && (cd.Status.Phase == "" || cd.Status.Phase == flaggerv1.CanaryPhaseInitializing) {
if err := c.checkMetricProviderAvailability(cd); err != nil {
c.recordEventErrorf(cd, "Error checking metric providers: %v", err)
}
}

// init mesh router
meshRouter := c.routerFactory.MeshRouter(provider, labelSelector)

Expand Down
33 changes: 0 additions & 33 deletions pkg/controller/scheduler_common_test.go

This file was deleted.

4 changes: 2 additions & 2 deletions pkg/controller/scheduler_daemonset_fixture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func newDaemonSetFixture(c *flaggerv1.Canary) daemonSetFixture {
rf := router.NewFactory(nil, kubeClient, flaggerClient, "annotationsPrefix", "", logger, flaggerClient)

// init observer
observerFactory, _ := observers.NewFactory("fake")
observerFactory, _ := observers.NewFactory(testMetricsServerURL)

// init canary factory
configTracker := &canary.ConfigTracker{
Expand Down Expand Up @@ -616,7 +616,7 @@ func newDaemonSetTestService() *corev1.Service {
func newDaemonSetTestMetricTemplate() *flaggerv1.MetricTemplate {
provider := flaggerv1.MetricTemplateProvider{
Type: "prometheus",
Address: "fake",
Address: testMetricsServerURL,
SecretRef: &corev1.LocalObjectReference{
Name: "podinfo-secret-env",
},
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/scheduler_deployment_fixture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func newDeploymentFixture(c *flaggerv1.Canary) fixture {
rf := router.NewFactory(nil, kubeClient, flaggerClient, "annotationsPrefix", "", logger, flaggerClient)

// init observer
observerFactory, _ := observers.NewFactory("fake")
observerFactory, _ := observers.NewFactory(testMetricsServerURL)

// init canary factory
configTracker := &canary.ConfigTracker{
Expand Down Expand Up @@ -708,7 +708,7 @@ func newDeploymentTestHPA() *hpav2.HorizontalPodAutoscaler {
func newDeploymentTestMetricTemplate() *flaggerv1.MetricTemplate {
provider := flaggerv1.MetricTemplateProvider{
Type: "prometheus",
Address: "fake",
Address: testMetricsServerURL,
SecretRef: &corev1.LocalObjectReference{
Name: "podinfo-secret-env",
},
Expand Down
2 changes: 0 additions & 2 deletions pkg/controller/scheduler_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,9 @@ func TestScheduler_DeploymentRollback(t *testing.T) {

// run metric checks
mocks.ctrl.advanceCanary("podinfo", "default")
require.NoError(t, err)

// finalise analysis
mocks.ctrl.advanceCanary("podinfo", "default")
require.NoError(t, err)

// check status
c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo", metav1.GetOptions{})
Expand Down
61 changes: 61 additions & 0 deletions pkg/controller/scheduler_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controller
import (
"context"
"errors"
"fmt"
"strings"
"time"

Expand All @@ -13,6 +14,66 @@ import (
"github.com/weaveworks/flagger/pkg/metrics/providers"
)

const (
MetricsProviderServiceSuffix = ":service"
)

// to be called during canary initialization
func (c *Controller) checkMetricProviderAvailability(canary *flaggerv1.Canary) error {
for _, metric := range canary.GetAnalysis().Metrics {
if metric.Name == "request-success-rate" || metric.Name == "request-duration" {
observerFactory := c.observerFactory
if canary.Spec.MetricsServer != "" {
var err error
observerFactory, err = observers.NewFactory(canary.Spec.MetricsServer)
if err != nil {
return fmt.Errorf("error building Prometheus client for %s %v", canary.Spec.MetricsServer, err)
}
}
if ok, err := observerFactory.Client.IsOnline(); !ok || err != nil {
return fmt.Errorf("prometheus not avaiable: %v", err)
}
continue
}

if metric.TemplateRef != nil {
namespace := canary.Namespace
if metric.TemplateRef.Namespace != "" {
namespace = metric.TemplateRef.Namespace
}

template, err := c.flaggerInformers.MetricInformer.Lister().MetricTemplates(namespace).Get(metric.TemplateRef.Name)
if err != nil {
return fmt.Errorf("metric template %s.%s error: %v", metric.TemplateRef.Name, namespace, err)
}

var credentials map[string][]byte
if template.Spec.Provider.SecretRef != nil {
secret, err := c.kubeClient.CoreV1().Secrets(namespace).Get(context.TODO(), template.Spec.Provider.SecretRef.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("metric template %s.%s secret %s error: %v",
metric.TemplateRef.Name, namespace, template.Spec.Provider.SecretRef.Name, err)
}
credentials = secret.Data
}

factory := providers.Factory{}
provider, err := factory.Provider(metric.Interval, template.Spec.Provider, credentials)
if err != nil {
return fmt.Errorf("metric template %s.%s provider %s error: %v",
metric.TemplateRef.Name, namespace, template.Spec.Provider.Type, err)
}

if ok, err := provider.IsOnline(); !ok || err != nil {
return fmt.Errorf("%v in metric tempalte %s.%s not avaiable: %v", template.Spec.Provider.Type,
template.Name, template.Namespace, err)
}
}
}
c.recordEventInfof(canary, "all the metrics providers are available!")
return nil
}

func (c *Controller) runBuiltinMetricChecks(canary *flaggerv1.Canary) bool {
// override the global provider if one is specified in the canary spec
var metricsProvider string
Expand Down
53 changes: 53 additions & 0 deletions pkg/controller/scheduler_metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package controller

import (
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
"k8s.io/client-go/tools/record"

flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1"
"github.com/weaveworks/flagger/pkg/metrics/observers"
)

func TestController_checkMetricProviderAvailability(t *testing.T) {
t.Run("builtin", func(t *testing.T) {
// ok
analysis := &flaggerv1.CanaryAnalysis{Metrics: []flaggerv1.CanaryMetric{{Name: "request-success-rate"}}}
canary := &flaggerv1.Canary{Spec: flaggerv1.CanarySpec{Analysis: analysis}}
obs, err := observers.NewFactory(testMetricsServerURL)
require.NoError(t, err)
ctrl := Controller{observerFactory: obs, logger: zap.S(), eventRecorder: &record.FakeRecorder{}}
require.NoError(t, ctrl.checkMetricProviderAvailability(canary))

// error
ctrl.observerFactory, err = observers.NewFactory("http://non-exist")
require.NoError(t, err)
require.Error(t, ctrl.checkMetricProviderAvailability(canary))

// ok
canary.Spec.MetricsServer = testMetricsServerURL
require.NoError(t, ctrl.checkMetricProviderAvailability(canary))
})

t.Run("templateRef", func(t *testing.T) {
ctrl := newDeploymentFixture(nil).ctrl

// error (not found)
analysis := &flaggerv1.CanaryAnalysis{Metrics: []flaggerv1.CanaryMetric{{
Name: "", TemplateRef: &flaggerv1.CrossNamespaceObjectReference{
Name: "non-exist", Namespace: "default",
},
}}}
canary := &flaggerv1.Canary{Spec: flaggerv1.CanarySpec{Analysis: analysis}}
require.Error(t, ctrl.checkMetricProviderAvailability(canary))

// ok
canary.Spec.Analysis.Metrics[0].TemplateRef = &flaggerv1.CrossNamespaceObjectReference{
Name: "envoy",
Namespace: "default",
}
require.NoError(t, ctrl.checkMetricProviderAvailability(canary))
})
}
54 changes: 54 additions & 0 deletions pkg/controller/scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package controller

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"os"
"testing"

flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1"
clientset "github.com/weaveworks/flagger/pkg/client/clientset/versioned"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var testMetricsServerURL string

func TestMain(m *testing.M) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Query()["query"][0] == "vector(1)" {
// for IsOnline invoked during canary initialization
w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1545905245.458,"1"]}]}}`))
return
}
w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1545905245.458,"100"]}]}}`))
}))

testMetricsServerURL = ts.URL
defer ts.Close()
os.Exit(m.Run())
}

func assertPhase(flaggerClient clientset.Interface, canary string, phase flaggerv1.CanaryPhase) error {
c, err := flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), canary, metav1.GetOptions{})
if err != nil {
return err
}

if c.Status.Phase != phase {
return fmt.Errorf("got canary state %s wanted %s", c.Status.Phase, phase)
}

return nil
}

func alwaysReady() bool {
return true
}

func toFloatPtr(val int) *float64 {
v := float64(val)
return &v
}
4 changes: 0 additions & 4 deletions pkg/metrics/providers/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,6 @@ func NewPrometheusProvider(provider flaggerv1.MetricTemplateProvider, credential

// RunQuery executes the promQL query and returns the the first result as float64
func (p *PrometheusProvider) RunQuery(query string) (float64, error) {
if p.url.String() == "fake" {
return 100, nil
}

query = url.QueryEscape(p.trimQuery(query))
u, err := url.Parse(fmt.Sprintf("./api/v1/query?query=%s", query))
if err != nil {
Expand Down