Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- [Readiness Checks](libs/readiness.md)
- [Kubernetes Resource Management](libs/resource.md)
- [Retrying k8s Operations](libs/retry.md)
- [Smart Requeuing of Kubernetes Resources](libs/smartrequeue.md)
- [Kubernetes Resource Status Updating](libs/status.md)
- [Testing](libs/testing.md)
- [Thread Management](libs/threads.md)
Expand Down
9 changes: 9 additions & 0 deletions docs/libs/smartrequeue.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Smart Requeuing of Kubernetes Resources

The `pkg/controller/smartrequeue` package contains the smart requeuing logic that was originally implemented [here](https://github.com/openmcp-project/cluster-provider-kind/tree/v0.0.7/pkg/smartrequeue). It allows to requeue reconcile requests with an increasing backoff, similar to what the controller-runtime does when the `Reconcile` method returns an error.

Use `NewStore` in the constructor of the reconciler. During reconciliation, the store's `For` method can be used to get the entry for the passed-in object, which can then generate the reconcile result:
- `Error` returns an error and lets the controller-runtime handle the backoff
- `Never` does not requeue the object
- `Backoff` requeues the object with an increasing backoff every time it is called on the same object
- `Reset` requeues the object, but resets the duration to its minimal value
22 changes: 22 additions & 0 deletions pkg/controller/smartrequeue/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package smartrequeue

import "context"

// contextKey is a type used as a key for storing and retrieving the Entry from the context.
type contextKey struct{}

// NewContext creates a new context with the given Entry.
// This is a utility function for passing Entry instances through context.
func NewContext(ctx context.Context, entry *Entry) context.Context {
return context.WithValue(ctx, contextKey{}, entry)
}

// FromContext retrieves the Entry from the context, if it exists.
// Returns nil if no Entry is found in the context.
func FromContext(ctx context.Context) *Entry {
entry, ok := ctx.Value(contextKey{}).(*Entry)
if !ok {
return nil
}
return entry
}
33 changes: 33 additions & 0 deletions pkg/controller/smartrequeue/context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package smartrequeue

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestNewContext(t *testing.T) {
store := NewStore(time.Second, time.Minute, 2)
entry := newEntry(store)
ctx := NewContext(context.Background(), entry)

// Test that we get the entry back using FromContext
got := FromContext(ctx)
assert.Equal(t, entry, got, "Expected entry to be the same as the one set in context")
}

func TestFromContext(t *testing.T) {
store := NewStore(time.Second, time.Minute, 2)
entry := newEntry(store)
ctx := NewContext(context.Background(), entry)

// Retrieve entry from context
got := FromContext(ctx)
assert.Equal(t, entry, got, "Expected entry to be the same as the one set in context")

// Test empty context
got = FromContext(context.Background())
assert.Nil(t, got, "Expected nil when no entry is set in context")
}
36 changes: 36 additions & 0 deletions pkg/controller/smartrequeue/dummyobjects_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package smartrequeue

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"

"sigs.k8s.io/controller-runtime/pkg/client"
)

type dummyObject struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata"`
}

var _ client.Object = &dummyObject{}

func (d *dummyObject) DeepCopyObject() runtime.Object {
return &dummyObject{
TypeMeta: d.TypeMeta,
ObjectMeta: *d.DeepCopy(),
}
}

type anotherDummyObject struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata"`
}

var _ client.Object = &anotherDummyObject{}

func (d *anotherDummyObject) DeepCopyObject() runtime.Object {
return &anotherDummyObject{
TypeMeta: d.TypeMeta,
ObjectMeta: *d.DeepCopy(),
}
}
65 changes: 65 additions & 0 deletions pkg/controller/smartrequeue/entry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package smartrequeue

import (
"time"

ctrl "sigs.k8s.io/controller-runtime"
)

// Entry is used to manage the requeue logic for a specific object.
// It holds the next duration to requeue and the store it belongs to.
type Entry struct {
store *Store
nextDuration time.Duration
}

func newEntry(s *Store) *Entry {
return &Entry{
store: s,
nextDuration: s.minInterval,
}
}

// Error resets the duration to the minInterval and returns an empty Result and the error
// so that the controller-runtime can handle the exponential backoff for errors.
func (e *Entry) Error(err error) (ctrl.Result, error) {
e.nextDuration = e.store.minInterval
return ctrl.Result{}, err
}

// Backoff returns a Result and increments the interval for the next iteration.
func (e *Entry) Backoff() (ctrl.Result, error) {
// Save current duration for result
current := e.nextDuration

// Schedule calculation of next duration
defer e.setNext()

return ctrl.Result{RequeueAfter: current}, nil
}

// Reset resets the duration to the minInterval and returns a Result with that interval.
func (e *Entry) Reset() (ctrl.Result, error) {
e.nextDuration = e.store.minInterval
defer e.setNext()
return ctrl.Result{RequeueAfter: e.nextDuration}, nil
}

// Never deletes the entry from the store and returns an empty Result.
func (e *Entry) Never() (ctrl.Result, error) {
e.store.deleteEntry(e)
return ctrl.Result{}, nil
}

// setNext updates the next requeue duration using exponential backoff.
// It multiplies the current duration by the store's multiplier and ensures
// the result doesn't exceed the configured maximum interval.
func (e *Entry) setNext() {
newDuration := time.Duration(float32(e.nextDuration) * e.store.multiplier)

if newDuration > e.store.maxInterval {
newDuration = e.store.maxInterval
}

e.nextDuration = newDuration
}
119 changes: 119 additions & 0 deletions pkg/controller/smartrequeue/entry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package smartrequeue

import (
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
ctrl "sigs.k8s.io/controller-runtime"
)

// Helper function to get requeue duration from Result
func getRequeueAfter(res ctrl.Result, _ error) time.Duration {
return res.RequeueAfter.Round(time.Second)
}

func TestEntry_Stable(t *testing.T) {
// Setup
store := NewStore(time.Second, time.Minute, 2)
entry := newEntry(store)

// Test the exponential backoff behavior
t.Run("exponential backoff sequence", func(t *testing.T) {
expectedDurations := []time.Duration{
1 * time.Second,
2 * time.Second,
4 * time.Second,
8 * time.Second,
16 * time.Second,
32 * time.Second,
60 * time.Second, // Capped at maxInterval
60 * time.Second, // Still capped
}

for i, expected := range expectedDurations {
result, err := entry.Backoff()
require.NoError(t, err)
assert.Equal(t, expected, getRequeueAfter(result, err), "Iteration %d should have correct duration", i)
}
})
}

func TestEntry_Progressing(t *testing.T) {
// Setup
minInterval := time.Second
maxInterval := time.Minute
store := NewStore(minInterval, maxInterval, 2)
entry := newEntry(store)

// Ensure state is not at minimum
_, _ = entry.Backoff()
_, _ = entry.Backoff()

// Test progressing resets duration to minimum
t.Run("resets to minimum interval", func(t *testing.T) {
result, err := entry.Reset()
require.NoError(t, err)
assert.Equal(t, minInterval, getRequeueAfter(result, err))

// Second call should also return min interval with small increment
result, err = entry.Reset()
require.NoError(t, err)
assert.Equal(t, minInterval, getRequeueAfter(result, err))
})

// After progressing, Stable should restart exponential backoff
t.Run("stable continues from minimum", func(t *testing.T) {
result, err := entry.Backoff()
require.NoError(t, err)
assert.Equal(t, 2*time.Second, getRequeueAfter(result, err))

result, err = entry.Backoff()
require.NoError(t, err)
assert.Equal(t, 4*time.Second, getRequeueAfter(result, err))
})
}

func TestEntry_Error(t *testing.T) {
// Setup
store := NewStore(time.Second, time.Minute, 2)
entry := newEntry(store)
testErr := errors.New("test error")

// Ensure state is not at minimum
_, _ = entry.Backoff()
_, _ = entry.Backoff()

// Test error handling
t.Run("returns error and resets backoff", func(t *testing.T) {
result, err := entry.Error(testErr)
assert.Equal(t, testErr, err, "Should return the passed error")
assert.Equal(t, 0*time.Second, getRequeueAfter(result, err), "Should have zero requeue time")
})

// After error, stable should continue from minimum
t.Run("stable continues properly after error", func(t *testing.T) {
result, err := entry.Backoff()
require.NoError(t, err)
assert.Equal(t, time.Second, getRequeueAfter(result, err))

result, err = entry.Backoff()
require.NoError(t, err)
assert.Equal(t, 2*time.Second, getRequeueAfter(result, err))
})
}

func TestEntry_Never(t *testing.T) {
// Setup
store := NewStore(time.Second, time.Minute, 2)
entry := newEntry(store)

// Test Never behavior
t.Run("returns empty result", func(t *testing.T) {
result, err := entry.Never()
require.NoError(t, err)
assert.Equal(t, time.Duration(0), getRequeueAfter(result, err))
})
}
46 changes: 46 additions & 0 deletions pkg/controller/smartrequeue/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package smartrequeue_test

import (
"fmt"
"time"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/openmcp-project/controller-utils/pkg/controller/smartrequeue"
)

// This example shows how to use the SmartRequeue package in a Kubernetes controller.
func Example_controllerUsage() {
// Create a store with min and max requeue intervals
store := smartrequeue.NewStore(5*time.Second, 10*time.Minute, 2.0)

// In your controller's Reconcile function:
reconcileFunction := func(_ ctrl.Request) (ctrl.Result, error) {
// Create a dummy object representing what you'd get from the client
var obj client.Object // In real code: Get this from the client

// Get the Entry for this specific object
entry := store.For(obj)

// Determine the state of the external resource...
inProgress := false // This would be determined by your logic
errOccurred := false // This would be determined by your logic

// nolint:gocritic
if errOccurred {
// Handle error case
err := fmt.Errorf("something went wrong")
return entry.Error(err)
} else if inProgress {
// Resource is changing - check back soon
return entry.Reset()
} else {
// Resource is stable - gradually back off
return entry.Backoff()
}
}

// Call the reconcile function
_, _ = reconcileFunction(ctrl.Request{})
}
Loading