diff --git a/go.mod b/go.mod index 7658840380a..5143d7acdbd 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/minio/minio v0.0.0-20200723003940-b9be841fd222 github.com/minio/minio-go/v7 v7.0.2 github.com/stretchr/testify v1.5.1 + golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 k8s.io/api v0.18.6 k8s.io/apimachinery v0.18.6 k8s.io/client-go v0.18.6 diff --git a/pkg/controller/cluster/main-controller.go b/pkg/controller/cluster/main-controller.go index 0526703ab44..d17f991f7ff 100644 --- a/pkg/controller/cluster/main-controller.go +++ b/pkg/controller/cluster/main-controller.go @@ -32,6 +32,8 @@ import ( "strings" "time" + "golang.org/x/time/rate" + "github.com/docker/cli/cli/config/configfile" "k8s.io/klog/v2" @@ -120,6 +122,9 @@ const ( StatusInconsistentMinIOVersions = "Different versions across MinIO Zones" ) +// ErrMinIONotReady is the error returned when MinIO is not Ready +var ErrMinIONotReady = fmt.Errorf("MinIO is not ready") + // Controller struct watches the Kubernetes API for changes to Tenant resources type Controller struct { // kubeClientSet is a standard kubernetes clientset @@ -219,7 +224,7 @@ func NewController( tenantsSynced: tenantInformer.Informer().HasSynced, serviceLister: serviceInformer.Lister(), serviceListerSynced: serviceInformer.Informer().HasSynced, - workqueue: queue.NewNamedRateLimitingQueue(queue.DefaultControllerRateLimiter(), "Tenants"), + workqueue: queue.NewNamedRateLimitingQueue(MinIOControllerRateLimiter(), "Tenants"), recorder: recorder, hostsTemplate: hostsTemplate, operatorVersion: operatorVersion, @@ -233,6 +238,7 @@ func NewController( tenantInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueTenant, UpdateFunc: func(old, new interface{}) { + // TODO: compare old vs new and don't enqueue if they are identical controller.enqueueTenant(new) }, }) @@ -757,6 +763,8 @@ func (c *Controller) processNextWorkItem() bool { // Run the syncHandler, passing it the namespace/name string of the // Tenant resource to be synced. if err := c.syncHandler(key); err != nil { + // Put the item back on the workqueue to handle any transient errors. + c.workqueue.AddRateLimited(key) return fmt.Errorf("error syncing '%s': %s", key, err.Error()) } // Finally, if no error occurs we Forget this item so it does not @@ -826,7 +834,8 @@ func (c *Controller) syncHandler(key string) error { if _, err2 = c.updateTenantStatus(ctx, tenant, err.Error(), 0); err2 != nil { klog.V(2).Infof(err2.Error()) } - return err + // return nil so we don't re-queue this work item + return nil } secret, err := c.applyOperatorWebhookSecret(ctx, tenant) @@ -842,7 +851,8 @@ func (c *Controller) syncHandler(key string) error { if _, err = c.updateTenantStatus(ctx, tenant, msg, 0); err != nil { klog.V(2).Infof(err.Error()) } - return fmt.Errorf(msg) + // return nil so we don't re-queue this work item + return nil } // TLS is mandatory if KES is enabled @@ -856,7 +866,8 @@ func (c *Controller) syncHandler(key string) error { if _, err = c.updateTenantStatus(ctx, tenant, msg, 0); err != nil { klog.V(2).Infof(err.Error()) } - return fmt.Errorf(msg) + // return nil so we don't re-queue this work item + return nil } } @@ -911,7 +922,8 @@ func (c *Controller) syncHandler(key string) error { if _, err = c.updateTenantStatus(ctx, t, StatusFailedAlreadyExists, 0); err != nil { return err } - return fmt.Errorf("Failed creating MinIO Tenant '%s' because another MinIO Tenant already exists in the namespace '%s'", t.Name, tenant.Namespace) + // return nil so we don't re-queue this work item + return nil } } } @@ -943,7 +955,7 @@ func (c *Controller) syncHandler(key string) error { // Check healthcheck for previous zone, if they are online before adding this zone. if i > 0 && !tenant.MinIOHealthCheck() { - return fmt.Errorf("MinIO is not ready") + return ErrMinIONotReady } // If auto cert is enabled, create certificates for MinIO and @@ -1021,7 +1033,8 @@ func (c *Controller) syncHandler(key string) error { } msg := fmt.Sprintf(MessageResourceExists, ss.Name) c.recorder.Event(tenant, corev1.EventTypeWarning, ErrResourceExists, msg) - return fmt.Errorf(msg) + // return nil so we don't re-queue this work item, this error won't get fixed by reprocessing + return nil } // keep track of all replicas @@ -1046,7 +1059,7 @@ func (c *Controller) syncHandler(key string) error { // So comparing tenant.Spec.Image (version to update to) against one value from images slice is fine. if tenant.Spec.Image != images[0] && tenant.Status.CurrentState != StatusUpdatingMinIOVersion { if !tenant.MinIOHealthCheck() { - return fmt.Errorf("MinIO is not ready") + return ErrMinIONotReady } // Images different with the newer state change, continue to verify @@ -1135,7 +1148,11 @@ func (c *Controller) syncHandler(key string) error { if !tenant.HasCredsSecret() || !tenant.HasConsoleSecret() { msg := "Please set the credentials" klog.V(2).Infof(msg) - return fmt.Errorf(msg) + if _, terr := c.updateTenantStatus(ctx, tenant, msg, totalReplicas); terr != nil { + return err + } + // return nil so we don't re-queue this work item + return nil } consoleSecretName := tenant.Spec.Console.ConsoleSecret.Name @@ -1149,7 +1166,7 @@ func (c *Controller) syncHandler(key string) error { if _, err = c.updateTenantStatus(ctx, tenant, StatusWaitingForReadyState, totalReplicas); err != nil { return err } - return fmt.Errorf("MinIO is not ready") + return ErrMinIONotReady } if tenant, err = c.updateTenantStatus(ctx, tenant, StatusProvisioningConsoleDeployment, totalReplicas); err != nil { @@ -1447,3 +1464,13 @@ func (c *Controller) checkAndCreateConsoleCSR(ctx context.Context, nsName types. } return nil } + +// MinIOControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue for our controller. +// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential +func MinIOControllerRateLimiter() queue.RateLimiter { + return queue.NewMaxOfRateLimiter( + queue.NewItemExponentialFailureRateLimiter(5*time.Second, 60*time.Second), + // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) + &queue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ) +}