diff --git a/pkg/controller/main-controller.go b/pkg/controller/main-controller.go index 8e34682d458..e425602e2aa 100644 --- a/pkg/controller/main-controller.go +++ b/pkg/controller/main-controller.go @@ -208,7 +208,6 @@ type EventType int // Possible values of EventType const ( STSServerNotification EventType = iota - LeaderElection ) // EventNotification - structure to send messages through a channel regarding a error event to be handled @@ -437,7 +436,7 @@ func (c *Controller) startSTSAPIServer(ctx context.Context, notificationChannel // leaderRun start the Controller and the API's // When a new leader is elected this function is ran in the pod -func leaderRun(ctx context.Context, c *Controller, threadiness int, stopCh <-chan struct{}) { +func leaderRun(ctx context.Context, c *Controller, threadiness int, stopCh <-chan struct{}, notificationChannel chan *EventNotification) { // we declate the channel to communicate on servers errors var upgradeServerChannel <-chan error @@ -512,6 +511,11 @@ func leaderRun(ctx context.Context, c *Controller, threadiness int, stopCh <-cha for { select { + case oerr := <-notificationChannel: + if !errors.Is(oerr.Err, http.ErrServerClosed) { + klog.Errorf("STS API Server stopped: %v, going to restart", oerr.Err) + go c.startSTSAPIServer(ctx, notificationChannel) + } case err := <-upgradeServerChannel: if err != http.ErrServerClosed { klog.Errorf("Upgrade Server stopped: %v, going to restart", err) @@ -559,15 +563,18 @@ func (c *Controller) Start(threadiness int, stopCh <-chan struct{}) error { // we use the Lease lock type since edits to Leases are less common // and fewer objects in the cluster watch "all Leases". - lock := &resourcelock.LeaseLock{ - LeaseMeta: metav1.ObjectMeta{ - Name: leaseLockName, - Namespace: leaseLockNamespace, - }, - Client: c.kubeClientSet.CoordinationV1(), - LockConfig: resourcelock.ResourceLockConfig{ + lock, err := resourcelock.New( + resourcelock.LeasesResourceLock, + leaseLockNamespace, + leaseLockName, + c.kubeClientSet.CoreV1(), + c.kubeClientSet.CoordinationV1(), + resourcelock.ResourceLockConfig{ Identity: c.podName, }, + ) + if err != nil { + panic(err.Error()) } if IsSTSEnabled() { @@ -578,92 +585,70 @@ func (c *Controller) Start(threadiness int, stopCh <-chan struct{}) error { klog.Info("STS Api server is not enabled, not starting") } - go func() { - // start the leader election code loop - leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ - Lock: lock, - // IMPORTANT: you MUST ensure that any code you have that - // is protected by the lease must terminate **before** - // you call cancel. Otherwise, you could have a background - // loop still running and another process could - // get elected before your background loop finished, violating - // the stated goal of the lease. - ReleaseOnCancel: true, - LeaseDuration: 60 * time.Second, - RenewDeadline: 15 * time.Second, - RetryPeriod: 5 * time.Second, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: func(ctx context.Context) { - // start the controller + API code - leaderRun(ctx, c, threadiness, stopCh) - }, - OnStoppedLeading: func() { - // we can do cleanup here - klog.Infof("leader lost: %s", c.podName) - }, - OnNewLeader: func(identity string) { - // we're notified when new leader elected - if identity == c.podName { - klog.Infof("%s: I am the leader, applying leader labels on myself", c.podName) - // Patch this pod so the main service uses it - p := []patchAnnotation{{ - Op: "add", - Path: "/metadata/labels/operator", - Value: "leader", - }} - - payloadBytes, err := json.Marshal(p) - if err != nil { - klog.Errorf("failed to marshal patch: %#v", err) - } else { - _, err = c.kubeClientSet.CoreV1().Pods(leaseLockNamespace).Patch(ctx, c.podName, types.JSONPatchType, payloadBytes, metav1.PatchOptions{}) - if err != nil { - klog.Errorf("failed to patch operator leader pod: %+v", err) - } - } + // start the leader election code loop + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: lock, + // IMPORTANT: you MUST ensure that any code you have that + // is protected by the lease must terminate **before** + // you call cancel. Otherwise, you could have a background + // loop still running and another process could + // get elected before your background loop finished, violating + // the stated goal of the lease. + ReleaseOnCancel: true, + LeaseDuration: 60 * time.Second, + RenewDeadline: 15 * time.Second, + RetryPeriod: 5 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + // start the controller + API code + leaderRun(ctx, c, threadiness, stopCh, notificationChannel) + }, + OnStoppedLeading: func() { + // we can do cleanup here + klog.Infof("leader lost: %s", c.podName) + }, + OnNewLeader: func(identity string) { + // we're notified when new leader elected + if identity == c.podName { + klog.Infof("%s: I am the leader, applying leader labels on myself", c.podName) + // Patch this pod so the main service uses it + p := []patchAnnotation{{ + Op: "add", + Path: "/metadata/labels/operator", + Value: "leader", + }} + + payloadBytes, err := json.Marshal(p) + if err != nil { + klog.Errorf("failed to marshal patch: %#v", err) } else { - klog.Infof("%s: is the leader, removing any leader labels that I '%s' might have", identity, c.podName) - // Patch this pod so the main service uses it - p := []patchAnnotation{{ - Op: "remove", - Path: "/metadata/labels/operator", - }} - - payloadBytes, err := json.Marshal(p) + _, err = c.kubeClientSet.CoreV1().Pods(leaseLockNamespace).Patch(ctx, c.podName, types.JSONPatchType, payloadBytes, metav1.PatchOptions{}) if err != nil { - klog.Errorf("failed to marshal patch: %#v", err) - } else { - c.kubeClientSet.CoreV1().Pods(leaseLockNamespace).Patch(ctx, c.podName, types.JSONPatchType, payloadBytes, metav1.PatchOptions{}) + klog.Errorf("failed to patch operator leader pod: %+v", err) } } - }, - }, - }) - - notificationChannel <- &EventNotification{ - Type: LeaderElection, - Err: nil, - } - }() - for { - select { - case oerr := <-notificationChannel: - switch oerr.Type { - case STSServerNotification: - if !errors.Is(oerr.Err, http.ErrServerClosed) { - klog.Errorf("STS API Server stopped: %v, going to restart", oerr.Err) - go c.startSTSAPIServer(ctx, notificationChannel) + } else { + klog.Infof("%s: is the leader, removing any leader labels that I '%s' might have", identity, c.podName) + // Patch this pod so the main service uses it + p := []patchAnnotation{{ + Op: "remove", + Path: "/metadata/labels/operator", + }} + + payloadBytes, err := json.Marshal(p) + if err != nil { + klog.Errorf("failed to marshal patch: %#v", err) + } else { + c.kubeClientSet.CoreV1().Pods(leaseLockNamespace).Patch(ctx, c.podName, types.JSONPatchType, payloadBytes, metav1.PatchOptions{}) + } } - case LeaderElection: - return nil - } - case <-stopCh: - return nil - } - } + }, + }, + }) + return nil } -// Stop is called to shutdown the controller +// Stop is called to shut down the controller func (c *Controller) Stop() { klog.Info("Stopping the minio controller webservers") // Wait upto 5 secs and terminate all connections.