Skip to content

Commit

Permalink
Merge pull request #1248 from ncdc/0.6/metadata-only-watch
Browse files Browse the repository at this point in the history
✨ Backport support for metadata-only watches to release-0.6
  • Loading branch information
k8s-ci-robot authored Nov 9, 2020
2 parents b3d7cf1 + fa91e1b commit 7fc9110
Show file tree
Hide file tree
Showing 11 changed files with 1,646 additions and 341 deletions.
72 changes: 60 additions & 12 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"

"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
Expand All @@ -37,6 +38,17 @@ import (
var newController = controller.New
var getGvk = apiutil.GVKForObject

// project represents other forms that the we can use to
// send/receive a given resource (metadata-only, unstructured, etc)
type objectProjection int

const (
// projectAsNormal doesn't change the object from the form given
projectAsNormal objectProjection = iota
// projectAsMetadata turns this into an metadata-only watch
projectAsMetadata
)

// Builder builds a Controller.
type Builder struct {
forInput ForInput
Expand Down Expand Up @@ -68,8 +80,9 @@ func (blder *Builder) ForType(apiType runtime.Object) *Builder {

// ForInput represents the information set by For method.
type ForInput struct {
object runtime.Object
predicates []predicate.Predicate
object runtime.Object
predicates []predicate.Predicate
objectProjection objectProjection
}

// For defines the type of Object being *reconciled*, and configures the ControllerManagedBy to respond to create / delete /
Expand All @@ -88,8 +101,9 @@ func (blder *Builder) For(object runtime.Object, opts ...ForOption) *Builder {

// OwnsInput represents the information set by Owns method.
type OwnsInput struct {
object runtime.Object
predicates []predicate.Predicate
object runtime.Object
predicates []predicate.Predicate
objectProjection objectProjection
}

// Owns defines types of Objects being *generated* by the ControllerManagedBy, and configures the ControllerManagedBy to respond to
Expand All @@ -107,9 +121,10 @@ func (blder *Builder) Owns(object runtime.Object, opts ...OwnsOption) *Builder {

// WatchesInput represents the information set by Watches method.
type WatchesInput struct {
src source.Source
eventhandler handler.EventHandler
predicates []predicate.Predicate
src source.Source
eventhandler handler.EventHandler
predicates []predicate.Predicate
objectProjection objectProjection
}

// Watches exposes the lower-level ControllerManagedBy Watches functions through the builder. Consider using
Expand Down Expand Up @@ -195,19 +210,43 @@ func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, erro
return blder.ctrl, nil
}

func (blder *Builder) project(obj runtime.Object, proj objectProjection) (runtime.Object, error) {
switch proj {
case projectAsNormal:
return obj, nil
case projectAsMetadata:
metaObj := &metav1.PartialObjectMetadata{}
gvk, err := getGvk(obj, blder.mgr.GetScheme())
if err != nil {
return nil, fmt.Errorf("unable to determine GVK of %T for a metadata-only watch: %w", obj, err)
}
metaObj.SetGroupVersionKind(gvk)
return metaObj, nil
default:
panic(fmt.Sprintf("unexpected projection type %v on type %T, should not be possible since this is an internal field", proj, obj))
}
}

func (blder *Builder) doWatch() error {
// Reconcile type
src := &source.Kind{Type: blder.forInput.object}
typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
err := blder.ctrl.Watch(src, hdler, allPredicates...)
if err != nil {
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}

// Watches the managed types
for _, own := range blder.ownsInput {
src := &source.Kind{Type: own.object}
typeForSrc, err := blder.project(own.object, own.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForOwner{
OwnerType: blder.forInput.object,
IsController: true,
Expand All @@ -223,10 +262,19 @@ func (blder *Builder) doWatch() error {
for _, w := range blder.watchesInput {
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)

// If the source of this watch is of type *source.Kind, project it.
if srckind, ok := w.src.(*source.Kind); ok {
typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
if err != nil {
return err
}
srckind.Type = typeForSrc
}

if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
return err
}

}
return nil
}
Expand Down
109 changes: 105 additions & 4 deletions pkg/builder/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -294,8 +296,107 @@ var _ = Describe("application", func() {
})
})

Describe("watching with projections", func() {
var mgr manager.Manager
BeforeEach(func() {
// use a cache that intercepts requests for fully typed objects to
// ensure we use the projected versions
var err error
mgr, err = manager.New(cfg, manager.Options{NewCache: newNonTypedOnlyCache})
Expect(err).NotTo(HaveOccurred())
})

It("should support watching For, Owns, and Watch as metadata", func() {
statefulSetMaps := make(chan *metav1.PartialObjectMetadata)

bldr := ControllerManagedBy(mgr).
For(&appsv1.Deployment{}, OnlyMetadata).
Owns(&appsv1.ReplicaSet{}, OnlyMetadata).
Watches(&source.Kind{Type: &appsv1.StatefulSet{}},
&handler.EnqueueRequestsFromMapFunc{
ToRequests: handler.ToRequestsFunc(func(o handler.MapObject) []reconcile.Request {
ometa := o.Object.(*metav1.PartialObjectMetadata)
statefulSetMaps <- ometa
return nil
}),
},
OnlyMetadata)

doReconcileTest("8", stop, bldr, mgr, true)

By("Creating a new stateful set")
set := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test1",
Labels: map[string]string{
"foo": "bar",
},
},
Spec: appsv1.StatefulSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx",
},
},
},
},
},
}
err := mgr.GetClient().Create(context.TODO(), set)
Expect(err).NotTo(HaveOccurred())

By("Checking that the mapping function has been called")
Eventually(func() bool {
metaSet := <-statefulSetMaps
Expect(metaSet.Name).To(Equal(set.Name))
Expect(metaSet.Namespace).To(Equal(set.Namespace))
Expect(metaSet.Labels).To(Equal(set.Labels))
return true
}).Should(BeTrue())
})
})
})

// newNonTypedOnlyCache returns a new cache that wraps the normal cache,
// returning an error if normal, typed objects have informers requested.
func newNonTypedOnlyCache(config *rest.Config, opts cache.Options) (cache.Cache, error) {
normalCache, err := cache.New(config, opts)
if err != nil {
return nil, err
}
return &nonTypedOnlyCache{
Cache: normalCache,
}, nil
}

// nonTypedOnlyCache is a cache.Cache that only provides metadata &
// unstructured informers.
type nonTypedOnlyCache struct {
cache.Cache
}

func (c *nonTypedOnlyCache) GetInformer(ctx context.Context, obj runtime.Object) (cache.Informer, error) {
switch obj.(type) {
case (*metav1.PartialObjectMetadata):
return c.Cache.GetInformer(ctx, obj)
default:
return nil, fmt.Errorf("did not want to provide an informer for normal type %T", obj)
}
}
func (c *nonTypedOnlyCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (cache.Informer, error) {
return nil, fmt.Errorf("don't try to sidestep the restriction on informer types by calling GetInformerForKind")
}

// TODO(directxman12): this function has too many arguments, and the whole
// "nameSuffix" think is a bit of a hack It should be cleaned up significantly by someone with a bit of time
func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr manager.Manager, complete bool) {
deployName := "deploy-name-" + nameSuffix
rsName := "rs-name-" + nameSuffix
Expand Down Expand Up @@ -358,8 +459,8 @@ func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr
Expect(err).NotTo(HaveOccurred())

By("Waiting for the Deployment Reconcile")
Expect(<-ch).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}}))
Eventually(ch).Should(Receive(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}})))

By("Creating a ReplicaSet")
// Expect a Reconcile when an Owned object is managedObjects.
Expand Down Expand Up @@ -388,8 +489,8 @@ func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr
Expect(err).NotTo(HaveOccurred())

By("Waiting for the ReplicaSet Reconcile")
Expect(<-ch).To(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}}))
Eventually(ch).Should(Receive(Equal(reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: "default", Name: deployName}})))

}

Expand Down
55 changes: 55 additions & 0 deletions pkg/builder/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"os"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
logf "sigs.k8s.io/controller-runtime/pkg/log"

appsv1 "k8s.io/api/apps/v1"
Expand All @@ -34,6 +35,60 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

func ExampleBuilder_metadata_only() {
logf.SetLogger(zap.New())

var log = logf.Log.WithName("builder-examples")

mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{})
if err != nil {
log.Error(err, "could not create manager")
os.Exit(1)
}

cl := mgr.GetClient()
err = builder.
ControllerManagedBy(mgr). // Create the ControllerManagedBy
For(&appsv1.ReplicaSet{}). // ReplicaSet is the Application API
Owns(&corev1.Pod{}, builder.OnlyMetadata). // ReplicaSet owns Pods created by it, and caches them as metadata only
Complete(reconcile.Func(func(req reconcile.Request) (reconcile.Result, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Read the ReplicaSet
rs := &appsv1.ReplicaSet{}
err := cl.Get(ctx, req.NamespacedName, rs)
if err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}

// List the Pods matching the PodTemplate Labels, but only their metadata
var podsMeta metav1.PartialObjectMetadataList
err = cl.List(ctx, &podsMeta, client.InNamespace(req.Namespace), client.MatchingLabels(rs.Spec.Template.Labels))
if err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}

// Update the ReplicaSet
rs.Labels["pod-count"] = fmt.Sprintf("%v", len(podsMeta.Items))
err = cl.Update(ctx, rs)
if err != nil {
return reconcile.Result{}, err
}

return reconcile.Result{}, nil
}))
if err != nil {
log.Error(err, "could not create controller")
os.Exit(1)
}

if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
log.Error(err, "could not start manager")
os.Exit(1)
}
}

// This example creates a simple application ControllerManagedBy that is configured for ReplicaSets and Pods.
//
// * Create a new application for ReplicaSets that manages Pods owned by the ReplicaSet and calls into
Expand Down
39 changes: 39 additions & 0 deletions pkg/builder/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,42 @@ var _ OwnsOption = &Predicates{}
var _ WatchesOption = &Predicates{}

// }}}

// {{{ For & Owns Dual-Type options

// asProjection configures the projection (currently only metadata) on the input.
// Currently only metadata is supported. We might want to expand
// this to arbitrary non-special local projections in the future.
type projectAs objectProjection

// ApplyToFor applies this configuration to the given ForInput options.
func (p projectAs) ApplyToFor(opts *ForInput) {
opts.objectProjection = objectProjection(p)
}

// ApplyToOwns applies this configuration to the given OwnsInput options.
func (p projectAs) ApplyToOwns(opts *OwnsInput) {
opts.objectProjection = objectProjection(p)
}

// ApplyToWatches applies this configuration to the given WatchesInput options.
func (p projectAs) ApplyToWatches(opts *WatchesInput) {
opts.objectProjection = objectProjection(p)
}

var (
// OnlyMetadata tells the controller to *only* cache metadata, and to watch
// the the API server in metadata-only form. This is useful when watching
// lots of objects, really big objects, or objects for which you only know
// the the GVK, but not the structure. You'll need to pass
// metav1.PartialObjectMetadata to the client when fetching objects in your
// reconciler, otherwise you'll end up with a duplicate structured or
// unstructured cache.
OnlyMetadata = projectAs(projectAsMetadata)

_ ForOption = OnlyMetadata
_ OwnsOption = OnlyMetadata
_ WatchesOption = OnlyMetadata
)

// }}}
Loading

0 comments on commit 7fc9110

Please sign in to comment.