Skip to content

Commit

Permalink
Improve the zone addition process (minio#150)
Browse files Browse the repository at this point in the history
Currently the operator deletes existing statefulset and creates a
fresh new statefulset, during zone addition. This is not ideal
because this requires a downtime of MinIO cluster.

This PR attempts to improve this process by using the update
statefulset API to add new pods and then deletes only the pods
that were part of existing statefulset. As the pods are deleted
Kubernetes spawns new pods. This means at any point, more than
quorum number of MinIO pods are available. This allows a better,
seamless zone addition flow.

Also improve the minioinstance status update flow and cleanup
KES flow in controller.
  • Loading branch information
nitisht authored Jun 22, 2020
1 parent a764f1d commit 4810731
Showing 1 changed file with 81 additions and 73 deletions.
154 changes: 81 additions & 73 deletions pkg/controller/cluster/main-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package cluster
import (
"context"
"fmt"
"strconv"
"time"

"k8s.io/klog"
Expand Down Expand Up @@ -71,6 +72,22 @@ const (
// MessageResourceSynced is the message used for an Event fired when a MinIOInstance
// is synced successfully
MessageResourceSynced = "MinIOInstance synced successfully"
// Standard Status messages for MinIOInstance
ready = "Ready"
addingZone = "Adding New MinIO Zone"
provisioningCIService = "Provisioning MinIO Cluster IP Service"
provisioningHLService = "Provisioning MinIO Headless Service"
provisioningStatefulSet = "Provisioning MinIO Statefulset"
provisioningMCSDeployment = "Provisioning MCS Deployment"
provisioningKESStatefulSet = "Provisioning KES StatefulSet"
waitingForReadyState = "Waiting for Pods to be ready"
waitingMinIOCert = "Waiting for MinIO TLS Certificate"
waitingMinIOClientCert = "Waiting for MinIO TLS Client Certificate"
waitingKESCert = "Waiting for KES TLS Certificate"
updatingMinIOVersion = "Updating MinIO Version"
updatingMCSVersion = "Updating MCS Version"
updatingMinIOStatefulSet = "Adding New Pods to MinIO Statefulset"
notOwned = "Statefulset not controlled by operator"
)

// Controller struct watches the Kubernetes API for changes to MinIOInstance resources
Expand Down Expand Up @@ -372,8 +389,7 @@ func (c *Controller) syncHandler(key string) error {
svc, err := c.serviceLister.Services(mi.Namespace).Get(mi.MinIOCIServiceName())
if err != nil {
if apierrors.IsNotFound(err) {
currentState := "Provisioning Service"
mi, err = c.updateMinIOInstanceStatus(ctx, mi, currentState, 0)
mi, err = c.updateMinIOInstanceStatus(ctx, mi, provisioningCIService, 0)
if err != nil {
return err
}
Expand All @@ -392,8 +408,7 @@ func (c *Controller) syncHandler(key string) error {
hlSvc, err := c.serviceLister.Services(mi.Namespace).Get(mi.MinIOHLServiceName())
if err != nil {
if apierrors.IsNotFound(err) {
currentState := "Provisining Headless Service"
mi, err = c.updateMinIOInstanceStatus(ctx, mi, currentState, 0)
mi, err = c.updateMinIOInstanceStatus(ctx, mi, provisioningHLService, 0)
if err != nil {
return err
}
Expand Down Expand Up @@ -422,8 +437,7 @@ func (c *Controller) syncHandler(key string) error {
return err
}
}
currentState := "Provisioning Statefulset"
mi, err = c.updateMinIOInstanceStatus(ctx, mi, currentState, 0)
mi, err = c.updateMinIOInstanceStatus(ctx, mi, provisioningStatefulSet, 0)
if err != nil {
return err
}
Expand All @@ -436,58 +450,54 @@ func (c *Controller) syncHandler(key string) error {
return err
}
} else {
// If this number of the replicas on the MinIOInstance resource is specified, and the
// If the number of the replicas on the MinIOInstance resource is specified, and the
// number does not equal the current desired replicas on the StatefulSet, we
// should update the StatefulSet resource.
if mi.MinIOReplicas() != *ss.Spec.Replicas {
klog.V(2).Infof("MinIOInstance %s replicas: %d, StatefulSet replicas: %d", name, mi.MinIOReplicas(), *ss.Spec.Replicas)
currentState := "Adding new Zone"
mi, err = c.updateMinIOInstanceStatus(ctx, mi, currentState, 0)
// If the status already indicates "addingZone", no need for another
// thread to enter this block - we don't want to get in a race for deletion and creation of CSRs
if mi.MinIOReplicas() != *ss.Spec.Replicas && mi.Status.CurrentState != addingZone {
// save current replicas before creating new statefulset
// this is used later to delete only the older pods in statefulset
currentReplicas := mi.MinIOReplicas()
mi, err = c.updateMinIOInstanceStatus(ctx, mi, addingZone, 0)
if err != nil {
return err
}
// If this was a TLS enabled Setup, we create new CSR because change in number of replicas means the new endpoints
// If this is a TLS enabled Setup, we create new CSR because change in number of replicas means the new endpoints
// need to be added in the CSR
if mi.RequiresAutoCertSetup() {
klog.V(2).Infof("Removing the existing MinIO CSRs and related secrets")
if err := c.removeMinIOCSRAndSecrets(ctx, mi); err != nil {
return err
}

klog.V(2).Infof("Creating required MinIO CSRs and related secrets")
if err := c.checkAndCreateMinIOCSR(ctx, nsName, mi); err != nil {
return err
}
} else {
currentState := "Waiting for Pods to be ready"
mi, err = c.updateMinIOInstanceStatus(ctx, mi, currentState, ss.Status.Replicas)
if err != nil {
return err
}
}

ss = statefulsets.NewForMinIO(mi, hlSvc.Name, c.hostsTemplate)
klog.V(2).Infof("Removing the existing StatefulSet %s with replicas: %d", name, *ss.Spec.Replicas)
if err := c.kubeClientSet.AppsV1().StatefulSets(mi.Namespace).Delete(ctx, ss.Name, metav1.DeleteOptions{}); err != nil {
return err
}

currentState = "Provisioning Statefulset"
mi, err = c.updateMinIOInstanceStatus(ctx, mi, currentState, 0)
klog.V(2).Infof("Creating a new StatefulSet %s with replicas: %d", name, mi.MinIOReplicas())
mi, err = c.updateMinIOInstanceStatus(ctx, mi, updatingMinIOStatefulSet, 0)
if err != nil {
return err
}
klog.V(2).Infof("Creating a new StatefulSet %s with replicas: %d", name, mi.MinIOReplicas())
if _, err := c.kubeClientSet.AppsV1().StatefulSets(mi.Namespace).Create(ctx, ss, cOpts); err != nil {
// Create a new statefulset object and send an update request
ss = statefulsets.NewForMinIO(mi, hlSvc.Name, c.hostsTemplate)
if _, err := c.kubeClientSet.AppsV1().StatefulSets(mi.Namespace).Update(ctx, ss, uOpts); err != nil {
return err
}
// remove all the existing Pods so StatefulSet creates new pods with proper secrets/cli args etc
for i := 0; i < int(currentReplicas); i++ {
if err := c.kubeClientSet.CoreV1().Pods(mi.Namespace).Delete(ctx, "minio-"+strconv.Itoa(i), metav1.DeleteOptions{}); err != nil {
return err
}
}
}

// If this container version on the MinIOInstance resource is specified, and the
// version does not equal the current desired version in the StatefulSet, we
// should update the StatefulSet resource.
if mi.Spec.Image != ss.Spec.Template.Spec.Containers[0].Image {
mi, err = c.updateMinIOInstanceStatus(ctx, mi, "Updating MinIO Version", ss.Status.Replicas)
mi, err = c.updateMinIOInstanceStatus(ctx, mi, updatingMinIOVersion, ss.Status.Replicas)
if err != nil {
return err
}
Expand Down Expand Up @@ -522,8 +532,7 @@ func (c *Controller) syncHandler(key string) error {
}
// Check if any one replica is READY
if ss.Status.ReadyReplicas > 0 {
currentState := "Provisioning MCS"
mi, err = c.updateMinIOInstanceStatus(ctx, mi, currentState, ss.Status.Replicas)
mi, err = c.updateMinIOInstanceStatus(ctx, mi, provisioningMCSDeployment, ss.Status.Replicas)
if err != nil {
return err
}
Expand All @@ -546,8 +555,7 @@ func (c *Controller) syncHandler(key string) error {
return err
}
} else {
currentState := "Waiting for Pods to be ready"
mi, err = c.updateMinIOInstanceStatus(ctx, mi, currentState, ss.Status.Replicas)
mi, err = c.updateMinIOInstanceStatus(ctx, mi, waitingForReadyState, ss.Status.Replicas)
if err != nil {
return err
}
Expand All @@ -559,52 +567,55 @@ func (c *Controller) syncHandler(key string) error {
}

if mi.HasKESEnabled() && (mi.RequiresAutoCertSetup() || mi.RequiresExternalCertSetup()) {
svc, err := c.serviceLister.Services(mi.Namespace).Get(mi.KESHLServiceName())
// If the headless service doesn't exist, we'll create it
if apierrors.IsNotFound(err) {
klog.V(2).Infof("Creating a new Headless Service for cluster %q", nsName)
svc = services.NewHeadlessForKES(mi)
_, err = c.kubeClientSet.CoreV1().Services(svc.Namespace).Create(ctx, svc, cOpts)
} else {
return err
}
// Get the StatefulSet with the name specified in spec
_, err = c.statefulSetLister.StatefulSets(mi.Namespace).Get(mi.KESStatefulSetName())
if apierrors.IsNotFound(err) {
currentState := "Provisioning KES StatefulSet"
mi, err = c.updateMinIOInstanceStatus(ctx, mi, currentState, 0)
if err != nil {
svc, err := c.serviceLister.Services(mi.Namespace).Get(mi.KESHLServiceName())
if err != nil {
if apierrors.IsNotFound(err) {
klog.V(2).Infof("Creating a new Headless Service for cluster %q", nsName)
svc = services.NewHeadlessForKES(mi)
_, err = c.kubeClientSet.CoreV1().Services(svc.Namespace).Create(ctx, svc, cOpts)
} else {
return err
}
ks := statefulsets.NewForKES(mi, svc.Name)
klog.V(2).Infof("Creating a new StatefulSet for cluster %q", nsName)
if _, err = c.kubeClientSet.AppsV1().StatefulSets(mi.Namespace).Create(ctx, ks, cOpts); err != nil {
klog.V(2).Infof(err.Error())
}

// Get the StatefulSet with the name specified in spec
if _, err = c.statefulSetLister.StatefulSets(mi.Namespace).Get(mi.KESStatefulSetName()); err != nil {
if apierrors.IsNotFound(err) {
mi, err = c.updateMinIOInstanceStatus(ctx, mi, provisioningKESStatefulSet, 0)
if err != nil {
return err
}
ks := statefulsets.NewForKES(mi, svc.Name)
klog.V(2).Infof("Creating a new StatefulSet for cluster %q", nsName)
if _, err = c.kubeClientSet.AppsV1().StatefulSets(mi.Namespace).Create(ctx, ks, cOpts); err != nil {
klog.V(2).Infof(err.Error())
return err
}
} else {
return err
}
} else {
return err
}

// After KES and MinIO are deployed successfully, create the MinIO Key on KES KMS Backend
_, err = c.jobLister.Jobs(mi.Namespace).Get(mi.Name)
if apierrors.IsNotFound(err) {
j := jobs.NewForKES(mi)
klog.V(2).Infof("Creating a new Job for cluster %q", nsName)
if _, err = c.kubeClientSet.BatchV1().Jobs(mi.Namespace).Create(ctx, j, cOpts); err != nil {
klog.V(2).Infof(err.Error())
if _, err = c.jobLister.Jobs(mi.Namespace).Get(mi.KESJobName()); err != nil {
if apierrors.IsNotFound(err) {
j := jobs.NewForKES(mi)
klog.V(2).Infof("Creating a new Job for cluster %q", nsName)
if _, err = c.kubeClientSet.BatchV1().Jobs(mi.Namespace).Create(ctx, j, cOpts); err != nil {
klog.V(2).Infof(err.Error())
return err
}
} else {
return err
}
} else {
return err
}
}

// If the StatefulSet is not controlled by this MinIOInstance resource, we should log
// a warning to the event recorder and ret
if !metav1.IsControlledBy(ss, mi) {
currentState := "Statefulset not controlled by operator"
mi, err = c.updateMinIOInstanceStatus(ctx, mi, currentState, ss.Status.Replicas)
mi, err = c.updateMinIOInstanceStatus(ctx, mi, notOwned, ss.Status.Replicas)
if err != nil {
return err
}
Expand All @@ -614,7 +625,7 @@ func (c *Controller) syncHandler(key string) error {
}

if mi.HasMCSEnabled() && d != nil && mi.Spec.MCS.Image != d.Spec.Template.Spec.Containers[0].Image {
mi, err = c.updateMinIOInstanceStatus(ctx, mi, "Updating MCS Version", ss.Status.Replicas)
mi, err = c.updateMinIOInstanceStatus(ctx, mi, updatingMCSVersion, ss.Status.Replicas)
if err != nil {
return err
}
Expand All @@ -631,7 +642,7 @@ func (c *Controller) syncHandler(key string) error {

// Finally, we update the status block of the MinIOInstance resource to reflect the
// current state of the world
_, err = c.updateMinIOInstanceStatus(ctx, mi, "Ready", ss.Status.Replicas)
_, err = c.updateMinIOInstanceStatus(ctx, mi, ready, ss.Status.Replicas)
if err != nil {
return err
}
Expand Down Expand Up @@ -668,8 +679,7 @@ func (c *Controller) removeMinIOCSRAndSecrets(ctx context.Context, mi *miniov1.M
func (c *Controller) checkAndCreateMinIOCSR(ctx context.Context, nsName types.NamespacedName, mi *miniov1.MinIOInstance) error {
if _, err := c.certClient.CertificateSigningRequests().Get(ctx, mi.MinIOCSRName(), metav1.GetOptions{}); err != nil {
if apierrors.IsNotFound(err) {
currentState := "Requesting Certificate"
mi, err = c.updateMinIOInstanceStatus(ctx, mi, currentState, 0)
mi, err = c.updateMinIOInstanceStatus(ctx, mi, waitingMinIOCert, 0)
if err != nil {
return err
}
Expand All @@ -684,8 +694,7 @@ func (c *Controller) checkAndCreateMinIOCSR(ctx context.Context, nsName types.Na
if mi.HasKESEnabled() {
if _, err := c.certClient.CertificateSigningRequests().Get(ctx, mi.MinIOClientCSRName(), metav1.GetOptions{}); err != nil {
if apierrors.IsNotFound(err) {
currentState := "Creating Client TLS CSR"
mi, err = c.updateMinIOInstanceStatus(ctx, mi, currentState, 0)
mi, err = c.updateMinIOInstanceStatus(ctx, mi, waitingMinIOClientCert, 0)
if err != nil {
return err
}
Expand All @@ -704,8 +713,7 @@ func (c *Controller) checkAndCreateMinIOCSR(ctx context.Context, nsName types.Na
func (c *Controller) checkAndCreateKESCSR(ctx context.Context, nsName types.NamespacedName, mi *miniov1.MinIOInstance) error {
if _, err := c.certClient.CertificateSigningRequests().Get(ctx, mi.KESCSRName(), metav1.GetOptions{}); err != nil {
if apierrors.IsNotFound(err) {
currentState := "Creating TLS CSR for KES"
mi, err = c.updateMinIOInstanceStatus(ctx, mi, currentState, 0)
mi, err = c.updateMinIOInstanceStatus(ctx, mi, waitingKESCert, 0)
if err != nil {
return err
}
Expand Down

0 comments on commit 4810731

Please sign in to comment.