Skip to content

Commit

Permalink
Fix slow leader reelection (#2156)
Browse files Browse the repository at this point in the history
  • Loading branch information
pjuarezd authored Jun 11, 2024
1 parent f39676b commit 3da4006
Showing 1 changed file with 73 additions and 88 deletions.
161 changes: 73 additions & 88 deletions pkg/controller/main-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ type EventType int
// Possible values of EventType
const (
STSServerNotification EventType = iota
LeaderElection
)

// EventNotification - structure to send messages through a channel regarding a error event to be handled
Expand Down Expand Up @@ -437,7 +436,7 @@ func (c *Controller) startSTSAPIServer(ctx context.Context, notificationChannel

// leaderRun start the Controller and the API's
// When a new leader is elected this function is ran in the pod
func leaderRun(ctx context.Context, c *Controller, threadiness int, stopCh <-chan struct{}) {
func leaderRun(ctx context.Context, c *Controller, threadiness int, stopCh <-chan struct{}, notificationChannel chan *EventNotification) {
// we declate the channel to communicate on servers errors
var upgradeServerChannel <-chan error

Expand Down Expand Up @@ -514,6 +513,11 @@ func leaderRun(ctx context.Context, c *Controller, threadiness int, stopCh <-cha

for {
select {
case oerr := <-notificationChannel:
if !errors.Is(oerr.Err, http.ErrServerClosed) {
klog.Errorf("STS API Server stopped: %v, going to restart", oerr.Err)
go c.startSTSAPIServer(ctx, notificationChannel)
}
case err := <-upgradeServerChannel:
if err != http.ErrServerClosed {
klog.Errorf("Upgrade Server stopped: %v, going to restart", err)
Expand Down Expand Up @@ -561,15 +565,18 @@ func (c *Controller) Start(threadiness int, stopCh <-chan struct{}) error {

// we use the Lease lock type since edits to Leases are less common
// and fewer objects in the cluster watch "all Leases".
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: leaseLockName,
Namespace: leaseLockNamespace,
},
Client: c.kubeClientSet.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
lock, err := resourcelock.New(
resourcelock.LeasesResourceLock,
leaseLockNamespace,
leaseLockName,
c.kubeClientSet.CoreV1(),
c.kubeClientSet.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: c.podName,
},
)
if err != nil {
panic(err.Error())
}

if IsSTSEnabled() {
Expand All @@ -580,92 +587,70 @@ func (c *Controller) Start(threadiness int, stopCh <-chan struct{}) error {
klog.Info("STS Api server is not enabled, not starting")
}

go func() {
// start the leader election code loop
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
// IMPORTANT: you MUST ensure that any code you have that
// is protected by the lease must terminate **before**
// you call cancel. Otherwise, you could have a background
// loop still running and another process could
// get elected before your background loop finished, violating
// the stated goal of the lease.
ReleaseOnCancel: true,
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// start the controller + API code
leaderRun(ctx, c, threadiness, stopCh)
},
OnStoppedLeading: func() {
// we can do cleanup here
klog.Infof("leader lost: %s", c.podName)
},
OnNewLeader: func(identity string) {
// we're notified when new leader elected
if identity == c.podName {
klog.Infof("%s: I am the leader, applying leader labels on myself", c.podName)
// Patch this pod so the main service uses it
p := []patchAnnotation{{
Op: "add",
Path: "/metadata/labels/operator",
Value: "leader",
}}

payloadBytes, err := json.Marshal(p)
if err != nil {
klog.Errorf("failed to marshal patch: %#v", err)
} else {
_, err = c.kubeClientSet.CoreV1().Pods(leaseLockNamespace).Patch(ctx, c.podName, types.JSONPatchType, payloadBytes, metav1.PatchOptions{})
if err != nil {
klog.Errorf("failed to patch operator leader pod: %+v", err)
}
}
// start the leader election code loop
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
// IMPORTANT: you MUST ensure that any code you have that
// is protected by the lease must terminate **before**
// you call cancel. Otherwise, you could have a background
// loop still running and another process could
// get elected before your background loop finished, violating
// the stated goal of the lease.
ReleaseOnCancel: true,
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// start the controller + API code
leaderRun(ctx, c, threadiness, stopCh, notificationChannel)
},
OnStoppedLeading: func() {
// we can do cleanup here
klog.Infof("leader lost: %s", c.podName)
},
OnNewLeader: func(identity string) {
// we're notified when new leader elected
if identity == c.podName {
klog.Infof("%s: I am the leader, applying leader labels on myself", c.podName)
// Patch this pod so the main service uses it
p := []patchAnnotation{{
Op: "add",
Path: "/metadata/labels/operator",
Value: "leader",
}}

payloadBytes, err := json.Marshal(p)
if err != nil {
klog.Errorf("failed to marshal patch: %#v", err)
} else {
klog.Infof("%s: is the leader, removing any leader labels that I '%s' might have", identity, c.podName)
// Patch this pod so the main service uses it
p := []patchAnnotation{{
Op: "remove",
Path: "/metadata/labels/operator",
}}

payloadBytes, err := json.Marshal(p)
_, err = c.kubeClientSet.CoreV1().Pods(leaseLockNamespace).Patch(ctx, c.podName, types.JSONPatchType, payloadBytes, metav1.PatchOptions{})
if err != nil {
klog.Errorf("failed to marshal patch: %#v", err)
} else {
c.kubeClientSet.CoreV1().Pods(leaseLockNamespace).Patch(ctx, c.podName, types.JSONPatchType, payloadBytes, metav1.PatchOptions{})
klog.Errorf("failed to patch operator leader pod: %+v", err)
}
}
},
},
})

notificationChannel <- &EventNotification{
Type: LeaderElection,
Err: nil,
}
}()
for {
select {
case oerr := <-notificationChannel:
switch oerr.Type {
case STSServerNotification:
if !errors.Is(oerr.Err, http.ErrServerClosed) {
klog.Errorf("STS API Server stopped: %v, going to restart", oerr.Err)
go c.startSTSAPIServer(ctx, notificationChannel)
} else {
klog.Infof("%s: is the leader, removing any leader labels that I '%s' might have", identity, c.podName)
// Patch this pod so the main service uses it
p := []patchAnnotation{{
Op: "remove",
Path: "/metadata/labels/operator",
}}

payloadBytes, err := json.Marshal(p)
if err != nil {
klog.Errorf("failed to marshal patch: %#v", err)
} else {
c.kubeClientSet.CoreV1().Pods(leaseLockNamespace).Patch(ctx, c.podName, types.JSONPatchType, payloadBytes, metav1.PatchOptions{})
}
}
case LeaderElection:
return nil
}
case <-stopCh:
return nil
}
}
},
},
})
return nil
}

// Stop is called to shutdown the controller
// Stop is called to shut down the controller
func (c *Controller) Stop() {
klog.Info("Stopping the minio controller webservers")
// Wait upto 5 secs and terminate all connections.
Expand Down

0 comments on commit 3da4006

Please sign in to comment.