Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement leader election #236

Merged
merged 4 commits into from
Jul 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@ TS=$(shell date +%Y-%m-%d_%H-%M-%S)

run:
GO111MODULE=on go run cmd/flagger/* -kubeconfig=$$HOME/.kube/config -log-level=info -mesh-provider=istio -namespace=test \
-metrics-server=https://prometheus.istio.weavedx.com
-metrics-server=https://prometheus.istio.weavedx.com \
-enable-leader-election=true

run2:
GO111MODULE=on go run cmd/flagger/* -kubeconfig=$$HOME/.kube/config -log-level=info -mesh-provider=istio -namespace=test \
-metrics-server=https://prometheus.istio.weavedx.com \
-enable-leader-election=true \
-port=9092

run-appmesh:
GO111MODULE=on go run cmd/flagger/* -kubeconfig=$$HOME/.kube/config -log-level=info -mesh-provider=appmesh \
Expand Down
2 changes: 2 additions & 0 deletions charts/flagger/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ Parameter | Description | Default
`slack.channel` | Slack channel | None
`slack.user` | Slack username | `flagger`
`msteams.url` | Microsoft Teams incoming webhook | None
`leaderElection.enabled` | leader election must be enabled when running more than one replica | `false`
`leaderElection.replicaCount` | number of replicas | `1`
`rbac.create` | if `true`, create and use RBAC resources | `true`
`rbac.pspEnabled` | If `true`, create and use a restricted pod security policy | `false`
`crd.create` | if `true`, create Flagger's CRDs | `true`
Expand Down
20 changes: 15 additions & 5 deletions charts/flagger/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ metadata:
app.kubernetes.io/managed-by: {{ .Release.Service }}
app.kubernetes.io/instance: {{ .Release.Name }}
spec:
replicas: 1
replicas: {{ .Values.leaderElection.replicaCount }}
strategy:
type: Recreate
selector:
Expand All @@ -22,6 +22,16 @@ spec:
app.kubernetes.io/instance: {{ .Release.Name }}
spec:
serviceAccountName: {{ template "flagger.serviceAccountName" . }}
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchLabels:
app.kubernetes.io/name: {{ template "flagger.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
topologyKey: kubernetes.io/hostname
containers:
- name: flagger
securityContext:
Expand Down Expand Up @@ -54,6 +64,10 @@ spec:
{{- if .Values.msteams.url }}
- -msteams-url={{ .Values.msteams.url }}
{{- end }}
{{- if .Values.leaderElection.enabled }}
- -enable-leader-election=true
- -leader-election-namespace={{ .Release.Namespace }}
{{- end }}
livenessProbe:
exec:
command:
Expand All @@ -78,10 +92,6 @@ spec:
{{ toYaml .Values.resources | indent 12 }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{ toYaml . | indent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{ toYaml . | indent 8 }}
{{- end }}
{{- with .Values.tolerations }}
Expand Down
6 changes: 4 additions & 2 deletions charts/flagger/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ msteams:
# MS Teams incoming webhook URL
url:

leaderElection:
enabled: false
replicaCount: 1

serviceAccount:
# serviceAccount.create: Whether to create a service account or not
create: true
Expand Down Expand Up @@ -54,8 +58,6 @@ nodeSelector: {}

tolerations: []

affinity: {}

prometheus:
# to be used with AppMesh or nginx ingress
install: false
126 changes: 102 additions & 24 deletions cmd/flagger/main.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package main

import (
"context"
"flag"
"fmt"
"log"
"os"
"strings"
"time"

semver "github.com/Masterminds/semver"
"github.com/Masterminds/semver"
clientset "github.com/weaveworks/flagger/pkg/client/clientset/versioned"
informers "github.com/weaveworks/flagger/pkg/client/informers/externalversions"
"github.com/weaveworks/flagger/pkg/controller"
Expand All @@ -20,31 +21,37 @@ import (
"github.com/weaveworks/flagger/pkg/signals"
"github.com/weaveworks/flagger/pkg/version"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/transport"
_ "k8s.io/code-generator/cmd/client-gen/generators"
)

var (
masterURL string
kubeconfig string
metricsServer string
controlLoopInterval time.Duration
logLevel string
port string
msteamsURL string
slackURL string
slackUser string
slackChannel string
threadiness int
zapReplaceGlobals bool
zapEncoding string
namespace string
meshProvider string
selectorLabels string
ver bool
masterURL string
kubeconfig string
metricsServer string
controlLoopInterval time.Duration
logLevel string
port string
msteamsURL string
slackURL string
slackUser string
slackChannel string
threadiness int
zapReplaceGlobals bool
zapEncoding string
namespace string
meshProvider string
selectorLabels string
enableLeaderElection bool
leaderElectionNamespace string
ver bool
)

func init() {
Expand All @@ -62,8 +69,10 @@ func init() {
flag.BoolVar(&zapReplaceGlobals, "zap-replace-globals", false, "Whether to change the logging level of the global zap logger.")
flag.StringVar(&zapEncoding, "zap-encoding", "json", "Zap logger encoding.")
flag.StringVar(&namespace, "namespace", "", "Namespace that flagger would watch canary object.")
flag.StringVar(&meshProvider, "mesh-provider", "istio", "Service mesh provider, can be istio, appmesh, supergloo, nginx or smi.")
flag.StringVar(&meshProvider, "mesh-provider", "istio", "Service mesh provider, can be istio, linkerd, appmesh, supergloo, nginx or smi.")
flag.StringVar(&selectorLabels, "selector-labels", "app,name,app.kubernetes.io/name", "List of pod labels that Flagger uses to create pod selectors.")
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false, "Enable leader election.")
flag.StringVar(&leaderElectionNamespace, "leader-election-namespace", "kube-system", "Namespace used to create the leader election config map.")
flag.BoolVar(&ver, "version", false, "Print version")
}

Expand Down Expand Up @@ -194,14 +203,83 @@ func main() {
}
}

// start controller
go func(ctrl *controller.Controller) {
if err := ctrl.Run(threadiness, stopCh); err != nil {
// leader election context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// prevents new requests when leadership is lost
cfg.Wrap(transport.ContextCanceller(ctx, fmt.Errorf("the leader is shutting down")))

// cancel leader election context on shutdown signals
go func() {
<-stopCh
cancel()
}()

// wrap controller run
runController := func() {
if err := c.Run(threadiness, stopCh); err != nil {
logger.Fatalf("Error running controller: %v", err)
}
}(c)
}

// run controller when this instance wins the leader election
if enableLeaderElection {
ns := leaderElectionNamespace
if namespace != "" {
ns = namespace
}
startLeaderElection(ctx, runController, ns, kubeClient, logger)
} else {
runController()
}
}

func startLeaderElection(ctx context.Context, run func(), ns string, kubeClient kubernetes.Interface, logger *zap.SugaredLogger) {
configMapName := "flagger-leader-election"
id, err := os.Hostname()
if err != nil {
logger.Fatalf("Error running controller: %v", err)
}
id = id + "_" + string(uuid.NewUUID())

lock, err := resourcelock.New(
resourcelock.ConfigMapsResourceLock,
ns,
configMapName,
kubeClient.CoreV1(),
kubeClient.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: id,
},
)
if err != nil {
logger.Fatalf("Error running controller: %v", err)
}

<-stopCh
logger.Infof("Starting leader election id: %s configmap: %s namespace: %s", id, configMapName, ns)
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
logger.Info("Acting as elected leader")
run()
},
OnStoppedLeading: func() {
logger.Infof("Leadership lost")
os.Exit(1)
},
OnNewLeader: func(identity string) {
if identity != id {
logger.Infof("Another instance has been elected as leader: %v", identity)
}
},
},
})
}

func initNotifier(logger *zap.SugaredLogger) (client notifier.Interface) {
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gnostic v0.0.0-20170426233943-68f4ded48ba9/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY=
Expand Down
9 changes: 0 additions & 9 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,15 +216,6 @@ func (c *Controller) syncHandler(key string) error {
}

c.canaries.Store(fmt.Sprintf("%s.%s", cd.Name, cd.Namespace), cd)

//if cd.Spec.TargetRef.Kind == "Deployment" {
// err = c.bootstrapDeployment(cd)
// if err != nil {
// c.logger.Warnf("%s.%s bootstrap error %v", cd.Name, cd.Namespace, err)
// return err
// }
//}

c.logger.Infof("Synced %s", key)

return nil
Expand Down
2 changes: 2 additions & 0 deletions test/e2e-linkerd.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ kind load docker-image test/flagger:latest
echo '>>> Installing Flagger'
helm upgrade -i flagger ${REPO_ROOT}/charts/flagger \
--namespace linkerd \
--set leaderElection.enabled=true \
--set leaderElection.replicaCount=2 \
--set metricsServer=http://linkerd-prometheus:9090 \
--set meshProvider=smi:linkerd

Expand Down