Skip to content

Commit

Permalink
Add canary factory for Kubernetes targets
Browse files Browse the repository at this point in the history
- extract Kubernetes operations to controller interface
- implement controller interface for kind Deployment
  • Loading branch information
stefanprodan committed Nov 25, 2019
1 parent c2cf9bf commit 01f624f
Show file tree
Hide file tree
Showing 12 changed files with 240 additions and 170 deletions.
30 changes: 19 additions & 11 deletions cmd/flagger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,6 @@ import (
"time"

"github.com/Masterminds/semver"
clientset "github.com/weaveworks/flagger/pkg/client/clientset/versioned"
informers "github.com/weaveworks/flagger/pkg/client/informers/externalversions"
"github.com/weaveworks/flagger/pkg/controller"
"github.com/weaveworks/flagger/pkg/logger"
"github.com/weaveworks/flagger/pkg/metrics"
"github.com/weaveworks/flagger/pkg/notifier"
"github.com/weaveworks/flagger/pkg/router"
"github.com/weaveworks/flagger/pkg/server"
"github.com/weaveworks/flagger/pkg/signals"
"github.com/weaveworks/flagger/pkg/version"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/kubernetes"
Expand All @@ -30,6 +20,18 @@ import (
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/transport"
_ "k8s.io/code-generator/cmd/client-gen/generators"

"github.com/weaveworks/flagger/pkg/canary"
clientset "github.com/weaveworks/flagger/pkg/client/clientset/versioned"
informers "github.com/weaveworks/flagger/pkg/client/informers/externalversions"
"github.com/weaveworks/flagger/pkg/controller"
"github.com/weaveworks/flagger/pkg/logger"
"github.com/weaveworks/flagger/pkg/metrics"
"github.com/weaveworks/flagger/pkg/notifier"
"github.com/weaveworks/flagger/pkg/router"
"github.com/weaveworks/flagger/pkg/server"
"github.com/weaveworks/flagger/pkg/signals"
"github.com/weaveworks/flagger/pkg/version"
)

var (
Expand Down Expand Up @@ -178,6 +180,12 @@ func main() {
go server.ListenAndServe(port, 3*time.Second, logger, stopCh)

routerFactory := router.NewFactory(cfg, kubeClient, flaggerClient, ingressAnnotationsPrefix, logger, meshClient)
configTracker := canary.ConfigTracker{
Logger: logger,
KubeClient: kubeClient,
FlaggerClient: flaggerClient,
}
canaryFactory := canary.NewFactory(kubeClient, flaggerClient, configTracker, labels, logger)

c := controller.NewController(
kubeClient,
Expand All @@ -187,11 +195,11 @@ func main() {
controlLoopInterval,
logger,
notifierClient,
canaryFactory,
routerFactory,
observerFactory,
meshProvider,
version.VERSION,
labels,
)

flaggerInformerFactory.Start(stopCh)
Expand Down
19 changes: 19 additions & 0 deletions pkg/canary/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package canary

import "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha3"

type Controller interface {
IsPrimaryReady(cd *v1alpha3.Canary) (bool, error)
IsCanaryReady(cd *v1alpha3.Canary) (bool, error)
SyncStatus(cd *v1alpha3.Canary, status v1alpha3.CanaryStatus) error
SetStatusFailedChecks(cd *v1alpha3.Canary, val int) error
SetStatusWeight(cd *v1alpha3.Canary, val int) error
SetStatusIterations(cd *v1alpha3.Canary, val int) error
SetStatusPhase(cd *v1alpha3.Canary, phase v1alpha3.CanaryPhase) error
Initialize(cd *v1alpha3.Canary, skipLivenessChecks bool) (label string, ports map[string]int32, err error)
Promote(cd *v1alpha3.Canary) error
HasTargetChanged(cd *v1alpha3.Canary) (bool, error)
HaveDependenciesChanged(cd *v1alpha3.Canary) (bool, error)
Scale(cd *v1alpha3.Canary, replicas int32) error
ScaleFromZero(cd *v1alpha3.Canary) error
}
93 changes: 49 additions & 44 deletions pkg/canary/deployer.go → pkg/canary/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ import (
clientset "github.com/weaveworks/flagger/pkg/client/clientset/versioned"
)

// Deployer is managing the operations for Kubernetes deployment kind
type Deployer struct {
KubeClient kubernetes.Interface
FlaggerClient clientset.Interface
Logger *zap.SugaredLogger
ConfigTracker ConfigTracker
Labels []string
// DeploymentController is managing the operations for Kubernetes Deployment kind
type DeploymentController struct {
kubeClient kubernetes.Interface
flaggerClient clientset.Interface
logger *zap.SugaredLogger
configTracker ConfigTracker
labels []string
}

// Initialize creates the primary deployment, hpa,
// scales to zero the canary deployment and returns the pod selector label and container ports
func (c *Deployer) Initialize(cd *flaggerv1.Canary, skipLivenessChecks bool) (label string, ports map[string]int32, err error) {
func (c *DeploymentController) Initialize(cd *flaggerv1.Canary, skipLivenessChecks bool) (label string, ports map[string]int32, err error) {
primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name)
label, ports, err = c.createPrimaryDeployment(cd)
if err != nil {
Expand All @@ -47,7 +47,7 @@ func (c *Deployer) Initialize(cd *flaggerv1.Canary, skipLivenessChecks bool) (la
}
}

c.Logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("Scaling down %s.%s", cd.Spec.TargetRef.Name, cd.Namespace)
c.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("Scaling down %s.%s", cd.Spec.TargetRef.Name, cd.Namespace)
if err := c.Scale(cd, 0); err != nil {
return "", ports, err
}
Expand All @@ -62,11 +62,11 @@ func (c *Deployer) Initialize(cd *flaggerv1.Canary, skipLivenessChecks bool) (la
}

// Promote copies the pod spec, secrets and config maps from canary to primary
func (c *Deployer) Promote(cd *flaggerv1.Canary) error {
func (c *DeploymentController) Promote(cd *flaggerv1.Canary) error {
targetName := cd.Spec.TargetRef.Name
primaryName := fmt.Sprintf("%s-primary", targetName)

canary, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
canary, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return fmt.Errorf("deployment %s.%s not found", targetName, cd.Namespace)
Expand All @@ -80,7 +80,7 @@ func (c *Deployer) Promote(cd *flaggerv1.Canary) error {
targetName, cd.Namespace, targetName)
}

primary, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(primaryName, metav1.GetOptions{})
primary, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(primaryName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return fmt.Errorf("deployment %s.%s not found", primaryName, cd.Namespace)
Expand All @@ -89,11 +89,11 @@ func (c *Deployer) Promote(cd *flaggerv1.Canary) error {
}

// promote secrets and config maps
configRefs, err := c.ConfigTracker.GetTargetConfigs(cd)
configRefs, err := c.configTracker.GetTargetConfigs(cd)
if err != nil {
return err
}
if err := c.ConfigTracker.CreatePrimaryConfigs(cd, configRefs); err != nil {
if err := c.configTracker.CreatePrimaryConfigs(cd, configRefs); err != nil {
return err
}

Expand All @@ -104,7 +104,7 @@ func (c *Deployer) Promote(cd *flaggerv1.Canary) error {
primaryCopy.Spec.Strategy = canary.Spec.Strategy

// update spec with primary secrets and config maps
primaryCopy.Spec.Template.Spec = c.ConfigTracker.ApplyPrimaryConfigs(canary.Spec.Template.Spec, configRefs)
primaryCopy.Spec.Template.Spec = c.configTracker.ApplyPrimaryConfigs(canary.Spec.Template.Spec, configRefs)

// update pod annotations to ensure a rolling update
annotations, err := c.makeAnnotations(canary.Spec.Template.Annotations)
Expand All @@ -116,7 +116,7 @@ func (c *Deployer) Promote(cd *flaggerv1.Canary) error {
primaryCopy.Spec.Template.Labels = makePrimaryLabels(canary.Spec.Template.Labels, primaryName, label)

// apply update
_, err = c.KubeClient.AppsV1().Deployments(cd.Namespace).Update(primaryCopy)
_, err = c.kubeClient.AppsV1().Deployments(cd.Namespace).Update(primaryCopy)
if err != nil {
return fmt.Errorf("updating deployment %s.%s template spec failed: %v",
primaryCopy.GetName(), primaryCopy.Namespace, err)
Expand All @@ -132,10 +132,10 @@ func (c *Deployer) Promote(cd *flaggerv1.Canary) error {
return nil
}

// HasDeploymentChanged returns true if the canary deployment pod spec has changed
func (c *Deployer) HasDeploymentChanged(cd *flaggerv1.Canary) (bool, error) {
// HasTargetChanged returns true if the canary deployment pod spec has changed
func (c *DeploymentController) HasTargetChanged(cd *flaggerv1.Canary) (bool, error) {
targetName := cd.Spec.TargetRef.Name
canary, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
canary, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return false, fmt.Errorf("deployment %s.%s not found", targetName, cd.Namespace)
Expand Down Expand Up @@ -164,10 +164,15 @@ func (c *Deployer) HasDeploymentChanged(cd *flaggerv1.Canary) (bool, error) {
return false, nil
}

// HaveDependenciesChanged returns true if the canary configmaps or secrets have changed
func (c *DeploymentController) HaveDependenciesChanged(cd *flaggerv1.Canary) (bool, error) {
return c.configTracker.HasConfigChanged(cd)
}

// Scale sets the canary deployment replicas
func (c *Deployer) Scale(cd *flaggerv1.Canary, replicas int32) error {
func (c *DeploymentController) Scale(cd *flaggerv1.Canary, replicas int32) error {
targetName := cd.Spec.TargetRef.Name
dep, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
dep, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return fmt.Errorf("deployment %s.%s not found", targetName, cd.Namespace)
Expand All @@ -178,16 +183,16 @@ func (c *Deployer) Scale(cd *flaggerv1.Canary, replicas int32) error {
depCopy := dep.DeepCopy()
depCopy.Spec.Replicas = int32p(replicas)

_, err = c.KubeClient.AppsV1().Deployments(dep.Namespace).Update(depCopy)
_, err = c.kubeClient.AppsV1().Deployments(dep.Namespace).Update(depCopy)
if err != nil {
return fmt.Errorf("scaling %s.%s to %v failed: %v", depCopy.GetName(), depCopy.Namespace, replicas, err)
}
return nil
}

func (c *Deployer) ScaleUp(cd *flaggerv1.Canary) error {
func (c *DeploymentController) ScaleFromZero(cd *flaggerv1.Canary) error {
targetName := cd.Spec.TargetRef.Name
dep, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
dep, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return fmt.Errorf("deployment %s.%s not found", targetName, cd.Namespace)
Expand All @@ -202,18 +207,18 @@ func (c *Deployer) ScaleUp(cd *flaggerv1.Canary) error {
depCopy := dep.DeepCopy()
depCopy.Spec.Replicas = replicas

_, err = c.KubeClient.AppsV1().Deployments(dep.Namespace).Update(depCopy)
_, err = c.kubeClient.AppsV1().Deployments(dep.Namespace).Update(depCopy)
if err != nil {
return fmt.Errorf("scaling %s.%s to %v failed: %v", depCopy.GetName(), depCopy.Namespace, replicas, err)
}
return nil
}

func (c *Deployer) createPrimaryDeployment(cd *flaggerv1.Canary) (string, map[string]int32, error) {
func (c *DeploymentController) createPrimaryDeployment(cd *flaggerv1.Canary) (string, map[string]int32, error) {
targetName := cd.Spec.TargetRef.Name
primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name)

canaryDep, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
canaryDep, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return "", nil, fmt.Errorf("deployment %s.%s not found, retrying", targetName, cd.Namespace)
Expand All @@ -236,14 +241,14 @@ func (c *Deployer) createPrimaryDeployment(cd *flaggerv1.Canary) (string, map[st
ports = p
}

primaryDep, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(primaryName, metav1.GetOptions{})
primaryDep, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(primaryName, metav1.GetOptions{})
if errors.IsNotFound(err) {
// create primary secrets and config maps
configRefs, err := c.ConfigTracker.GetTargetConfigs(cd)
configRefs, err := c.configTracker.GetTargetConfigs(cd)
if err != nil {
return "", nil, err
}
if err := c.ConfigTracker.CreatePrimaryConfigs(cd, configRefs); err != nil {
if err := c.configTracker.CreatePrimaryConfigs(cd, configRefs); err != nil {
return "", nil, err
}
annotations, err := c.makeAnnotations(canaryDep.Spec.Template.Annotations)
Expand Down Expand Up @@ -289,25 +294,25 @@ func (c *Deployer) createPrimaryDeployment(cd *flaggerv1.Canary) (string, map[st
Annotations: annotations,
},
// update spec with the primary secrets and config maps
Spec: c.ConfigTracker.ApplyPrimaryConfigs(canaryDep.Spec.Template.Spec, configRefs),
Spec: c.configTracker.ApplyPrimaryConfigs(canaryDep.Spec.Template.Spec, configRefs),
},
},
}

_, err = c.KubeClient.AppsV1().Deployments(cd.Namespace).Create(primaryDep)
_, err = c.kubeClient.AppsV1().Deployments(cd.Namespace).Create(primaryDep)
if err != nil {
return "", nil, err
}

c.Logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("Deployment %s.%s created", primaryDep.GetName(), cd.Namespace)
c.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("Deployment %s.%s created", primaryDep.GetName(), cd.Namespace)
}

return label, ports, nil
}

func (c *Deployer) reconcilePrimaryHpa(cd *flaggerv1.Canary, init bool) error {
func (c *DeploymentController) reconcilePrimaryHpa(cd *flaggerv1.Canary, init bool) error {
primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name)
hpa, err := c.KubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Get(cd.Spec.AutoscalerRef.Name, metav1.GetOptions{})
hpa, err := c.kubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Get(cd.Spec.AutoscalerRef.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return fmt.Errorf("HorizontalPodAutoscaler %s.%s not found, retrying",
Expand All @@ -328,7 +333,7 @@ func (c *Deployer) reconcilePrimaryHpa(cd *flaggerv1.Canary, init bool) error {
}

primaryHpaName := fmt.Sprintf("%s-primary", cd.Spec.AutoscalerRef.Name)
primaryHpa, err := c.KubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Get(primaryHpaName, metav1.GetOptions{})
primaryHpa, err := c.kubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Get(primaryHpaName, metav1.GetOptions{})

// create HPA
if errors.IsNotFound(err) {
Expand All @@ -348,11 +353,11 @@ func (c *Deployer) reconcilePrimaryHpa(cd *flaggerv1.Canary, init bool) error {
Spec: hpaSpec,
}

_, err = c.KubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Create(primaryHpa)
_, err = c.kubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Create(primaryHpa)
if err != nil {
return err
}
c.Logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("HorizontalPodAutoscaler %s.%s created", primaryHpa.GetName(), cd.Namespace)
c.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("HorizontalPodAutoscaler %s.%s created", primaryHpa.GetName(), cd.Namespace)
return nil
}

Expand All @@ -370,19 +375,19 @@ func (c *Deployer) reconcilePrimaryHpa(cd *flaggerv1.Canary, init bool) error {
hpaClone.Spec.MinReplicas = hpaSpec.MinReplicas
hpaClone.Spec.Metrics = hpaSpec.Metrics

_, upErr := c.KubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Update(hpaClone)
_, upErr := c.kubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Update(hpaClone)
if upErr != nil {
return upErr
}
c.Logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("HorizontalPodAutoscaler %s.%s updated", primaryHpa.GetName(), cd.Namespace)
c.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("HorizontalPodAutoscaler %s.%s updated", primaryHpa.GetName(), cd.Namespace)
}
}

return nil
}

// makeAnnotations appends an unique ID to annotations map
func (c *Deployer) makeAnnotations(annotations map[string]string) (map[string]string, error) {
func (c *DeploymentController) makeAnnotations(annotations map[string]string) (map[string]string, error) {
idKey := "flagger-id"
res := make(map[string]string)
uuid := make([]byte, 16)
Expand All @@ -405,8 +410,8 @@ func (c *Deployer) makeAnnotations(annotations map[string]string) (map[string]st
}

// getSelectorLabel returns the selector match label
func (c *Deployer) getSelectorLabel(deployment *appsv1.Deployment) (string, error) {
for _, l := range c.Labels {
func (c *DeploymentController) getSelectorLabel(deployment *appsv1.Deployment) (string, error) {
for _, l := range c.labels {
if _, ok := deployment.Spec.Selector.MatchLabels[l]; ok {
return l, nil
}
Expand All @@ -421,7 +426,7 @@ var sidecars = map[string]bool{
}

// getPorts returns a list of all container ports
func (c *Deployer) getPorts(cd *flaggerv1.Canary, deployment *appsv1.Deployment) (map[string]int32, error) {
func (c *DeploymentController) getPorts(cd *flaggerv1.Canary, deployment *appsv1.Deployment) (map[string]int32, error) {
ports := make(map[string]int32)

for _, container := range deployment.Spec.Template.Spec.Containers {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestCanaryDeployer_IsNewSpec(t *testing.T) {
t.Fatal(err.Error())
}

isNew, err := mocks.deployer.HasDeploymentChanged(mocks.canary)
isNew, err := mocks.deployer.HasTargetChanged(mocks.canary)
if err != nil {
t.Fatal(err.Error())
}
Expand Down
Loading

0 comments on commit 01f624f

Please sign in to comment.