Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor canary package #378

Merged
merged 1 commit into from
Nov 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions cmd/flagger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,6 @@ import (
"time"

"github.com/Masterminds/semver"
clientset "github.com/weaveworks/flagger/pkg/client/clientset/versioned"
informers "github.com/weaveworks/flagger/pkg/client/informers/externalversions"
"github.com/weaveworks/flagger/pkg/controller"
"github.com/weaveworks/flagger/pkg/logger"
"github.com/weaveworks/flagger/pkg/metrics"
"github.com/weaveworks/flagger/pkg/notifier"
"github.com/weaveworks/flagger/pkg/router"
"github.com/weaveworks/flagger/pkg/server"
"github.com/weaveworks/flagger/pkg/signals"
"github.com/weaveworks/flagger/pkg/version"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/kubernetes"
Expand All @@ -30,6 +20,18 @@ import (
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/transport"
_ "k8s.io/code-generator/cmd/client-gen/generators"

"github.com/weaveworks/flagger/pkg/canary"
clientset "github.com/weaveworks/flagger/pkg/client/clientset/versioned"
informers "github.com/weaveworks/flagger/pkg/client/informers/externalversions"
"github.com/weaveworks/flagger/pkg/controller"
"github.com/weaveworks/flagger/pkg/logger"
"github.com/weaveworks/flagger/pkg/metrics"
"github.com/weaveworks/flagger/pkg/notifier"
"github.com/weaveworks/flagger/pkg/router"
"github.com/weaveworks/flagger/pkg/server"
"github.com/weaveworks/flagger/pkg/signals"
"github.com/weaveworks/flagger/pkg/version"
)

var (
Expand Down Expand Up @@ -178,6 +180,12 @@ func main() {
go server.ListenAndServe(port, 3*time.Second, logger, stopCh)

routerFactory := router.NewFactory(cfg, kubeClient, flaggerClient, ingressAnnotationsPrefix, logger, meshClient)
configTracker := canary.ConfigTracker{
Logger: logger,
KubeClient: kubeClient,
FlaggerClient: flaggerClient,
}
canaryFactory := canary.NewFactory(kubeClient, flaggerClient, configTracker, labels, logger)

c := controller.NewController(
kubeClient,
Expand All @@ -187,11 +195,11 @@ func main() {
controlLoopInterval,
logger,
notifierClient,
canaryFactory,
routerFactory,
observerFactory,
meshProvider,
version.VERSION,
labels,
)

flaggerInformerFactory.Start(stopCh)
Expand Down
19 changes: 19 additions & 0 deletions pkg/canary/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package canary

import "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha3"

type Controller interface {
IsPrimaryReady(canary *v1alpha3.Canary) (bool, error)
IsCanaryReady(canary *v1alpha3.Canary) (bool, error)
SyncStatus(canary *v1alpha3.Canary, status v1alpha3.CanaryStatus) error
SetStatusFailedChecks(canary *v1alpha3.Canary, val int) error
SetStatusWeight(canary *v1alpha3.Canary, val int) error
SetStatusIterations(canary *v1alpha3.Canary, val int) error
SetStatusPhase(canary *v1alpha3.Canary, phase v1alpha3.CanaryPhase) error
Initialize(canary *v1alpha3.Canary, skipLivenessChecks bool) (label string, ports map[string]int32, err error)
Promote(canary *v1alpha3.Canary) error
HasTargetChanged(canary *v1alpha3.Canary) (bool, error)
HaveDependenciesChanged(canary *v1alpha3.Canary) (bool, error)
Scale(canary *v1alpha3.Canary, replicas int32) error
ScaleFromZero(canary *v1alpha3.Canary) error
}
93 changes: 49 additions & 44 deletions pkg/canary/deployer.go → pkg/canary/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ import (
clientset "github.com/weaveworks/flagger/pkg/client/clientset/versioned"
)

// Deployer is managing the operations for Kubernetes deployment kind
type Deployer struct {
KubeClient kubernetes.Interface
FlaggerClient clientset.Interface
Logger *zap.SugaredLogger
ConfigTracker ConfigTracker
Labels []string
// DeploymentController is managing the operations for Kubernetes Deployment kind
type DeploymentController struct {
kubeClient kubernetes.Interface
flaggerClient clientset.Interface
logger *zap.SugaredLogger
configTracker ConfigTracker
labels []string
}

// Initialize creates the primary deployment, hpa,
// scales to zero the canary deployment and returns the pod selector label and container ports
func (c *Deployer) Initialize(cd *flaggerv1.Canary, skipLivenessChecks bool) (label string, ports map[string]int32, err error) {
func (c *DeploymentController) Initialize(cd *flaggerv1.Canary, skipLivenessChecks bool) (label string, ports map[string]int32, err error) {
primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name)
label, ports, err = c.createPrimaryDeployment(cd)
if err != nil {
Expand All @@ -47,7 +47,7 @@ func (c *Deployer) Initialize(cd *flaggerv1.Canary, skipLivenessChecks bool) (la
}
}

c.Logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("Scaling down %s.%s", cd.Spec.TargetRef.Name, cd.Namespace)
c.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("Scaling down %s.%s", cd.Spec.TargetRef.Name, cd.Namespace)
if err := c.Scale(cd, 0); err != nil {
return "", ports, err
}
Expand All @@ -62,11 +62,11 @@ func (c *Deployer) Initialize(cd *flaggerv1.Canary, skipLivenessChecks bool) (la
}

// Promote copies the pod spec, secrets and config maps from canary to primary
func (c *Deployer) Promote(cd *flaggerv1.Canary) error {
func (c *DeploymentController) Promote(cd *flaggerv1.Canary) error {
targetName := cd.Spec.TargetRef.Name
primaryName := fmt.Sprintf("%s-primary", targetName)

canary, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
canary, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return fmt.Errorf("deployment %s.%s not found", targetName, cd.Namespace)
Expand All @@ -80,7 +80,7 @@ func (c *Deployer) Promote(cd *flaggerv1.Canary) error {
targetName, cd.Namespace, targetName)
}

primary, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(primaryName, metav1.GetOptions{})
primary, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(primaryName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return fmt.Errorf("deployment %s.%s not found", primaryName, cd.Namespace)
Expand All @@ -89,11 +89,11 @@ func (c *Deployer) Promote(cd *flaggerv1.Canary) error {
}

// promote secrets and config maps
configRefs, err := c.ConfigTracker.GetTargetConfigs(cd)
configRefs, err := c.configTracker.GetTargetConfigs(cd)
if err != nil {
return err
}
if err := c.ConfigTracker.CreatePrimaryConfigs(cd, configRefs); err != nil {
if err := c.configTracker.CreatePrimaryConfigs(cd, configRefs); err != nil {
return err
}

Expand All @@ -104,7 +104,7 @@ func (c *Deployer) Promote(cd *flaggerv1.Canary) error {
primaryCopy.Spec.Strategy = canary.Spec.Strategy

// update spec with primary secrets and config maps
primaryCopy.Spec.Template.Spec = c.ConfigTracker.ApplyPrimaryConfigs(canary.Spec.Template.Spec, configRefs)
primaryCopy.Spec.Template.Spec = c.configTracker.ApplyPrimaryConfigs(canary.Spec.Template.Spec, configRefs)

// update pod annotations to ensure a rolling update
annotations, err := c.makeAnnotations(canary.Spec.Template.Annotations)
Expand All @@ -116,7 +116,7 @@ func (c *Deployer) Promote(cd *flaggerv1.Canary) error {
primaryCopy.Spec.Template.Labels = makePrimaryLabels(canary.Spec.Template.Labels, primaryName, label)

// apply update
_, err = c.KubeClient.AppsV1().Deployments(cd.Namespace).Update(primaryCopy)
_, err = c.kubeClient.AppsV1().Deployments(cd.Namespace).Update(primaryCopy)
if err != nil {
return fmt.Errorf("updating deployment %s.%s template spec failed: %v",
primaryCopy.GetName(), primaryCopy.Namespace, err)
Expand All @@ -132,10 +132,10 @@ func (c *Deployer) Promote(cd *flaggerv1.Canary) error {
return nil
}

// HasDeploymentChanged returns true if the canary deployment pod spec has changed
func (c *Deployer) HasDeploymentChanged(cd *flaggerv1.Canary) (bool, error) {
// HasTargetChanged returns true if the canary deployment pod spec has changed
func (c *DeploymentController) HasTargetChanged(cd *flaggerv1.Canary) (bool, error) {
targetName := cd.Spec.TargetRef.Name
canary, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
canary, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return false, fmt.Errorf("deployment %s.%s not found", targetName, cd.Namespace)
Expand Down Expand Up @@ -164,10 +164,15 @@ func (c *Deployer) HasDeploymentChanged(cd *flaggerv1.Canary) (bool, error) {
return false, nil
}

// HaveDependenciesChanged returns true if the canary configmaps or secrets have changed
func (c *DeploymentController) HaveDependenciesChanged(cd *flaggerv1.Canary) (bool, error) {
return c.configTracker.HasConfigChanged(cd)
}

// Scale sets the canary deployment replicas
func (c *Deployer) Scale(cd *flaggerv1.Canary, replicas int32) error {
func (c *DeploymentController) Scale(cd *flaggerv1.Canary, replicas int32) error {
targetName := cd.Spec.TargetRef.Name
dep, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
dep, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return fmt.Errorf("deployment %s.%s not found", targetName, cd.Namespace)
Expand All @@ -178,16 +183,16 @@ func (c *Deployer) Scale(cd *flaggerv1.Canary, replicas int32) error {
depCopy := dep.DeepCopy()
depCopy.Spec.Replicas = int32p(replicas)

_, err = c.KubeClient.AppsV1().Deployments(dep.Namespace).Update(depCopy)
_, err = c.kubeClient.AppsV1().Deployments(dep.Namespace).Update(depCopy)
if err != nil {
return fmt.Errorf("scaling %s.%s to %v failed: %v", depCopy.GetName(), depCopy.Namespace, replicas, err)
}
return nil
}

func (c *Deployer) ScaleUp(cd *flaggerv1.Canary) error {
func (c *DeploymentController) ScaleFromZero(cd *flaggerv1.Canary) error {
targetName := cd.Spec.TargetRef.Name
dep, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
dep, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return fmt.Errorf("deployment %s.%s not found", targetName, cd.Namespace)
Expand All @@ -202,18 +207,18 @@ func (c *Deployer) ScaleUp(cd *flaggerv1.Canary) error {
depCopy := dep.DeepCopy()
depCopy.Spec.Replicas = replicas

_, err = c.KubeClient.AppsV1().Deployments(dep.Namespace).Update(depCopy)
_, err = c.kubeClient.AppsV1().Deployments(dep.Namespace).Update(depCopy)
if err != nil {
return fmt.Errorf("scaling %s.%s to %v failed: %v", depCopy.GetName(), depCopy.Namespace, replicas, err)
}
return nil
}

func (c *Deployer) createPrimaryDeployment(cd *flaggerv1.Canary) (string, map[string]int32, error) {
func (c *DeploymentController) createPrimaryDeployment(cd *flaggerv1.Canary) (string, map[string]int32, error) {
targetName := cd.Spec.TargetRef.Name
primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name)

canaryDep, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
canaryDep, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return "", nil, fmt.Errorf("deployment %s.%s not found, retrying", targetName, cd.Namespace)
Expand All @@ -236,14 +241,14 @@ func (c *Deployer) createPrimaryDeployment(cd *flaggerv1.Canary) (string, map[st
ports = p
}

primaryDep, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(primaryName, metav1.GetOptions{})
primaryDep, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(primaryName, metav1.GetOptions{})
if errors.IsNotFound(err) {
// create primary secrets and config maps
configRefs, err := c.ConfigTracker.GetTargetConfigs(cd)
configRefs, err := c.configTracker.GetTargetConfigs(cd)
if err != nil {
return "", nil, err
}
if err := c.ConfigTracker.CreatePrimaryConfigs(cd, configRefs); err != nil {
if err := c.configTracker.CreatePrimaryConfigs(cd, configRefs); err != nil {
return "", nil, err
}
annotations, err := c.makeAnnotations(canaryDep.Spec.Template.Annotations)
Expand Down Expand Up @@ -289,25 +294,25 @@ func (c *Deployer) createPrimaryDeployment(cd *flaggerv1.Canary) (string, map[st
Annotations: annotations,
},
// update spec with the primary secrets and config maps
Spec: c.ConfigTracker.ApplyPrimaryConfigs(canaryDep.Spec.Template.Spec, configRefs),
Spec: c.configTracker.ApplyPrimaryConfigs(canaryDep.Spec.Template.Spec, configRefs),
},
},
}

_, err = c.KubeClient.AppsV1().Deployments(cd.Namespace).Create(primaryDep)
_, err = c.kubeClient.AppsV1().Deployments(cd.Namespace).Create(primaryDep)
if err != nil {
return "", nil, err
}

c.Logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("Deployment %s.%s created", primaryDep.GetName(), cd.Namespace)
c.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("Deployment %s.%s created", primaryDep.GetName(), cd.Namespace)
}

return label, ports, nil
}

func (c *Deployer) reconcilePrimaryHpa(cd *flaggerv1.Canary, init bool) error {
func (c *DeploymentController) reconcilePrimaryHpa(cd *flaggerv1.Canary, init bool) error {
primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name)
hpa, err := c.KubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Get(cd.Spec.AutoscalerRef.Name, metav1.GetOptions{})
hpa, err := c.kubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Get(cd.Spec.AutoscalerRef.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return fmt.Errorf("HorizontalPodAutoscaler %s.%s not found, retrying",
Expand All @@ -328,7 +333,7 @@ func (c *Deployer) reconcilePrimaryHpa(cd *flaggerv1.Canary, init bool) error {
}

primaryHpaName := fmt.Sprintf("%s-primary", cd.Spec.AutoscalerRef.Name)
primaryHpa, err := c.KubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Get(primaryHpaName, metav1.GetOptions{})
primaryHpa, err := c.kubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Get(primaryHpaName, metav1.GetOptions{})

// create HPA
if errors.IsNotFound(err) {
Expand All @@ -348,11 +353,11 @@ func (c *Deployer) reconcilePrimaryHpa(cd *flaggerv1.Canary, init bool) error {
Spec: hpaSpec,
}

_, err = c.KubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Create(primaryHpa)
_, err = c.kubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Create(primaryHpa)
if err != nil {
return err
}
c.Logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("HorizontalPodAutoscaler %s.%s created", primaryHpa.GetName(), cd.Namespace)
c.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("HorizontalPodAutoscaler %s.%s created", primaryHpa.GetName(), cd.Namespace)
return nil
}

Expand All @@ -370,19 +375,19 @@ func (c *Deployer) reconcilePrimaryHpa(cd *flaggerv1.Canary, init bool) error {
hpaClone.Spec.MinReplicas = hpaSpec.MinReplicas
hpaClone.Spec.Metrics = hpaSpec.Metrics

_, upErr := c.KubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Update(hpaClone)
_, upErr := c.kubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Update(hpaClone)
if upErr != nil {
return upErr
}
c.Logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("HorizontalPodAutoscaler %s.%s updated", primaryHpa.GetName(), cd.Namespace)
c.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("HorizontalPodAutoscaler %s.%s updated", primaryHpa.GetName(), cd.Namespace)
}
}

return nil
}

// makeAnnotations appends an unique ID to annotations map
func (c *Deployer) makeAnnotations(annotations map[string]string) (map[string]string, error) {
func (c *DeploymentController) makeAnnotations(annotations map[string]string) (map[string]string, error) {
idKey := "flagger-id"
res := make(map[string]string)
uuid := make([]byte, 16)
Expand All @@ -405,8 +410,8 @@ func (c *Deployer) makeAnnotations(annotations map[string]string) (map[string]st
}

// getSelectorLabel returns the selector match label
func (c *Deployer) getSelectorLabel(deployment *appsv1.Deployment) (string, error) {
for _, l := range c.Labels {
func (c *DeploymentController) getSelectorLabel(deployment *appsv1.Deployment) (string, error) {
for _, l := range c.labels {
if _, ok := deployment.Spec.Selector.MatchLabels[l]; ok {
return l, nil
}
Expand All @@ -421,7 +426,7 @@ var sidecars = map[string]bool{
}

// getPorts returns a list of all container ports
func (c *Deployer) getPorts(cd *flaggerv1.Canary, deployment *appsv1.Deployment) (map[string]int32, error) {
func (c *DeploymentController) getPorts(cd *flaggerv1.Canary, deployment *appsv1.Deployment) (map[string]int32, error) {
ports := make(map[string]int32)

for _, container := range deployment.Spec.Template.Spec.Containers {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestCanaryDeployer_IsNewSpec(t *testing.T) {
t.Fatal(err.Error())
}

isNew, err := mocks.deployer.HasDeploymentChanged(mocks.canary)
isNew, err := mocks.deployer.HasTargetChanged(mocks.canary)
if err != nil {
t.Fatal(err.Error())
}
Expand Down
Loading