Skip to content

Commit

Permalink
Merge pull request #547 from scothis/for-each
Browse files Browse the repository at this point in the history
ForEach reconciler
  • Loading branch information
scothis authored Sep 24, 2024
2 parents 4a934e2 + 1871a44 commit 7c8780b
Show file tree
Hide file tree
Showing 5 changed files with 330 additions and 1 deletion.
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Within an existing Kubebuilder or controller-runtime project, reconcilers.io may
- [Advice](#advice)
- [IfThen](#ifthen)
- [While](#while)
- [ForEach](#foreach)
- [TryCatch](#trycatch)
- [OverrideSetup](#overridesetup)
- [WithConfig](#withconfig)
Expand Down Expand Up @@ -535,6 +536,45 @@ func TenTimesReconciler() *reconcilers.SubReconciler[*buildv1alpha1.Function] {
}
```

#### ForEach

A [`ForEach`](https://pkg.go.dev/reconciler.io/runtime/reconcilers#ForEach) calls the reconciler for each item returned from Items. A cursor marks the item being reconciled. The current cursor can be retrieved with [`CursorStasher`](https://pkg.go.dev/reconciler.io/runtime/reconcilers#CursorStasher).

Nested iteration is allowed so long as the types being iterated over contain unique names. Otherwise the stash keys will collide. For testing the nested reconciler outside the scope of the loop, use the CursorStasher's Key method to lookup the StashKey, do not expect the StashKey to be stable between releases.

**Example:**

A ForEach can be used to interact with each volume mount on a pod.

```go
func VolumeMountReconciler() *reconcilers.SubReconciler[*corev1.Pod] {
containerCursorStasher := reconcilers.CursorStasher[corev1.Container]()
volumeMountCursorStasher := reconcilers.CursorStasher[corev1.VolumeMount]()
return &reconcilers.ForEach[*corev1.Pod, corev1.Container]{
Items: func(ctx context.Context, resource *corev1.Pod) ([]corev1.Container, error) {
return resource.Spec.Containers, nil
}
Reconciler: &reconcilers.ForEach[*corev1.Pod, corev1.VolumeMount]{
Items: func(ctx context.Context, resource *corev1.Pod) ([]corev1.VolumeMount, error) {
containerCursor := containerCursorStasher.RetrieveOrDie(ctx)
return containerCursor.Item.VolumeMounts, nil
}
Reconciler: reconcilers.SyncReconciler[*corev1.Pod]{
Sync: func(ctx context.Context, resource *corev1.Pod) error {
containerCursor := containerCursorStasher.RetrieveOrDie(ctx)
volumeMountCursor := volumeMountCursorStasher.RetrieveOrDie(ctx)
// do something
return nil
}
},
},
}
}
```

#### TryCatch

A [`TryCatch`](https://pkg.go.dev/reconciler.io/runtime/reconcilers#TryCatch) is used to recover from errors returned by a reconciler. The `Catch` method is called with the result and error from the `Try` reconciler, giving it the option to either continue the existing results, or replace them with new results.
Expand Down
107 changes: 107 additions & 0 deletions reconcilers/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
var (
_ SubReconciler[client.Object] = (*IfThen[client.Object])(nil)
_ SubReconciler[client.Object] = (*While[client.Object])(nil)
_ SubReconciler[client.Object] = (*ForEach[client.Object, any])(nil)
_ SubReconciler[client.Object] = (*TryCatch[client.Object])(nil)
_ SubReconciler[client.Object] = (*OverrideSetup[client.Object])(nil)
)
Expand Down Expand Up @@ -281,6 +282,112 @@ func (r *While[T]) Reconcile(ctx context.Context, resource T) (Result, error) {
return aggregateResult, nil
}

// ForEach calls the reconcilers for each item. The current value of the iteration is exposed as a
// Cursor that is available via the CursorStasher helper. Multiple ForEach reconcilers are nestable
// so long as the types being iterated over are unique.
type ForEach[Type client.Object, Item any] struct {
// Name used to identify this reconciler. Defaults to `ForEach`. Ideally unique, but
// not required to be so.
//
// +optional
Name string

// Setup performs initialization on the manager and builder this reconciler
// will run with. It's common to setup field indexes and watch resources.
//
// +optional
Setup func(ctx context.Context, mgr ctrl.Manager, bldr *builder.Builder) error

// Reconciler to be called for each iterable item
Reconciler SubReconciler[Type]

// Items returns the items to iterate over
Items func(ctx context.Context, resource Type) ([]Item, error)
}

func (r *ForEach[T, I]) SetupWithManager(ctx context.Context, mgr ctrl.Manager, bldr *builder.Builder) error {
if r.Name == "" {
r.Name = "ForEach"
}

log := logr.FromContextOrDiscard(ctx).
WithName(r.Name)
ctx = logr.NewContext(ctx, log)

if err := r.validate(ctx); err != nil {
return err
}
if err := r.Reconciler.SetupWithManager(ctx, mgr, bldr); err != nil {
return err
}
if r.Setup == nil {
return nil
}
return r.Setup(ctx, mgr, bldr)
}

func (r *ForEach[T, I]) validate(ctx context.Context) error {
// validate Reconciler
if r.Reconciler == nil {
return fmt.Errorf("ForEach %q must implement Reconciler", r.Name)
}
if r.Items == nil {
return fmt.Errorf("ForEach %q must implement Items", r.Name)
}

return nil
}

func (r *ForEach[T, I]) Reconcile(ctx context.Context, resource T) (Result, error) {
log := logr.FromContextOrDiscard(ctx).
WithName(r.Name)
ctx = logr.NewContext(ctx, log)

result := Result{}

items, err := r.Items(ctx, resource)
if err != nil {
return result, err
}

for i := range items {
log := log.WithName(fmt.Sprintf("%d", i))
ctx := logr.NewContext(ctx, log)

CursorStasher[I]().Store(ctx, Cursor[I]{
Index: i,
Length: len(items),
Item: items[i],
})

iterationResult, err := r.Reconciler.Reconcile(ctx, resource)
if err != nil {
return result, err
}
result = AggregateResults(result, iterationResult)
}

return result, nil
}

// Cursor represents the current value within an iterator
type Cursor[I any] struct {
// Index of the current iteration
Index int
// Length of the current iteration, or -1 if not known and/or unbounded
Length int
// Item of the current iteration
Item I
}

// CursorStasher creates a Stasher for a Cursor of the generic type
func CursorStasher[I any]() Stasher[Cursor[I]] {
// avoid key collisions for nested iteration over different types
var empty I
key := StashKey(fmt.Sprintf("reconciler.io/runtime:cursor:%s", typeName(empty)))
return NewStasher[Cursor[I]](key)
}

// TryCatch facilitates recovery from errors encountered within the Try
// reconciler. The results of the Try reconciler are passed to the Catch
// handler which can suppress, modify or continue the error. The Finally
Expand Down
128 changes: 128 additions & 0 deletions reconcilers/flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"testing"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/pointer"
diecorev1 "reconciler.io/dies/apis/core/v1"
diemetav1 "reconciler.io/dies/apis/meta/v1"
"reconciler.io/runtime/internal/resources"
"reconciler.io/runtime/internal/resources/dies"
Expand Down Expand Up @@ -269,6 +271,132 @@ func TestWhile(t *testing.T) {
})
}

func TestForEach(t *testing.T) {
testNamespace := "test-namespace"
testName := "test-resource"

scheme := runtime.NewScheme()
_ = resources.AddToScheme(scheme)

resource := dies.TestResourceBlank.
MetadataDie(func(d *diemetav1.ObjectMetaDie) {
d.Namespace(testNamespace)
d.Name(testName)
}).
SpecDie(func(d *dies.TestResourceSpecDie) {
d.Fields(map[string]string{})
})

rts := rtesting.SubReconcilerTests[*resources.TestResource]{
"empty items": {
Resource: resource.DieReleasePtr(),
ExpectResource: resource.DieReleasePtr(),
},
"each container": {
Resource: resource.
SpecDie(func(d *dies.TestResourceSpecDie) {
d.TemplateDie(func(d *diecorev1.PodTemplateSpecDie) {
d.SpecDie(func(d *diecorev1.PodSpecDie) {
d.ContainerDie("hello", func(d *diecorev1.ContainerDie) {
d.Image("world")
})
d.ContainerDie("foo", func(d *diecorev1.ContainerDie) {
d.Image("bar")
})
})
})
}).
DieReleasePtr(),
ExpectResource: resource.
SpecDie(func(d *dies.TestResourceSpecDie) {
d.TemplateDie(func(d *diecorev1.PodTemplateSpecDie) {
d.SpecDie(func(d *diecorev1.PodSpecDie) {
d.ContainerDie("hello", func(d *diecorev1.ContainerDie) {
d.Image("world")
})
d.ContainerDie("foo", func(d *diecorev1.ContainerDie) {
d.Image("bar")
})
})
})
}).
StatusDie(func(d *dies.TestResourceStatusDie) {
// container.name -> container.image-index-length
d.AddField("hello", "world-0-2")
d.AddField("foo", "bar-1-2")
}).
DieReleasePtr(),
},
"terminate iteration on error": {
Resource: resource.
SpecDie(func(d *dies.TestResourceSpecDie) {
d.TemplateDie(func(d *diecorev1.PodTemplateSpecDie) {
d.SpecDie(func(d *diecorev1.PodSpecDie) {
d.ContainerDie("hello", func(d *diecorev1.ContainerDie) {
d.Image("world")
})
d.ContainerDie("die", func(d *diecorev1.ContainerDie) {
d.Image("die")
})
d.ContainerDie("foo", func(d *diecorev1.ContainerDie) {
d.Image("bar")
})
})
})
}).
DieReleasePtr(),
ExpectResource: resource.
SpecDie(func(d *dies.TestResourceSpecDie) {
d.TemplateDie(func(d *diecorev1.PodTemplateSpecDie) {
d.SpecDie(func(d *diecorev1.PodSpecDie) {
d.ContainerDie("hello", func(d *diecorev1.ContainerDie) {
d.Image("world")
})
d.ContainerDie("die", func(d *diecorev1.ContainerDie) {
d.Image("die")
})
d.ContainerDie("foo", func(d *diecorev1.ContainerDie) {
d.Image("bar")
})
})
})
}).
StatusDie(func(d *dies.TestResourceStatusDie) {
// container.name -> container.image-index-length
d.AddField("hello", "world-0-3")
}).
DieReleasePtr(),
ShouldErr: true,
},
}

rts.Run(t, scheme, func(t *testing.T, rtc *rtesting.SubReconcilerTestCase[*resources.TestResource], c reconcilers.Config) reconcilers.SubReconciler[*resources.TestResource] {
r := &reconcilers.ForEach[*resources.TestResource, corev1.Container]{
Items: func(ctx context.Context, resource *resources.TestResource) ([]corev1.Container, error) {
return resource.Spec.Template.Spec.Containers, nil
},
Reconciler: &reconcilers.SyncReconciler[*resources.TestResource]{
Sync: func(ctx context.Context, resource *resources.TestResource) error {
cursor := reconcilers.CursorStasher[corev1.Container]().RetrieveOrDie(ctx)

if cursor.Item.Name == "die" {
return fmt.Errorf("exit early")
}

if resource.Status.Fields == nil {
resource.Status.Fields = map[string]string{}
}
resource.Status.Fields[cursor.Item.Name] = fmt.Sprintf("%s-%d-%d", cursor.Item.Image, cursor.Index, cursor.Length)

return nil
},
},
}

return r
})
}

func TestTryCatch(t *testing.T) {
testNamespace := "test-namespace"
testName := "test-resource"
Expand Down
1 change: 0 additions & 1 deletion reconcilers/reconcilers.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ func typeName(i interface{}) string {
}

t := reflect.TypeOf(i)
// TODO do we need this?
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
Expand Down
55 changes: 55 additions & 0 deletions reconcilers/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,61 @@ func TestWhile_validate(t *testing.T) {
}
}

func TestForEach_validate(t *testing.T) {
tests := []struct {
name string
reconciler *ForEach[*resources.TestResource, any]
shouldErr string
expectedLogs []string
}{
{
name: "valid",
reconciler: &ForEach[*resources.TestResource, any]{
Items: func(ctx context.Context, resource *resources.TestResource) ([]any, error) {
return nil, nil
},
Reconciler: Sequence[*resources.TestResource]{},
},
},
{
name: "missing items",
reconciler: &ForEach[*resources.TestResource, any]{
// Items: func(ctx context.Context, resource *resources.TestResource) ([]any, error) {
// return nil, nil
// },
Reconciler: Sequence[*resources.TestResource]{},
},
shouldErr: `ForEach "missing items" must implement Items`,
},
{
name: "missing reconciler",
reconciler: &ForEach[*resources.TestResource, any]{
Items: func(ctx context.Context, resource *resources.TestResource) ([]any, error) {
return nil, nil
},
// Reconciler: Sequence[*resources.TestResource]{},
},
shouldErr: `ForEach "missing reconciler" must implement Reconciler`,
},
}

for _, c := range tests {
t.Run(c.name, func(t *testing.T) {
sink := &bufferedSink{}
ctx := logr.NewContext(context.TODO(), logr.New(sink))
r := c.reconciler
r.Name = c.name
err := r.validate(ctx)
if (err != nil) != (c.shouldErr != "") || (c.shouldErr != "" && c.shouldErr != err.Error()) {
t.Errorf("validate() error = %q, shouldErr %q", err, c.shouldErr)
}
if diff := cmp.Diff(c.expectedLogs, sink.Lines); diff != "" {
t.Errorf("%s: unexpected logs (-expected, +actual): %s", c.name, diff)
}
})
}
}

func TestTryCatch_validate(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit 7c8780b

Please sign in to comment.