Skip to content

Commit

Permalink
⚠️ Add TypedReconciler
Browse files Browse the repository at this point in the history
This change adds a TypedReconciler which allows to customize the type
being used in the workqueue. There is a number of situations where a
custom type might be better than the default `reconcile.Request`:
* Multi-Cluster controllers might want to put the clusters in there
* Some controllers do not reconcile individual resources of a given type
  but all of them at once, for example IngressControllers might do this
* Some controllers do not operate on Kubernetes resources at all
  • Loading branch information
alvaroaleman committed May 2, 2024
1 parent 9bc967a commit da7a60e
Show file tree
Hide file tree
Showing 27 changed files with 621 additions and 489 deletions.
4 changes: 0 additions & 4 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,6 @@ issues:
- linters:
- staticcheck
text: "SA1019: .*The component config package has been deprecated and will be removed in a future release."
- linters:
- staticcheck
# Will be addressed separately.
text: "SA1019: workqueue.(RateLimitingInterface|DefaultControllerRateLimiter|New|NewItemExponentialFailureRateLimiter|NewRateLimitingQueueWithConfig|DefaultItemBasedRateLimiter|RateLimitingQueueConfig) is deprecated:"
# With Go 1.16, the new embed directive can be used with an un-named import,
# revive (previously, golint) only allows these to be imported in a main.go, which wouldn't work for us.
# This directive allows the embed package to be imported with an underscore everywhere.
Expand Down
2 changes: 1 addition & 1 deletion alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

// Builder builds an Application ControllerManagedBy (e.g. Operator) and returns a manager.Manager to start it.
type Builder = builder.Builder
type Builder = builder.Builder[reconcile.Request]

// Request contains the information necessary to reconcile a Kubernetes object. This includes the
// information to uniquely identify the object - its Name and Namespace. It does NOT contain information about
Expand Down
56 changes: 56 additions & 0 deletions examples/typed/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package main

import (
"context"
"fmt"
"os"

networkingv1 "k8s.io/api/networking/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

func main() {
if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}

func run() error {
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
if err != nil {
return fmt.Errorf("failed to construct manager: %w", err)
}

type request struct{}

r := reconcile.TypedFunc[request](func(ctx context.Context, _ request) (reconcile.Result, error) {
ingressList := &networkingv1.IngressList{}
if err := mgr.GetClient().List(ctx, ingressList); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to list ingresses: %w", err)
}

buildIngressConfig(ingressList)
return reconcile.Result{}, nil
})
if err := builder.TypedControllerManagedBy[request](mgr).
WatchesRawSource(source.TypedKind(
mgr.GetCache(),
&networkingv1.Ingress{},
handler.TypedEnqueueRequestsFromMapFunc(func(context.Context, *networkingv1.Ingress) []request {
return []request{{}}
})),
).
Named("ingress_controller").
Complete(r); err != nil {
return fmt.Errorf("failed to construct ingress-controller: %w", err)
}

return nil
}

func buildIngressConfig(*networkingv1.IngressList) {}
101 changes: 65 additions & 36 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package builder
import (
"errors"
"fmt"
"reflect"
"strings"

"github.com/go-logr/logr"
Expand All @@ -37,7 +38,6 @@ import (
)

// Supporting mocking out functions for testing.
var newController = controller.New
var getGvk = apiutil.GVKForObject

// project represents other forms that we can use to
Expand All @@ -52,21 +52,27 @@ const (
)

// Builder builds a Controller.
type Builder struct {
type Builder[request comparable] struct {
forInput ForInput
ownsInput []OwnsInput
rawSources []source.Source
watchesInput []WatchesInput
rawSources []source.TypedSource[request]
watchesInput []WatchesInput[request]
mgr manager.Manager
globalPredicates []predicate.Predicate
ctrl controller.Controller
ctrlOptions controller.Options
ctrl controller.TypedController[request]
ctrlOptions controller.TypedOptions[request]
name string
newController func(name string, mgr manager.Manager, options controller.TypedOptions[request]) (controller.TypedController[request], error)
}

// ControllerManagedBy returns a new controller builder that will be started by the provided Manager.
func ControllerManagedBy(m manager.Manager) *Builder {
return &Builder{mgr: m}
func ControllerManagedBy(m manager.Manager) *Builder[reconcile.Request] {
return TypedControllerManagedBy[reconcile.Request](m)
}

// TypedControllerManagedBy returns a new tyepd controller builder that will be started by the provided Manager.
func TypedControllerManagedBy[request comparable](m manager.Manager) *Builder[request] {
return &Builder[request]{mgr: m}
}

// ForInput represents the information set by the For method.
Expand All @@ -81,7 +87,7 @@ type ForInput struct {
// update events by *reconciling the object*.
// This is the equivalent of calling
// Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{}).
func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder {
func (blder *Builder[request]) For(object client.Object, opts ...ForOption) *Builder[request] {
if blder.forInput.object != nil {
blder.forInput.err = fmt.Errorf("For(...) should only be called once, could not assign multiple objects for reconciliation")
return blder
Expand Down Expand Up @@ -111,7 +117,7 @@ type OwnsInput struct {
//
// By default, this is the equivalent of calling
// Watches(object, handler.EnqueueRequestForOwner([...], ownerType, OnlyControllerOwner())).
func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {
func (blder *Builder[request]) Owns(object client.Object, opts ...OwnsOption) *Builder[request] {
input := OwnsInput{object: object}
for _, opt := range opts {
opt.ApplyToOwns(&input)
Expand All @@ -122,9 +128,9 @@ func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {
}

// WatchesInput represents the information set by Watches method.
type WatchesInput struct {
type WatchesInput[request comparable] struct {
obj client.Object
handler handler.EventHandler
handler handler.TypedEventHandler[client.Object, request]
predicates []predicate.Predicate
objectProjection objectProjection
}
Expand All @@ -134,8 +140,12 @@ type WatchesInput struct {
//
// This is the equivalent of calling
// WatchesRawSource(source.Kind(cache, object, eventHandler, predicates...)).
func (blder *Builder) Watches(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
input := WatchesInput{
func (blder *Builder[request]) Watches(
object client.Object,
eventHandler handler.TypedEventHandler[client.Object, request],
opts ...WatchesOption[request],
) *Builder[request] {
input := WatchesInput[request]{
obj: object,
handler: eventHandler,
}
Expand Down Expand Up @@ -175,8 +185,12 @@ func (blder *Builder) Watches(object client.Object, eventHandler handler.EventHa
// In the first case, controller-runtime will create another cache for the
// concrete type on top of the metadata cache; this increases memory
// consumption and leads to race conditions as caches are not in sync.
func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
opts = append(opts, OnlyMetadata)
func (blder *Builder[request]) WatchesMetadata(
object client.Object,
eventHandler handler.TypedEventHandler[client.Object, request],
opts ...WatchesOption[request],
) *Builder[request] {
opts = append(opts, projectAs[request](projectAsMetadata))
return blder.Watches(object, eventHandler, opts...)
}

Expand All @@ -187,7 +201,7 @@ func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler
// This method is only exposed for more advanced use cases, most users should use one of the higher level functions.
//
// WatchesRawSource does not respect predicates configured through WithEventFilter.
func (blder *Builder) WatchesRawSource(src source.Source) *Builder {
func (blder *Builder[request]) WatchesRawSource(src source.TypedSource[request]) *Builder[request] {
blder.rawSources = append(blder.rawSources, src)

return blder
Expand All @@ -197,19 +211,19 @@ func (blder *Builder) WatchesRawSource(src source.Source) *Builder {
// trigger reconciliations. For example, filtering on whether the resource version has changed.
// Given predicate is added for all watched objects.
// Defaults to the empty list.
func (blder *Builder) WithEventFilter(p predicate.Predicate) *Builder {
func (blder *Builder[request]) WithEventFilter(p predicate.Predicate) *Builder[request] {
blder.globalPredicates = append(blder.globalPredicates, p)
return blder
}

// WithOptions overrides the controller options used in doController. Defaults to empty.
func (blder *Builder) WithOptions(options controller.Options) *Builder {
func (blder *Builder[request]) WithOptions(options controller.TypedOptions[request]) *Builder[request] {
blder.ctrlOptions = options
return blder
}

// WithLogConstructor overrides the controller options's LogConstructor.
func (blder *Builder) WithLogConstructor(logConstructor func(*reconcile.Request) logr.Logger) *Builder {
func (blder *Builder[request]) WithLogConstructor(logConstructor func(*request) logr.Logger) *Builder[request] {
blder.ctrlOptions.LogConstructor = logConstructor
return blder
}
Expand All @@ -219,19 +233,19 @@ func (blder *Builder) WithLogConstructor(logConstructor func(*reconcile.Request)
// (underscores and alphanumeric characters only).
//
// By default, controllers are named using the lowercase version of their kind.
func (blder *Builder) Named(name string) *Builder {
func (blder *Builder[request]) Named(name string) *Builder[request] {
blder.name = name
return blder
}

// Complete builds the Application Controller.
func (blder *Builder) Complete(r reconcile.Reconciler) error {
func (blder *Builder[request]) Complete(r reconcile.TypedReconciler[request]) error {
_, err := blder.Build(r)
return err
}

// Build builds the Application Controller and returns the Controller it created.
func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
func (blder *Builder[request]) Build(r reconcile.TypedReconciler[request]) (controller.TypedController[request], error) {
if r == nil {
return nil, fmt.Errorf("must provide a non-nil Reconciler")
}
Expand All @@ -255,7 +269,7 @@ func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, erro
return blder.ctrl, nil
}

func (blder *Builder) project(obj client.Object, proj objectProjection) (client.Object, error) {
func (blder *Builder[request]) project(obj client.Object, proj objectProjection) (client.Object, error) {
switch proj {
case projectAsNormal:
return obj, nil
Expand All @@ -272,17 +286,23 @@ func (blder *Builder) project(obj client.Object, proj objectProjection) (client.
}
}

func (blder *Builder) doWatch() error {
func (blder *Builder[request]) doWatch() error {
// Reconcile type
if blder.forInput.object != nil {
obj, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
if err != nil {
return err
}
hdler := &handler.EnqueueRequestForObject{}

var hdler handler.TypedEventHandler[client.Object, request]
if reflect.TypeFor[request]() != reflect.TypeOf(reconcile.Request{}) {
return errors.New("For() is not supported for TypedBuilder")
}

reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(&handler.EnqueueRequestForObject{}))
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, blder.forInput.predicates...)
src := source.Kind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
src := source.TypedKind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
if err := blder.ctrl.Watch(src); err != nil {
return err
}
Expand All @@ -301,14 +321,18 @@ func (blder *Builder) doWatch() error {
if !own.matchEveryOwner {
opts = append(opts, handler.OnlyControllerOwner())
}
hdler := handler.EnqueueRequestForOwner(
var hdler handler.TypedEventHandler[client.Object, request]
if reflect.TypeFor[request]() != reflect.TypeOf(reconcile.Request{}) {
return errors.New("Owns() is not supported for TypedBuilder")
}
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.EnqueueRequestForOwner(
blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(),
blder.forInput.object,
opts...,
)
)))
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)
src := source.Kind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
src := source.TypedKind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
if err := blder.ctrl.Watch(src); err != nil {
return err
}
Expand All @@ -325,7 +349,7 @@ func (blder *Builder) doWatch() error {
}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)
if err := blder.ctrl.Watch(source.Kind(blder.mgr.GetCache(), projected, w.handler, allPredicates...)); err != nil {
if err := blder.ctrl.Watch(source.TypedKind[client.Object, request](blder.mgr.GetCache(), projected, w.handler, allPredicates...)); err != nil {
return err
}
}
Expand All @@ -337,7 +361,7 @@ func (blder *Builder) doWatch() error {
return nil
}

func (blder *Builder) getControllerName(gvk schema.GroupVersionKind, hasGVK bool) (string, error) {
func (blder *Builder[request]) getControllerName(gvk schema.GroupVersionKind, hasGVK bool) (string, error) {
if blder.name != "" {
return blder.name, nil
}
Expand All @@ -347,7 +371,7 @@ func (blder *Builder) getControllerName(gvk schema.GroupVersionKind, hasGVK bool
return strings.ToLower(gvk.Kind), nil
}

func (blder *Builder) doController(r reconcile.Reconciler) error {
func (blder *Builder[request]) doController(r reconcile.TypedReconciler[request]) error {
globalOpts := blder.mgr.GetControllerOptions()

ctrlOptions := blder.ctrlOptions
Expand Down Expand Up @@ -401,9 +425,10 @@ func (blder *Builder) doController(r reconcile.Reconciler) error {
)
}

ctrlOptions.LogConstructor = func(req *reconcile.Request) logr.Logger {
ctrlOptions.LogConstructor = func(in *request) logr.Logger {
log := log
if req != nil {

if req, ok := any(in).(*reconcile.Request); ok && req != nil {
if hasGVK {
log = log.WithValues(gvk.Kind, klog.KRef(req.Namespace, req.Name))
}
Expand All @@ -415,7 +440,11 @@ func (blder *Builder) doController(r reconcile.Reconciler) error {
}
}

if blder.newController == nil {
blder.newController = controller.NewTyped[request]
}

// Build the controller and return.
blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions)
blder.ctrl, err = blder.newController(controllerName, blder.mgr, ctrlOptions)
return err
}
Loading

0 comments on commit da7a60e

Please sign in to comment.