diff --git a/pkg/network/status/status.go b/pkg/network/status/status.go index cda7dbd80ece..4d1e527ea956 100644 --- a/pkg/network/status/status.go +++ b/pkg/network/status/status.go @@ -30,6 +30,7 @@ import ( "time" "go.uber.org/zap" + "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" @@ -47,6 +48,9 @@ const ( // probeTimeout defines the maximum amount of time a request will wait probeTimeout = 1 * time.Second probePath = "/healthz" + // initialDelay defines the delay before enqueuing a probing request the first time. + // It gives times for the change to propagate and prevents unnecessary retries. + initialDelay = 200 * time.Millisecond ) var dialContext = (&net.Dialer{Timeout: probeTimeout}).DialContext @@ -134,7 +138,12 @@ func NewProber( ingressStates: make(map[string]*ingressState), podContexts: make(map[string]cancelContext), workQueue: workqueue.NewNamedRateLimitingQueue( - workqueue.DefaultControllerRateLimiter(), + workqueue.NewMaxOfRateLimiter( + // Per item exponential backoff + workqueue.NewItemExponentialFailureRateLimiter(50*time.Millisecond, 30*time.Second), + // Global rate limiter + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(50), 100)}, + ), "ProbingQueue"), targetLister: targetLister, readyCallback: readyCallback, @@ -252,7 +261,7 @@ func (m *Prober) IsReady(ctx context.Context, ing *v1alpha1.Ingress) (bool, erro for _, wi := range ipWorkItems { wi.podState = podState wi.context = podCtx - m.workQueue.AddRateLimited(wi) + m.workQueue.AddAfter(wi, initialDelay) m.logger.Infof("Queuing probe for %s, IP: %s:%s (depth: %d)", wi.url, wi.podIP, wi.podPort, m.workQueue.Len()) }