diff --git a/pkg/controller/deployer.go b/pkg/controller/deployer.go index a5d65a3ec..3eed041a0 100644 --- a/pkg/controller/deployer.go +++ b/pkg/controller/deployer.go @@ -82,15 +82,11 @@ func (c *CanaryDeployer) IsPrimaryReady(cd *flaggerv1.Canary) (bool, error) { retriable, err := c.isDeploymentReady(primary, cd.GetProgressDeadlineSeconds()) if err != nil { - if retriable { - return retriable, fmt.Errorf("Halt %s.%s advancement %s", cd.Name, cd.Namespace, err.Error()) - } else { - return retriable, err - } + return retriable, fmt.Errorf("Halt advancement %s.%s %s", primaryName, cd.Namespace, err.Error()) } if primary.Spec.Replicas == int32p(0) { - return true, fmt.Errorf("halt %s.%s advancement primary deployment is scaled to zero", + return true, fmt.Errorf("Halt %s.%s advancement primary deployment is scaled to zero", cd.Name, cd.Namespace) } return true, nil @@ -112,7 +108,7 @@ func (c *CanaryDeployer) IsCanaryReady(cd *flaggerv1.Canary) (bool, error) { retriable, err := c.isDeploymentReady(canary, cd.GetProgressDeadlineSeconds()) if err != nil { if retriable { - return retriable, fmt.Errorf("Halt %s.%s advancement %s", cd.Name, cd.Namespace, err.Error()) + return retriable, fmt.Errorf("Halt advancement %s.%s %s", targetName, cd.Namespace, err.Error()) } else { return retriable, fmt.Errorf("deployment does not have minimum availability for more than %vs", cd.GetProgressDeadlineSeconds()) diff --git a/pkg/controller/job.go b/pkg/controller/job.go index f44a9abc8..ead2415e4 100644 --- a/pkg/controller/job.go +++ b/pkg/controller/job.go @@ -6,7 +6,8 @@ import "time" type CanaryJob struct { Name string Namespace string - function func(name string, namespace string) + SkipTests bool + function func(name string, namespace string, skipTests bool) done chan bool ticker *time.Ticker } @@ -15,11 +16,11 @@ type CanaryJob struct { func (j CanaryJob) Start() { go func() { // run the infra bootstrap on job creation - j.function(j.Name, j.Namespace) + j.function(j.Name, j.Namespace, j.SkipTests) for { select { case <-j.ticker.C: - j.function(j.Name, j.Namespace) + j.function(j.Name, j.Namespace, j.SkipTests) case <-j.done: return } diff --git a/pkg/controller/scheduler.go b/pkg/controller/scheduler.go index f6975df3b..0fd9f1c42 100644 --- a/pkg/controller/scheduler.go +++ b/pkg/controller/scheduler.go @@ -58,8 +58,7 @@ func (c *Controller) scheduleCanaries() { for canaryName, targetName := range current { for name, target := range current { if name != canaryName && target == targetName { - c.logger.Errorf("Bad things will happen! Found more than one canary with the same target %s", - targetName) + c.logger.With("canary", canaryName).Errorf("Bad things will happen! Found more than one canary with the same target %s", targetName) } } } @@ -70,7 +69,7 @@ func (c *Controller) scheduleCanaries() { } } -func (c *Controller) advanceCanary(name string, namespace string) { +func (c *Controller) advanceCanary(name string, namespace string, skipLivenessChecks bool) { begin := time.Now() // check if the canary exists cd, err := c.flaggerClient.FlaggerV1alpha3().Canaries(namespace).Get(name, v1.GetOptions{}) @@ -105,9 +104,11 @@ func (c *Controller) advanceCanary(name string, namespace string) { } // check primary deployment status - if _, err := c.deployer.IsPrimaryReady(cd); err != nil { - c.recordEventWarningf(cd, "%v", err) - return + if !skipLivenessChecks { + if _, err := c.deployer.IsPrimaryReady(cd); err != nil { + c.recordEventWarningf(cd, "%v", err) + return + } } // check if virtual service exists @@ -121,7 +122,33 @@ func (c *Controller) advanceCanary(name string, namespace string) { c.recorder.SetWeight(cd, primaryRoute.Weight, canaryRoute.Weight) // check if canary analysis should start (canary revision has changes) or continue - if ok := c.checkCanaryStatus(cd, c.deployer); !ok { + if ok := c.checkCanaryStatus(cd); !ok { + return + } + + // check if canary revision changed during analysis + if restart := c.hasCanaryRevisionChanged(cd); restart { + c.recordEventInfof(cd, "New revision detected! Restarting analysis for %s.%s", + cd.Spec.TargetRef.Name, cd.Namespace) + + // route all traffic back to primary + primaryRoute.Weight = 100 + canaryRoute.Weight = 0 + if err := c.router.SetRoutes(cd, primaryRoute, canaryRoute); err != nil { + c.recordEventWarningf(cd, "%v", err) + return + } + + // reset status + status := flaggerv1.CanaryStatus{ + Phase: flaggerv1.CanaryProgressing, + CanaryWeight: 0, + FailedChecks: 0, + } + if err := c.deployer.SyncStatus(cd, status); err != nil { + c.recordEventWarningf(cd, "%v", err) + return + } return } @@ -130,10 +157,13 @@ func (c *Controller) advanceCanary(name string, namespace string) { }() // check canary deployment status - retriable, err := c.deployer.IsCanaryReady(cd) - if err != nil && retriable { - c.recordEventWarningf(cd, "%v", err) - return + var retriable = true + if !skipLivenessChecks { + retriable, err = c.deployer.IsCanaryReady(cd) + if err != nil && retriable { + c.recordEventWarningf(cd, "%v", err) + return + } } // check if the number of failed checks reached the threshold @@ -185,7 +215,7 @@ func (c *Controller) advanceCanary(name string, namespace string) { // check if the canary success rate is above the threshold // skip check if no traffic is routed to canary if canaryRoute.Weight == 0 { - c.recordEventInfof(cd, "Starting canary deployment for %s.%s", cd.Name, cd.Namespace) + c.recordEventInfof(cd, "Starting canary analysis for %s.%s", cd.Spec.TargetRef.Name, cd.Namespace) } else { if ok := c.analyseCanary(cd); !ok { if err := c.deployer.SetStatusFailedChecks(cd, cd.Status.FailedChecks+1); err != nil { @@ -260,14 +290,14 @@ func (c *Controller) advanceCanary(name string, namespace string) { } } -func (c *Controller) checkCanaryStatus(cd *flaggerv1.Canary, deployer CanaryDeployer) bool { +func (c *Controller) checkCanaryStatus(cd *flaggerv1.Canary) bool { c.recorder.SetStatus(cd) if cd.Status.Phase == flaggerv1.CanaryProgressing { return true } if cd.Status.Phase == "" { - if err := deployer.SyncStatus(cd, flaggerv1.CanaryStatus{Phase: flaggerv1.CanaryInitialized}); err != nil { + if err := c.deployer.SyncStatus(cd, flaggerv1.CanaryStatus{Phase: flaggerv1.CanaryInitialized}); err != nil { c.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Errorf("%v", err) return false } @@ -278,15 +308,15 @@ func (c *Controller) checkCanaryStatus(cd *flaggerv1.Canary, deployer CanaryDepl return false } - if diff, err := deployer.IsNewSpec(cd); diff { + if diff, err := c.deployer.IsNewSpec(cd); diff { c.recordEventInfof(cd, "New revision detected! Scaling up %s.%s", cd.Spec.TargetRef.Name, cd.Namespace) c.sendNotification(cd, "New revision detected, starting canary analysis.", true, false) - if err = deployer.Scale(cd, 1); err != nil { + if err = c.deployer.Scale(cd, 1); err != nil { c.recordEventErrorf(cd, "%v", err) return false } - if err := deployer.SyncStatus(cd, flaggerv1.CanaryStatus{Phase: flaggerv1.CanaryProgressing}); err != nil { + if err := c.deployer.SyncStatus(cd, flaggerv1.CanaryStatus{Phase: flaggerv1.CanaryProgressing}); err != nil { c.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Errorf("%v", err) return false } @@ -296,6 +326,15 @@ func (c *Controller) checkCanaryStatus(cd *flaggerv1.Canary, deployer CanaryDepl return false } +func (c *Controller) hasCanaryRevisionChanged(cd *flaggerv1.Canary) bool { + if cd.Status.Phase == flaggerv1.CanaryProgressing { + if diff, _ := c.deployer.IsNewSpec(cd); diff { + return true + } + } + return false +} + func (c *Controller) analyseCanary(r *flaggerv1.Canary) bool { // run metrics checks for _, metric := range r.Spec.CanaryAnalysis.Metrics { diff --git a/pkg/controller/scheduler_test.go b/pkg/controller/scheduler_test.go index b796cf5fb..3c03fd749 100644 --- a/pkg/controller/scheduler_test.go +++ b/pkg/controller/scheduler_test.go @@ -1,12 +1,16 @@ package controller import ( + "go.uber.org/zap" + "k8s.io/client-go/kubernetes" "sync" "testing" "time" + istioclientset "github.com/knative/pkg/client/clientset/versioned" fakeIstio "github.com/knative/pkg/client/clientset/versioned/fake" "github.com/stefanprodan/flagger/pkg/apis/flagger/v1alpha3" + clientset "github.com/stefanprodan/flagger/pkg/client/clientset/versioned" fakeFlagger "github.com/stefanprodan/flagger/pkg/client/clientset/versioned/fake" informers "github.com/stefanprodan/flagger/pkg/client/informers/externalversions" "github.com/stefanprodan/flagger/pkg/logging" @@ -21,6 +25,39 @@ var ( noResyncPeriodFunc = func() time.Duration { return 0 } ) +func newTestController( + kubeClient kubernetes.Interface, + istioClient istioclientset.Interface, + flaggerClient clientset.Interface, + logger *zap.SugaredLogger, + deployer CanaryDeployer, + router CanaryRouter, + observer CanaryObserver, +) *Controller { + flaggerInformerFactory := informers.NewSharedInformerFactory(flaggerClient, noResyncPeriodFunc()) + flaggerInformer := flaggerInformerFactory.Flagger().V1alpha3().Canaries() + + ctrl := &Controller{ + kubeClient: kubeClient, + istioClient: istioClient, + flaggerClient: flaggerClient, + flaggerLister: flaggerInformer.Lister(), + flaggerSynced: flaggerInformer.Informer().HasSynced, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName), + eventRecorder: &record.FakeRecorder{}, + logger: logger, + canaries: new(sync.Map), + flaggerWindow: time.Second, + deployer: deployer, + router: router, + observer: observer, + recorder: NewCanaryRecorder(false), + } + ctrl.flaggerSynced = alwaysReady + + return ctrl +} + func TestScheduler_Init(t *testing.T) { canary := newTestCanary() dep := newTestDeployment() @@ -45,29 +82,9 @@ func TestScheduler_Init(t *testing.T) { observer := CanaryObserver{ metricsServer: "fake", } + ctrl := newTestController(kubeClient, istioClient, flaggerClient, logger, deployer, router, observer) - flaggerInformerFactory := informers.NewSharedInformerFactory(flaggerClient, noResyncPeriodFunc()) - flaggerInformer := flaggerInformerFactory.Flagger().V1alpha3().Canaries() - - ctrl := &Controller{ - kubeClient: kubeClient, - istioClient: istioClient, - flaggerClient: flaggerClient, - flaggerLister: flaggerInformer.Lister(), - flaggerSynced: flaggerInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName), - eventRecorder: &record.FakeRecorder{}, - logger: logger, - canaries: new(sync.Map), - flaggerWindow: time.Second, - deployer: deployer, - router: router, - observer: observer, - recorder: NewCanaryRecorder(false), - } - ctrl.flaggerSynced = alwaysReady - - ctrl.advanceCanary("podinfo", "default") + ctrl.advanceCanary("podinfo", "default", false) _, err := kubeClient.AppsV1().Deployments("default").Get("podinfo-primary", metav1.GetOptions{}) if err != nil { @@ -99,30 +116,10 @@ func TestScheduler_NewRevision(t *testing.T) { observer := CanaryObserver{ metricsServer: "fake", } - - flaggerInformerFactory := informers.NewSharedInformerFactory(flaggerClient, noResyncPeriodFunc()) - flaggerInformer := flaggerInformerFactory.Flagger().V1alpha3().Canaries() - - ctrl := &Controller{ - kubeClient: kubeClient, - istioClient: istioClient, - flaggerClient: flaggerClient, - flaggerLister: flaggerInformer.Lister(), - flaggerSynced: flaggerInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName), - eventRecorder: &record.FakeRecorder{}, - logger: logger, - canaries: new(sync.Map), - flaggerWindow: time.Second, - deployer: deployer, - router: router, - observer: observer, - recorder: NewCanaryRecorder(false), - } - ctrl.flaggerSynced = alwaysReady + ctrl := newTestController(kubeClient, istioClient, flaggerClient, logger, deployer, router, observer) // init - ctrl.advanceCanary("podinfo", "default") + ctrl.advanceCanary("podinfo", "default", false) // update dep2 := newTestDeploymentUpdated() @@ -132,7 +129,7 @@ func TestScheduler_NewRevision(t *testing.T) { } // detect changes - ctrl.advanceCanary("podinfo", "default") + ctrl.advanceCanary("podinfo", "default", false) c, err := kubeClient.AppsV1().Deployments("default").Get("podinfo", metav1.GetOptions{}) if err != nil { @@ -168,46 +165,195 @@ func TestScheduler_Rollback(t *testing.T) { observer := CanaryObserver{ metricsServer: "fake", } + ctrl := newTestController(kubeClient, istioClient, flaggerClient, logger, deployer, router, observer) - flaggerInformerFactory := informers.NewSharedInformerFactory(flaggerClient, noResyncPeriodFunc()) - flaggerInformer := flaggerInformerFactory.Flagger().V1alpha3().Canaries() + // init + ctrl.advanceCanary("podinfo", "default", true) - ctrl := &Controller{ + // update failed checks to max + err := deployer.SyncStatus(canary, v1alpha3.CanaryStatus{Phase: v1alpha3.CanaryProgressing, FailedChecks: 11}) + if err != nil { + t.Fatal(err.Error()) + } + + // detect changes + ctrl.advanceCanary("podinfo", "default", true) + + c, err := flaggerClient.FlaggerV1alpha3().Canaries("default").Get("podinfo", metav1.GetOptions{}) + if err != nil { + t.Fatal(err.Error()) + } + + if c.Status.Phase != v1alpha3.CanaryFailed { + t.Errorf("Got canary state %v wanted %v", c.Status.Phase, v1alpha3.CanaryFailed) + } +} + +func TestScheduler_NewRevisionReset(t *testing.T) { + canary := newTestCanary() + dep := newTestDeployment() + hpa := newTestHPA() + + flaggerClient := fakeFlagger.NewSimpleClientset(canary) + kubeClient := fake.NewSimpleClientset(dep, hpa) + istioClient := fakeIstio.NewSimpleClientset() + + logger, _ := logging.NewLogger("debug") + deployer := CanaryDeployer{ + flaggerClient: flaggerClient, + kubeClient: kubeClient, + logger: logger, + } + router := CanaryRouter{ + flaggerClient: flaggerClient, kubeClient: kubeClient, istioClient: istioClient, + logger: logger, + } + observer := CanaryObserver{ + metricsServer: "fake", + } + ctrl := newTestController(kubeClient, istioClient, flaggerClient, logger, deployer, router, observer) + + // init + ctrl.advanceCanary("podinfo", "default", false) + + // first update + dep2 := newTestDeploymentUpdated() + _, err := kubeClient.AppsV1().Deployments("default").Update(dep2) + if err != nil { + t.Fatal(err.Error()) + } + + // detect changes + ctrl.advanceCanary("podinfo", "default", true) + // advance + ctrl.advanceCanary("podinfo", "default", true) + + primaryRoute, canaryRoute, err := router.GetRoutes(canary) + if err != nil { + t.Fatal(err.Error()) + } + + if primaryRoute.Weight != 90 { + t.Errorf("Got primary route %v wanted %v", primaryRoute.Weight, 90) + } + + if canaryRoute.Weight != 10 { + t.Errorf("Got canary route %v wanted %v", canaryRoute.Weight, 10) + } + + // second update + dep2.Spec.Template.Spec.ServiceAccountName = "test" + _, err = kubeClient.AppsV1().Deployments("default").Update(dep2) + if err != nil { + t.Fatal(err.Error()) + } + + // detect changes + ctrl.advanceCanary("podinfo", "default", true) + + primaryRoute, canaryRoute, err = router.GetRoutes(canary) + if err != nil { + t.Fatal(err.Error()) + } + + if primaryRoute.Weight != 100 { + t.Errorf("Got primary route %v wanted %v", primaryRoute.Weight, 100) + } + + if canaryRoute.Weight != 0 { + t.Errorf("Got canary route %v wanted %v", canaryRoute.Weight, 0) + } +} + +func TestScheduler_Promotion(t *testing.T) { + canary := newTestCanary() + dep := newTestDeployment() + hpa := newTestHPA() + + flaggerClient := fakeFlagger.NewSimpleClientset(canary) + kubeClient := fake.NewSimpleClientset(dep, hpa) + istioClient := fakeIstio.NewSimpleClientset() + + logger, _ := logging.NewLogger("debug") + deployer := CanaryDeployer{ flaggerClient: flaggerClient, - flaggerLister: flaggerInformer.Lister(), - flaggerSynced: flaggerInformer.Informer().HasSynced, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName), - eventRecorder: &record.FakeRecorder{}, + kubeClient: kubeClient, logger: logger, - canaries: new(sync.Map), - flaggerWindow: time.Second, - deployer: deployer, - router: router, - observer: observer, - recorder: NewCanaryRecorder(false), } - ctrl.flaggerSynced = alwaysReady + router := CanaryRouter{ + flaggerClient: flaggerClient, + kubeClient: kubeClient, + istioClient: istioClient, + logger: logger, + } + observer := CanaryObserver{ + metricsServer: "fake", + } + ctrl := newTestController(kubeClient, istioClient, flaggerClient, logger, deployer, router, observer) // init - ctrl.advanceCanary("podinfo", "default") + ctrl.advanceCanary("podinfo", "default", false) - // update failed checks to max - err := deployer.SyncStatus(canary, v1alpha3.CanaryStatus{Phase: v1alpha3.CanaryProgressing, FailedChecks: 11}) + // update + dep2 := newTestDeploymentUpdated() + _, err := kubeClient.AppsV1().Deployments("default").Update(dep2) if err != nil { t.Fatal(err.Error()) } // detect changes - ctrl.advanceCanary("podinfo", "default") + ctrl.advanceCanary("podinfo", "default", true) + + primaryRoute, canaryRoute, err := router.GetRoutes(canary) + if err != nil { + t.Fatal(err.Error()) + } + + primaryRoute.Weight = 60 + canaryRoute.Weight = 40 + err = ctrl.router.SetRoutes(canary, primaryRoute, canaryRoute) + if err != nil { + t.Fatal(err.Error()) + } + + // advance + ctrl.advanceCanary("podinfo", "default", true) + + // promote + ctrl.advanceCanary("podinfo", "default", true) + + primaryRoute, canaryRoute, err = router.GetRoutes(canary) + if err != nil { + t.Fatal(err.Error()) + } + + if primaryRoute.Weight != 100 { + t.Errorf("Got primary route %v wanted %v", primaryRoute.Weight, 100) + } + + if canaryRoute.Weight != 0 { + t.Errorf("Got canary route %v wanted %v", canaryRoute.Weight, 0) + } + + primaryDep, err := kubeClient.AppsV1().Deployments("default").Get("podinfo-primary", metav1.GetOptions{}) + if err != nil { + t.Fatal(err.Error()) + } + + primaryImage := primaryDep.Spec.Template.Spec.Containers[0].Image + canaryImage := dep2.Spec.Template.Spec.Containers[0].Image + if primaryImage != canaryImage { + t.Errorf("Got primary image %v wanted %v", primaryImage, canaryImage) + } c, err := flaggerClient.FlaggerV1alpha3().Canaries("default").Get("podinfo", metav1.GetOptions{}) if err != nil { t.Fatal(err.Error()) } - if c.Status.Phase != v1alpha3.CanaryFailed { - t.Errorf("Got canary state %v wanted %v", c.Status.Phase, v1alpha3.CanaryFailed) + if c.Status.Phase != v1alpha3.CanarySucceeded { + t.Errorf("Got canary state %v wanted %v", c.Status.Phase, v1alpha3.CanarySucceeded) } }