Skip to content

Commit

Permalink
controller/engine: split controller creation and start
Browse files Browse the repository at this point in the history
Signed-off-by: Dr. Stefan Schimanski <stefan.schimanski@upbound.io>
  • Loading branch information
sttts committed Oct 9, 2023
1 parent d170d97 commit 55772bd
Showing 1 changed file with 50 additions and 15 deletions.
65 changes: 50 additions & 15 deletions pkg/controller/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,39 @@ func TriggeredBy(source source.Source, h handler.EventHandler, p ...predicate.Pr
// the supplied options, and configured with the supplied watches. Start does
// not block.
func (e *Engine) Start(name string, o controller.Options, w ...Watch) error {
c, err := e.Create(name, o, w...)
if err != nil {
return err
}
return c.Start(context.Background())
}

// NamedController is a named controller that is not started yet.
type NamedController interface {
Start(ctx context.Context) error
GetCache() cache.Cache
}

type namedController struct {
name string
e *Engine
ca cache.Cache
ctrl controller.Controller
}

// Create the named controller. Each controller gets its own cache
// whose lifecycle is coupled to the controller. The controller is created with
// the supplied options, and configured with the supplied watches. It is not
// started yet.
func (e *Engine) Create(name string, o controller.Options, w ...Watch) (NamedController, error) {
// Each controller gets its own cache because there's currently no way to
// stop an informer. In practice a controller-runtime cache is a map of
// kinds to informers. If we delete the CRD for a kind we need to stop the
// relevant informer, or it will spew errors about the kind not existing. We
// work around this by stopping the entire cache.
ca, err := e.newCache(e.mgr.GetConfig(), cache.Options{Scheme: e.mgr.GetScheme(), Mapper: e.mgr.GetRESTMapper()})
if err != nil {
return errors.Wrap(err, errCreateCache)
return nil, errors.Wrap(err, errCreateCache)
}

// Wrap the existing manager to use our cache for the GVKs of this controller.
Expand All @@ -201,47 +226,57 @@ func (e *Engine) Start(name string, o controller.Options, w ...Watch) error {

ctrl, err := e.newCtrl(name, rm, o)
if err != nil {
return errors.Wrap(err, errCreateController)
return nil, errors.Wrap(err, errCreateController)
}

for _, wt := range w {
if wt.customSource != nil {
if err := ctrl.Watch(wt.customSource, wt.handler, wt.predicates...); err != nil {
return errors.Wrap(err, errWatch)
return nil, errors.Wrap(err, errWatch)
}
continue
}

// route cache and client (read) requests to our cache for this GVK.
gvk, err := apiutil.GVKForObject(wt.kind, e.mgr.GetScheme())
if err != nil {
return errors.Wrapf(err, "failed to get GVK for type %T", wt.kind)
return nil, errors.Wrapf(err, "failed to get GVK for type %T", wt.kind)
}
rc.AddDelegate(gvk, ca)

if err := ctrl.Watch(source.Kind(ca, wt.kind), wt.handler, wt.predicates...); err != nil {
return errors.Wrap(err, errWatch)
return nil, errors.Wrap(err, errWatch)
}
}

if e.IsRunning(name) {
return namedController{name: name, e: e, ca: ca, ctrl: ctrl}, nil
}

// Start the named controller. Start does not block.
func (c namedController) Start(ctx context.Context) error {
if c.e.IsRunning(c.name) {
return nil
}

ctx, stop := context.WithCancel(context.Background())
e.mx.Lock()
e.started[name] = stop
e.errors[name] = nil
e.mx.Unlock()
ctx, stop := context.WithCancel(ctx)
c.e.mx.Lock()
c.e.started[c.name] = stop
c.e.errors[c.name] = nil
c.e.mx.Unlock()

go func() {
<-e.mgr.Elected()
e.done(name, errors.Wrap(ca.Start(ctx), errCrashCache))
<-c.e.mgr.Elected()
c.e.done(c.name, errors.Wrap(c.ca.Start(ctx), errCrashCache))
}()
go func() {
<-e.mgr.Elected()
e.done(name, errors.Wrap(ctrl.Start(ctx), errCrashController))
<-c.e.mgr.Elected()
c.e.done(c.name, errors.Wrap(c.ctrl.Start(ctx), errCrashController))
}()

return nil
}

// GetCache returns the cache used by the named controller.
func (c namedController) GetCache() cache.Cache {
return c.ca
}

0 comments on commit 55772bd

Please sign in to comment.