Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-queue Failed Items from the Work-queue #326

Merged
merged 3 commits into from
Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/minio/minio v0.0.0-20200723003940-b9be841fd222
github.com/minio/minio-go/v7 v7.0.2
github.com/stretchr/testify v1.5.1
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1
k8s.io/api v0.18.6
k8s.io/apimachinery v0.18.6
k8s.io/client-go v0.18.6
Expand Down
47 changes: 37 additions & 10 deletions pkg/controller/cluster/main-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"strings"
"time"

"golang.org/x/time/rate"

"github.com/docker/cli/cli/config/configfile"

"k8s.io/klog/v2"
Expand Down Expand Up @@ -120,6 +122,9 @@ const (
StatusInconsistentMinIOVersions = "Different versions across MinIO Zones"
)

// ErrMinIONotReady is the error returned when MinIO is not Ready
var ErrMinIONotReady = fmt.Errorf("MinIO is not ready")

// Controller struct watches the Kubernetes API for changes to Tenant resources
type Controller struct {
// kubeClientSet is a standard kubernetes clientset
Expand Down Expand Up @@ -219,7 +224,7 @@ func NewController(
tenantsSynced: tenantInformer.Informer().HasSynced,
serviceLister: serviceInformer.Lister(),
serviceListerSynced: serviceInformer.Informer().HasSynced,
workqueue: queue.NewNamedRateLimitingQueue(queue.DefaultControllerRateLimiter(), "Tenants"),
workqueue: queue.NewNamedRateLimitingQueue(MinIOControllerRateLimiter(), "Tenants"),
recorder: recorder,
hostsTemplate: hostsTemplate,
operatorVersion: operatorVersion,
Expand All @@ -233,6 +238,7 @@ func NewController(
tenantInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueTenant,
UpdateFunc: func(old, new interface{}) {
// TODO: compare old vs new and don't enqueue if they are identical
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you not use reflect.Equal(old, new) ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a package from kubernetes for that equality.Semantic.DeepEqual and equality.Semantic.DeepDerivative I experimented with both, they didn't work quite as I expected, I'll keep looking into it.

controller.enqueueTenant(new)
},
})
Expand Down Expand Up @@ -757,6 +763,8 @@ func (c *Controller) processNextWorkItem() bool {
// Run the syncHandler, passing it the namespace/name string of the
// Tenant resource to be synced.
if err := c.syncHandler(key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s", key, err.Error())
}
// Finally, if no error occurs we Forget this item so it does not
Expand Down Expand Up @@ -826,7 +834,8 @@ func (c *Controller) syncHandler(key string) error {
if _, err2 = c.updateTenantStatus(ctx, tenant, err.Error(), 0); err2 != nil {
klog.V(2).Infof(err2.Error())
}
return err
// return nil so we don't re-queue this work item
return nil
}

secret, err := c.applyOperatorWebhookSecret(ctx, tenant)
Expand All @@ -842,7 +851,8 @@ func (c *Controller) syncHandler(key string) error {
if _, err = c.updateTenantStatus(ctx, tenant, msg, 0); err != nil {
klog.V(2).Infof(err.Error())
}
return fmt.Errorf(msg)
// return nil so we don't re-queue this work item
return nil
}

// TLS is mandatory if KES is enabled
Expand All @@ -856,7 +866,8 @@ func (c *Controller) syncHandler(key string) error {
if _, err = c.updateTenantStatus(ctx, tenant, msg, 0); err != nil {
klog.V(2).Infof(err.Error())
}
return fmt.Errorf(msg)
// return nil so we don't re-queue this work item
return nil
}
}

Expand Down Expand Up @@ -911,7 +922,8 @@ func (c *Controller) syncHandler(key string) error {
if _, err = c.updateTenantStatus(ctx, t, StatusFailedAlreadyExists, 0); err != nil {
return err
}
return fmt.Errorf("Failed creating MinIO Tenant '%s' because another MinIO Tenant already exists in the namespace '%s'", t.Name, tenant.Namespace)
// return nil so we don't re-queue this work item
return nil
}
}
}
Expand Down Expand Up @@ -943,7 +955,7 @@ func (c *Controller) syncHandler(key string) error {

// Check healthcheck for previous zone, if they are online before adding this zone.
if i > 0 && !tenant.MinIOHealthCheck() {
return fmt.Errorf("MinIO is not ready")
return ErrMinIONotReady
}

// If auto cert is enabled, create certificates for MinIO and
Expand Down Expand Up @@ -1021,7 +1033,8 @@ func (c *Controller) syncHandler(key string) error {
}
msg := fmt.Sprintf(MessageResourceExists, ss.Name)
c.recorder.Event(tenant, corev1.EventTypeWarning, ErrResourceExists, msg)
return fmt.Errorf(msg)
// return nil so we don't re-queue this work item, this error won't get fixed by reprocessing
return nil
}

// keep track of all replicas
Expand All @@ -1046,7 +1059,7 @@ func (c *Controller) syncHandler(key string) error {
// So comparing tenant.Spec.Image (version to update to) against one value from images slice is fine.
if tenant.Spec.Image != images[0] && tenant.Status.CurrentState != StatusUpdatingMinIOVersion {
if !tenant.MinIOHealthCheck() {
return fmt.Errorf("MinIO is not ready")
return ErrMinIONotReady
}

// Images different with the newer state change, continue to verify
Expand Down Expand Up @@ -1135,7 +1148,11 @@ func (c *Controller) syncHandler(key string) error {
if !tenant.HasCredsSecret() || !tenant.HasConsoleSecret() {
msg := "Please set the credentials"
klog.V(2).Infof(msg)
return fmt.Errorf(msg)
if _, terr := c.updateTenantStatus(ctx, tenant, msg, totalReplicas); terr != nil {
return err
}
// return nil so we don't re-queue this work item
return nil
}

consoleSecretName := tenant.Spec.Console.ConsoleSecret.Name
Expand All @@ -1149,7 +1166,7 @@ func (c *Controller) syncHandler(key string) error {
if _, err = c.updateTenantStatus(ctx, tenant, StatusWaitingForReadyState, totalReplicas); err != nil {
return err
}
return fmt.Errorf("MinIO is not ready")
return ErrMinIONotReady
}

if tenant, err = c.updateTenantStatus(ctx, tenant, StatusProvisioningConsoleDeployment, totalReplicas); err != nil {
Expand Down Expand Up @@ -1447,3 +1464,13 @@ func (c *Controller) checkAndCreateConsoleCSR(ctx context.Context, nsName types.
}
return nil
}

// MinIOControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue for our controller.
// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
func MinIOControllerRateLimiter() queue.RateLimiter {
return queue.NewMaxOfRateLimiter(
queue.NewItemExponentialFailureRateLimiter(5*time.Second, 60*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&queue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}