diff --git a/helm/minio-operator/crds/minio.min.io_tenants.yaml b/helm/minio-operator/crds/minio.min.io_tenants.yaml index 9ad02f456b3..83cc65fd136 100644 --- a/helm/minio-operator/crds/minio.min.io_tenants.yaml +++ b/helm/minio-operator/crds/minio.min.io_tenants.yaml @@ -3798,6 +3798,9 @@ spec: type: integer syncVersion: type: string + waitingOnReady: + format: date-time + type: string writeQuorum: format: int32 type: integer @@ -7625,6 +7628,9 @@ spec: type: integer syncVersion: type: string + waitingOnReady: + format: date-time + type: string writeQuorum: format: int32 type: integer diff --git a/logsearchapi/main.go b/logsearchapi/main.go index ce40f471f4c..f25af8ecb88 100644 --- a/logsearchapi/main.go +++ b/logsearchapi/main.go @@ -37,5 +37,6 @@ func main() { Addr: ":8080", Handler: ls, } + log.Print("Log Search API Listening on Port :8080") log.Fatal(s.ListenAndServe()) } diff --git a/main.go b/main.go index 479318fc8df..fc6979f7dd9 100644 --- a/main.go +++ b/main.go @@ -44,7 +44,6 @@ import ( apiextension "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" - certapi "k8s.io/client-go/kubernetes/typed/certificates/v1" "k8s.io/client-go/rest" ) @@ -103,11 +102,6 @@ func main() { klog.Fatalf("Error building MinIO clientset: %s", err.Error()) } - certClient, err := certapi.NewForConfig(cfg) - if err != nil { - klog.Errorf("Error building certificate clientset: %v", err.Error()) - } - extClient, err := apiextension.NewForConfig(cfg) if err != nil { klog.Errorf("Error building certificate clientset: %v", err.Error()) @@ -162,7 +156,7 @@ func main() { promInformerFactory = prominformers.NewSharedInformerFactory(promClient, time.Second*30) } - mainController := cluster.NewController(kubeClient, controllerClient, *certClient, promClient, + mainController := cluster.NewController(kubeClient, controllerClient, promClient, kubeInformerFactory.Apps().V1().StatefulSets(), kubeInformerFactory.Apps().V1().Deployments(), kubeInformerFactory.Batch().V1().Jobs(), diff --git a/pkg/apis/minio.min.io/v2/types.go b/pkg/apis/minio.min.io/v2/types.go index 5262b8fd26a..e802a9173ca 100644 --- a/pkg/apis/minio.min.io/v2/types.go +++ b/pkg/apis/minio.min.io/v2/types.go @@ -417,6 +417,10 @@ type TenantStatus struct { // // Health State of the tenant HealthStatus HealthStatus `json:"healthStatus,omitempty"` + // *Optional* + + // + // If set, we will wait until cleared for up a given time + WaitingOnReady *metav1.Time `json:"waitingOnReady,omitempty"` } // CertificateConfig (`certConfig`) defines controlling attributes associated to any TLS certificate automatically generated by the Operator as part of tenant creation. These fields have no effect if `spec.autoCert: false`. diff --git a/pkg/apis/minio.min.io/v2/zz_generated.deepcopy.go b/pkg/apis/minio.min.io/v2/zz_generated.deepcopy.go index 80d9f199c81..37d5d5f3110 100644 --- a/pkg/apis/minio.min.io/v2/zz_generated.deepcopy.go +++ b/pkg/apis/minio.min.io/v2/zz_generated.deepcopy.go @@ -803,6 +803,10 @@ func (in *TenantStatus) DeepCopyInto(out *TenantStatus) { *out = make([]PoolStatus, len(*in)) copy(*out, *in) } + if in.WaitingOnReady != nil { + in, out := &in.WaitingOnReady, &out.WaitingOnReady + *out = (*in).DeepCopy() + } return } diff --git a/pkg/controller/cluster/csr.go b/pkg/controller/cluster/csr.go index 2b7e32c36fa..cb2d3439011 100644 --- a/pkg/controller/cluster/csr.go +++ b/pkg/controller/cluster/csr.go @@ -109,7 +109,7 @@ func (c *Controller) createCertificateSigningRequest(ctx context.Context, labels }, } - ks, err := c.certClient.CertificateSigningRequests().Create(ctx, kubeCSR, metav1.CreateOptions{}) + ks, err := c.kubeClientSet.CertificatesV1().CertificateSigningRequests().Create(ctx, kubeCSR, metav1.CreateOptions{}) if err != nil && !k8serrors.IsAlreadyExists(err) { return err } @@ -132,7 +132,7 @@ func (c *Controller) createCertificateSigningRequest(ctx context.Context, labels }, } - _, err = c.certClient.CertificateSigningRequests().UpdateApproval(ctx, name, ks, metav1.UpdateOptions{}) + _, err = c.kubeClientSet.CertificatesV1().CertificateSigningRequests().UpdateApproval(ctx, name, ks, metav1.UpdateOptions{}) if err != nil { return err } @@ -161,7 +161,7 @@ func (c *Controller) fetchCertificate(ctx context.Context, csrName string) ([]by return nil, fmt.Errorf("%s", s.String()) case <-tick.C: - r, err := c.certClient.CertificateSigningRequests().Get(ctx, csrName, v1.GetOptions{}) + r, err := c.kubeClientSet.CertificatesV1().CertificateSigningRequests().Get(ctx, csrName, v1.GetOptions{}) if err != nil { klog.Errorf("Unexpected error during certificate fetching of csr/%s: %s", csrName, err) return nil, err diff --git a/pkg/controller/cluster/http_handlers.go b/pkg/controller/cluster/http_handlers.go index 17e7aeb5fe5..267a1db3472 100644 --- a/pkg/controller/cluster/http_handlers.go +++ b/pkg/controller/cluster/http_handlers.go @@ -17,6 +17,7 @@ package cluster import ( + "context" "encoding/json" "fmt" "log" @@ -86,7 +87,7 @@ func (c *Controller) BucketSrvHandler(w http.ResponseWriter, r *http.Request) { } // Find the tenant - tenant, err := c.tenantsLister.Tenants(namespace).Get(name) + tenant, err := c.minioClientSet.MinioV2().Tenants(namespace).Get(r.Context(), name, metav1.GetOptions{}) if err != nil { klog.Errorf("Unable to lookup tenant:%s/%s for the bucket:%s request. err:%s", namespace, name, bucket, err) http.Error(w, err.Error(), http.StatusBadRequest) @@ -149,7 +150,7 @@ func (c *Controller) GetenvHandler(w http.ResponseWriter, r *http.Request) { } // Get the Tenant resource with this namespace/name - tenant, err := c.tenantsLister.Tenants(namespace).Get(name) + tenant, err := c.minioClientSet.MinioV2().Tenants(namespace).Get(context.Background(), name, metav1.GetOptions{}) if err != nil { if k8serrors.IsNotFound(err) { // The Tenant resource may no longer exist, in which case we stop processing. @@ -167,7 +168,7 @@ func (c *Controller) GetenvHandler(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusForbidden) return } - // correct all statefulset names by loading them, this will fix their name on the tenant pool namess + // correct all statefulset names by loading them, this will fix their name on the tenant pool names _, err = c.getAllSSForTenant(tenant) if err != nil { http.Error(w, err.Error(), http.StatusForbidden) diff --git a/pkg/controller/cluster/kes.go b/pkg/controller/cluster/kes.go index 838c69b11e5..98dd96f6907 100644 --- a/pkg/controller/cluster/kes.go +++ b/pkg/controller/cluster/kes.go @@ -154,7 +154,7 @@ func (c *Controller) checkKESCertificatesStatus(ctx context.Context, tenant *min return err } // TLS secret not found, delete CSR if exists and start certificate generation process again - if err = c.certClient.CertificateSigningRequests().Delete(ctx, tenant.MinIOClientCSRName(), metav1.DeleteOptions{}); err != nil { + if err = c.kubeClientSet.CertificatesV1().CertificateSigningRequests().Delete(ctx, tenant.MinIOClientCSRName(), metav1.DeleteOptions{}); err != nil { return err } } @@ -169,7 +169,7 @@ func (c *Controller) checkKESCertificatesStatus(ctx context.Context, tenant *min return err } // TLS secret not found, delete CSR if exists and start certificate generation process again - if err = c.certClient.CertificateSigningRequests().Delete(ctx, tenant.KESCSRName(), metav1.DeleteOptions{}); err != nil { + if err = c.kubeClientSet.CertificatesV1().CertificateSigningRequests().Delete(ctx, tenant.KESCSRName(), metav1.DeleteOptions{}); err != nil { return err } } @@ -265,7 +265,7 @@ func (c *Controller) checkKESStatus(ctx context.Context, tenant *miniov2.Tenant, } func (c *Controller) checkAndCreateMinIOClientCSR(ctx context.Context, nsName types.NamespacedName, tenant *miniov2.Tenant) error { - if _, err := c.certClient.CertificateSigningRequests().Get(ctx, tenant.MinIOClientCSRName(), metav1.GetOptions{}); err != nil { + if _, err := c.kubeClientSet.CertificatesV1().CertificateSigningRequests().Get(ctx, tenant.MinIOClientCSRName(), metav1.GetOptions{}); err != nil { if k8serrors.IsNotFound(err) { if tenant, err = c.updateTenantStatus(ctx, tenant, StatusWaitingMinIOClientCert, 0); err != nil { return err @@ -284,7 +284,7 @@ func (c *Controller) checkAndCreateMinIOClientCSR(ctx context.Context, nsName ty } func (c *Controller) checkAndCreateKESCSR(ctx context.Context, nsName types.NamespacedName, tenant *miniov2.Tenant) error { - if _, err := c.certClient.CertificateSigningRequests().Get(ctx, tenant.KESCSRName(), metav1.GetOptions{}); err != nil { + if _, err := c.kubeClientSet.CertificatesV1().CertificateSigningRequests().Get(ctx, tenant.KESCSRName(), metav1.GetOptions{}); err != nil { if k8serrors.IsNotFound(err) { if tenant, err = c.updateTenantStatus(ctx, tenant, StatusWaitingKESCert, 0); err != nil { return err diff --git a/pkg/controller/cluster/main-controller.go b/pkg/controller/cluster/main-controller.go index 701137938cf..67ceb77a7b0 100644 --- a/pkg/controller/cluster/main-controller.go +++ b/pkg/controller/cluster/main-controller.go @@ -21,7 +21,6 @@ import ( "context" "errors" "fmt" - "math/rand" "net/http" "strings" "time" @@ -44,7 +43,6 @@ import ( v1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" @@ -54,7 +52,6 @@ import ( coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" - certapi "k8s.io/client-go/kubernetes/typed/certificates/v1" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" appslisters "k8s.io/client-go/listers/apps/v1" batchlisters "k8s.io/client-go/listers/batch/v1" @@ -66,13 +63,10 @@ import ( "k8s.io/client-go/tools/record" queue "k8s.io/client-go/util/workqueue" - "github.com/golang-jwt/jwt" - jwtreq "github.com/golang-jwt/jwt/request" miniov2 "github.com/minio/operator/pkg/apis/minio.min.io/v2" clientset "github.com/minio/operator/pkg/client/clientset/versioned" minioscheme "github.com/minio/operator/pkg/client/clientset/versioned/scheme" informers "github.com/minio/operator/pkg/client/informers/externalversions/minio.min.io/v2" - listers "github.com/minio/operator/pkg/client/listers/minio.min.io/v2" "github.com/minio/operator/pkg/resources/configmaps" "github.com/minio/operator/pkg/resources/deployments" "github.com/minio/operator/pkg/resources/secrets" @@ -117,11 +111,15 @@ const ( StatusNotOwned = "Statefulset not controlled by operator" StatusFailedAlreadyExists = "Another MinIO Tenant already exists in the namespace" StatusInconsistentMinIOVersions = "Different versions across MinIO Pools" + StatusRestartingMinIO = "Different versions across MinIO Pools" ) // ErrMinIONotReady is the error returned when MinIO is not Ready var ErrMinIONotReady = fmt.Errorf("MinIO is not ready") +// ErrMinIORestarting is the error returned when MinIO is restarting +var ErrMinIORestarting = fmt.Errorf("MinIO is restarting") + // ErrLogSearchNotReady is the error returned when Log Search is not Ready var ErrLogSearchNotReady = fmt.Errorf("Log Search is not ready") @@ -131,8 +129,6 @@ type Controller struct { kubeClientSet kubernetes.Interface // minioClientSet is a clientset for our own API group minioClientSet clientset.Interface - // certClient is a clientset for our certficate management - certClient certapi.CertificatesV1Client // promClient is a clientset for Prometheus service monitor promClient promclientset.Interface // statefulSetLister is able to list/get StatefulSets from a shared @@ -156,9 +152,6 @@ type Controller struct { // has synced at least once. jobListerSynced cache.InformerSynced - // tenantsLister lists Tenant from a shared informer's - // store. - tenantsLister listers.TenantLister // tenantsSynced returns true if the StatefulSet shared informer // has synced at least once. tenantsSynced cache.InformerSynced @@ -201,7 +194,6 @@ type Controller struct { func NewController( kubeClientSet kubernetes.Interface, minioClientSet clientset.Interface, - certClient certapi.CertificatesV1Client, promClient promclientset.Interface, statefulSetInformer appsinformers.StatefulSetInformer, deploymentInformer appsinformers.DeploymentInformer, @@ -224,7 +216,6 @@ func NewController( controller := &Controller{ kubeClientSet: kubeClientSet, minioClientSet: minioClientSet, - certClient: certClient, promClient: promClient, statefulSetLister: statefulSetInformer.Lister(), statefulSetListerSynced: statefulSetInformer.Informer().HasSynced, @@ -232,7 +223,6 @@ func NewController( deploymentListerSynced: deploymentInformer.Informer().HasSynced, jobLister: jobInformer.Lister(), jobListerSynced: jobInformer.Informer().HasSynced, - tenantsLister: tenantInformer.Lister(), tenantsSynced: tenantInformer.Informer().HasSynced, serviceLister: serviceInformer.Lister(), serviceListerSynced: serviceInformer.Informer().HasSynced, @@ -300,91 +290,6 @@ func NewController( return controller } -func (c *Controller) validateRequest(r *http.Request, secret *v1.Secret) error { - tokenStr, err := jwtreq.AuthorizationHeaderExtractor.ExtractToken(r) - if err != nil { - return err - } - - stdClaims := &jwt.StandardClaims{} - token, err := jwt.ParseWithClaims(tokenStr, stdClaims, func(token *jwt.Token) (interface{}, error) { - return secret.Data[miniov2.WebhookOperatorPassword], nil - }) - if err != nil { - return err - } - - if !token.Valid { - return fmt.Errorf(http.StatusText(http.StatusForbidden)) - } - if stdClaims.Issuer != string(secret.Data[miniov2.WebhookOperatorUsername]) { - return fmt.Errorf(http.StatusText(http.StatusForbidden)) - } - - return nil -} - -func generateRandomKey(length int) string { - rand.Seed(time.Now().UnixNano()) - chars := []rune("ABCDEFGHIJKLMNOPQRSTUVWXYZÅÄÖ" + - "abcdefghijklmnopqrstuvwxyzåäö" + - "0123456789") - var b strings.Builder - for i := 0; i < length; i++ { - b.WriteRune(chars[rand.Intn(len(chars))]) - } - return b.String() -} - -func (c *Controller) applyOperatorWebhookSecret(ctx context.Context, tenant *miniov2.Tenant) (*v1.Secret, error) { - secret, err := c.kubeClientSet.CoreV1().Secrets(tenant.Namespace).Get(ctx, - miniov2.WebhookSecret, metav1.GetOptions{}) - if err != nil { - if k8serrors.IsNotFound(err) { - secret = getSecretForTenant(tenant, generateRandomKey(20), generateRandomKey(40)) - return c.kubeClientSet.CoreV1().Secrets(tenant.Namespace).Create(ctx, secret, metav1.CreateOptions{}) - } - return nil, err - } - // check the secret has the desired values - minioArgs := string(secret.Data[miniov2.WebhookMinIOArgs]) - if strings.Contains(minioArgs, "env://") && isOperatorTLS() { - // update the secret - minioArgs = strings.ReplaceAll(minioArgs, "env://", "env+tls://") - secret.Data[miniov2.WebhookMinIOArgs] = []byte(minioArgs) - secret, err = c.kubeClientSet.CoreV1().Secrets(tenant.Namespace).Update(ctx, secret, metav1.UpdateOptions{}) - if err != nil { - return nil, err - } - // update the revision of the tenant to force a rolling restart across all statefulsets - t2, err := c.increaseTenantRevision(ctx, tenant) - if err != nil { - return nil, err - } - *tenant = *t2 - } - - return secret, nil -} - -func secretData(tenant *miniov2.Tenant, accessKey, secretKey string) []byte { - scheme := "env" - if isOperatorTLS() { - scheme = "env+tls" - } - return []byte(fmt.Sprintf("%s://%s:%s@%s:%s%s/%s/%s", - scheme, - accessKey, - secretKey, - fmt.Sprintf("operator.%s.svc.%s", - miniov2.GetNSFromFile(), - miniov2.GetClusterDomain()), - miniov2.WebhookDefaultPort, - miniov2.WebhookAPIGetenv, - tenant.Namespace, - tenant.Name)) -} - func getSecretForTenant(tenant *miniov2.Tenant, accessKey, secretKey string) *v1.Secret { secret := &corev1.Secret{ Type: "Opaque", @@ -517,7 +422,7 @@ func (c *Controller) processNextWorkItem() bool { // Finally, if no error occurs we Forget this item so it does not // get queued again until another change happens. c.workqueue.Forget(obj) - klog.Infof("Successfully synced '%s'", key) + klog.V(4).Infof("Successfully synced '%s'", key) return nil } @@ -546,7 +451,6 @@ func (c *Controller) syncHandler(key string) error { ctx := context.Background() cOpts := metav1.CreateOptions{} uOpts := metav1.UpdateOptions{} - gOpts := metav1.GetOptions{} // Convert the namespace/name string into a distinct namespace and name if key == "" { @@ -557,7 +461,7 @@ func (c *Controller) syncHandler(key string) error { namespace, tenantName := key2NamespaceName(key) // Get the Tenant resource with this namespace/name - tenant, err := c.tenantsLister.Tenants(namespace).Get(tenantName) + tenant, err := c.minioClientSet.MinioV2().Tenants(namespace).Get(context.Background(), tenantName, metav1.GetOptions{}) if err != nil { // The Tenant resource may no longer exist, in which case we stop processing. if k8serrors.IsNotFound(err) { @@ -585,7 +489,7 @@ func (c *Controller) syncHandler(key string) error { // Check the Sync Version to see if the tenant needs upgrade if tenant.Status.SyncVersion == "" { if tenant, err = c.upgrade420(ctx, tenant); err != nil { - klog.V(2).Infof("Error upgrading tenant: %v", err.Error()) + klog.V(2).Infof("'%s' Error upgrading tenant: %v", key, err.Error()) return err } } @@ -599,7 +503,7 @@ func (c *Controller) syncHandler(key string) error { if tenant.Spec.RequestAutoCert == nil && tenant.APIVersion != "" { // If we get certificate signing requests for MinIO is safe to assume the Tenant v1 was deployed using AutoCert // otherwise AutoCert will be false - tenantCSR, err := c.certClient.CertificateSigningRequests().Get(ctx, tenant.MinIOCSRName(), metav1.GetOptions{}) + tenantCSR, err := c.kubeClientSet.CertificatesV1().CertificateSigningRequests().Get(ctx, tenant.MinIOCSRName(), metav1.GetOptions{}) if err != nil || tenantCSR == nil { autoCertEnabled = false } @@ -657,16 +561,16 @@ func (c *Controller) syncHandler(key string) error { } // List all MinIO Tenants in this namespace. - li, err := c.tenantsLister.Tenants(tenant.Namespace).List(labels.NewSelector()) + li, err := c.minioClientSet.MinioV2().Tenants(tenant.Namespace).List(context.Background(), metav1.ListOptions{}) if err != nil { return err } // Only 1 minio tenant per namespace allowed. - if len(li) > 1 { - for _, t := range li { + if len(li.Items) > 1 { + for _, t := range li.Items { if t.Status.CurrentState != StatusInitialized { - if _, err = c.updateTenantStatus(ctx, t, StatusFailedAlreadyExists, 0); err != nil { + if _, err = c.updateTenantStatus(ctx, &t, StatusFailedAlreadyExists, 0); err != nil { return err } // return nil so we don't re-queue this work item @@ -675,40 +579,10 @@ func (c *Controller) syncHandler(key string) error { } } - // Configuration for tenant can be passed using 3 different sources, tenant.spec.env, k8s credsSecret and config.env secret - // If the user provides duplicated configuration the override order will be: - // tenant.Spec.Env < credsSecret (k8s secret) < config.env file (k8s secret) - tenantConfiguration := map[string][]byte{} - - for _, config := range tenant.GetEnvVars() { - tenantConfiguration[config.Name] = []byte(config.Value) - } - - if tenant.HasCredsSecret() { - minioSecretName := tenant.Spec.CredsSecret.Name - minioSecret, err := c.kubeClientSet.CoreV1().Secrets(tenant.Namespace).Get(ctx, minioSecretName, gOpts) - if err != nil { - return err - } - configFromCredsSecret := minioSecret.Data - for key, val := range configFromCredsSecret { - tenantConfiguration[key] = val - } - } - - // Load tenant configuration from file - if tenant.HasConfigurationSecret() { - minioConfigurationSecretName := tenant.Spec.Configuration.Name - minioConfigurationSecret, err := c.kubeClientSet.CoreV1().Secrets(tenant.Namespace).Get(ctx, minioConfigurationSecretName, gOpts) - if err != nil { - return err - } - configFromFile := miniov2.ParseRawConfiguration(minioConfigurationSecret.Data["config.env"]) - for key, val := range configFromFile { - tenantConfiguration[key] = val - } + tenantConfiguration, err := c.getTenantCredentials(ctx, tenant) + if err != nil { + return err } - adminClnt, err := tenant.NewMinIOAdmin(tenantConfiguration) if err != nil { return err @@ -755,6 +629,14 @@ func (c *Controller) syncHandler(key string) error { } } + // Create logSecret before deploying any statefulset + if tenant.HasLogEnabled() { + _, err = c.checkAndCreateLogSecret(ctx, tenant) + if err != nil { + return err + } + } + // consolidate the status of all pools. this is meant to cover for legacy tenants // this status value is zero only for new tenants or legacy tenants if len(tenant.Status.Pools) == 0 { @@ -777,7 +659,18 @@ func (c *Controller) syncHandler(key string) error { } // Check if this is fresh setup not an expansion. - freshSetup := len(tenant.Spec.Pools) == len(tenant.Status.Pools) + //addingNewPool := len(tenant.Spec.Pools) == len(tenant.Status.Pools) + addingNewPool := false + // count the number of initialized pools, if at least 1 is not Initialized, we are still adding a new pool + for _, poolStatus := range tenant.Status.Pools { + if poolStatus.State != miniov2.PoolInitialized { + addingNewPool = true + break + } + } + if addingNewPool { + klog.Infof("%s Detected we are adding a new pool", key) + } // Check if we need to create any of the pools. It's important not to update the statefulsets // in this loop because we need all the pools "as they are" for the hot-update below @@ -801,13 +694,11 @@ func (c *Controller) syncHandler(key string) error { } ss, err := c.statefulSetLister.StatefulSets(tenant.Namespace).Get(ssName) if k8serrors.IsNotFound(err) { - - klog.Infof("Deploying pool %s", pool.Name) - - // Check healthcheck for previous pool only if its not a fresh setup, - // if they are online before adding this pool. - if !freshSetup && !tenant.MinIOHealthCheck() { - klog.Infof("Deploying pool failed %s", pool.Name) + klog.Infof("'%s/%s': Deploying pool %s", tenant.Namespace, tenant.Name, pool.Name) + // Check healthcheck for previous pool only if it's not a new setup, + // and check if they are online before adding this pool. + if addingNewPool && !tenant.MinIOHealthCheck() { + klog.Infof("'%s/%s': Deploying new pool failed %s: MinIO is not Ready", tenant.Namespace, tenant.Name, pool.Name) return ErrMinIONotReady } @@ -827,12 +718,6 @@ func (c *Controller) syncHandler(key string) error { if tenant, err = c.updatePoolStatus(ctx, tenant); err != nil { return err } - // Restart the services to fetch the new args, ignore any error. - // only perform `restart()` of server deployment when we are truly - // expanding an existing deployment. - if !freshSetup { - adminClnt.ServiceRestart(ctx) //nolint:errcheck - } } // keep track of all replicas @@ -840,11 +725,11 @@ func (c *Controller) syncHandler(key string) error { images = append(images, ss.Spec.Template.Spec.Containers[0].Image) } - // validate each pool if it's initialized + // validate each pool if it's initialized, and mark it if it is. for pi, pool := range tenant.Spec.Pools { // get a pod for the established statefulset if tenant.Status.Pools[pi].State == miniov2.PoolCreated { - // get a pod for the ss + // get the first pod for the ss and try to reach the pool via that single pod. pods, err := c.kubeClientSet.CoreV1().Pods(tenant.Namespace).List(ctx, metav1.ListOptions{ LabelSelector: fmt.Sprintf("%s=%s", miniov2.PoolLabel, pool.Name), }) @@ -852,7 +737,16 @@ func (c *Controller) syncHandler(key string) error { klog.Warning("Could not validate state of statefulset for pool", err) } if len(pods.Items) > 0 { - ssPod := pods.Items[0] + var ssPod *corev1.Pod + for _, p := range pods.Items { + if strings.HasSuffix(p.Name, "-0") { + ssPod = &p + break + } + } + if ssPod == nil { + break + } podAddress := fmt.Sprintf("%s:9000", tenant.MinIOHLPodHostname(ssPod.Name)) podAdminClnt, err := tenant.NewMinIOAdminForAddress(podAddress, tenantConfiguration) if err != nil { @@ -860,17 +754,93 @@ func (c *Controller) syncHandler(key string) error { } _, err = podAdminClnt.ServerInfo(ctx) - // any error means we are not ready, if the call succeeds, the ss is ready - if err == nil { + // any error means we are not ready, if the call succeeds or we get `server not initialized`, the ss is ready + if err == nil || madmin.ToErrorResponse(err).Code == "XMinioServerNotInitialized" { + + // Restart the services to fetch the new args, ignore any error. + // only perform `restart()` of server deployment when we are truly + // expanding an existing deployment. (a pool became initialized) + minioRestarted := false + if len(tenant.Spec.Pools) > 1 && addingNewPool { + // get a new admin client that points to a pod of an already initialized pool (ie: pool-0) + livePods, err := c.kubeClientSet.CoreV1().Pods(tenant.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", miniov2.PoolLabel, tenant.Spec.Pools[0].Name), + }) + if err != nil { + klog.Warning("Could not validate state of statefulset for pool", err) + } + var livePod *corev1.Pod + for _, p := range livePods.Items { + if p.Status.Phase == v1.PodRunning { + livePod = &p + break + } + } + if livePod == nil { + break + } + livePodAddress := fmt.Sprintf("%s:9000", tenant.MinIOHLPodHostname(livePod.Name)) + livePodAdminClnt, err := tenant.NewMinIOAdminForAddress(livePodAddress, tenantConfiguration) + if err != nil { + return err + } + // Now tell MinIO to restart + if err = livePodAdminClnt.ServiceRestart(ctx); err != nil { + klog.Infof("We failed to restart MinIO to adopt the new pool: %v", err) + } + minioRestarted = true + metaNowTime := metav1.Now() + tenant.Status.WaitingOnReady = &metaNowTime + tenant.Status.CurrentState = StatusRestartingMinIO + if tenant, err = c.updatePoolStatus(ctx, tenant); err != nil { + klog.Infof("'%s' Can't update tenant status: %v", key, err) + } + klog.Infof("'%s' Was restarted", key) + } + // Report the pool is properly created tenant.Status.Pools[pi].State = miniov2.PoolInitialized // push updates to status if tenant, err = c.updatePoolStatus(ctx, tenant); err != nil { return err } + + if minioRestarted { + return ErrMinIORestarting + } + } else { - fmt.Println(err) + klog.Infof("'%s/%s' Error waiting for pool to be ready: %v", tenant.Namespace, tenant.Name, + err) + } + } + } + } + // wait here until all pools are initialized, so we can continue with updating versions and the ss resources. + for _, poolStatus := range tenant.Status.Pools { + if poolStatus.State != miniov2.PoolInitialized { + // at least 1 is not initialized, stop here until they all are. + return errors.New("Waiting for all pools to initialize") + } + } + + // wait here if `waitOnReady` is set to a given time + if tenant.Status.WaitingOnReady != nil { + // if it's been longer than the default time 5 minutes, unset the field and continue + someTimeAgo := time.Now().Add(-5 * time.Minute) + if tenant.Status.WaitingOnReady.Time.Before(someTimeAgo) { + tenant.Status.WaitingOnReady = nil + if tenant, err = c.updatePoolStatus(ctx, tenant); err != nil { + klog.Infof("'%s' Can't update tenant status: %v", key, err) + } + } else { + // check if MinIO is already online after the previous restart + if tenant.MinIOHealthCheck() { + tenant.Status.WaitingOnReady = nil + if _, err = c.updatePoolStatus(ctx, tenant); err != nil { + klog.Infof("'%s' Can't update tenant status: %v", key, err) } + return ErrMinIORestarting } } } @@ -892,6 +862,7 @@ func (c *Controller) syncHandler(key string) error { // So comparing tenant.Spec.Image (version to update to) against one value from images slice is fine. if tenant.Spec.Image != images[0] && tenant.Status.CurrentState != StatusUpdatingMinIOVersion { if !tenant.MinIOHealthCheck() { + klog.Infof("%s is not running can't update image online", key) return ErrMinIONotReady } @@ -1061,14 +1032,6 @@ func (c *Controller) syncHandler(key string) error { } } - // Create logSecret before deploying console - if tenant.HasLogEnabled() { - _, err = c.checkAndCreateLogSecret(ctx, tenant) - if err != nil { - return err - } - } - if tenant.HasLogEnabled() { var logSecret *corev1.Secret logSecret, err = c.checkAndCreateLogSecret(ctx, tenant) @@ -1100,6 +1063,7 @@ func (c *Controller) syncHandler(key string) error { if _, err = c.updateTenantStatus(ctx, tenant, StatusWaitingForReadyState, totalReplicas); err != nil { return err } + klog.Infof("Can't reach minio for log config") return ErrMinIONotReady } err = c.checkAndConfigureLogSearchAPI(ctx, tenant, logSecret, adminClnt) @@ -1188,7 +1152,7 @@ func (c *Controller) handleObject(obj interface{}) { return } - tenant, err := c.tenantsLister.Tenants(object.GetNamespace()).Get(ownerRef.Name) + tenant, err := c.minioClientSet.MinioV2().Tenants(object.GetNamespace()).Get(context.Background(), ownerRef.Name, metav1.GetOptions{}) if err != nil { klog.V(4).Infof("ignoring orphaned object '%s' of tenant '%s'", object.GetSelfLink(), ownerRef.Name) return diff --git a/pkg/controller/cluster/minio.go b/pkg/controller/cluster/minio.go index 45b923f8bea..115d9511dab 100644 --- a/pkg/controller/cluster/minio.go +++ b/pkg/controller/cluster/minio.go @@ -40,7 +40,7 @@ import ( ) func (c *Controller) checkAndCreateMinIOCSR(ctx context.Context, nsName types.NamespacedName, tenant *miniov2.Tenant) error { - if _, err := c.certClient.CertificateSigningRequests().Get(ctx, tenant.MinIOCSRName(), metav1.GetOptions{}); err != nil { + if _, err := c.kubeClientSet.CertificatesV1().CertificateSigningRequests().Get(ctx, tenant.MinIOCSRName(), metav1.GetOptions{}); err != nil { if k8serrors.IsNotFound(err) { if tenant, err = c.updateTenantStatus(ctx, tenant, StatusWaitingMinIOCert, 0); err != nil { return err @@ -68,7 +68,7 @@ func (c *Controller) checkMinIOCertificatesStatus(ctx context.Context, tenant *m return err } // TLS secret not found, delete CSR if exists and start certificate generation process again - if err = c.certClient.CertificateSigningRequests().Delete(ctx, tenant.MinIOCSRName(), metav1.DeleteOptions{}); err != nil { + if err = c.kubeClientSet.CertificatesV1().CertificateSigningRequests().Delete(ctx, tenant.MinIOCSRName(), metav1.DeleteOptions{}); err != nil { return err } } else { diff --git a/pkg/controller/cluster/monitoring.go b/pkg/controller/cluster/monitoring.go index aec1fbfd499..92321382006 100644 --- a/pkg/controller/cluster/monitoring.go +++ b/pkg/controller/cluster/monitoring.go @@ -27,11 +27,9 @@ import ( "strconv" "time" - "k8s.io/klog/v2" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" + "k8s.io/klog/v2" miniov2 "github.com/minio/operator/pkg/apis/minio.min.io/v2" ) @@ -52,7 +50,7 @@ func (c *Controller) recurrentTenantStatusMonitor(stopCh <-chan struct{}) { select { case <-ticker.C: if err := c.tenantsHealthMonitor(); err != nil { - log.Println(err) + klog.Infof("%v", err) } case <-stopCh: ticker.Stop() @@ -64,11 +62,11 @@ func (c *Controller) recurrentTenantStatusMonitor(stopCh <-chan struct{}) { func (c *Controller) tenantsHealthMonitor() error { // list all tenants and get their cluster health - tenants, err := c.tenantsLister.Tenants("").List(labels.NewSelector()) + tenants, err := c.minioClientSet.MinioV2().Tenants("").List(context.Background(), metav1.ListOptions{}) if err != nil { return err } - for _, tenant := range tenants { + for _, tenant := range tenants.Items { // don't get the tenant cluster health if it doesn't have at least 1 pool initialized oneInitialized := false for _, pool := range tenant.Status.Pools { @@ -77,39 +75,33 @@ func (c *Controller) tenantsHealthMonitor() error { } } if !oneInitialized { + klog.Infof("'%s/%s' no pool is initialized", tenant.Namespace, tenant.Name) continue } // get cluster health for tenant - healthResult, err := getMinIOHealthStatus(tenant, RegularMode) + healthResult, err := getMinIOHealthStatus(&tenant, RegularMode) if err != nil { // show the error and continue - klog.V(2).Infof(err.Error()) + klog.Infof("'%s/%s' Failed to get cluster health: %v", tenant.Namespace, tenant.Name, err) continue } - - // get mc admin info - minioSecretName := tenant.Spec.CredsSecret.Name - minioSecret, err := c.kubeClientSet.CoreV1().Secrets(tenant.Namespace).Get(context.Background(), minioSecretName, metav1.GetOptions{}) + tenantConfiguration, err := c.getTenantCredentials(context.Background(), &tenant) if err != nil { - // show the error and continue - klog.V(2).Infof(err.Error()) - continue + return err } - - adminClnt, err := tenant.NewMinIOAdmin(minioSecret.Data) + adminClnt, err := tenant.NewMinIOAdmin(tenantConfiguration) if err != nil { // show the error and continue - klog.V(2).Infof(err.Error()) + klog.Infof("'%s/%s': %v", tenant.Namespace, tenant.Name, err) continue } - - srvInfoCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + srvInfoCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() storageInfo, err := adminClnt.StorageInfo(srvInfoCtx) if err != nil { // show the error and continue - klog.V(2).Infof(err.Error()) + klog.Infof("'%s/%s' Failed to get storage info: %v", tenant.Namespace, tenant.Name, err) continue } @@ -138,10 +130,9 @@ func (c *Controller) tenantsHealthMonitor() error { tenant.Status.HealthStatus = miniov2.HealthStatusRed } - if _, err = c.updatePoolStatus(context.Background(), tenant); err != nil { - klog.V(2).Infof(err.Error()) + if _, err = c.updatePoolStatus(context.Background(), &tenant); err != nil { + klog.Infof("'%s/%s' Can't update tenant status: %v", tenant.Namespace, tenant.Name, err) } - } return nil } @@ -210,7 +201,7 @@ func getMinIOHealthStatusWithRetry(tenant *miniov2.Tenant, mode HealthMode, tryC req, err := http.NewRequest(http.MethodGet, endpoint, nil) if err != nil { - log.Println("error request pinging", err) + klog.Infof("error request pinging: %v", err) return nil, err } @@ -223,18 +214,18 @@ func getMinIOHealthStatusWithRetry(tenant *miniov2.Tenant, mode HealthMode, tryC if err != nil { // if we fail due to timeout, retry if err, ok := err.(net.Error); ok && err.Timeout() && tryCount > 0 { - log.Printf("health check failed, retrying %d, err: %s", tryCount, err) + klog.Infof("health check failed, retrying %d, err: %s", tryCount, err) time.Sleep(10 * time.Second) return getMinIOHealthStatusWithRetry(tenant, mode, tryCount-1) } - log.Println("error pinging", err) + klog.Infof("error pinging: %v", err) return nil, err } driveskHealing := 0 if resp.Header.Get("X-Minio-Healing-Drives") != "" { val, err := strconv.Atoi(resp.Header.Get("X-Minio-Healing-Drives")) if err != nil { - log.Println("Cannot parse healing drives from health check") + klog.Infof("Cannot parse healing drives from health check") } else { driveskHealing = val } @@ -243,7 +234,7 @@ func getMinIOHealthStatusWithRetry(tenant *miniov2.Tenant, mode HealthMode, tryC if resp.Header.Get("X-Minio-Write-Quorum") != "" { val, err := strconv.Atoi(resp.Header.Get("X-Minio-Write-Quorum")) if err != nil { - log.Println("Cannot parse min write drives from health check") + klog.Infof("Cannot parse min write drives from health check") } else { minDriveWrites = val } diff --git a/pkg/controller/cluster/operator.go b/pkg/controller/cluster/operator.go index 56f629a7ec8..9492b925b16 100644 --- a/pkg/controller/cluster/operator.go +++ b/pkg/controller/cluster/operator.go @@ -87,7 +87,7 @@ func (c *Controller) generateTLSCert() (string, string) { klog.Infof("Waiting for the operator certificates to be issued %v", err.Error()) time.Sleep(time.Second * 10) } else { - if err = c.certClient.CertificateSigningRequests().Delete(ctx, "operator-auto-tls", metav1.DeleteOptions{}); err != nil { + if err = c.kubeClientSet.CertificatesV1().CertificateSigningRequests().Delete(ctx, "operator-auto-tls", metav1.DeleteOptions{}); err != nil { klog.Infof(err.Error()) } } @@ -228,7 +228,7 @@ func (c *Controller) createOperatorCSR(ctx context.Context, operator metav1.Obje } func (c *Controller) checkAndCreateOperatorCSR(ctx context.Context, operator metav1.Object) error { - if _, err := c.certClient.CertificateSigningRequests().Get(ctx, "operator-auto-tls", metav1.GetOptions{}); err != nil { + if _, err := c.kubeClientSet.CertificatesV1().CertificateSigningRequests().Get(ctx, "operator-auto-tls", metav1.GetOptions{}); err != nil { if k8serrors.IsNotFound(err) { klog.V(2).Infof("Creating a new Certificate Signing Request for Operator Server Certs, cluster %q") if err = c.createOperatorCSR(ctx, operator); err != nil { @@ -254,7 +254,7 @@ func (c *Controller) createUsers(ctx context.Context, tenant *miniov2.Tenant, te adminClnt, err := tenant.NewMinIOAdmin(tenantConfiguration) if err != nil { // show the error and continue - klog.V(2).Infof(err.Error()) + klog.Errorf("Error instantiating madmin: %v", err.Error()) } skipCreateUsers := false diff --git a/pkg/controller/cluster/pools.go b/pkg/controller/cluster/pools.go index 871da1f3ba2..8b872a3e028 100644 --- a/pkg/controller/cluster/pools.go +++ b/pkg/controller/cluster/pools.go @@ -17,9 +17,12 @@ package cluster import ( + "context" "reflect" "strings" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + miniov2 "github.com/minio/operator/pkg/apis/minio.min.io/v2" "github.com/minio/operator/pkg/resources/statefulsets" appsv1 "k8s.io/api/apps/v1" @@ -29,13 +32,13 @@ import ( ) func (c *Controller) getSSForPool(tenant *miniov2.Tenant, pool *miniov2.Pool) (*appsv1.StatefulSet, error) { - ss, err := c.statefulSetLister.StatefulSets(tenant.Namespace).Get(tenant.PoolStatefulsetName(pool)) + ss, err := c.kubeClientSet.AppsV1().StatefulSets(tenant.Namespace).Get(context.Background(), tenant.PoolStatefulsetName(pool), metav1.GetOptions{}) if err != nil { if !k8serrors.IsNotFound(err) { return nil, err } // check if there are legacy statefulsets - ss, err = c.statefulSetLister.StatefulSets(tenant.Namespace).Get(tenant.LegacyStatefulsetName(pool)) + ss, err = c.kubeClientSet.AppsV1().StatefulSets(tenant.Namespace).Get(context.Background(), tenant.LegacyStatefulsetName(pool), metav1.GetOptions{}) if err != nil { return nil, err } diff --git a/pkg/controller/cluster/tenants.go b/pkg/controller/cluster/tenants.go new file mode 100644 index 00000000000..a3033e5ebf7 --- /dev/null +++ b/pkg/controller/cluster/tenants.go @@ -0,0 +1,62 @@ +// This file is part of MinIO Console Server +// Copyright (c) 2021 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cluster + +import ( + "context" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + miniov2 "github.com/minio/operator/pkg/apis/minio.min.io/v2" +) + +func (c *Controller) getTenantCredentials(ctx context.Context, tenant *miniov2.Tenant) (map[string][]byte, error) { + // Configuration for tenant can be passed using 3 different sources, tenant.spec.env, k8s credsSecret and config.env secret + // If the user provides duplicated configuration the override order will be: + // tenant.Spec.Env < credsSecret (k8s secret) < config.env file (k8s secret) + tenantConfiguration := map[string][]byte{} + + for _, config := range tenant.GetEnvVars() { + tenantConfiguration[config.Name] = []byte(config.Value) + } + + if tenant.HasCredsSecret() { + minioSecretName := tenant.Spec.CredsSecret.Name + minioSecret, err := c.kubeClientSet.CoreV1().Secrets(tenant.Namespace).Get(ctx, minioSecretName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + configFromCredsSecret := minioSecret.Data + for key, val := range configFromCredsSecret { + tenantConfiguration[key] = val + } + } + + // Load tenant configuration from file + if tenant.HasConfigurationSecret() { + minioConfigurationSecretName := tenant.Spec.Configuration.Name + minioConfigurationSecret, err := c.kubeClientSet.CoreV1().Secrets(tenant.Namespace).Get(ctx, minioConfigurationSecretName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + configFromFile := miniov2.ParseRawConfiguration(minioConfigurationSecret.Data["config.env"]) + for key, val := range configFromFile { + tenantConfiguration[key] = val + } + } + return tenantConfiguration, nil +} diff --git a/pkg/controller/cluster/upgrades.go b/pkg/controller/cluster/upgrades.go index fc1d3214ff6..35963d4e2d4 100644 --- a/pkg/controller/cluster/upgrades.go +++ b/pkg/controller/cluster/upgrades.go @@ -60,6 +60,12 @@ func (c *Controller) upgrade420(ctx context.Context, tenant *miniov2.Tenant) (*m } } + // delete the previous operator secrets, they may be in a bad state + err = c.kubeClientSet.CoreV1().Secrets(tenant.Namespace).Delete(ctx, + miniov2.WebhookSecret, metav1.DeleteOptions{}) + if err != nil { + klog.Errorf("Error deleting operator webhook secret, manual deletion is needed: %v", err) + } if tenant, err = c.updateTenantSyncVersion(ctx, tenant, "v4.2.0"); err != nil { return nil, err diff --git a/pkg/controller/cluster/webhook-server.go b/pkg/controller/cluster/webhook-server.go deleted file mode 100644 index e4e4dd1ce93..00000000000 --- a/pkg/controller/cluster/webhook-server.go +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright (C) 2020, MinIO, Inc. - * - * This code is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License, version 3, - * along with this program. If not, see - * - */ - -package cluster - -import ( - "fmt" - "net/http" - "time" - - "github.com/gorilla/mux" - miniov2 "github.com/minio/operator/pkg/apis/minio.min.io/v2" -) - -// Used for registering with rest handlers (have a look at registerStorageRESTHandlers for usage example) -// If it is passed ["aaaa", "bbbb"], it returns ["aaaa", "{aaaa:.*}", "bbbb", "{bbbb:.*}"] -func restQueries(keys ...string) []string { - var accumulator []string - for _, key := range keys { - accumulator = append(accumulator, key, "{"+key+":.*}") - } - return accumulator -} - -func configureWebhookServer(c *Controller) *http.Server { - router := mux.NewRouter().SkipClean(true).UseEncodedPath() - - router.Methods(http.MethodGet). - Path(miniov2.WebhookAPIGetenv + "/{namespace}/{name:.+}"). - HandlerFunc(c.GetenvHandler). - Queries(restQueries("key")...) - router.Methods(http.MethodPost). - Path(miniov2.WebhookAPIBucketService + "/{namespace}/{name:.+}"). - HandlerFunc(c.BucketSrvHandler). - Queries(restQueries("bucket")...) - router.Methods(http.MethodGet). - PathPrefix(miniov2.WebhookAPIUpdate). - Handler(http.StripPrefix(miniov2.WebhookAPIUpdate, http.FileServer(http.Dir(updatePath)))) - // CRD Conversion - router.Methods(http.MethodPost). - Path(miniov2.WebhookCRDConversaion). - HandlerFunc(c.CRDConversionHandler) - //. - // Queries(restQueries("bucket")...) - - router.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fmt.Println(r) - }) - - s := &http.Server{ - Addr: ":" + miniov2.WebhookDefaultPort, - Handler: router, - ReadTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, - MaxHeaderBytes: 1 << 20, - } - - return s -} diff --git a/pkg/controller/cluster/webhook.go b/pkg/controller/cluster/webhook.go new file mode 100644 index 00000000000..df764ae409c --- /dev/null +++ b/pkg/controller/cluster/webhook.go @@ -0,0 +1,168 @@ +/* + * Copyright (C) 2020, MinIO, Inc. + * + * This code is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License, version 3, + * along with this program. If not, see + * + */ + +package cluster + +import ( + "context" + "fmt" + "math/rand" + "net/http" + "strings" + "time" + + jwtreq "github.com/golang-jwt/jwt/request" + + v1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/golang-jwt/jwt" + "github.com/gorilla/mux" + miniov2 "github.com/minio/operator/pkg/apis/minio.min.io/v2" +) + +// Used for registering with rest handlers (have a look at registerStorageRESTHandlers for usage example) +// If it is passed ["aaaa", "bbbb"], it returns ["aaaa", "{aaaa:.*}", "bbbb", "{bbbb:.*}"] +func restQueries(keys ...string) []string { + var accumulator []string + for _, key := range keys { + accumulator = append(accumulator, key, "{"+key+":.*}") + } + return accumulator +} + +func configureWebhookServer(c *Controller) *http.Server { + router := mux.NewRouter().SkipClean(true).UseEncodedPath() + + router.Methods(http.MethodGet). + Path(miniov2.WebhookAPIGetenv + "/{namespace}/{name:.+}"). + HandlerFunc(c.GetenvHandler). + Queries(restQueries("key")...) + router.Methods(http.MethodPost). + Path(miniov2.WebhookAPIBucketService + "/{namespace}/{name:.+}"). + HandlerFunc(c.BucketSrvHandler). + Queries(restQueries("bucket")...) + router.Methods(http.MethodGet). + PathPrefix(miniov2.WebhookAPIUpdate). + Handler(http.StripPrefix(miniov2.WebhookAPIUpdate, http.FileServer(http.Dir(updatePath)))) + // CRD Conversion + router.Methods(http.MethodPost). + Path(miniov2.WebhookCRDConversaion). + HandlerFunc(c.CRDConversionHandler) + //. + // Queries(restQueries("bucket")...) + + router.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Println(r) + }) + + s := &http.Server{ + Addr: ":" + miniov2.WebhookDefaultPort, + Handler: router, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + MaxHeaderBytes: 1 << 20, + } + + return s +} + +func (c *Controller) validateRequest(r *http.Request, secret *v1.Secret) error { + tokenStr, err := jwtreq.AuthorizationHeaderExtractor.ExtractToken(r) + if err != nil { + return err + } + + stdClaims := &jwt.StandardClaims{} + token, err := jwt.ParseWithClaims(tokenStr, stdClaims, func(token *jwt.Token) (interface{}, error) { + return secret.Data[miniov2.WebhookOperatorPassword], nil + }) + if err != nil { + return err + } + + if !token.Valid { + return fmt.Errorf(http.StatusText(http.StatusForbidden)) + } + if stdClaims.Issuer != string(secret.Data[miniov2.WebhookOperatorUsername]) { + return fmt.Errorf(http.StatusText(http.StatusForbidden)) + } + + return nil +} + +func generateRandomKey(length int) string { + rand.Seed(time.Now().UnixNano()) + chars := []rune("ABCDEFGHIJKLMNOPQRSTUVWXYZ" + + "abcdefghijklmnopqrstuvwxyz" + + "0123456789") + var b strings.Builder + for i := 0; i < length; i++ { + b.WriteRune(chars[rand.Intn(len(chars))]) + } + return b.String() +} + +func (c *Controller) applyOperatorWebhookSecret(ctx context.Context, tenant *miniov2.Tenant) (*v1.Secret, error) { + secret, err := c.kubeClientSet.CoreV1().Secrets(tenant.Namespace).Get(ctx, + miniov2.WebhookSecret, metav1.GetOptions{}) + if err != nil { + if k8serrors.IsNotFound(err) { + secret = getSecretForTenant(tenant, generateRandomKey(20), generateRandomKey(40)) + return c.kubeClientSet.CoreV1().Secrets(tenant.Namespace).Create(ctx, secret, metav1.CreateOptions{}) + } + return nil, err + } + // check the secret has the desired values + minioArgs := string(secret.Data[miniov2.WebhookMinIOArgs]) + if strings.Contains(minioArgs, "env://") && isOperatorTLS() { + // update the secret + minioArgs = strings.ReplaceAll(minioArgs, "env://", "env+tls://") + secret.Data[miniov2.WebhookMinIOArgs] = []byte(minioArgs) + secret, err = c.kubeClientSet.CoreV1().Secrets(tenant.Namespace).Update(ctx, secret, metav1.UpdateOptions{}) + if err != nil { + return nil, err + } + // update the revision of the tenant to force a rolling restart across all statefulsets + t2, err := c.increaseTenantRevision(ctx, tenant) + if err != nil { + return nil, err + } + *tenant = *t2 + } + + return secret, nil +} + +func secretData(tenant *miniov2.Tenant, accessKey, secretKey string) []byte { + scheme := "env" + if isOperatorTLS() { + scheme = "env+tls" + } + return []byte(fmt.Sprintf("%s://%s:%s@%s:%s%s/%s/%s", + scheme, + accessKey, + secretKey, + fmt.Sprintf("operator.%s.svc.%s", + miniov2.GetNSFromFile(), + miniov2.GetClusterDomain()), + miniov2.WebhookDefaultPort, + miniov2.WebhookAPIGetenv, + tenant.Namespace, + tenant.Name)) +} diff --git a/resources/base/crds/minio.min.io_tenants.yaml b/resources/base/crds/minio.min.io_tenants.yaml index 9ad02f456b3..83cc65fd136 100644 --- a/resources/base/crds/minio.min.io_tenants.yaml +++ b/resources/base/crds/minio.min.io_tenants.yaml @@ -3798,6 +3798,9 @@ spec: type: integer syncVersion: type: string + waitingOnReady: + format: date-time + type: string writeQuorum: format: int32 type: integer @@ -7625,6 +7628,9 @@ spec: type: integer syncVersion: type: string + waitingOnReady: + format: date-time + type: string writeQuorum: format: int32 type: integer