Skip to content

Commit

Permalink
Refactor deployer(now controller) and kubernetes router
Browse files Browse the repository at this point in the history
  • Loading branch information
mumoshu committed Nov 25, 2019
1 parent e5f4bfc commit 607fd71
Show file tree
Hide file tree
Showing 17 changed files with 712 additions and 439 deletions.
95 changes: 95 additions & 0 deletions pkg/canary/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package canary

import (
"fmt"

"github.com/mitchellh/hashstructure"
"github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha3"
clientset "github.com/weaveworks/flagger/pkg/client/clientset/versioned"
"go.uber.org/zap"
"k8s.io/client-go/kubernetes"
)

type Controller interface {
Initialize(cd *v1alpha3.Canary, skipLivenessChecks bool) (label string, ports map[string]int32, err error)
Promote(cd *v1alpha3.Canary) error
HasTargetChanged(cd *v1alpha3.Canary) (bool, error)
HasConfigChanged(cd *v1alpha3.Canary) (bool, error)
Scale(cd *v1alpha3.Canary, replicas int32) error
ScaleUp(cd *v1alpha3.Canary) error
MakeStatusConditions(canaryStatus v1alpha3.CanaryStatus,
phase v1alpha3.CanaryPhase) (bool, []v1alpha3.CanaryCondition)
SyncStatus(cd *v1alpha3.Canary, status v1alpha3.CanaryStatus) error
SetStatusFailedChecks(cd *v1alpha3.Canary, val int) error
SetStatusWeight(cd *v1alpha3.Canary, val int) error
SetStatusIterations(cd *v1alpha3.Canary, val int) error
SetStatusPhase(cd *v1alpha3.Canary, phase v1alpha3.CanaryPhase) error
IsPrimaryReady(cd *v1alpha3.Canary) (bool, error)
IsCanaryReady(cd *v1alpha3.Canary) (bool, error)
}

type VersionKind string

func NewVersionKind(ver, kind string) VersionKind {
return VersionKind(fmt.Sprintf("%s/%s", ver, kind))
}

var (
VersionKindDeployment = NewVersionKind("apps/v1", "Deployment")
VersionKindService = NewVersionKind("core/v1", "Service")
)

func NewController(kubeClient kubernetes.Interface, flaggerClient clientset.Interface, logger *zap.SugaredLogger,
labels []string) Controller {
return &multiController{
deployers: map[VersionKind]Controller{
VersionKindDeployment: newDeploymentController(kubeClient, flaggerClient, logger, labels),
VersionKindService: newServiceController(kubeClient, flaggerClient, logger),
},
}
}

func newDeploymentController(kubeClient kubernetes.Interface, flaggerClient clientset.Interface, logger *zap.SugaredLogger,
labels []string) Controller {
return &DeploymentDeployer{
Logger: logger,
KubeClient: kubeClient,
FlaggerClient: flaggerClient,
Labels: labels,
ConfigTracker: ConfigTracker{
Logger: logger,
KubeClient: kubeClient,
FlaggerClient: flaggerClient,
},
}
}

func newServiceController(kubeClient kubernetes.Interface, flaggerClient clientset.Interface, logger *zap.SugaredLogger) Controller {
return &ServiceDeployer{
Logger: logger,
KubeClient: kubeClient,
FlaggerClient: flaggerClient,
}
}

func hasSpecChanged(cd *v1alpha3.Canary, spec interface{}) (bool, error) {
if cd.Status.LastAppliedSpec == "" {
return true, nil
}

newHash, err := hashstructure.Hash(spec, nil)
if err != nil {
return false, fmt.Errorf("hash error %v", err)
}

// do not trigger a canary deployment on manual rollback
if cd.Status.LastPromotedSpec == fmt.Sprintf("%d", newHash) {
return false, nil
}

if cd.Status.LastAppliedSpec != fmt.Sprintf("%d", newHash) {
return true, nil
}

return false, nil
}
140 changes: 19 additions & 121 deletions pkg/canary/deployer.go → pkg/canary/deployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"io"

"github.com/google/go-cmp/cmp"
"github.com/mitchellh/hashstructure"
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
hpav1 "k8s.io/api/autoscaling/v2beta1"
Expand All @@ -21,24 +20,22 @@ import (
clientset "github.com/weaveworks/flagger/pkg/client/clientset/versioned"
)

// Deployer is managing the operations for Kubernetes deployment kind
type Deployer struct {
// Controller is managing the operations for Kubernetes deployment kind
type DeploymentDeployer struct {
KubeClient kubernetes.Interface
FlaggerClient clientset.Interface
Logger *zap.SugaredLogger
ConfigTracker ConfigTracker
Labels []string
}

var _ Controller = &DeploymentDeployer{}

// Initialize creates the primary deployment, hpa,
// scales to zero the canary deployment and returns the pod selector label and container ports
func (c *Deployer) Initialize(cd *flaggerv1.Canary, skipLivenessChecks bool) (label string, ports map[string]int32, err error) {
func (c *DeploymentDeployer) Initialize(cd *flaggerv1.Canary, skipLivenessChecks bool) (label string, ports map[string]int32, err error) {
primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name)

if isTargetedService(cd) {
return "", nil, nil
}

label, ports, err = c.createPrimaryDeployment(cd)
if err != nil {
return "", ports, fmt.Errorf("creating deployment %s.%s failed: %v", primaryName, cd.Namespace, err)
Expand Down Expand Up @@ -66,55 +63,8 @@ func (c *Deployer) Initialize(cd *flaggerv1.Canary, skipLivenessChecks bool) (la
return label, ports, nil
}

// Promote copies target's spec from canary to primary
func (c *Deployer) Promote(cd *flaggerv1.Canary) error {
if isTargetedService(cd) {
return c.promoteService(cd)
}
return c.promoteDeployment(cd)
}

// promoteService copies service's spec from canary to primary
func (c *Deployer) promoteService(cd *flaggerv1.Canary) error {
targetName := cd.Spec.TargetRef.Name
primaryName := fmt.Sprintf("%s-primary", targetName)

canary, err := c.KubeClient.CoreV1().Services(cd.Namespace).Get(targetName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return fmt.Errorf("service %s.%s not found", targetName, cd.Namespace)
}
return fmt.Errorf("service %s.%s query error %v", targetName, cd.Namespace, err)
}

primary, err := c.KubeClient.CoreV1().Services(cd.Namespace).Get(primaryName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return fmt.Errorf("service %s.%s not found", primaryName, cd.Namespace)
}
return fmt.Errorf("service %s.%s query error %v", primaryName, cd.Namespace, err)
}

primaryCopy := canary.DeepCopy()
primaryCopy.ObjectMeta.Name = primary.ObjectMeta.Name
if primaryCopy.Spec.Type == "ClusterIP" {
primaryCopy.Spec.ClusterIP = primary.Spec.ClusterIP
}
primaryCopy.ObjectMeta.ResourceVersion = primary.ObjectMeta.ResourceVersion
primaryCopy.ObjectMeta.UID = primary.ObjectMeta.UID

// apply update
_, err = c.KubeClient.CoreV1().Services(cd.Namespace).Update(primaryCopy)
if err != nil {
return fmt.Errorf("updating service %s.%s spec failed: %v",
primaryCopy.GetName(), primaryCopy.Namespace, err)
}

return nil
}

// promoteDeployment copies deployment's pod spec, secrets and config maps from canary to primary
func (c *Deployer) promoteDeployment(cd *flaggerv1.Canary) error {
// Promote copies deployment's pod spec, secrets and config maps from canary to primary
func (c *DeploymentDeployer) Promote(cd *flaggerv1.Canary) error {
targetName := cd.Spec.TargetRef.Name
primaryName := fmt.Sprintf("%s-primary", targetName)

Expand Down Expand Up @@ -184,30 +134,8 @@ func (c *Deployer) promoteDeployment(cd *flaggerv1.Canary) error {
return nil
}

// HasTargetChanged returns true if the canary deployment pod or service spec has changed
func (c *Deployer) HasTargetChanged(cd *flaggerv1.Canary) (bool, error) {
if isTargetedService(cd) {
return c.HasServiceChanged(cd)
}
return c.HasDeploymentChanged(cd)
}

// HasServiceChanged returns true if the canary service spec has changed
func (c *Deployer) HasServiceChanged(cd *flaggerv1.Canary) (bool, error) {
targetName := cd.Spec.TargetRef.Name
canary, err := c.KubeClient.CoreV1().Services(cd.Namespace).Get(targetName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return false, fmt.Errorf("service %s.%s not found", targetName, cd.Namespace)
}
return false, fmt.Errorf("service %s.%s query error %v", targetName, cd.Namespace, err)
}

return c.hasSpecChanged(cd, canary.Spec)
}

// HasDeploymentChanged returns true if the canary deployment pod spec has changed
func (c *Deployer) HasDeploymentChanged(cd *flaggerv1.Canary) (bool, error) {
// HasTargetChanged returns true if the canary deployment pod spec has changed
func (c *DeploymentDeployer) HasTargetChanged(cd *flaggerv1.Canary) (bool, error) {
targetName := cd.Spec.TargetRef.Name
canary, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
if err != nil {
Expand All @@ -217,37 +145,11 @@ func (c *Deployer) HasDeploymentChanged(cd *flaggerv1.Canary) (bool, error) {
return false, fmt.Errorf("deployment %s.%s query error %v", targetName, cd.Namespace, err)
}

return c.hasSpecChanged(cd, canary.Spec.Template)
}

func (c *Deployer) hasSpecChanged(cd *flaggerv1.Canary, spec interface{}) (bool, error) {
if cd.Status.LastAppliedSpec == "" {
return true, nil
}

newHash, err := hashstructure.Hash(spec, nil)
if err != nil {
return false, fmt.Errorf("hash error %v", err)
}

// do not trigger a canary deployment on manual rollback
if cd.Status.LastPromotedSpec == fmt.Sprintf("%d", newHash) {
return false, nil
}

if cd.Status.LastAppliedSpec != fmt.Sprintf("%d", newHash) {
return true, nil
}

return false, nil
return hasSpecChanged(cd, canary.Spec.Template)
}

// Scale sets the canary deployment replicas
func (c *Deployer) Scale(cd *flaggerv1.Canary, replicas int32) error {
if isTargetedService(cd) {
return nil
}

func (c *DeploymentDeployer) Scale(cd *flaggerv1.Canary, replicas int32) error {
targetName := cd.Spec.TargetRef.Name
dep, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
if err != nil {
Expand All @@ -267,11 +169,7 @@ func (c *Deployer) Scale(cd *flaggerv1.Canary, replicas int32) error {
return nil
}

func (c *Deployer) ScaleUp(cd *flaggerv1.Canary) error {
if isTargetedService(cd) {
return nil
}

func (c *DeploymentDeployer) ScaleUp(cd *flaggerv1.Canary) error {
targetName := cd.Spec.TargetRef.Name
dep, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{})
if err != nil {
Expand All @@ -295,7 +193,7 @@ func (c *Deployer) ScaleUp(cd *flaggerv1.Canary) error {
return nil
}

func (c *Deployer) createPrimaryDeployment(cd *flaggerv1.Canary) (string, map[string]int32, error) {
func (c *DeploymentDeployer) createPrimaryDeployment(cd *flaggerv1.Canary) (string, map[string]int32, error) {
targetName := cd.Spec.TargetRef.Name
primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name)

Expand Down Expand Up @@ -391,7 +289,7 @@ func (c *Deployer) createPrimaryDeployment(cd *flaggerv1.Canary) (string, map[st
return label, ports, nil
}

func (c *Deployer) reconcilePrimaryHpa(cd *flaggerv1.Canary, init bool) error {
func (c *DeploymentDeployer) reconcilePrimaryHpa(cd *flaggerv1.Canary, init bool) error {
primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name)
hpa, err := c.KubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Get(cd.Spec.AutoscalerRef.Name, metav1.GetOptions{})
if err != nil {
Expand Down Expand Up @@ -468,7 +366,7 @@ func (c *Deployer) reconcilePrimaryHpa(cd *flaggerv1.Canary, init bool) error {
}

// makeAnnotations appends an unique ID to annotations map
func (c *Deployer) makeAnnotations(annotations map[string]string) (map[string]string, error) {
func (c *DeploymentDeployer) makeAnnotations(annotations map[string]string) (map[string]string, error) {
idKey := "flagger-id"
res := make(map[string]string)
uuid := make([]byte, 16)
Expand All @@ -491,7 +389,7 @@ func (c *Deployer) makeAnnotations(annotations map[string]string) (map[string]st
}

// getSelectorLabel returns the selector match label
func (c *Deployer) getSelectorLabel(deployment *appsv1.Deployment) (string, error) {
func (c *DeploymentDeployer) getSelectorLabel(deployment *appsv1.Deployment) (string, error) {
for _, l := range c.Labels {
if _, ok := deployment.Spec.Selector.MatchLabels[l]; ok {
return l, nil
Expand All @@ -507,7 +405,7 @@ var sidecars = map[string]bool{
}

// getPorts returns a list of all container ports
func (c *Deployer) getPorts(cd *flaggerv1.Canary, deployment *appsv1.Deployment) (map[string]int32, error) {
func (c *DeploymentDeployer) getPorts(cd *flaggerv1.Canary, deployment *appsv1.Deployment) (map[string]int32, error) {
ports := make(map[string]int32)

for _, container := range deployment.Spec.Template.Spec.Containers {
Expand Down Expand Up @@ -545,8 +443,8 @@ func (c *Deployer) getPorts(cd *flaggerv1.Canary, deployment *appsv1.Deployment)
return ports, nil
}

func isTargetedService(cd *flaggerv1.Canary) bool {
return cd.Spec.TargetRef.Kind == "Service"
func (c *DeploymentDeployer) HasConfigChanged(cd *flaggerv1.Canary) (bool, error) {
return c.ConfigTracker.HasConfigChanged(cd)
}

func makePrimaryLabels(labels map[string]string, primaryName string, label string) map[string]string {
Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions pkg/canary/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type Mocks struct {
canary *flaggerv1.Canary
kubeClient kubernetes.Interface
flaggerClient clientset.Interface
deployer Deployer
deployer DeploymentDeployer
logger *zap.SugaredLogger
}

Expand All @@ -43,7 +43,7 @@ func SetupMocks() Mocks {

logger, _ := logger.NewLogger("debug")

deployer := Deployer{
deployer := DeploymentDeployer{
FlaggerClient: flaggerClient,
KubeClient: kubeClient,
Logger: logger,
Expand Down
Loading

0 comments on commit 607fd71

Please sign in to comment.