Skip to content

Commit

Permalink
feat(controller): Allow to disable leader election (#5638) (#5648)
Browse files Browse the repository at this point in the history
Co-authored-by: tobi <mail@singhania.at>
  • Loading branch information
tobisinghania and tobi authored Apr 16, 2021
1 parent 3c8f561 commit e3d1d1e
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 52 deletions.
1 change: 1 addition & 0 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Note that these environment variables may be removed at any time.
| `EXPRESSION_TEMPLATES` | `bool` | Escape hatch to disable expression templates. Default `true`. |
| `GZIP_IMPLEMENTATION` | `string` | The implementation of compression/decompression. Currently only "PGZip" and "GZip" are supported. Defaults to "PGZip". |
| `LEADER_ELECTION_IDENTITY` | `string` | The ID used for workflow controllers to elect a leader. |
| `LEADER_ELECTION_DISABLE` | `bool` | Whether leader election should be disabled. |
| `LEADER_ELECTION_LEASE_DURATION` | `time.Duration` | The duration that non-leader candidates will wait to force acquire leadership. |
| `LEADER_ELECTION_RENEW_DEADLINE` | `time.Duration` | The duration that the acting master will retry refreshing leadership before giving up. |
| `LEADER_ELECTION_RETRY_PERIOD` | `time.Duration` | The duration that the leader election clients should wait between tries of actions. |
Expand Down
114 changes: 62 additions & 52 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,64 +222,74 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
// Start the metrics server
go wfc.metrics.RunServer(ctx)

nodeID, ok := os.LookupEnv("LEADER_ELECTION_IDENTITY")
if !ok {
log.Fatal("LEADER_ELECTION_IDENTITY must be set so that the workflow controllers can elect a leader")
}
logCtx := log.WithField("id", nodeID)
leaderElectionOff := os.Getenv("LEADER_ELECTION_DISABLE")
if leaderElectionOff == "true" {
log.Info("Leader election is turned off. Running in single-instance mode")
logCtx := log.WithField("id", "single-instance")
go wfc.startLeading(ctx, logCtx, podCleanupWorkers, workflowTTLWorkers, wfWorkers, podWorkers)
} else {
nodeID, ok := os.LookupEnv("LEADER_ELECTION_IDENTITY")
if !ok {
log.Fatal("LEADER_ELECTION_IDENTITY must be set so that the workflow controllers can elect a leader")
}
logCtx := log.WithField("id", nodeID)

var cancel context.CancelFunc
leaderName := "workflow-controller"
if wfc.Config.InstanceID != "" {
leaderName = fmt.Sprintf("%s-%s", leaderName, wfc.Config.InstanceID)
}

leaderName := "workflow-controller"
if wfc.Config.InstanceID != "" {
leaderName = fmt.Sprintf("%s-%s", leaderName, wfc.Config.InstanceID)
go leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{Name: leaderName, Namespace: wfc.namespace}, Client: wfc.kubeclientset.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{Identity: nodeID, EventRecorder: wfc.eventRecorderManager.Get(wfc.namespace)},
},
ReleaseOnCancel: true,
LeaseDuration: env.LookupEnvDurationOr("LEADER_ELECTION_LEASE_DURATION", 15*time.Second),
RenewDeadline: env.LookupEnvDurationOr("LEADER_ELECTION_RENEW_DEADLINE", 10*time.Second),
RetryPeriod: env.LookupEnvDurationOr("LEADER_ELECTION_RETRY_PERIOD", 5*time.Second),
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
wfc.startLeading(ctx, logCtx, podCleanupWorkers, workflowTTLWorkers, wfWorkers, podWorkers)
},
OnStoppedLeading: func() {
logCtx.Info("stopped leading")
cancel()
},
OnNewLeader: func(identity string) {
logCtx.WithField("leader", identity).Info("new leader")
},
},
})
}
<-ctx.Done()
}

var cancel context.CancelFunc
go leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{Name: leaderName, Namespace: wfc.namespace}, Client: wfc.kubeclientset.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{Identity: nodeID, EventRecorder: wfc.eventRecorderManager.Get(wfc.namespace)},
},
ReleaseOnCancel: true,
LeaseDuration: env.LookupEnvDurationOr("LEADER_ELECTION_LEASE_DURATION", 15*time.Second),
RenewDeadline: env.LookupEnvDurationOr("LEADER_ELECTION_RENEW_DEADLINE", 10*time.Second),
RetryPeriod: env.LookupEnvDurationOr("LEADER_ELECTION_RETRY_PERIOD", 5*time.Second),
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

logCtx.Info("started leading")
ctx, cancel = context.WithCancel(ctx)

for i := 0; i < podCleanupWorkers; i++ {
go wait.UntilWithContext(ctx, wfc.runPodCleanup, time.Second)
}
go wfc.workflowGarbageCollector(ctx.Done())
go wfc.archivedWorkflowGarbageCollector(ctx.Done())
func (wfc *WorkflowController) startLeading(ctx context.Context, logCtx *log.Entry, podCleanupWorkers int, workflowTTLWorkers int, wfWorkers int, podWorkers int) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

go wfc.runTTLController(ctx, workflowTTLWorkers)
go wfc.runCronController(ctx)
go wait.Until(wfc.syncWorkflowPhaseMetrics, 15*time.Second, ctx.Done())
go wait.Until(wfc.syncPodPhaseMetrics, 15*time.Second, ctx.Done())
logCtx.Info("started leading")

go wait.Until(wfc.syncManager.CheckWorkflowExistence, workflowExistenceCheckPeriod, ctx.Done())
for i := 0; i < podCleanupWorkers; i++ {
go wait.UntilWithContext(ctx, wfc.runPodCleanup, time.Second)
}
go wfc.workflowGarbageCollector(ctx.Done())
go wfc.archivedWorkflowGarbageCollector(ctx.Done())

for i := 0; i < wfWorkers; i++ {
go wait.Until(wfc.runWorker, time.Second, ctx.Done())
}
for i := 0; i < podWorkers; i++ {
go wait.Until(wfc.podWorker, time.Second, ctx.Done())
}
},
OnStoppedLeading: func() {
logCtx.Info("stopped leading")
cancel()
},
OnNewLeader: func(identity string) {
logCtx.WithField("leader", identity).Info("new leader")
},
},
})
<-ctx.Done()
go wfc.runTTLController(ctx, workflowTTLWorkers)
go wfc.runCronController(ctx)
go wait.Until(wfc.syncWorkflowPhaseMetrics, 15*time.Second, ctx.Done())
go wait.Until(wfc.syncPodPhaseMetrics, 15*time.Second, ctx.Done())

go wait.Until(wfc.syncManager.CheckWorkflowExistence, workflowExistenceCheckPeriod, ctx.Done())

for i := 0; i < wfWorkers; i++ {
go wait.Until(wfc.runWorker, time.Second, ctx.Done())
}
for i := 0; i < podWorkers; i++ {
go wait.Until(wfc.podWorker, time.Second, ctx.Done())
}
}

func (wfc *WorkflowController) waitForCacheSync(ctx context.Context) {
Expand Down

0 comments on commit e3d1d1e

Please sign in to comment.