diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index e8b617ce47a..c61402a063a 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -174,3 +174,19 @@ func setupSignalHandler() (stopCh <-chan struct{}) { return stop } + +// Result contains the result of a sync invocation. +type Result struct { + // Requeue tells the Controller to requeue the reconcile key. Defaults to false. + Requeue bool + + // RequeueAfter if greater than 0, tells the Controller to requeue the reconcile key after the Duration. + // Implies that Requeue is true, there is no need to set Requeue to true at the same time as RequeueAfter. + RequeueAfter time.Duration +} + +// WrapResult is wrap for result. +// We can find where return result. +func WrapResult(result Result, err error) (Result, error) { + return result, err +} diff --git a/pkg/controller/main-controller.go b/pkg/controller/main-controller.go index a23acbb22e1..b79a9fccf13 100644 --- a/pkg/controller/main-controller.go +++ b/pkg/controller/main-controller.go @@ -648,17 +648,27 @@ func (c *Controller) processNextWorkItem() bool { return nil } klog.V(2).Infof("Key from workqueue: %s", key) - // Run the syncHandler, passing it the namespace/name string of the - // Tenant resource to be synced. - if err := c.syncHandler(key); err != nil { - // Put the item back on the workqueue to handle any transient errors. + + result, err := c.syncHandler(key) + switch { + case err != nil: c.workqueue.AddRateLimited(key) return fmt.Errorf("error syncing '%s': %s", key, err.Error()) + case result.RequeueAfter > 0: + // The result.RequeueAfter request will be lost, if it is returned + // along with a non-nil error. But this is intended as + // We need to drive to stable reconcile loops before queuing due + // to result.RequestAfter + c.workqueue.Forget(obj) + c.workqueue.AddAfter(key, result.RequeueAfter) + case result.Requeue: + c.workqueue.AddRateLimited(key) + default: + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + c.workqueue.Forget(obj) + klog.V(4).Infof("Successfully synced '%s'", key) } - // Finally, if no error occurs we Forget this item so it does not - // get queued again until another change happens. - c.workqueue.Forget(obj) - klog.V(4).Infof("Successfully synced '%s'", key) return nil } @@ -683,7 +693,7 @@ func key2NamespaceName(key string) (namespace, name string) { // syncHandler compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the Tenant resource // with the current status of the resource. -func (c *Controller) syncHandler(key string) error { +func (c *Controller) syncHandler(key string) (Result, error) { ctx := context.Background() cOpts := metav1.CreateOptions{} uOpts := metav1.UpdateOptions{} @@ -691,7 +701,7 @@ func (c *Controller) syncHandler(key string) error { // Convert the namespace/name string into a distinct namespace and name if key == "" { runtime.HandleError(fmt.Errorf("Invalid resource key: %s", key)) - return nil + return WrapResult(Result{}, nil) } namespace, tenantName := key2NamespaceName(key) @@ -702,14 +712,15 @@ func (c *Controller) syncHandler(key string) error { // The Tenant resource may no longer exist, in which case we stop processing. if k8serrors.IsNotFound(err) { runtime.HandleError(fmt.Errorf("Tenant '%s' in work queue no longer exists", key)) - return nil + return WrapResult(Result{}, nil) } - return nil + // will retry after 5sec + return WrapResult(Result{RequeueAfter: time.Second * 5}, nil) } // Check the Sync Version to see if the tenant needs upgrade if tenant, err = c.checkForUpgrades(ctx, tenant); err != nil { - return err + return WrapResult(Result{}, err) } // Set any required default values and init Global variables @@ -723,20 +734,20 @@ func (c *Controller) syncHandler(key string) error { klog.V(2).Infof(err2.Error()) } c.RegisterEvent(ctx, tenant, corev1.EventTypeWarning, "MissingCreds", "Tenant is missing root credentials") - return nil + return WrapResult(Result{}, nil) } - return err + return WrapResult(Result{}, err) } // get existing configuration from config.env skipEnvVars, err := c.getTenantConfiguration(ctx, tenant) if err != nil { - return err + return WrapResult(Result{}, err) } // Check if we are decommissioning a pool before we ensure defaults, as that would populate a defaulted pool name tenant, err = c.checkForPoolDecommission(ctx, key, tenant, tenantConfiguration) if err != nil { - return err + return WrapResult(Result{}, err) } tenant.EnsureDefaults() @@ -749,7 +760,7 @@ func (c *Controller) syncHandler(key string) error { klog.V(2).Infof(err2.Error()) } // return nil so we don't re-queue this work item - return nil + return WrapResult(Result{}, nil) } // AutoCertEnabled verification is used to manage the tenant migration between v1 and v2 @@ -796,7 +807,8 @@ func (c *Controller) syncHandler(key string) error { err = c.checkMinIOCertificatesStatus(ctx, tenant, nsName) if err != nil { klog.V(2).Infof("Error when consolidating tenant service: %v", err) - return err + // will retry after 5sec + return WrapResult(Result{RequeueAfter: time.Second * 5}, nil) } // validate services @@ -804,13 +816,13 @@ func (c *Controller) syncHandler(key string) error { err = c.checkMinIOSvc(ctx, tenant, nsName) if err != nil { klog.V(2).Infof("error consolidating minio service: %s", err.Error()) - return err + return WrapResult(Result{}, err) } // Check Console Endpoint Service err = c.checkConsoleSvc(ctx, tenant, nsName) if err != nil { klog.V(2).Infof("error consolidating console service: %s", err.Error()) - return err + return WrapResult(Result{}, err) } // Handle the Internal Headless Service for Tenant StatefulSet @@ -818,25 +830,25 @@ func (c *Controller) syncHandler(key string) error { if err != nil { if k8serrors.IsNotFound(err) { if tenant, err = c.updateTenantStatus(ctx, tenant, StatusProvisioningHLService, 0); err != nil { - return err + return WrapResult(Result{}, err) } klog.V(2).Infof("Creating a new Headless Service for cluster %q", nsName) // Create the headless service for the tenant hlSvc = services.NewHeadlessForMinIO(tenant) _, err = c.kubeClientSet.CoreV1().Services(tenant.Namespace).Create(ctx, hlSvc, cOpts) if err != nil { - return err + return WrapResult(Result{}, err) } c.RegisterEvent(ctx, tenant, corev1.EventTypeNormal, "SvcCreated", "Headless Service created") } else { - return err + return WrapResult(Result{}, err) } } // List all MinIO Tenants in this namespace. li, err := c.minioClientSet.MinioV2().Tenants(tenant.Namespace).List(context.Background(), metav1.ListOptions{}) if err != nil { - return err + return WrapResult(Result{}, err) } // Only 1 minio tenant per namespace allowed. @@ -844,26 +856,26 @@ func (c *Controller) syncHandler(key string) error { for _, t := range li.Items { if t.Status.CurrentState != StatusInitialized { if _, err = c.updateTenantStatus(ctx, &t, StatusFailedAlreadyExists, 0); err != nil { - return err + return WrapResult(Result{}, err) } // return nil so we don't re-queue this work item - return nil + return WrapResult(Result{}, err) } } } // Create Tenant Services Accoutns for Tenant err = c.checkAndCreateServiceAccount(ctx, tenant) if err != nil { - return err + return WrapResult(Result{}, err) } adminClnt, err := tenant.NewMinIOAdmin(tenantConfiguration, c.getTransport()) if err != nil { if _, uerr := c.updateTenantStatus(ctx, tenant, StatusTenantCredentialsNotSet, 0); uerr != nil { - return uerr + return WrapResult(Result{}, uerr) } klog.Errorf("Error initializing minio admin client: %v", err) - return err + return WrapResult(Result{}, err) } // For each pool check if there is a stateful set @@ -873,13 +885,13 @@ func (c *Controller) syncHandler(key string) error { err = c.checkKESStatus(ctx, tenant, totalAvailableReplicas, cOpts, uOpts, nsName) if err != nil { klog.V(2).Infof("Error checking KES state %v", err) - return err + return WrapResult(Result{}, err) } // check if operator-ca-tls has to be updated or re-created in the tenant namespace operatorCATLSExists, err := c.checkOperatorCaForTenant(ctx, tenant) if err != nil { - return err + return WrapResult(Result{}, err) } // consolidate the status of all pools. this is meant to cover for legacy tenants @@ -887,7 +899,7 @@ func (c *Controller) syncHandler(key string) error { if len(tenant.Status.Pools) == 0 { pools, err := c.getAllSSForTenant(tenant) if err != nil { - return err + return WrapResult(Result{}, err) } for _, pool := range pools { if pool != nil { @@ -899,7 +911,7 @@ func (c *Controller) syncHandler(key string) error { } // push updates to status if tenant, err = c.updatePoolStatus(ctx, tenant); err != nil { - return err + return WrapResult(Result{}, err) } klog.Info("Detected we are updating a legacy tenant deployment") @@ -936,14 +948,14 @@ func (c *Controller) syncHandler(key string) error { }) // push updates to status if tenant, err = c.updatePoolStatus(ctx, tenant); err != nil { - return err + return WrapResult(Result{}, err) } } ss, err := c.statefulSetLister.StatefulSets(tenant.Namespace).Get(ssName) if k8serrors.IsNotFound(err) { klog.Infof("'%s/%s': Deploying pool %s", tenant.Namespace, tenant.Name, pool.Name) if tenant, err = c.updateTenantStatus(ctx, tenant, StatusProvisioningStatefulSet, 0); err != nil { - return err + return WrapResult(Result{}, err) } ss = statefulsets.NewPool(&statefulsets.NewPoolArgs{ Tenant: tenant, @@ -958,7 +970,7 @@ func (c *Controller) syncHandler(key string) error { }) ss, err = c.kubeClientSet.AppsV1().StatefulSets(tenant.Namespace).Create(ctx, ss, cOpts) if err != nil { - return err + return WrapResult(Result{}, err) } c.RegisterEvent(ctx, tenant, corev1.EventTypeNormal, "PoolCreated", fmt.Sprintf("Tenant pool %s created", pool.Name)) // Report the pool is properly created @@ -967,7 +979,7 @@ func (c *Controller) syncHandler(key string) error { addingNewPool = true // push updates to status if tenant, err = c.updatePoolStatus(ctx, tenant); err != nil { - return err + return WrapResult(Result{}, err) } } @@ -992,14 +1004,14 @@ func (c *Controller) syncHandler(key string) error { // Restart services to get new args since we are expanding the deployment here. if err := c.restartInitializedPool(ctx, tenant, initializedPool, tenantConfiguration); err != nil { klog.Infof("'%s' restart call failed", key) - return err + return WrapResult(Result{}, err) } metaNowTime := metav1.Now() tenant.Status.WaitingOnReady = &metaNowTime tenant.Status.CurrentState = StatusRestartingMinIO if tenant, err = c.updatePoolStatus(ctx, tenant); err != nil { klog.Infof("'%s' Can't update tenant status: %v", key, err) - return err + return WrapResult(Result{}, err) } klog.Infof("'%s' was restarted", key) restarted = true @@ -1009,11 +1021,11 @@ func (c *Controller) syncHandler(key string) error { tenant.Status.Pools[pi].State = miniov2.PoolInitialized // push updates to status if tenant, err = c.updatePoolStatus(ctx, tenant); err != nil { - return err + return WrapResult(Result{}, err) } if restarted { - return ErrMinIORestarting + return WrapResult(Result{}, ErrMinIORestarting) } } @@ -1021,7 +1033,7 @@ func (c *Controller) syncHandler(key string) error { for _, poolStatus := range tenant.Status.Pools { if poolStatus.State != miniov2.PoolInitialized { // at least 1 is not initialized, stop here until they all are. - return errors.New("Waiting for all pools to initialize") + return WrapResult(Result{}, errors.New("Waiting for all pools to initialize")) } } @@ -1041,7 +1053,7 @@ func (c *Controller) syncHandler(key string) error { if _, err = c.updatePoolStatus(ctx, tenant); err != nil { klog.Infof("'%s' Can't update tenant status: %v", key, err) } - return ErrMinIORestarting + return WrapResult(Result{}, ErrMinIORestarting) } } } @@ -1054,10 +1066,9 @@ func (c *Controller) syncHandler(key string) error { } if compareImage != image { if _, err = c.updateTenantStatus(ctx, tenant, StatusInconsistentMinIOVersions, totalAvailableReplicas); err != nil { - return err + return WrapResult(Result{}, err) } - return fmt.Errorf("Pool %d is running incorrect image version, all pools are required to be on the same MinIO version. Attempting update of the inconsistent pool", - i+1) + return WrapResult(Result{}, fmt.Errorf("Pool %d is running incorrect image version, all pools are required to be on the same MinIO version. Attempting update of the inconsistent pool", i+1)) } } @@ -1076,14 +1087,14 @@ func (c *Controller) syncHandler(key string) error { if specImage != ssImage && tenant.Status.CurrentState != StatusUpdatingMinIOVersion { if !tenant.MinIOHealthCheck(c.getTransport()) { klog.Infof("%s is not running can't update image online", key) - return ErrMinIONotReady + return WrapResult(Result{}, ErrMinIONotReady) } // Images different with the newer state change, continue to verify // if upgrade is possible tenant, err = c.updateTenantStatus(ctx, tenant, StatusUpdatingMinIOVersion, totalAvailableReplicas) if err != nil { - return err + return WrapResult(Result{}, err) } klog.V(4).Infof("Collecting artifacts for Tenant '%s' to update MinIO from: %s, to: %s", @@ -1092,7 +1103,7 @@ func (c *Controller) syncHandler(key string) error { latest, err := c.fetchArtifacts(tenant) defer c.removeArtifacts() if err != nil { - return err + return WrapResult(Result{}, err) } updateURL, err := tenant.UpdateURL(latest, fmt.Sprintf("http://operator.%s.svc.%s:%s%s", miniov2.GetNSFromFile(), @@ -1103,11 +1114,11 @@ func (c *Controller) syncHandler(key string) error { if err != nil { err = fmt.Errorf("Unable to get canonical update URL for Tenant '%s', failed with %v", tenantName, err) if _, terr := c.updateTenantStatus(ctx, tenant, err.Error(), totalAvailableReplicas); terr != nil { - return terr + return WrapResult(Result{}, terr) } // Correct URL could not be obtained, not proceeding to update. - return err + return WrapResult(Result{}, err) } klog.V(4).Infof("Updating Tenant %s MinIO version from: %s, to: %s -> URL: %s", @@ -1117,10 +1128,10 @@ func (c *Controller) syncHandler(key string) error { if err != nil { if madmin.ToErrorResponse(err).Code != "MethodNotAllowed" { if _, terr := c.updateTenantStatus(ctx, tenant, err.Error(), totalAvailableReplicas); terr != nil { - return terr + return WrapResult(Result{}, terr) } // Update failed, nothing needs to be changed in the container - return err + return WrapResult(Result{}, err) } c.RegisterEvent(ctx, tenant, corev1.EventTypeWarning, "Inplace update is disabled, falling back to performing only statefulset update.", fmt.Sprintf("Tenant %s", tenant.Name)) } @@ -1133,12 +1144,12 @@ func (c *Controller) syncHandler(key string) error { newVer, err := miniov2.ReleaseTagToReleaseTime(us.UpdatedVersion) if err != nil { klog.Errorf("Unsupported release tag on new image, server updated but might leave dangling console deployment %v", err) - return err + return WrapResult(Result{}, err) } consoleDeployment, err := c.deploymentLister.Deployments(tenant.Namespace).Get(tenant.ConsoleDeploymentName()) if unifiedConsoleReleaseTime.Before(newVer) && consoleDeployment != nil && err == nil { if err := c.deleteOldConsoleDeployment(ctx, tenant, consoleDeployment.Name); err != nil { - return err + return WrapResult(Result{}, err) } } klog.Infof("Tenant '%s' MinIO updated successfully from: %s, to: %s successfully", @@ -1149,9 +1160,9 @@ func (c *Controller) syncHandler(key string) error { us.CurrentVersion) klog.Info(msg) if _, terr := c.updateTenantStatus(ctx, tenant, msg, totalAvailableReplicas); terr != nil { - return err + return WrapResult(Result{}, err) } - return nil + return WrapResult(Result{}, nil) } } for i, pool := range tenant.Spec.Pools { @@ -1168,7 +1179,7 @@ func (c *Controller) syncHandler(key string) error { OperatorImage: c.operatorImage, }) if _, err = c.kubeClientSet.AppsV1().StatefulSets(tenant.Namespace).Update(ctx, ss, uOpts); err != nil { - return err + return WrapResult(Result{}, err) } c.RegisterEvent(ctx, tenant, corev1.EventTypeNormal, "PoolUpdated", fmt.Sprintf("Tenant pool %s updated", pool.Name)) } @@ -1190,19 +1201,19 @@ func (c *Controller) syncHandler(key string) error { }) // push updates to status if tenant, err = c.updatePoolStatus(ctx, tenant); err != nil { - return err + return WrapResult(Result{}, err) } } existingStatefulSet, err := c.statefulSetLister.StatefulSets(tenant.Namespace).Get(ssName) // at this point the existingStatefulSet should already exist, error out if k8serrors.IsNotFound(err) { klog.Errorf("%s's pool %s doesn't exist: %v", tenant.Name, ssName, err) - return err + return WrapResult(Result{}, err) } if pool.Servers != *existingStatefulSet.Spec.Replicas { // warn the user that replica count of an existing pool can't be changed if tenant, err = c.updateTenantStatus(ctx, tenant, fmt.Sprintf("Can't modify server count for pool %s", pool.Name), 0); err != nil { - return err + return WrapResult(Result{}, err) } } // generated the expected StatefulSet based on the new tenant configuration @@ -1220,7 +1231,7 @@ func (c *Controller) syncHandler(key string) error { // Verify if this pool matches the spec on the tenant (resources, affinity, sidecars, etc) poolMatchesSS, err := poolSSMatchesSpec(expectedStatefulSet, existingStatefulSet) if err != nil { - return err + return WrapResult(Result{}, err) } // if the pool doesn't match the spec if !poolMatchesSS { @@ -1243,7 +1254,8 @@ func (c *Controller) syncHandler(key string) error { } if existingStatefulSet, err = c.kubeClientSet.AppsV1().StatefulSets(tenant.Namespace).Update(ctx, newStatefulSet, uOpts); err != nil { - return err + klog.Errorf("[Will try again in 5sec] Update tenant %s statefulset %s error %s", tenant.Name, ssName, err) + return WrapResult(Result{RequeueAfter: time.Second * 5}, nil) } } @@ -1251,24 +1263,24 @@ func (c *Controller) syncHandler(key string) error { // a warning to the event recorder and ret if !metav1.IsControlledBy(existingStatefulSet, tenant) { if tenant, err = c.updateTenantStatus(ctx, tenant, StatusNotOwned, existingStatefulSet.Status.Replicas); err != nil { - return err + return WrapResult(Result{}, err) } msg := fmt.Sprintf(MessageResourceExists, existingStatefulSet.Name) c.recorder.Event(tenant, corev1.EventTypeWarning, ErrResourceExists, msg) // return nil so we don't re-queue this work item, this error won't get fixed by reprocessing - return nil + return WrapResult(Result{}, nil) } } if tenant.HasPrometheusOperatorEnabled() { err := c.checkAndCreatePrometheusAddlConfig(ctx, tenant, string(tenantConfiguration["accesskey"]), string(tenantConfiguration["secretkey"])) if err != nil { - return err + return WrapResult(Result{}, err) } } else { err := c.deletePrometheusAddlConfig(ctx, tenant) if err != nil { - return err + return WrapResult(Result{}, err) } } @@ -1277,7 +1289,8 @@ func (c *Controller) syncHandler(key string) error { if err := c.createUsers(ctx, tenant, tenantConfiguration); err != nil { klog.V(2).Infof("Unable to create MinIO users: %v", err) c.RegisterEvent(ctx, tenant, corev1.EventTypeWarning, "UsersCreatedFailed", fmt.Sprintf("Users creation failed: %s", err)) - return err + // retry after 5sec + return WrapResult(Result{RequeueAfter: time.Second * 5}, nil) } c.RegisterEvent(ctx, tenant, corev1.EventTypeNormal, "UsersCreated", "Users created") } @@ -1287,7 +1300,8 @@ func (c *Controller) syncHandler(key string) error { if err := c.createBuckets(ctx, tenant, tenantConfiguration); err != nil { klog.V(2).Infof("Unable to create MinIO buckets: %v", err) c.RegisterEvent(ctx, tenant, corev1.EventTypeWarning, "BucketsCreatedFailed", fmt.Sprintf("Buckets creation failed: %s", err)) - return err + // retry after 5sec + return WrapResult(Result{RequeueAfter: time.Second * 5}, err) } c.RegisterEvent(ctx, tenant, corev1.EventTypeNormal, "BucketsCreated", "Buckets created") } @@ -1295,7 +1309,7 @@ func (c *Controller) syncHandler(key string) error { // Finally, we update the status block of the Tenant resource to reflect the // current state of the world _, err = c.updateTenantStatus(ctx, tenant, StatusInitialized, totalAvailableReplicas) - return err + return WrapResult(Result{}, err) } // enqueueTenant takes a Tenant resource and converts it into a namespace/name