Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix slow leader reelection #2156

Merged
merged 3 commits into from
Jun 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 73 additions & 88 deletions pkg/controller/main-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ type EventType int
// Possible values of EventType
const (
STSServerNotification EventType = iota
LeaderElection
)

// EventNotification - structure to send messages through a channel regarding a error event to be handled
Expand Down Expand Up @@ -437,7 +436,7 @@ func (c *Controller) startSTSAPIServer(ctx context.Context, notificationChannel

// leaderRun start the Controller and the API's
// When a new leader is elected this function is ran in the pod
func leaderRun(ctx context.Context, c *Controller, threadiness int, stopCh <-chan struct{}) {
func leaderRun(ctx context.Context, c *Controller, threadiness int, stopCh <-chan struct{}, notificationChannel chan *EventNotification) {
// we declate the channel to communicate on servers errors
var upgradeServerChannel <-chan error

Expand Down Expand Up @@ -512,6 +511,11 @@ func leaderRun(ctx context.Context, c *Controller, threadiness int, stopCh <-cha

for {
select {
case oerr := <-notificationChannel:
if !errors.Is(oerr.Err, http.ErrServerClosed) {
klog.Errorf("STS API Server stopped: %v, going to restart", oerr.Err)
go c.startSTSAPIServer(ctx, notificationChannel)
}
case err := <-upgradeServerChannel:
if err != http.ErrServerClosed {
klog.Errorf("Upgrade Server stopped: %v, going to restart", err)
Expand Down Expand Up @@ -559,15 +563,18 @@ func (c *Controller) Start(threadiness int, stopCh <-chan struct{}) error {

// we use the Lease lock type since edits to Leases are less common
// and fewer objects in the cluster watch "all Leases".
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: leaseLockName,
Namespace: leaseLockNamespace,
},
Client: c.kubeClientSet.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
lock, err := resourcelock.New(
resourcelock.LeasesResourceLock,
leaseLockNamespace,
leaseLockName,
c.kubeClientSet.CoreV1(),
c.kubeClientSet.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: c.podName,
},
)
if err != nil {
panic(err.Error())
}

if IsSTSEnabled() {
Expand All @@ -578,92 +585,70 @@ func (c *Controller) Start(threadiness int, stopCh <-chan struct{}) error {
klog.Info("STS Api server is not enabled, not starting")
}

go func() {
// start the leader election code loop
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
// IMPORTANT: you MUST ensure that any code you have that
// is protected by the lease must terminate **before**
// you call cancel. Otherwise, you could have a background
// loop still running and another process could
// get elected before your background loop finished, violating
// the stated goal of the lease.
ReleaseOnCancel: true,
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// start the controller + API code
leaderRun(ctx, c, threadiness, stopCh)
},
OnStoppedLeading: func() {
// we can do cleanup here
klog.Infof("leader lost: %s", c.podName)
},
OnNewLeader: func(identity string) {
// we're notified when new leader elected
if identity == c.podName {
klog.Infof("%s: I am the leader, applying leader labels on myself", c.podName)
// Patch this pod so the main service uses it
p := []patchAnnotation{{
Op: "add",
Path: "/metadata/labels/operator",
Value: "leader",
}}

payloadBytes, err := json.Marshal(p)
if err != nil {
klog.Errorf("failed to marshal patch: %#v", err)
} else {
_, err = c.kubeClientSet.CoreV1().Pods(leaseLockNamespace).Patch(ctx, c.podName, types.JSONPatchType, payloadBytes, metav1.PatchOptions{})
if err != nil {
klog.Errorf("failed to patch operator leader pod: %+v", err)
}
}
// start the leader election code loop
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
// IMPORTANT: you MUST ensure that any code you have that
// is protected by the lease must terminate **before**
// you call cancel. Otherwise, you could have a background
// loop still running and another process could
// get elected before your background loop finished, violating
// the stated goal of the lease.
ReleaseOnCancel: true,
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// start the controller + API code
leaderRun(ctx, c, threadiness, stopCh, notificationChannel)
},
OnStoppedLeading: func() {
// we can do cleanup here
klog.Infof("leader lost: %s", c.podName)
},
OnNewLeader: func(identity string) {
// we're notified when new leader elected
if identity == c.podName {
klog.Infof("%s: I am the leader, applying leader labels on myself", c.podName)
// Patch this pod so the main service uses it
p := []patchAnnotation{{
Op: "add",
Path: "/metadata/labels/operator",
Value: "leader",
}}

payloadBytes, err := json.Marshal(p)
if err != nil {
klog.Errorf("failed to marshal patch: %#v", err)
} else {
klog.Infof("%s: is the leader, removing any leader labels that I '%s' might have", identity, c.podName)
// Patch this pod so the main service uses it
p := []patchAnnotation{{
Op: "remove",
Path: "/metadata/labels/operator",
}}

payloadBytes, err := json.Marshal(p)
_, err = c.kubeClientSet.CoreV1().Pods(leaseLockNamespace).Patch(ctx, c.podName, types.JSONPatchType, payloadBytes, metav1.PatchOptions{})
if err != nil {
klog.Errorf("failed to marshal patch: %#v", err)
} else {
c.kubeClientSet.CoreV1().Pods(leaseLockNamespace).Patch(ctx, c.podName, types.JSONPatchType, payloadBytes, metav1.PatchOptions{})
klog.Errorf("failed to patch operator leader pod: %+v", err)
}
}
},
},
})

notificationChannel <- &EventNotification{
Type: LeaderElection,
Err: nil,
}
}()
for {
select {
case oerr := <-notificationChannel:
switch oerr.Type {
case STSServerNotification:
if !errors.Is(oerr.Err, http.ErrServerClosed) {
klog.Errorf("STS API Server stopped: %v, going to restart", oerr.Err)
go c.startSTSAPIServer(ctx, notificationChannel)
} else {
klog.Infof("%s: is the leader, removing any leader labels that I '%s' might have", identity, c.podName)
// Patch this pod so the main service uses it
p := []patchAnnotation{{
Op: "remove",
Path: "/metadata/labels/operator",
}}

payloadBytes, err := json.Marshal(p)
if err != nil {
klog.Errorf("failed to marshal patch: %#v", err)
} else {
c.kubeClientSet.CoreV1().Pods(leaseLockNamespace).Patch(ctx, c.podName, types.JSONPatchType, payloadBytes, metav1.PatchOptions{})
}
}
case LeaderElection:
return nil
}
case <-stopCh:
return nil
}
}
},
},
})
return nil
}

// Stop is called to shutdown the controller
// Stop is called to shut down the controller
func (c *Controller) Stop() {
klog.Info("Stopping the minio controller webservers")
// Wait upto 5 secs and terminate all connections.
Expand Down
Loading