diff --git a/docs/README.md b/docs/README.md index 424e1e6..497c367 100644 --- a/docs/README.md +++ b/docs/README.md @@ -15,6 +15,7 @@ - [Readiness Checks](libs/readiness.md) - [Kubernetes Resource Management](libs/resource.md) - [Retrying k8s Operations](libs/retry.md) +- [Smart Requeuing of Kubernetes Resources](libs/smartrequeue.md) - [Kubernetes Resource Status Updating](libs/status.md) - [Testing](libs/testing.md) - [Thread Management](libs/threads.md) diff --git a/docs/libs/smartrequeue.md b/docs/libs/smartrequeue.md new file mode 100644 index 0000000..ad03fa8 --- /dev/null +++ b/docs/libs/smartrequeue.md @@ -0,0 +1,9 @@ +# Smart Requeuing of Kubernetes Resources + +The `pkg/controller/smartrequeue` package contains the smart requeuing logic that was originally implemented [here](https://github.com/openmcp-project/cluster-provider-kind/tree/v0.0.7/pkg/smartrequeue). It allows to requeue reconcile requests with an increasing backoff, similar to what the controller-runtime does when the `Reconcile` method returns an error. + +Use `NewStore` in the constructor of the reconciler. During reconciliation, the store's `For` method can be used to get the entry for the passed-in object, which can then generate the reconcile result: +- `Error` returns an error and lets the controller-runtime handle the backoff +- `Never` does not requeue the object +- `Backoff` requeues the object with an increasing backoff every time it is called on the same object +- `Reset` requeues the object, but resets the duration to its minimal value diff --git a/pkg/controller/smartrequeue/context.go b/pkg/controller/smartrequeue/context.go new file mode 100644 index 0000000..1f0bc2d --- /dev/null +++ b/pkg/controller/smartrequeue/context.go @@ -0,0 +1,22 @@ +package smartrequeue + +import "context" + +// contextKey is a type used as a key for storing and retrieving the Entry from the context. +type contextKey struct{} + +// NewContext creates a new context with the given Entry. +// This is a utility function for passing Entry instances through context. +func NewContext(ctx context.Context, entry *Entry) context.Context { + return context.WithValue(ctx, contextKey{}, entry) +} + +// FromContext retrieves the Entry from the context, if it exists. +// Returns nil if no Entry is found in the context. +func FromContext(ctx context.Context) *Entry { + entry, ok := ctx.Value(contextKey{}).(*Entry) + if !ok { + return nil + } + return entry +} diff --git a/pkg/controller/smartrequeue/context_test.go b/pkg/controller/smartrequeue/context_test.go new file mode 100644 index 0000000..9324fd4 --- /dev/null +++ b/pkg/controller/smartrequeue/context_test.go @@ -0,0 +1,33 @@ +package smartrequeue + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNewContext(t *testing.T) { + store := NewStore(time.Second, time.Minute, 2) + entry := newEntry(store) + ctx := NewContext(context.Background(), entry) + + // Test that we get the entry back using FromContext + got := FromContext(ctx) + assert.Equal(t, entry, got, "Expected entry to be the same as the one set in context") +} + +func TestFromContext(t *testing.T) { + store := NewStore(time.Second, time.Minute, 2) + entry := newEntry(store) + ctx := NewContext(context.Background(), entry) + + // Retrieve entry from context + got := FromContext(ctx) + assert.Equal(t, entry, got, "Expected entry to be the same as the one set in context") + + // Test empty context + got = FromContext(context.Background()) + assert.Nil(t, got, "Expected nil when no entry is set in context") +} diff --git a/pkg/controller/smartrequeue/dummyobjects_test.go b/pkg/controller/smartrequeue/dummyobjects_test.go new file mode 100644 index 0000000..95463e9 --- /dev/null +++ b/pkg/controller/smartrequeue/dummyobjects_test.go @@ -0,0 +1,36 @@ +package smartrequeue + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type dummyObject struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata"` +} + +var _ client.Object = &dummyObject{} + +func (d *dummyObject) DeepCopyObject() runtime.Object { + return &dummyObject{ + TypeMeta: d.TypeMeta, + ObjectMeta: *d.DeepCopy(), + } +} + +type anotherDummyObject struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata"` +} + +var _ client.Object = &anotherDummyObject{} + +func (d *anotherDummyObject) DeepCopyObject() runtime.Object { + return &anotherDummyObject{ + TypeMeta: d.TypeMeta, + ObjectMeta: *d.DeepCopy(), + } +} diff --git a/pkg/controller/smartrequeue/entry.go b/pkg/controller/smartrequeue/entry.go new file mode 100644 index 0000000..232831b --- /dev/null +++ b/pkg/controller/smartrequeue/entry.go @@ -0,0 +1,65 @@ +package smartrequeue + +import ( + "time" + + ctrl "sigs.k8s.io/controller-runtime" +) + +// Entry is used to manage the requeue logic for a specific object. +// It holds the next duration to requeue and the store it belongs to. +type Entry struct { + store *Store + nextDuration time.Duration +} + +func newEntry(s *Store) *Entry { + return &Entry{ + store: s, + nextDuration: s.minInterval, + } +} + +// Error resets the duration to the minInterval and returns an empty Result and the error +// so that the controller-runtime can handle the exponential backoff for errors. +func (e *Entry) Error(err error) (ctrl.Result, error) { + e.nextDuration = e.store.minInterval + return ctrl.Result{}, err +} + +// Backoff returns a Result and increments the interval for the next iteration. +func (e *Entry) Backoff() (ctrl.Result, error) { + // Save current duration for result + current := e.nextDuration + + // Schedule calculation of next duration + defer e.setNext() + + return ctrl.Result{RequeueAfter: current}, nil +} + +// Reset resets the duration to the minInterval and returns a Result with that interval. +func (e *Entry) Reset() (ctrl.Result, error) { + e.nextDuration = e.store.minInterval + defer e.setNext() + return ctrl.Result{RequeueAfter: e.nextDuration}, nil +} + +// Never deletes the entry from the store and returns an empty Result. +func (e *Entry) Never() (ctrl.Result, error) { + e.store.deleteEntry(e) + return ctrl.Result{}, nil +} + +// setNext updates the next requeue duration using exponential backoff. +// It multiplies the current duration by the store's multiplier and ensures +// the result doesn't exceed the configured maximum interval. +func (e *Entry) setNext() { + newDuration := time.Duration(float32(e.nextDuration) * e.store.multiplier) + + if newDuration > e.store.maxInterval { + newDuration = e.store.maxInterval + } + + e.nextDuration = newDuration +} diff --git a/pkg/controller/smartrequeue/entry_test.go b/pkg/controller/smartrequeue/entry_test.go new file mode 100644 index 0000000..ff88fd3 --- /dev/null +++ b/pkg/controller/smartrequeue/entry_test.go @@ -0,0 +1,119 @@ +package smartrequeue + +import ( + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + ctrl "sigs.k8s.io/controller-runtime" +) + +// Helper function to get requeue duration from Result +func getRequeueAfter(res ctrl.Result, _ error) time.Duration { + return res.RequeueAfter.Round(time.Second) +} + +func TestEntry_Stable(t *testing.T) { + // Setup + store := NewStore(time.Second, time.Minute, 2) + entry := newEntry(store) + + // Test the exponential backoff behavior + t.Run("exponential backoff sequence", func(t *testing.T) { + expectedDurations := []time.Duration{ + 1 * time.Second, + 2 * time.Second, + 4 * time.Second, + 8 * time.Second, + 16 * time.Second, + 32 * time.Second, + 60 * time.Second, // Capped at maxInterval + 60 * time.Second, // Still capped + } + + for i, expected := range expectedDurations { + result, err := entry.Backoff() + require.NoError(t, err) + assert.Equal(t, expected, getRequeueAfter(result, err), "Iteration %d should have correct duration", i) + } + }) +} + +func TestEntry_Progressing(t *testing.T) { + // Setup + minInterval := time.Second + maxInterval := time.Minute + store := NewStore(minInterval, maxInterval, 2) + entry := newEntry(store) + + // Ensure state is not at minimum + _, _ = entry.Backoff() + _, _ = entry.Backoff() + + // Test progressing resets duration to minimum + t.Run("resets to minimum interval", func(t *testing.T) { + result, err := entry.Reset() + require.NoError(t, err) + assert.Equal(t, minInterval, getRequeueAfter(result, err)) + + // Second call should also return min interval with small increment + result, err = entry.Reset() + require.NoError(t, err) + assert.Equal(t, minInterval, getRequeueAfter(result, err)) + }) + + // After progressing, Stable should restart exponential backoff + t.Run("stable continues from minimum", func(t *testing.T) { + result, err := entry.Backoff() + require.NoError(t, err) + assert.Equal(t, 2*time.Second, getRequeueAfter(result, err)) + + result, err = entry.Backoff() + require.NoError(t, err) + assert.Equal(t, 4*time.Second, getRequeueAfter(result, err)) + }) +} + +func TestEntry_Error(t *testing.T) { + // Setup + store := NewStore(time.Second, time.Minute, 2) + entry := newEntry(store) + testErr := errors.New("test error") + + // Ensure state is not at minimum + _, _ = entry.Backoff() + _, _ = entry.Backoff() + + // Test error handling + t.Run("returns error and resets backoff", func(t *testing.T) { + result, err := entry.Error(testErr) + assert.Equal(t, testErr, err, "Should return the passed error") + assert.Equal(t, 0*time.Second, getRequeueAfter(result, err), "Should have zero requeue time") + }) + + // After error, stable should continue from minimum + t.Run("stable continues properly after error", func(t *testing.T) { + result, err := entry.Backoff() + require.NoError(t, err) + assert.Equal(t, time.Second, getRequeueAfter(result, err)) + + result, err = entry.Backoff() + require.NoError(t, err) + assert.Equal(t, 2*time.Second, getRequeueAfter(result, err)) + }) +} + +func TestEntry_Never(t *testing.T) { + // Setup + store := NewStore(time.Second, time.Minute, 2) + entry := newEntry(store) + + // Test Never behavior + t.Run("returns empty result", func(t *testing.T) { + result, err := entry.Never() + require.NoError(t, err) + assert.Equal(t, time.Duration(0), getRequeueAfter(result, err)) + }) +} diff --git a/pkg/controller/smartrequeue/example_test.go b/pkg/controller/smartrequeue/example_test.go new file mode 100644 index 0000000..0e13121 --- /dev/null +++ b/pkg/controller/smartrequeue/example_test.go @@ -0,0 +1,46 @@ +package smartrequeue_test + +import ( + "fmt" + "time" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/openmcp-project/controller-utils/pkg/controller/smartrequeue" +) + +// This example shows how to use the SmartRequeue package in a Kubernetes controller. +func Example_controllerUsage() { + // Create a store with min and max requeue intervals + store := smartrequeue.NewStore(5*time.Second, 10*time.Minute, 2.0) + + // In your controller's Reconcile function: + reconcileFunction := func(_ ctrl.Request) (ctrl.Result, error) { + // Create a dummy object representing what you'd get from the client + var obj client.Object // In real code: Get this from the client + + // Get the Entry for this specific object + entry := store.For(obj) + + // Determine the state of the external resource... + inProgress := false // This would be determined by your logic + errOccurred := false // This would be determined by your logic + + // nolint:gocritic + if errOccurred { + // Handle error case + err := fmt.Errorf("something went wrong") + return entry.Error(err) + } else if inProgress { + // Resource is changing - check back soon + return entry.Reset() + } else { + // Resource is stable - gradually back off + return entry.Backoff() + } + } + + // Call the reconcile function + _, _ = reconcileFunction(ctrl.Request{}) +} diff --git a/pkg/controller/smartrequeue/store.go b/pkg/controller/smartrequeue/store.go new file mode 100644 index 0000000..13ce197 --- /dev/null +++ b/pkg/controller/smartrequeue/store.go @@ -0,0 +1,118 @@ +package smartrequeue + +import ( + "reflect" + "sync" + "time" + + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// Store is used to manage requeue entries for different objects. +// It holds a map of entries indexed by a key that uniquely identifies the object. +type Store struct { + minInterval time.Duration + maxInterval time.Duration + multiplier float32 + objects map[key]*Entry + mu sync.RWMutex // Using RWMutex for better read concurrency +} + +// NewStore creates a new Store with the specified minimum and maximum intervals +// and a multiplier for the exponential backoff logic. +func NewStore(minInterval, maxInterval time.Duration, multiplier float32) *Store { + if minInterval <= 0 { + minInterval = 1 * time.Second // Safe default + } + + if maxInterval < minInterval { + maxInterval = minInterval * 60 // Safe default: 1 minute or 60x min + } + + if multiplier <= 1.0 { + multiplier = 2.0 // Safe default: double each time + } + + return &Store{ + minInterval: minInterval, + maxInterval: maxInterval, + multiplier: multiplier, + objects: make(map[key]*Entry), + } +} + +// For gets or creates an Entry for the specified object. +func (s *Store) For(obj client.Object) *Entry { + key := keyFromObject(obj) + + // Try read lock first for better concurrency + s.mu.RLock() + entry, exists := s.objects[key] + s.mu.RUnlock() + + if exists { + return entry + } + + // Need to create a new entry + s.mu.Lock() + defer s.mu.Unlock() + + // Check again in case another goroutine created it while we were waiting + entry, exists = s.objects[key] + if !exists { + entry = &Entry{ + store: s, + nextDuration: s.minInterval, + } + s.objects[key] = entry + } + + return entry +} + +// Clear removes all entries from the store (mainly useful for testing). +func (s *Store) Clear() { + s.mu.Lock() + defer s.mu.Unlock() + + s.objects = make(map[key]*Entry) +} + +// deleteEntry removes an entry from the store. +func (s *Store) deleteEntry(toDelete *Entry) { + s.mu.Lock() + defer s.mu.Unlock() + + for k, entry := range s.objects { + if entry == toDelete { + delete(s.objects, k) + break + } + } +} + +// keyFromObject generates a unique key for a client.Object. +func keyFromObject(obj client.Object) key { + kind := "" + if obj != nil { + kind = obj.GetObjectKind().GroupVersionKind().Kind + if kind == "" { + // Fallback if Kind is not set in GroupVersionKind + kind = reflect.TypeOf(obj).Elem().Name() + } + } + + return key{ + Kind: kind, + Name: obj.GetName(), + Namespace: obj.GetNamespace(), + } +} + +// key uniquely identifies a Kubernetes object. +type key struct { + Kind string + Name string + Namespace string +} diff --git a/pkg/controller/smartrequeue/store_test.go b/pkg/controller/smartrequeue/store_test.go new file mode 100644 index 0000000..2ce6149 --- /dev/null +++ b/pkg/controller/smartrequeue/store_test.go @@ -0,0 +1,184 @@ +package smartrequeue + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func TestFor(t *testing.T) { + tests := []struct { + name string + firstObj client.Object + secondObj client.Object + expectSame bool + description string + }{ + { + name: "same object returns same entry", + firstObj: &dummyObject{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + secondObj: &dummyObject{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + expectSame: true, + description: "Expected to get the same entry back", + }, + { + name: "different namespace returns different entry", + firstObj: &dummyObject{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + secondObj: &dummyObject{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test", + Namespace: "test2", + }, + }, + expectSame: false, + description: "Expected to get a different entry back", + }, + { + name: "different name returns different entry", + firstObj: &dummyObject{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + secondObj: &dummyObject{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test2", + Namespace: "test", + }, + }, + expectSame: false, + description: "Expected to get a different entry back", + }, + { + name: "different kind returns different entry", + firstObj: &dummyObject{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + secondObj: &anotherDummyObject{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + }, + expectSame: false, + description: "Expected to get a different entry back", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + store := NewStore(time.Second, time.Minute, 2) + entry1 := store.For(tt.firstObj) + + assert.NotNil(t, entry1, "Expected entry to be created") + result, err := entry1.Backoff() + require.NoError(t, err) + assert.Equal(t, 1*time.Second, getRequeueAfter(result, err)) + + entry2 := store.For(tt.secondObj) + + if tt.expectSame { + assert.Same(t, entry1, entry2, tt.description) + } else { + assert.NotSame(t, entry1, entry2, tt.description) + } + }) + } +} + +// TestClear ensures the Clear method removes all entries +func TestClear(t *testing.T) { + store := NewStore(time.Second, time.Minute, 2) + + // Add some entries + obj1 := &dummyObject{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test1", + Namespace: "test", + }, + } + + obj2 := &dummyObject{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "test2", + Namespace: "test", + }, + } + + // Get entries to populate the store + entry1 := store.For(obj1) + entry2 := store.For(obj2) + + // Verify entries exist + assert.NotNil(t, entry1) + assert.NotNil(t, entry2) + + // Clear the store + store.Clear() + + // Get entries again - they should be new instances + entry1After := store.For(obj1) + entry2After := store.For(obj2) + + // Verify they're different instances + assert.NotSame(t, entry1, entry1After) + assert.NotSame(t, entry2, entry2After) +} + +// TestConcurrentAccess tests that the store handles concurrent access properly +func TestConcurrentAccess(t *testing.T) { + store := NewStore(time.Second, time.Minute, 2) + + // Create a series of objects + const numObjects = 100 + objects := make([]client.Object, numObjects) + + for i := 0; i < numObjects; i++ { + objects[i] = &dummyObject{ + ObjectMeta: ctrl.ObjectMeta{ + Name: fmt.Sprintf("test-%d", i), + Namespace: "test", + }, + } + } + + // Access concurrently + var wg sync.WaitGroup + wg.Add(numObjects) + + for i := 0; i < numObjects; i++ { + go func(idx int) { + defer wg.Done() + obj := objects[idx] + entry := store.For(obj) + _, _ = entry.Backoff() // Just exercise the API + }(i) + } + + wg.Wait() +}