diff --git a/go.mod b/go.mod index 9849789..35606f4 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( k8s.io/api v0.25.3 k8s.io/apimachinery v0.25.3 k8s.io/client-go v0.25.3 + k8s.io/klog/v2 v2.70.1 k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed sigs.k8s.io/controller-runtime v0.12.3 ) @@ -117,7 +118,6 @@ require ( k8s.io/apiserver v0.25.3 // indirect k8s.io/component-base v0.25.3 // indirect k8s.io/klog v1.0.0 // indirect - k8s.io/klog/v2 v2.70.1 // indirect k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 // indirect open-cluster-management.io/api v0.7.0 // indirect sigs.k8s.io/apiserver-network-proxy v0.0.30 // indirect diff --git a/pkg/source/builtin/k8sresourcewatcher/controller/controller.go b/pkg/source/builtin/k8sresourcewatcher/controller/controller.go index 7af294b..841110a 100644 --- a/pkg/source/builtin/k8sresourcewatcher/controller/controller.go +++ b/pkg/source/builtin/k8sresourcewatcher/controller/controller.go @@ -35,12 +35,12 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" "github.com/kubevela/kube-trigger/api/v1alpha1" "github.com/kubevela/kube-trigger/pkg/eventhandler" "github.com/kubevela/kube-trigger/pkg/source/builtin/k8sresourcewatcher/types" "github.com/kubevela/kube-trigger/pkg/source/builtin/k8sresourcewatcher/utils" + "github.com/kubevela/kube-trigger/pkg/workqueue" ) const maxRetries = 5 diff --git a/pkg/workqueue/default_rate_limiters.go b/pkg/workqueue/default_rate_limiters.go new file mode 100644 index 0000000..089e2b4 --- /dev/null +++ b/pkg/workqueue/default_rate_limiters.go @@ -0,0 +1,261 @@ +/* +Copyright 2023 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "math" + "sync" + "time" + + "golang.org/x/time/rate" +) + +// RateLimiter . +type RateLimiter interface { + // When gets an item and gets to decide how long that item should wait + When(item interface{}) time.Duration + // Forget indicates that an item is finished being retried. Doesn't matter whether it's for failing + // or for success, we'll stop tracking it + Forget(item interface{}) + // NumRequeues returns back how many failures the item has had + NumRequeues(item interface{}) int +} + +// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has +// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential +func DefaultControllerRateLimiter() RateLimiter { + return NewMaxOfRateLimiter( + NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second), + // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) + &BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ) +} + +// BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API +type BucketRateLimiter struct { + *rate.Limiter +} + +// RateLimiter . +var _ RateLimiter = &BucketRateLimiter{} + +// When . +func (r *BucketRateLimiter) When(item interface{}) time.Duration { + return r.Limiter.Reserve().Delay() +} + +// NumRequeues . +func (r *BucketRateLimiter) NumRequeues(item interface{}) int { + return 0 +} + +// Forget . +func (r *BucketRateLimiter) Forget(item interface{}) { +} + +// ItemExponentialFailureRateLimiter does a simple baseDelay*2^ limit +// dealing with max failures and expiration are up to the caller +type ItemExponentialFailureRateLimiter struct { + failuresLock sync.Mutex + failures map[interface{}]int + + baseDelay time.Duration + maxDelay time.Duration +} + +var _ RateLimiter = &ItemExponentialFailureRateLimiter{} + +// NewItemExponentialFailureRateLimiter creates a new ItemExponentialFailureRateLimiter +func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter { + return &ItemExponentialFailureRateLimiter{ + failures: map[interface{}]int{}, + baseDelay: baseDelay, + maxDelay: maxDelay, + } +} + +// DefaultItemBasedRateLimiter is a no-arg constructor for a default rate limiter for a workqueue +func DefaultItemBasedRateLimiter() RateLimiter { + return NewItemExponentialFailureRateLimiter(time.Millisecond, 1000*time.Second) +} + +// When . +func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration { + r.failuresLock.Lock() + defer r.failuresLock.Unlock() + + exp := r.failures[item] + r.failures[item]++ + + // The backoff is capped such that 'calculated' value never overflows. + backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp)) + if backoff > math.MaxInt64 { + return r.maxDelay + } + + calculated := time.Duration(backoff) + if calculated > r.maxDelay { + return r.maxDelay + } + + return calculated +} + +// NumRequeues . +func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int { + r.failuresLock.Lock() + defer r.failuresLock.Unlock() + + return r.failures[item] +} + +// Forget . +func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) { + r.failuresLock.Lock() + defer r.failuresLock.Unlock() + + delete(r.failures, item) +} + +// ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that +type ItemFastSlowRateLimiter struct { + failuresLock sync.Mutex + failures map[interface{}]int + + maxFastAttempts int + fastDelay time.Duration + slowDelay time.Duration +} + +// RateLimiter . +var _ RateLimiter = &ItemFastSlowRateLimiter{} + +// NewItemFastSlowRateLimiter creates a new ItemFastSlowRateLimiter +func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter { + return &ItemFastSlowRateLimiter{ + failures: map[interface{}]int{}, + fastDelay: fastDelay, + slowDelay: slowDelay, + maxFastAttempts: maxFastAttempts, + } +} + +// When . +func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration { + r.failuresLock.Lock() + defer r.failuresLock.Unlock() + + r.failures[item]++ + + if r.failures[item] <= r.maxFastAttempts { + return r.fastDelay + } + + return r.slowDelay +} + +// NumRequeues . +func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int { + r.failuresLock.Lock() + defer r.failuresLock.Unlock() + + return r.failures[item] +} + +// Forget . +func (r *ItemFastSlowRateLimiter) Forget(item interface{}) { + r.failuresLock.Lock() + defer r.failuresLock.Unlock() + + delete(r.failures, item) +} + +// MaxOfRateLimiter calls every RateLimiter and returns the worst case response +// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items +// were separately delayed a longer time. +type MaxOfRateLimiter struct { + limiters []RateLimiter +} + +// When . +func (r *MaxOfRateLimiter) When(item interface{}) time.Duration { + ret := time.Duration(0) + for _, limiter := range r.limiters { + curr := limiter.When(item) + if curr > ret { + ret = curr + } + } + + return ret +} + +// NewMaxOfRateLimiter creates a new MaxOfRateLimiter +func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter { + return &MaxOfRateLimiter{limiters: limiters} +} + +// NumRequeues . +func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int { + ret := 0 + for _, limiter := range r.limiters { + curr := limiter.NumRequeues(item) + if curr > ret { + ret = curr + } + } + + return ret +} + +// Forget . +func (r *MaxOfRateLimiter) Forget(item interface{}) { + for _, limiter := range r.limiters { + limiter.Forget(item) + } +} + +// WithMaxWaitRateLimiter have maxDelay which avoids waiting too long +type WithMaxWaitRateLimiter struct { + limiter RateLimiter + maxDelay time.Duration +} + +// NewWithMaxWaitRateLimiter creates a new WithMaxWaitRateLimiter +func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter { + return &WithMaxWaitRateLimiter{limiter: limiter, maxDelay: maxDelay} +} + +// When . +func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration { + delay := w.limiter.When(item) + if delay > w.maxDelay { + return w.maxDelay + } + + return delay +} + +// Forget . +func (w WithMaxWaitRateLimiter) Forget(item interface{}) { + w.limiter.Forget(item) +} + +// NumRequeues . +func (w WithMaxWaitRateLimiter) NumRequeues(item interface{}) int { + return w.limiter.NumRequeues(item) +} diff --git a/pkg/workqueue/delaying_queue.go b/pkg/workqueue/delaying_queue.go new file mode 100644 index 0000000..82075b6 --- /dev/null +++ b/pkg/workqueue/delaying_queue.go @@ -0,0 +1,287 @@ +/* +Copyright 2023 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "container/heap" + "sync" + "time" + + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/utils/clock" +) + +// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to +// requeue items after failures without ending up in a hot-loop. +type DelayingInterface interface { + Interface + // AddAfter adds an item to the workqueue after the indicated duration has passed + AddAfter(item interface{}, duration time.Duration) +} + +// NewDelayingQueue constructs a new workqueue with delayed queuing ability +func NewDelayingQueue() DelayingInterface { + return NewDelayingQueueWithCustomClock(clock.RealClock{}, "") +} + +// NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to +// inject custom queue Interface instead of the default one +func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface { + return newDelayingQueue(clock.RealClock{}, q, name) +} + +// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability +func NewNamedDelayingQueue(name string) DelayingInterface { + return NewDelayingQueueWithCustomClock(clock.RealClock{}, name) +} + +// NewDelayingQueueWithCustomClock constructs a new named workqueue +// with ability to inject real or fake clock for testing purposes +func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface { + return newDelayingQueue(clock, NewNamed(name), name) +} + +func newDelayingQueue(clock clock.WithTicker, q Interface, name string) *delayingType { + ret := &delayingType{ + Interface: q, + clock: clock, + heartbeat: clock.NewTicker(maxWait), + stopCh: make(chan struct{}), + waitingForAddCh: make(chan *waitFor, 1000), + waitingForQueue: &waitForPriorityQueue{}, + metrics: newRetryMetrics(name), + } + + go ret.waitingLoop() + return ret +} + +// delayingType wraps an Interface and provides delayed re-enquing +type delayingType struct { + Interface + + // clock tracks time for delayed firing + clock clock.Clock + + // stopCh lets us signal a shutdown to the waiting loop + stopCh chan struct{} + // stopOnce guarantees we only signal shutdown a single time + stopOnce sync.Once + + // heartbeat ensures we wait no more than maxWait before firing + heartbeat clock.Ticker + + // waitingForAddCh is a buffered channel that feeds waitingForAdd + waitingForAddCh chan *waitFor + + waitingForQueue *waitForPriorityQueue + + // metrics counts the number of retries + metrics retryMetrics +} + +func (q *delayingType) Len() int { + return len(q.waitingForAddCh) + q.waitingForQueue.Len() + q.Interface.Len() +} + +// waitFor holds the data to push and the time it should be added +type waitFor struct { + data t + readyAt time.Time + // index in the priority queue (heap) + index int +} + +// waitForPriorityQueue implements a priority queue for waitFor items. +// +// waitForPriorityQueue implements heap.Interface. The item occurring next in +// time (i.e., the item with the smallest readyAt) is at the root (index 0). +// Peek returns this minimum item at index 0. Pop returns the minimum item after +// it has been removed from the queue and placed at index Len()-1 by +// container/heap. Push adds an item at index Len(), and container/heap +// percolates it into the correct location. +type waitForPriorityQueue []*waitFor + +func (pq waitForPriorityQueue) Len() int { + return len(pq) +} +func (pq waitForPriorityQueue) Less(i, j int) bool { + return pq[i].readyAt.Before(pq[j].readyAt) +} +func (pq waitForPriorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +// Push adds an item to the queue. Push should not be called directly; instead, +// use `heap.Push`. +func (pq *waitForPriorityQueue) Push(x interface{}) { + n := len(*pq) + item := x.(*waitFor) + item.index = n + *pq = append(*pq, item) +} + +// Pop removes an item from the queue. Pop should not be called directly; +// instead, use `heap.Pop`. +func (pq *waitForPriorityQueue) Pop() interface{} { + n := len(*pq) + item := (*pq)[n-1] + item.index = -1 + *pq = (*pq)[0:(n - 1)] + return item +} + +// Peek returns the item at the beginning of the queue, without removing the +// item or otherwise mutating the queue. It is safe to call directly. +func (pq waitForPriorityQueue) Peek() interface{} { + return pq[0] +} + +// ShutDown stops the queue. After the queue drains, the returned shutdown bool +// on Get() will be true. This method may be invoked more than once. +func (q *delayingType) ShutDown() { + q.stopOnce.Do(func() { + q.Interface.ShutDown() + close(q.stopCh) + q.heartbeat.Stop() + }) +} + +// AddAfter adds the given item to the work queue after the given delay +func (q *delayingType) AddAfter(item interface{}, duration time.Duration) { + // don't push if we're already shutting down + if q.ShuttingDown() { + return + } + + q.metrics.retry() + + // immediately push things with no delay + if duration <= 0 { + q.Add(item) + return + } + + select { + case <-q.stopCh: + // unblock if ShutDown() is called + case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}: + } +} + +// maxWait keeps a max bound on the wait time. It's just insurance against weird things happening. +// Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an +// expired item sitting for more than 10 seconds. +const maxWait = 10 * time.Second + +// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added. +func (q *delayingType) waitingLoop() { + defer utilruntime.HandleCrash() + + // Make a placeholder channel to use when there are no items in our list + never := make(<-chan time.Time) + + // Make a timer that expires when the item at the head of the waiting queue is ready + var nextReadyAtTimer clock.Timer + + waitingForQueue := q.waitingForQueue + heap.Init(waitingForQueue) + + waitingEntryByData := map[t]*waitFor{} + + for { + if q.Interface.ShuttingDown() { + return + } + + now := q.clock.Now() + + // Add ready entries + for waitingForQueue.Len() > 0 { + entry := waitingForQueue.Peek().(*waitFor) + if entry.readyAt.After(now) { + break + } + + entry = heap.Pop(waitingForQueue).(*waitFor) + q.Add(entry.data) + delete(waitingEntryByData, entry.data) + } + + // Set up a wait for the first item's readyAt (if one exists) + nextReadyAt := never + if waitingForQueue.Len() > 0 { + if nextReadyAtTimer != nil { + nextReadyAtTimer.Stop() + } + entry := waitingForQueue.Peek().(*waitFor) + nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now)) + nextReadyAt = nextReadyAtTimer.C() + } + + select { + case <-q.stopCh: + return + + case <-q.heartbeat.C(): + // continue the loop, which will push ready items + + case <-nextReadyAt: + // continue the loop, which will push ready items + + case waitEntry := <-q.waitingForAddCh: + if waitEntry.readyAt.After(q.clock.Now()) { + insert(waitingForQueue, waitingEntryByData, waitEntry) + } else { + q.Add(waitEntry.data) + } + + drained := false + for !drained { + select { + case waitEntry := <-q.waitingForAddCh: + if waitEntry.readyAt.After(q.clock.Now()) { + insert(waitingForQueue, waitingEntryByData, waitEntry) + } else { + q.Add(waitEntry.data) + } + default: + drained = true + } + } + } + } +} + +// insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue +func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) { + // if the entry already exists, update the time only if it would cause the item to be queued sooner + existing, exists := knownEntries[entry.data] + if exists { + if existing.readyAt.After(entry.readyAt) { + existing.readyAt = entry.readyAt + heap.Fix(q, existing.index) + } + + return + } + + heap.Push(q, entry) + knownEntries[entry.data] = entry +} diff --git a/pkg/workqueue/doc.go b/pkg/workqueue/doc.go new file mode 100644 index 0000000..19823e7 --- /dev/null +++ b/pkg/workqueue/doc.go @@ -0,0 +1,26 @@ +/* +Copyright 2023 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package workqueue provides a simple queue that supports the following +// features: +// - Fair: items processed in the order in which they are added. +// - Stingy: a single item will not be processed multiple times concurrently, +// and if an item is added multiple times before it can be processed, it +// will only be processed once. +// - Multiple consumers and producers. In particular, it is allowed for an +// item to be reenqueued while it is being processed. +// - Shutdown notifications. +package workqueue // import "k8s.io/client-go/util/workqueue" diff --git a/pkg/workqueue/indexer_delaying_queue.go b/pkg/workqueue/indexer_delaying_queue.go new file mode 100644 index 0000000..e52201d --- /dev/null +++ b/pkg/workqueue/indexer_delaying_queue.go @@ -0,0 +1,431 @@ +/* +Copyright 2023 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "container/heap" + "sync" + "time" + + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/klog/v2" + "k8s.io/utils/clock" +) + +const ( + queueItemCap = 128 +) + +type indexerDelayingQueue struct { + clock clock.Clock + + keyFunc func(obj interface{}) (string, error) + + // stopCh lets us signal a shutdown to the waiting loop + stopCh chan struct{} + // stopOnce guarantees we only signal shutdown a single time + stopOnce sync.Once + + // heartbeat ensures we wait no more than maxWait before firing + heartbeat clock.Ticker + + // waitingForAddCh is a buffered channel that feeds waitingForAdd + waitingForAddCh chan *waitFor + + waitingForQueue *waitForPriorityQueue + + knownPrepareEntries cacheEntries + + queue []string + + // dirty defines all of the items that need to be processed. + dirty set + + cond *sync.Cond + + addCond *sync.Cond + + shuttingDown bool + drain bool + + metrics queueMetrics + + processingEntries cacheEntries +} + +// NewIndexerDelayingQueue . +func NewIndexerDelayingQueue(name string, keyFunc func(obj interface{}) (string, error)) DelayingInterface { + return newIndexerDelayingQueue(clock.RealClock{}, name, keyFunc) +} + +func newIndexerDelayingQueue(clock clock.WithTicker, name string, keyFunc func(obj interface{}) (string, error)) *indexerDelayingQueue { + mutex := &sync.Mutex{} + ret := &indexerDelayingQueue{ + clock: clock, + keyFunc: keyFunc, + heartbeat: clock.NewTicker(maxWait), + stopCh: make(chan struct{}), + waitingForAddCh: make(chan *waitFor, 1000), + waitingForQueue: &waitForPriorityQueue{}, + knownPrepareEntries: cacheEntries{}, + + dirty: set{}, + cond: sync.NewCond(mutex), + addCond: sync.NewCond(mutex), + metrics: globalMetricsFactory.newQueueMetrics(name, clock), + processingEntries: cacheEntries{}, + } + + go ret.waitingLoop() + return ret +} + +func (q *indexerDelayingQueue) Len() int { + q.cond.L.Lock() + defer q.cond.L.Unlock() + return len(q.knownPrepareEntries) + len(q.processingEntries) +} + +// waitEntry holds the data to push and the time it should be added +type waitEntry struct { + data t + readyAt time.Time +} + +// ShutDown stops the queue. After the queue drains, the returned shutdown bool +// on Get() will be true. This method may be invoked more than once. +func (q *indexerDelayingQueue) ShutDown() { + q.stopOnce.Do(func() { + q.shutdown() + close(q.stopCh) + q.heartbeat.Stop() + }) +} + +func (q *indexerDelayingQueue) shutdown() { + q.cond.L.Lock() + defer q.cond.L.Unlock() + q.shuttingDown = true + q.cond.Broadcast() + q.addCond.Broadcast() +} + +// waitForProcessing waits for the worker goroutines to finish processing items +// and call Done on them. +func (q *indexerDelayingQueue) waitForProcessing() { + q.cond.L.Lock() + defer q.cond.L.Unlock() + // Ensure that we do not wait on a queue which is already empty, as that + // could result in waiting for Done to be called on items in an empty queue + // which has already been shut down, which will result in waiting + // indefinitely. + if q.processingEntries.len() == 0 { + return + } + q.cond.Wait() +} + +func (q *indexerDelayingQueue) setDrain(shouldDrain bool) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + q.drain = shouldDrain +} + +func (q *indexerDelayingQueue) shouldDrain() bool { + q.cond.L.Lock() + defer q.cond.L.Unlock() + return q.drain +} + +// isProcessing indicates if there are still items on the work queue being +// processed. It's used to drain the work queue on an eventual shutdown. +func (q *indexerDelayingQueue) isProcessing() bool { + q.cond.L.Lock() + defer q.cond.L.Unlock() + return q.processingEntries.len() != 0 +} + +func (q *indexerDelayingQueue) ShutDownWithDrain() { + q.stopOnce.Do(func() { + q.setDrain(true) + q.shutdown() + close(q.stopCh) + q.heartbeat.Stop() + }) + for q.isProcessing() && q.shouldDrain() { + q.waitForProcessing() + } +} + +func (q *indexerDelayingQueue) ShuttingDown() bool { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + return q.shuttingDown +} + +// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added. +func (q *indexerDelayingQueue) waitingLoop() { + defer utilruntime.HandleCrash() + + // Make a placeholder channel to use when there are no items in our list + never := make(<-chan time.Time) + + // Make a timer that expires when the item at the head of the waiting queue is ready + var nextReadyAtTimer clock.Timer + + waitingForQueue := q.waitingForQueue + heap.Init(waitingForQueue) + + waitingEntryByData := map[t]*waitFor{} + + for { + if q.ShuttingDown() { + return + } + + now := q.clock.Now() + + // Add ready entries + for waitingForQueue.Len() > 0 { + entry := waitingForQueue.Peek().(*waitFor) + if entry.readyAt.After(now) { + break + } + + entry = heap.Pop(waitingForQueue).(*waitFor) + q.push(entry.data.(string)) + delete(waitingEntryByData, entry.data) + } + + // Set up a wait for the first item's readyAt (if one exists) + nextReadyAt := never + if waitingForQueue.Len() > 0 { + if nextReadyAtTimer != nil { + nextReadyAtTimer.Stop() + } + entry := waitingForQueue.Peek().(*waitFor) + nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now)) + nextReadyAt = nextReadyAtTimer.C() + } + + select { + case <-q.stopCh: + return + + case <-q.heartbeat.C(): + // continue the loop, which will push ready items + + case <-nextReadyAt: + // continue the loop, which will push ready items + + case waitEntry := <-q.waitingForAddCh: + if waitEntry.readyAt.After(q.clock.Now()) { + insert(waitingForQueue, waitingEntryByData, waitEntry) + } else { + q.push(waitEntry.data.(string)) + } + + drained := false + for !drained { + select { + case waitEntry := <-q.waitingForAddCh: + if waitEntry.readyAt.After(q.clock.Now()) { + insert(waitingForQueue, waitingEntryByData, waitEntry) + } else { + q.push(waitEntry.data.(string)) + } + default: + drained = true + } + } + } + } +} + +func (q *indexerDelayingQueue) Add(item interface{}) { + q.shouldAdd() + q.AddAfter(item, 0) +} + +func (q *indexerDelayingQueue) shouldAdd() { + q.addCond.L.Lock() + defer q.addCond.L.Unlock() + for len(q.knownPrepareEntries)+len(q.processingEntries) >= queueItemCap && !q.shuttingDown { + q.addCond.Wait() + } +} + +func (q *indexerDelayingQueue) AddAfter(item interface{}, delay time.Duration) { + key, err := q.keyFunc(item) + if err != nil { + klog.ErrorS(err, "indexerDelayingQueue generate key") + return + } + readAt := q.clock.Now().Add(delay) + + needPush := func() (needPush bool) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + if q.shuttingDown { + return + } + waitItem, exist := q.knownPrepareEntries.get(key) + if exist { + if o, ok := waitItem.data.(Compared); ok && o.LessOrEqual(item) { + waitItem.data = item + } + if waitItem.readyAt.After(readAt) { + waitItem.readyAt = readAt + needPush = true + } + return + } + processItem, exist := q.processingEntries.get(key) + if exist { + if o, ok := processItem.data.(Compared); ok && !o.LessOrEqual(item) { + return + } + } + q.knownPrepareEntries.insert(key, &waitEntry{ + data: item, + readyAt: readAt, + }) + q.metrics.add(key) + needPush = true + return + }() + + if needPush { + if readAt.Before(q.clock.Now()) { + q.push(key) + return + } + q.pushDelayQueue(key, readAt) + } +} + +func (q *indexerDelayingQueue) pushDelayQueue(key string, readyAt time.Time) { + select { + case <-q.stopCh: + // unblock if ShutDown() is called + case q.waitingForAddCh <- &waitFor{data: key, readyAt: readyAt}: + } +} + +// push marks item as needing processing. +func (q *indexerDelayingQueue) push(key string) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + if q.shuttingDown { + return + } + + // expired item,should be ignored. + if item, exist := q.knownPrepareEntries.get(key); !exist { + return + } else if item.readyAt.After(q.clock.Now()) { + return + } + + if q.dirty.has(key) { + return + } + + q.metrics.add(key) + + q.dirty.insert(key) + if _, exist := q.processingEntries.get(key); exist { + return + } + + q.queue = append(q.queue, key) + q.cond.Signal() +} + +// Get blocks until it can return an item to be processed. If shutdown = true, +// the caller should end their goroutine. You must call Done with item when you +// have finished processing it. +func (q *indexerDelayingQueue) Get() (item interface{}, shutdown bool) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + for len(q.queue) == 0 && !q.shuttingDown { + q.cond.Wait() + } + if len(q.queue) == 0 { + // We must be shutting down. + return nil, true + } + + key := q.queue[0] + q.queue = q.queue[1:] + + entry, _ := q.knownPrepareEntries.get(key) + q.processingEntries.insert(key, entry) + q.dirty.delete(key) + q.knownPrepareEntries.delete(key) + return entry.data, false +} + +// Done marks item as done processing, and if it has been marked as dirty again +// while it was being processed, it will be re-added to the queue for +// re-processing. +func (q *indexerDelayingQueue) Done(item interface{}) { + key, err := q.keyFunc(item) + if err != nil { + klog.ErrorS(err, "indexerDelayingQueue generate key") + return + } + + q.cond.L.Lock() + defer q.cond.L.Unlock() + + q.metrics.done(key) + + q.processingEntries.delete(key) + if q.dirty.has(key) { + q.queue = append(q.queue, key) + q.cond.Signal() + } else if q.processingEntries.len() == 0 { + q.cond.Signal() + } + q.addCond.Signal() +} + +type cacheEntries map[string]*waitEntry + +func (s cacheEntries) get(key string) (*waitEntry, bool) { + val, exists := s[key] + return val, exists +} + +func (s cacheEntries) insert(key string, value *waitEntry) { + s[key] = value +} + +func (s cacheEntries) delete(key string) { + delete(s, key) +} + +func (s cacheEntries) len() int { + return len(s) +} + +// Compared . +type Compared interface { + LessOrEqual(item interface{}) bool +} diff --git a/pkg/workqueue/indexer_delaying_queue_test.go b/pkg/workqueue/indexer_delaying_queue_test.go new file mode 100644 index 0000000..778436f --- /dev/null +++ b/pkg/workqueue/indexer_delaying_queue_test.go @@ -0,0 +1,213 @@ +/* +Copyright 2023 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "fmt" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" +) + +func TestIndexerDelayingQueue_Version(t *testing.T) { + q := NewIndexerDelayingQueue("test", metaNamespaceKeyFunc) + + var p1 = &podWrap{&corev1.Pod{}} + p1.Name = "ss" + p1.Namespace = "abc" + p1.ResourceVersion = "111" + var p2 = &podWrap{p1.DeepCopy()} + p2.ResourceVersion = "112" + + q.Add(p1) + func() { + item, _ := q.Get() + // ... handle occur error + defer q.Done(item) + q.AddAfter(item, time.Second) + }() + q.Add(p2) + item, _ := q.Get() + if q.Len() != 1 { + t.Errorf("expect queue len: 1, but %d", q.Len()) + return + } + if item.(*podWrap).ResourceVersion != "112" { + t.Errorf("expect the resource version : 112, but %s", item.(*podWrap).ResourceVersion) + return + } + q.Done(item) + if q.Len() != 0 { + t.Errorf("expect queue len: 0, but %d", q.Len()) + return + } +} + +func TestIndexerDelayingQueue_Parallel(t *testing.T) { + q := NewIndexerDelayingQueue("test", metaNamespaceKeyFunc) + + var p1 = &podWrap{&corev1.Pod{}} + p1.Name = "ss" + p1.Namespace = "abc" + p1.ResourceVersion = "111" + var p2 = &podWrap{p1.DeepCopy()} + p2.ResourceVersion = "112" + + q.Add(p1) + first := make(chan struct{}) + firstFinished := false + go func() { + item, _ := q.Get() + close(first) + time.Sleep(time.Second) + firstFinished = true + // ... handle occur error + defer q.Done(item) + q.AddAfter(item, time.Second) + }() + <-first + q.Add(p2) + item, _ := q.Get() + if !firstFinished { + t.Errorf("expect don't process the same key at the same time") + return + } + if item.(*podWrap).ResourceVersion != "112" { + t.Errorf("expect resource version: \"112\", but %s", item.(*podWrap).ResourceVersion) + return + } + q.AddAfter(item, time.Second) + q.Done(item) + + item, _ = q.Get() + if item.(*podWrap).ResourceVersion != "112" { + t.Errorf("expect resource version: \"112\", but %s", item.(*podWrap).ResourceVersion) + return + } + q.Done(item) + + if q.Len() != 0 { + t.Errorf("expect queue is empty,but %d", q.Len()) + return + } +} + +func TestIndexerDelayingQueue_ShutDown(t *testing.T) { + q := NewIndexerDelayingQueue("test", func(obj interface{}) (string, error) { + return fmt.Sprint(obj), nil + }) + q.Add("abc") + q.ShutDown() + + item, shutdown := q.Get() + + if shutdown { + t.Errorf("expect queue open, but shutdown") + return + } + + if item != "abc" { + t.Errorf("expect item: \"abc\", but %v", item) + return + } + + item, shutdown = q.Get() + + if !shutdown { + t.Errorf("expect queue shutdown, but open") + return + } + + q.Add("xxx") + item, shutdown = q.Get() + if !shutdown { + t.Errorf("expect queue shutdown, but open") + return + } + + if item != nil { + t.Errorf("expect item: nil, but %v", item) + return + } +} + +type podWrap struct { + *corev1.Pod +} + +func (w *podWrap) LessOrEqual(item interface{}) bool { + return w.GetResourceVersion() <= item.(*podWrap).GetResourceVersion() +} + +func TestBufferCap(t *testing.T) { + q := NewIndexerDelayingQueue("test", func(obj interface{}) (string, error) { + return fmt.Sprint(obj), nil + }) + for i := 1; i <= queueItemCap; i++ { + q.Add(i) + } + start := time.Now() + go func() { + time.Sleep(time.Second) + q.ShutDown() + }() + q.AddAfter(1000, 0) + if time.Now().Sub(start).Seconds() >= 1 { + t.Error("block AddAfter when queue overhead") + } + q.Add(1000) + if time.Now().Sub(start).Seconds() < 1 { + t.Error("can't block Add when queue overhead") + } + + q.ShutDown() + q.Add(1000) +} + +func TestIndexerQueueLen(t *testing.T) { + q := NewIndexerDelayingQueue("test", func(obj interface{}) (string, error) { + return fmt.Sprint(obj), nil + }) + + q.AddAfter(1, time.Second) + q.AddAfter(1, time.Second*2) + q.Add(1) + if q.Len() != 1 { + t.Errorf("q.Len() should be 1, but: %d", q.Len()) + } + item, _ := q.Get() + if item != 1 { + t.Errorf("q.Get() should be 1, but: %d", item) + } + q.Done(1) + if q.Len() != 0 { + t.Errorf("q.Len() should be 0, but: %d", q.Len()) + } +} + +func metaNamespaceKeyFunc(obj interface{}) (string, error) { + meta, err := meta.Accessor(obj) + if err != nil { + return "", fmt.Errorf("object has no meta: %v", err) + } + if len(meta.GetNamespace()) > 0 { + return meta.GetNamespace() + "/" + meta.GetName(), nil + } + return meta.GetName(), nil +} diff --git a/pkg/workqueue/metrics.go b/pkg/workqueue/metrics.go new file mode 100644 index 0000000..0a7ba17 --- /dev/null +++ b/pkg/workqueue/metrics.go @@ -0,0 +1,261 @@ +/* +Copyright 2023 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "sync" + "time" + + "k8s.io/utils/clock" +) + +// This file provides abstractions for setting the provider (e.g., prometheus) +// of metrics. + +type queueMetrics interface { + add(item t) + get(item t) + done(item t) + updateUnfinishedWork() +} + +// GaugeMetric represents a single numerical value that can arbitrarily go up +// and down. +type GaugeMetric interface { + Inc() + Dec() +} + +// SettableGaugeMetric represents a single numerical value that can arbitrarily go up +// and down. (Separate from GaugeMetric to preserve backwards compatibility.) +type SettableGaugeMetric interface { + Set(float64) +} + +// CounterMetric represents a single numerical value that only ever +// goes up. +type CounterMetric interface { + Inc() +} + +// SummaryMetric captures individual observations. +type SummaryMetric interface { + Observe(float64) +} + +// HistogramMetric counts individual observations. +type HistogramMetric interface { + Observe(float64) +} + +type noopMetric struct{} + +func (noopMetric) Inc() {} +func (noopMetric) Dec() {} +func (noopMetric) Set(float64) {} +func (noopMetric) Observe(float64) {} + +// defaultQueueMetrics expects the caller to lock before setting any metrics. +type defaultQueueMetrics struct { + clock clock.Clock + + // current depth of a workqueue + depth GaugeMetric + // total number of adds handled by a workqueue + adds CounterMetric + // how long an item stays in a workqueue + latency HistogramMetric + // how long processing an item from a workqueue takes + workDuration HistogramMetric + addTimes map[t]time.Time + processingStartTimes map[t]time.Time + + // how long have current threads been working? + unfinishedWorkSeconds SettableGaugeMetric + longestRunningProcessor SettableGaugeMetric +} + +func (m *defaultQueueMetrics) add(item t) { + if m == nil { + return + } + + m.adds.Inc() + m.depth.Inc() + if _, exists := m.addTimes[item]; !exists { + m.addTimes[item] = m.clock.Now() + } +} + +func (m *defaultQueueMetrics) get(item t) { + if m == nil { + return + } + + m.depth.Dec() + m.processingStartTimes[item] = m.clock.Now() + if startTime, exists := m.addTimes[item]; exists { + m.latency.Observe(m.sinceInSeconds(startTime)) + delete(m.addTimes, item) + } +} + +func (m *defaultQueueMetrics) done(item t) { + if m == nil { + return + } + + if startTime, exists := m.processingStartTimes[item]; exists { + m.workDuration.Observe(m.sinceInSeconds(startTime)) + delete(m.processingStartTimes, item) + } +} + +func (m *defaultQueueMetrics) updateUnfinishedWork() { + // Note that a summary metric would be better for this, but prometheus + // doesn't seem to have non-hacky ways to reset the summary metrics. + var total float64 + var oldest float64 + for _, t := range m.processingStartTimes { + age := m.sinceInSeconds(t) + total += age + if age > oldest { + oldest = age + } + } + m.unfinishedWorkSeconds.Set(total) + m.longestRunningProcessor.Set(oldest) +} + +type noMetrics struct{} + +func (noMetrics) add(item t) {} +func (noMetrics) get(item t) {} +func (noMetrics) done(item t) {} +func (noMetrics) updateUnfinishedWork() {} + +// Gets the time since the specified start in seconds. +func (m *defaultQueueMetrics) sinceInSeconds(start time.Time) float64 { + return m.clock.Since(start).Seconds() +} + +type retryMetrics interface { + retry() +} + +type defaultRetryMetrics struct { + retries CounterMetric +} + +func (m *defaultRetryMetrics) retry() { + if m == nil { + return + } + + m.retries.Inc() +} + +// MetricsProvider generates various metrics used by the queue. +type MetricsProvider interface { + NewDepthMetric(name string) GaugeMetric + NewAddsMetric(name string) CounterMetric + NewLatencyMetric(name string) HistogramMetric + NewWorkDurationMetric(name string) HistogramMetric + NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric + NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric + NewRetriesMetric(name string) CounterMetric +} + +type noopMetricsProvider struct{} + +func (noopMetricsProvider) NewDepthMetric(name string) GaugeMetric { + return noopMetric{} +} + +func (noopMetricsProvider) NewAddsMetric(name string) CounterMetric { + return noopMetric{} +} + +func (noopMetricsProvider) NewLatencyMetric(name string) HistogramMetric { + return noopMetric{} +} + +func (noopMetricsProvider) NewWorkDurationMetric(name string) HistogramMetric { + return noopMetric{} +} + +func (noopMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric { + return noopMetric{} +} + +func (noopMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric { + return noopMetric{} +} + +func (noopMetricsProvider) NewRetriesMetric(name string) CounterMetric { + return noopMetric{} +} + +var globalMetricsFactory = queueMetricsFactory{ + metricsProvider: noopMetricsProvider{}, +} + +type queueMetricsFactory struct { + metricsProvider MetricsProvider + + onlyOnce sync.Once +} + +func (f *queueMetricsFactory) setProvider(mp MetricsProvider) { + f.onlyOnce.Do(func() { + f.metricsProvider = mp + }) +} + +func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) queueMetrics { + mp := f.metricsProvider + if len(name) == 0 || mp == (noopMetricsProvider{}) { + return noMetrics{} + } + return &defaultQueueMetrics{ + clock: clock, + depth: mp.NewDepthMetric(name), + adds: mp.NewAddsMetric(name), + latency: mp.NewLatencyMetric(name), + workDuration: mp.NewWorkDurationMetric(name), + unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name), + longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name), + addTimes: map[t]time.Time{}, + processingStartTimes: map[t]time.Time{}, + } +} + +func newRetryMetrics(name string) retryMetrics { + var ret *defaultRetryMetrics + if len(name) == 0 { + return ret + } + return &defaultRetryMetrics{ + retries: globalMetricsFactory.metricsProvider.NewRetriesMetric(name), + } +} + +// SetProvider sets the metrics provider for all subsequently created work +// queues. Only the first call has an effect. +func SetProvider(metricsProvider MetricsProvider) { + globalMetricsFactory.setProvider(metricsProvider) +} diff --git a/pkg/workqueue/parallelizer.go b/pkg/workqueue/parallelizer.go new file mode 100644 index 0000000..d9e3677 --- /dev/null +++ b/pkg/workqueue/parallelizer.go @@ -0,0 +1,103 @@ +/* +Copyright 2023 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "context" + "sync" + + utilruntime "k8s.io/apimachinery/pkg/util/runtime" +) + +// DoWorkPieceFunc is a function that is called for each piece of work. +type DoWorkPieceFunc func(piece int) + +type options struct { + chunkSize int +} + +// Options is a function that sets options on the options struct. +type Options func(*options) + +// WithChunkSize allows to set chunks of work items to the workers, rather than +// processing one by one. +// It is recommended to use this option if the number of pieces significantly +// higher than the number of workers and the work done for each item is small. +func WithChunkSize(c int) func(*options) { + return func(o *options) { + o.chunkSize = c + } +} + +// ParallelizeUntil is a framework that allows for parallelizing N +// independent pieces of work until done or the context is canceled. +func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc, opts ...Options) { + if pieces == 0 { + return + } + o := options{} + for _, opt := range opts { + opt(&o) + } + chunkSize := o.chunkSize + if chunkSize < 1 { + chunkSize = 1 + } + + chunks := ceilDiv(pieces, chunkSize) + toProcess := make(chan int, chunks) + for i := 0; i < chunks; i++ { + toProcess <- i + } + close(toProcess) + + var stop <-chan struct{} + if ctx != nil { + stop = ctx.Done() + } + if chunks < workers { + workers = chunks + } + wg := sync.WaitGroup{} + wg.Add(workers) + for i := 0; i < workers; i++ { + go func() { + defer utilruntime.HandleCrash() + defer wg.Done() + for chunk := range toProcess { + start := chunk * chunkSize + end := start + chunkSize + if end > pieces { + end = pieces + } + for p := start; p < end; p++ { + select { + case <-stop: + return + default: + doWorkPiece(p) + } + } + } + }() + } + wg.Wait() +} + +func ceilDiv(a, b int) int { + return (a + b - 1) / b +} diff --git a/pkg/workqueue/queue.go b/pkg/workqueue/queue.go new file mode 100644 index 0000000..2b547f4 --- /dev/null +++ b/pkg/workqueue/queue.go @@ -0,0 +1,288 @@ +/* +Copyright 2023 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "sync" + "time" + + "k8s.io/utils/clock" +) + +// Interface is an abstract, pluggable interface for work queues. +type Interface interface { + Add(item interface{}) + Len() int + Get() (item interface{}, shutdown bool) + Done(item interface{}) + ShutDown() + ShutDownWithDrain() + ShuttingDown() bool +} + +// New constructs a new work queue (see the package comment). +func New() *Type { + return NewNamed("") +} + +// NewNamed . +func NewNamed(name string) *Type { + rc := clock.RealClock{} + return newQueue( + rc, + globalMetricsFactory.newQueueMetrics(name, rc), + defaultUnfinishedWorkUpdatePeriod, + ) +} + +func newQueue(c clock.WithTicker, metrics queueMetrics, updatePeriod time.Duration) *Type { + t := &Type{ + clock: c, + dirty: set{}, + processing: set{}, + cond: sync.NewCond(&sync.Mutex{}), + metrics: metrics, + unfinishedWorkUpdatePeriod: updatePeriod, + } + + // Don't start the goroutine for a type of noMetrics so we don't consume + // resources unnecessarily + if _, ok := metrics.(noMetrics); !ok { + go t.updateUnfinishedWorkLoop() + } + + return t +} + +const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond + +// Type is a work queue (see the package comment). +type Type struct { + // queue defines the order in which we will work on items. Every + // element of queue should be in the dirty set and not in the + // processing set. + queue []t + + // dirty defines all of the items that need to be processed. + dirty set + + // Things that are currently being processed are in the processing set. + // These things may be simultaneously in the dirty set. When we finish + // processing something and remove it from this set, we'll check if + // it's in the dirty set, and if so, push it to the queue. + processing set + + cond *sync.Cond + + shuttingDown bool + drain bool + + metrics queueMetrics + + unfinishedWorkUpdatePeriod time.Duration + clock clock.WithTicker +} + +type empty struct{} +type t interface{} +type set map[t]empty + +func (s set) has(item t) bool { + _, exists := s[item] + return exists +} + +func (s set) insert(item t) { + s[item] = empty{} +} + +func (s set) delete(item t) { + delete(s, item) +} + +func (s set) len() int { + return len(s) +} + +// Add marks item as needing processing. +func (q *Type) Add(item interface{}) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + if q.shuttingDown { + return + } + if q.dirty.has(item) { + return + } + + q.metrics.add(item) + + q.dirty.insert(item) + if q.processing.has(item) { + return + } + + q.queue = append(q.queue, item) + q.cond.Signal() +} + +// Len returns the current queue length, for informational purposes only. You +// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular +// value, that can't be synchronized properly. +func (q *Type) Len() int { + q.cond.L.Lock() + defer q.cond.L.Unlock() + return len(q.queue) +} + +// Get blocks until it can return an item to be processed. If shutdown = true, +// the caller should end their goroutine. You must call Done with item when you +// have finished processing it. +func (q *Type) Get() (item interface{}, shutdown bool) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + for len(q.queue) == 0 && !q.shuttingDown { + q.cond.Wait() + } + if len(q.queue) == 0 { + // We must be shutting down. + return nil, true + } + + item = q.queue[0] + // The underlying array still exists and reference this object, so the object will not be garbage collected. + q.queue[0] = nil + q.queue = q.queue[1:] + + q.metrics.get(item) + + q.processing.insert(item) + q.dirty.delete(item) + + return item, false +} + +// Done marks item as done processing, and if it has been marked as dirty again +// while it was being processed, it will be re-added to the queue for +// re-processing. +func (q *Type) Done(item interface{}) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + q.metrics.done(item) + + q.processing.delete(item) + if q.dirty.has(item) { + q.queue = append(q.queue, item) + q.cond.Signal() + } else if q.processing.len() == 0 { + q.cond.Signal() + } +} + +// ShutDown will cause q to ignore all new items added to it and +// immediately instruct the worker goroutines to exit. +func (q *Type) ShutDown() { + q.setDrain(false) + q.shutdown() +} + +// ShutDownWithDrain will cause q to ignore all new items added to it. As soon +// as the worker goroutines have "drained", i.e: finished processing and called +// Done on all existing items in the queue; they will be instructed to exit and +// ShutDownWithDrain will return. Hence: a strict requirement for using this is; +// your workers must ensure that Done is called on all items in the queue once +// the shut down has been initiated, if that is not the case: this will block +// indefinitely. It is, however, safe to call ShutDown after having called +// ShutDownWithDrain, as to force the queue shut down to terminate immediately +// without waiting for the drainage. +func (q *Type) ShutDownWithDrain() { + q.setDrain(true) + q.shutdown() + for q.isProcessing() && q.shouldDrain() { + q.waitForProcessing() + } +} + +// isProcessing indicates if there are still items on the work queue being +// processed. It's used to drain the work queue on an eventual shutdown. +func (q *Type) isProcessing() bool { + q.cond.L.Lock() + defer q.cond.L.Unlock() + return q.processing.len() != 0 +} + +// waitForProcessing waits for the worker goroutines to finish processing items +// and call Done on them. +func (q *Type) waitForProcessing() { + q.cond.L.Lock() + defer q.cond.L.Unlock() + // Ensure that we do not wait on a queue which is already empty, as that + // could result in waiting for Done to be called on items in an empty queue + // which has already been shut down, which will result in waiting + // indefinitely. + if q.processing.len() == 0 { + return + } + q.cond.Wait() +} + +func (q *Type) setDrain(shouldDrain bool) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + q.drain = shouldDrain +} + +func (q *Type) shouldDrain() bool { + q.cond.L.Lock() + defer q.cond.L.Unlock() + return q.drain +} + +func (q *Type) shutdown() { + q.cond.L.Lock() + defer q.cond.L.Unlock() + q.shuttingDown = true + q.cond.Broadcast() +} + +// ShuttingDown . +func (q *Type) ShuttingDown() bool { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + return q.shuttingDown +} + +func (q *Type) updateUnfinishedWorkLoop() { + t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod) + defer t.Stop() + for range t.C() { + if !func() bool { + q.cond.L.Lock() + defer q.cond.L.Unlock() + if !q.shuttingDown { + q.metrics.updateUnfinishedWork() + return true + } + return false + + }() { + return + } + } +} diff --git a/pkg/workqueue/rate_limiting_queue.go b/pkg/workqueue/rate_limiting_queue.go new file mode 100644 index 0000000..b6562f4 --- /dev/null +++ b/pkg/workqueue/rate_limiting_queue.go @@ -0,0 +1,70 @@ +/* +Copyright 2023 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +// RateLimitingInterface is an interface that rate limits items being added to the queue. +type RateLimitingInterface interface { + DelayingInterface + + // AddRateLimited adds an item to the workqueue after the rate limiter says it's ok + AddRateLimited(item interface{}) + + // Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing + // or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you + // still have to call `Done` on the queue. + Forget(item interface{}) + + // NumRequeues returns back how many times the item was requeued + NumRequeues(item interface{}) int +} + +// NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability +// Remember to call Forget! If you don't, you may end up tracking failures forever. +func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface { + return &rateLimitingType{ + DelayingInterface: NewDelayingQueue(), + rateLimiter: rateLimiter, + } +} + +// NewNamedRateLimitingQueue constructs a new workqueue with rateLimited queuing ability +func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface { + return &rateLimitingType{ + DelayingInterface: NewNamedDelayingQueue(name), + rateLimiter: rateLimiter, + } +} + +// rateLimitingType wraps an Interface and provides rateLimited re-enquing +type rateLimitingType struct { + DelayingInterface + + rateLimiter RateLimiter +} + +// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok +func (q *rateLimitingType) AddRateLimited(item interface{}) { + q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item)) +} + +func (q *rateLimitingType) NumRequeues(item interface{}) int { + return q.rateLimiter.NumRequeues(item) +} + +func (q *rateLimitingType) Forget(item interface{}) { + q.rateLimiter.Forget(item) +}