Skip to content
This repository has been archived by the owner on Aug 28, 2024. It is now read-only.

Track resources with a label selector #367

Merged
merged 3 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,9 @@ func StashExampleSubReconciler(c reconcilers.Config) reconcilers.SubReconciler {

The [`Tracker`](https://pkg.go.dev/github.com/vmware-labs/reconciler-runtime/tracker#Tracker) provides a means for one resource to watch another resource for mutations, triggering the reconciliation of the resource defining the reference.

It's common to work with a resource that is also tracked. The [Config.TrackAndGet](https://pkg.go.dev/github.com/vmware-labs/reconciler-runtime/reconcilers#Config.TrackAndGet) method uses the same signature as client.Get, but additionally tracks the resource.
Resources can either be tracked by name or with a label selector using [`TrackReference`](https://pkg.go.dev/github.com/vmware-labs/reconciler-runtime/tracker#Tracker.TrackReference).

It's common to work with a resource that is also tracked. The [Config.TrackAndGet](https://pkg.go.dev/github.com/vmware-labs/reconciler-runtime/reconcilers#Config.TrackAndGet) method uses the same signature as client.Get, but additionally tracks the resource. Likewise, the [Config.TrackAndList](https://pkg.go.dev/github.com/vmware-labs/reconciler-runtime/reconcilers#Config.TrackAndList) method uses the same signature as client.List, but additionally tracks resources matching the query.

In the [Setup](https://pkg.go.dev/github.com/vmware-labs/reconciler-runtime/reconcilers#SyncReconciler) method, a watch is created that will notify the handler every time a resource of that kind is mutated. The [EnqueueTracked](https://pkg.go.dev/github.com/vmware-labs/reconciler-runtime/reconcilers#EnqueueTracked) helper returns a list of resources that are tracking the given resource, those resources are enqueued for the reconciler.

Expand Down
21 changes: 9 additions & 12 deletions reconcilers/enqueuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,26 @@ package reconcilers
import (
"context"

"k8s.io/apimachinery/pkg/types"
"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"

"github.com/vmware-labs/reconciler-runtime/tracker"
)

func EnqueueTracked(ctx context.Context, by client.Object) handler.EventHandler {
func EnqueueTracked(ctx context.Context) handler.EventHandler {
c := RetrieveConfigOrDie(ctx)
log := logr.FromContextOrDiscard(ctx)

return handler.EnqueueRequestsFromMapFunc(
func(a client.Object) []Request {
func(obj client.Object) []Request {
var requests []Request

gvks, _, err := c.Scheme().ObjectKinds(by)
items, err := c.Tracker.GetObservers(obj)
if err != nil {
panic(err)
log.Error(err, "unable to get tracked requests")
return nil
}

key := tracker.NewKey(
gvks[0],
types.NamespacedName{Namespace: a.GetNamespace(), Name: a.GetName()},
)
for _, item := range c.Tracker.Lookup(ctx, key) {
for _, item := range items {
requests = append(requests, Request{NamespacedName: item})
}

Expand Down
61 changes: 50 additions & 11 deletions reconcilers/reconcilers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"errors"
"fmt"
"reflect"
"strings"
"sync"
"time"

Expand All @@ -21,11 +22,13 @@ import (
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/reference"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -71,21 +74,56 @@ func (c Config) WithCluster(cluster cluster.Cluster) Config {
// TrackAndGet tracks the resources for changes and returns the current value. The track is
// registered even when the resource does not exists so that its creation can be tracked.
//
// Equivalent to calling both `c.Tracker.Track(...)` and `c.Client.Get(...)`
// Equivalent to calling both `c.Tracker.TrackObject(...)` and `c.Client.Get(...)`
func (c Config) TrackAndGet(ctx context.Context, key types.NamespacedName, obj client.Object, opts ...client.GetOption) error {
c.Tracker.Track(
ctx,
tracker.NewKey(gvk(obj, c.Scheme()), key),
RetrieveRequest(ctx).NamespacedName,
)
// create synthetic resource to track from known type and request
req := RetrieveRequest(ctx)
resource := RetrieveResourceType(ctx).DeepCopyObject().(client.Object)
resource.SetNamespace(req.Namespace)
resource.SetName(req.Name)
ref := obj.DeepCopyObject().(client.Object)
ref.SetNamespace(key.Namespace)
ref.SetName(key.Name)
c.Tracker.TrackObject(ref, resource)

return c.Get(ctx, key, obj, opts...)
}

// TrackAndList tracks the resources for changes and returns the current value.
//
// Equivalent to calling both `c.Tracker.TrackReference(...)` and `c.Client.List(...)`
func (c Config) TrackAndList(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
// create synthetic resource to track from known type and request
req := RetrieveRequest(ctx)
resource := RetrieveResourceType(ctx).DeepCopyObject().(client.Object)
resource.SetNamespace(req.Namespace)
resource.SetName(req.Name)

or, err := reference.GetReference(c.Scheme(), list)
if err != nil {
return err
}
gvk := schema.FromAPIVersionAndKind(or.APIVersion, or.Kind)
listOpts := (&client.ListOptions{}).ApplyOptions(opts)
if listOpts.LabelSelector == nil {
listOpts.LabelSelector = labels.Everything()
}
ref := tracker.Reference{
APIGroup: gvk.Group,
Kind: strings.TrimSuffix(gvk.Kind, "List"),
Namespace: listOpts.Namespace,
Selector: listOpts.LabelSelector,
}
c.Tracker.TrackReference(ref, resource)

return c.List(ctx, list, opts...)
}

// NewConfig creates a Config for a specific API type. Typically passed into a
// reconciler.
func NewConfig(mgr ctrl.Manager, apiType client.Object, syncPeriod time.Duration) Config {
return Config{
Tracker: tracker.New(2 * syncPeriod),
Tracker: tracker.New(mgr.GetScheme(), 2*syncPeriod),
}.WithCluster(mgr)
}

Expand Down Expand Up @@ -601,7 +639,7 @@ func (r *AggregateReconciler[T]) Reconcile(ctx context.Context, req Request) (Re
Client: c.Client,
APIReader: c.APIReader,
Recorder: c.Recorder,
Tracker: tracker.New(0),
Tracker: tracker.New(c.Scheme(), 0),
})
desired, err := r.desiredResource(ctx, resource)
if err != nil {
Expand Down Expand Up @@ -1069,7 +1107,7 @@ func (r *ChildReconciler[T, CT, CLT]) SetupWithManager(ctx context.Context, mgr
}

if r.SkipOwnerReference {
bldr.Watches(&source.Kind{Type: r.ChildType}, EnqueueTracked(ctx, r.ChildType))
bldr.Watches(&source.Kind{Type: r.ChildType}, EnqueueTracked(ctx))
} else {
bldr.Owns(r.ChildType)
}
Expand Down Expand Up @@ -1680,7 +1718,8 @@ func (r *ResourceManager[T]) Manage(ctx context.Context, resource client.Object,
if r.TrackDesired {
// normally tracks should occur before API operations, but when creating a resource with a
// generated name, we need to know the actual resource name.
if err := c.Tracker.TrackChild(ctx, resource, desired, c.Scheme()); err != nil {

if err := c.Tracker.TrackObject(desired, resource); err != nil {
return nilT, err
}
}
Expand Down Expand Up @@ -1715,7 +1754,7 @@ func (r *ResourceManager[T]) Manage(ctx context.Context, resource client.Object,
}
log.Info("updating resource", "diff", cmp.Diff(r.sanitize(actual), r.sanitize(current)))
if r.TrackDesired {
if err := c.Tracker.TrackChild(ctx, resource, current, c.Scheme()); err != nil {
if err := c.Tracker.TrackObject(current, resource); err != nil {
return nilT, err
}
}
Expand Down
148 changes: 147 additions & 1 deletion reconcilers/reconcilers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -122,6 +123,151 @@ func TestConfig_TrackAndGet(t *testing.T) {
})
}

func TestConfig_TrackAndList(t *testing.T) {
testNamespace := "test-namespace"
testName := "test-resource"
testSelector, _ := labels.Parse("app=test-app")

scheme := runtime.NewScheme()
_ = resources.AddToScheme(scheme)
_ = clientgoscheme.AddToScheme(scheme)

resource := dies.TestResourceBlank.
MetadataDie(func(d *diemetav1.ObjectMetaDie) {
d.Namespace(testNamespace)
d.Name(testName)
})

configMap := diecorev1.ConfigMapBlank.
MetadataDie(func(d *diemetav1.ObjectMetaDie) {
d.Namespace("track-namespace")
d.Name("track-name")
d.AddLabel("app", "test-app")
}).
AddData("greeting", "hello")

rts := rtesting.SubReconcilerTests[*resources.TestResource]{
"track and list": {
Resource: resource.DieReleasePtr(),
GivenObjects: []client.Object{
configMap,
},
Metadata: map[string]interface{}{
"listOpts": []client.ListOption{},
},
ExpectTracks: []rtesting.TrackRequest{
{
Tracker: types.NamespacedName{
Namespace: testNamespace,
Name: testName,
},
TrackedReference: tracker.Reference{
Kind: "ConfigMap",
Selector: labels.Everything(),
},
},
},
},
"track and list constrained": {
Resource: resource.DieReleasePtr(),
GivenObjects: []client.Object{
configMap,
},
Metadata: map[string]interface{}{
"listOpts": []client.ListOption{
client.InNamespace("track-namespace"),
client.MatchingLabels(map[string]string{"app": "test-app"}),
},
},
ExpectTracks: []rtesting.TrackRequest{
{
Tracker: types.NamespacedName{
Namespace: testNamespace,
Name: testName,
},
TrackedReference: tracker.Reference{
Kind: "ConfigMap",
Namespace: "track-namespace",
Selector: testSelector,
},
},
},
},
"track with errored list": {
Resource: resource.DieReleasePtr(),
ShouldErr: true,
WithReactors: []rtesting.ReactionFunc{
rtesting.InduceFailure("list", "ConfigMapList"),
},
Metadata: map[string]interface{}{
"listOpts": []client.ListOption{},
},
ExpectTracks: []rtesting.TrackRequest{
{
Tracker: types.NamespacedName{
Namespace: testNamespace,
Name: testName,
},
TrackedReference: tracker.Reference{
Kind: "ConfigMap",
Selector: labels.Everything(),
},
},
},
},
}

// run with typed objects
t.Run("typed", func(t *testing.T) {
rts.Run(t, scheme, func(t *testing.T, rtc *rtesting.SubReconcilerTestCase[*resources.TestResource], c reconcilers.Config) reconcilers.SubReconciler[*resources.TestResource] {
return &reconcilers.SyncReconciler[*resources.TestResource]{
Sync: func(ctx context.Context, resource *resources.TestResource) error {
c := reconcilers.RetrieveConfigOrDie(ctx)

cms := &corev1.ConfigMapList{}
listOpts := rtc.Metadata["listOpts"].([]client.ListOption)
err := c.TrackAndList(ctx, cms, listOpts...)
if err != nil {
return err
}

if expected, actual := "hello", cms.Items[0].Data["greeting"]; expected != actual {
// should never get here
panic(fmt.Errorf("expected configmap to have greeting %q, found %q", expected, actual))
}
return nil
},
}
})
})

// run with unstructured objects
t.Run("unstructured", func(t *testing.T) {
rts.Run(t, scheme, func(t *testing.T, rtc *rtesting.SubReconcilerTestCase[*resources.TestResource], c reconcilers.Config) reconcilers.SubReconciler[*resources.TestResource] {
return &reconcilers.SyncReconciler[*resources.TestResource]{
Sync: func(ctx context.Context, resource *resources.TestResource) error {
c := reconcilers.RetrieveConfigOrDie(ctx)

cms := &unstructured.UnstructuredList{}
cms.SetAPIVersion("v1")
cms.SetKind("ConfigMapList")
listOpts := rtc.Metadata["listOpts"].([]client.ListOption)
err := c.TrackAndList(ctx, cms, listOpts...)
if err != nil {
return err
}

if expected, actual := "hello", cms.UnstructuredContent()["items"].([]interface{})[0].(map[string]interface{})["data"].(map[string]interface{})["greeting"].(string); expected != actual {
// should never get here
panic(fmt.Errorf("expected configmap to have greeting %q, found %q", expected, actual))
}
return nil
},
}
})
})
}

func TestResourceReconciler_NoStatus(t *testing.T) {
testNamespace := "test-namespace"
testName := "test-resource-no-status"
Expand Down Expand Up @@ -3807,7 +3953,7 @@ func TestWithConfig(t *testing.T) {
Metadata: map[string]interface{}{
"SubReconciler": func(t *testing.T, oc reconcilers.Config) reconcilers.SubReconciler[*resources.TestResource] {
c := reconcilers.Config{
Tracker: tracker.New(0),
Tracker: tracker.New(oc.Scheme(), 0),
}

return &reconcilers.WithConfig[*resources.TestResource]{
Expand Down
8 changes: 7 additions & 1 deletion reconcilers/reconcilers_validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (
"github.com/vmware-labs/reconciler-runtime/internal/resources"
"github.com/vmware-labs/reconciler-runtime/tracker"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -545,8 +547,12 @@ func TestCastResource_validate(t *testing.T) {
}

func TestWithConfig_validate(t *testing.T) {
scheme := runtime.NewScheme()
_ = resources.AddToScheme(scheme)
_ = clientgoscheme.AddToScheme(scheme)

config := Config{
Tracker: tracker.New(0),
Tracker: tracker.New(scheme, 0),
}

tests := []struct {
Expand Down
10 changes: 6 additions & 4 deletions testing/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (c *ExpectConfig) init() {
events: []Event{},
scheme: c.Scheme,
}
c.tracker = createTracker(c.GivenTracks)
c.tracker = createTracker(c.GivenTracks, c.Scheme)
c.observedErrors = []string{}
})
}
Expand Down Expand Up @@ -330,18 +330,20 @@ func (c *ExpectConfig) AssertTrackerExpectations(t *testing.T) {

actualTracks := c.tracker.getTrackRequests()
for i, exp := range c.ExpectTracks {
exp.normalize()

if i >= len(actualTracks) {
c.errorf(t, "Missing tracking request for config %q: %s", c.Name, exp)
c.errorf(t, "Missing tracking request for config %q: %v", c.Name, exp)
continue
}

if diff := cmp.Diff(exp, actualTracks[i]); diff != "" {
if diff := cmp.Diff(exp, actualTracks[i], NormalizeLabelSelector); diff != "" {
c.errorf(t, "Unexpected tracking request for config %q (-expected, +actual): %s", c.Name, diff)
}
}
if actual, exp := len(actualTracks), len(c.ExpectTracks); actual > exp {
for _, extra := range actualTracks[exp:] {
c.errorf(t, "Extra tracking request for config %q: %s", c.Name, extra)
c.errorf(t, "Extra tracking request for config %q: %v", c.Name, extra)
}
}
}
Expand Down
Loading