Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Backport support for metadata-only watches to release-0.6 #1248

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 60 additions & 12 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"

"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
Expand All @@ -37,6 +38,17 @@ import (
var newController = controller.New
var getGvk = apiutil.GVKForObject

// project represents other forms that the we can use to
// send/receive a given resource (metadata-only, unstructured, etc)
type objectProjection int

const (
// projectAsNormal doesn't change the object from the form given
projectAsNormal objectProjection = iota
// projectAsMetadata turns this into an metadata-only watch
projectAsMetadata
)

// Builder builds a Controller.
type Builder struct {
forInput ForInput
Expand Down Expand Up @@ -68,8 +80,9 @@ func (blder *Builder) ForType(apiType runtime.Object) *Builder {

// ForInput represents the information set by For method.
type ForInput struct {
object runtime.Object
predicates []predicate.Predicate
object runtime.Object
predicates []predicate.Predicate
objectProjection objectProjection
}

// For defines the type of Object being *reconciled*, and configures the ControllerManagedBy to respond to create / delete /
Expand All @@ -88,8 +101,9 @@ func (blder *Builder) For(object runtime.Object, opts ...ForOption) *Builder {

// OwnsInput represents the information set by Owns method.
type OwnsInput struct {
object runtime.Object
predicates []predicate.Predicate
object runtime.Object
predicates []predicate.Predicate
objectProjection objectProjection
}

// Owns defines types of Objects being *generated* by the ControllerManagedBy, and configures the ControllerManagedBy to respond to
Expand All @@ -107,9 +121,10 @@ func (blder *Builder) Owns(object runtime.Object, opts ...OwnsOption) *Builder {

// WatchesInput represents the information set by Watches method.
type WatchesInput struct {
src source.Source
eventhandler handler.EventHandler
predicates []predicate.Predicate
src source.Source
eventhandler handler.EventHandler
predicates []predicate.Predicate
objectProjection objectProjection
}

// Watches exposes the lower-level ControllerManagedBy Watches functions through the builder. Consider using
Expand Down Expand Up @@ -195,19 +210,43 @@ func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, erro
return blder.ctrl, nil
}

func (blder *Builder) project(obj runtime.Object, proj objectProjection) (runtime.Object, error) {
switch proj {
case projectAsNormal:
return obj, nil
case projectAsMetadata:
metaObj := &metav1.PartialObjectMetadata{}
gvk, err := getGvk(obj, blder.mgr.GetScheme())
if err != nil {
return nil, fmt.Errorf("unable to determine GVK of %T for a metadata-only watch: %w", obj, err)
}
metaObj.SetGroupVersionKind(gvk)
return metaObj, nil
default:
panic(fmt.Sprintf("unexpected projection type %v on type %T, should not be possible since this is an internal field", proj, obj))
}
}

func (blder *Builder) doWatch() error {
// Reconcile type
src := &source.Kind{Type: blder.forInput.object}
typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
err := blder.ctrl.Watch(src, hdler, allPredicates...)
if err != nil {
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}

// Watches the managed types
for _, own := range blder.ownsInput {
src := &source.Kind{Type: own.object}
typeForSrc, err := blder.project(own.object, own.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForOwner{
OwnerType: blder.forInput.object,
IsController: true,
Expand All @@ -223,10 +262,19 @@ func (blder *Builder) doWatch() error {
for _, w := range blder.watchesInput {
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)

// If the source of this watch is of type *source.Kind, project it.
if srckind, ok := w.src.(*source.Kind); ok {
typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
if err != nil {
return err
}
srckind.Type = typeForSrc
}

if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
return err
}

}
return nil
}
Expand Down
109 changes: 105 additions & 4 deletions pkg/builder/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -294,8 +296,107 @@ var _ = Describe("application", func() {
})
})

Describe("watching with projections", func() {
var mgr manager.Manager
BeforeEach(func() {
// use a cache that intercepts requests for fully typed objects to
// ensure we use the projected versions
var err error
mgr, err = manager.New(cfg, manager.Options{NewCache: newNonTypedOnlyCache})
Expect(err).NotTo(HaveOccurred())
})

It("should support watching For, Owns, and Watch as metadata", func() {
statefulSetMaps := make(chan *metav1.PartialObjectMetadata)

bldr := ControllerManagedBy(mgr).
For(&appsv1.Deployment{}, OnlyMetadata).
Owns(&appsv1.ReplicaSet{}, OnlyMetadata).
Watches(&source.Kind{Type: &appsv1.StatefulSet{}},
&handler.EnqueueRequestsFromMapFunc{
ToRequests: handler.ToRequestsFunc(func(o handler.MapObject) []reconcile.Request {
ometa := o.Object.(*metav1.PartialObjectMetadata)
statefulSetMaps <- ometa
return nil
}),
},
OnlyMetadata)

doReconcileTest("8", stop, bldr, mgr, true)

By("Creating a new stateful set")
set := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test1",
Labels: map[string]string{
"foo": "bar",
},
},
Spec: appsv1.StatefulSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx",
},
},
},
},
},
}
err := mgr.GetClient().Create(context.TODO(), set)
Expect(err).NotTo(HaveOccurred())

By("Checking that the mapping function has been called")
Eventually(func() bool {
metaSet := <-statefulSetMaps
Expect(metaSet.Name).To(Equal(set.Name))
Expect(metaSet.Namespace).To(Equal(set.Namespace))
Expect(metaSet.Labels).To(Equal(set.Labels))
return true
}).Should(BeTrue())
})
})
})

// newNonTypedOnlyCache returns a new cache that wraps the normal cache,
// returning an error if normal, typed objects have informers requested.
func newNonTypedOnlyCache(config *rest.Config, opts cache.Options) (cache.Cache, error) {
normalCache, err := cache.New(config, opts)
if err != nil {
return nil, err
}
return &nonTypedOnlyCache{
Cache: normalCache,
}, nil
}

// nonTypedOnlyCache is a cache.Cache that only provides metadata &
// unstructured informers.
type nonTypedOnlyCache struct {
cache.Cache
}

func (c *nonTypedOnlyCache) GetInformer(ctx context.Context, obj runtime.Object) (cache.Informer, error) {
switch obj.(type) {
case (*metav1.PartialObjectMetadata):
return c.Cache.GetInformer(ctx, obj)
default:
return nil, fmt.Errorf("did not want to provide an informer for normal type %T", obj)
}
}
func (c *nonTypedOnlyCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (cache.Informer, error) {
return nil, fmt.Errorf("don't try to sidestep the restriction on informer types by calling GetInformerForKind")
}

// TODO(directxman12): this function has too many arguments, and the whole
// "nameSuffix" think is a bit of a hack It should be cleaned up significantly by someone with a bit of time
func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr manager.Manager, complete bool) {
deployName := "deploy-name-" + nameSuffix
rsName := "rs-name-" + nameSuffix
Expand Down Expand Up @@ -358,8 +459,8 @@ func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr
Expect(err).NotTo(HaveOccurred())

By("Waiting for the Deployment Reconcile")
Expect(<-ch).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}}))
Eventually(ch).Should(Receive(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}})))

By("Creating a ReplicaSet")
// Expect a Reconcile when an Owned object is managedObjects.
Expand Down Expand Up @@ -388,8 +489,8 @@ func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr
Expect(err).NotTo(HaveOccurred())

By("Waiting for the ReplicaSet Reconcile")
Expect(<-ch).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}}))
Eventually(ch).Should(Receive(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}})))

}

Expand Down
55 changes: 55 additions & 0 deletions pkg/builder/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"os"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
logf "sigs.k8s.io/controller-runtime/pkg/log"

appsv1 "k8s.io/api/apps/v1"
Expand All @@ -34,6 +35,60 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

func ExampleBuilder_metadata_only() {
logf.SetLogger(zap.New())

var log = logf.Log.WithName("builder-examples")

mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{})
if err != nil {
log.Error(err, "could not create manager")
os.Exit(1)
}

cl := mgr.GetClient()
err = builder.
ControllerManagedBy(mgr). // Create the ControllerManagedBy
For(&appsv1.ReplicaSet{}). // ReplicaSet is the Application API
Owns(&corev1.Pod{}, builder.OnlyMetadata). // ReplicaSet owns Pods created by it, and caches them as metadata only
Complete(reconcile.Func(func(req reconcile.Request) (reconcile.Result, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Read the ReplicaSet
rs := &appsv1.ReplicaSet{}
err := cl.Get(ctx, req.NamespacedName, rs)
if err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}

// List the Pods matching the PodTemplate Labels, but only their metadata
var podsMeta metav1.PartialObjectMetadataList
err = cl.List(ctx, &podsMeta, client.InNamespace(req.Namespace), client.MatchingLabels(rs.Spec.Template.Labels))
if err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}

// Update the ReplicaSet
rs.Labels["pod-count"] = fmt.Sprintf("%v", len(podsMeta.Items))
err = cl.Update(ctx, rs)
if err != nil {
return reconcile.Result{}, err
}

return reconcile.Result{}, nil
}))
if err != nil {
log.Error(err, "could not create controller")
os.Exit(1)
}

if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
log.Error(err, "could not start manager")
os.Exit(1)
}
}

// This example creates a simple application ControllerManagedBy that is configured for ReplicaSets and Pods.
//
// * Create a new application for ReplicaSets that manages Pods owned by the ReplicaSet and calls into
Expand Down
39 changes: 39 additions & 0 deletions pkg/builder/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,42 @@ var _ OwnsOption = &Predicates{}
var _ WatchesOption = &Predicates{}

// }}}

// {{{ For & Owns Dual-Type options

// asProjection configures the projection (currently only metadata) on the input.
// Currently only metadata is supported. We might want to expand
// this to arbitrary non-special local projections in the future.
type projectAs objectProjection

// ApplyToFor applies this configuration to the given ForInput options.
func (p projectAs) ApplyToFor(opts *ForInput) {
opts.objectProjection = objectProjection(p)
}

// ApplyToOwns applies this configuration to the given OwnsInput options.
func (p projectAs) ApplyToOwns(opts *OwnsInput) {
opts.objectProjection = objectProjection(p)
}

// ApplyToWatches applies this configuration to the given WatchesInput options.
func (p projectAs) ApplyToWatches(opts *WatchesInput) {
opts.objectProjection = objectProjection(p)
}

var (
// OnlyMetadata tells the controller to *only* cache metadata, and to watch
// the the API server in metadata-only form. This is useful when watching
// lots of objects, really big objects, or objects for which you only know
// the the GVK, but not the structure. You'll need to pass
// metav1.PartialObjectMetadata to the client when fetching objects in your
// reconciler, otherwise you'll end up with a duplicate structured or
// unstructured cache.
OnlyMetadata = projectAs(projectAsMetadata)

_ ForOption = OnlyMetadata
_ OwnsOption = OnlyMetadata
_ WatchesOption = OnlyMetadata
)

// }}}
Loading