Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Refactor Syncer based on the enhanced ddsif, with controller manager and endpoints controller #2452

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 48 additions & 45 deletions hack/logcheck.out

Large diffs are not rendered by default.

38 changes: 5 additions & 33 deletions pkg/informer/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,13 +353,13 @@ func (d *GenericDiscoveringDynamicSharedInformerFactory[Informer, Lister, Generi
return inf
}

// Listers returns a map of per-resource-type listers for all types that are
// Informers returns a map of per-resource-type generic informers for all types that are
// known by this informer factory, and that are synced.
//
// If any informers aren't synced, their GVRs are returned so that they can be
// checked and processed later.
func (d *GenericDiscoveringDynamicSharedInformerFactory[Informer, Lister, GenericInformer]) Listers() (listers map[schema.GroupVersionResource]Lister, notSynced []schema.GroupVersionResource) {
listers = map[schema.GroupVersionResource]Lister{}
func (d *GenericDiscoveringDynamicSharedInformerFactory[Informer, Lister, GenericInformer]) Informers() (informers map[schema.GroupVersionResource]GenericInformer, notSynced []schema.GroupVersionResource) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍, much better

informers = map[schema.GroupVersionResource]GenericInformer{}

d.informersLock.RLock()
defer d.informersLock.RUnlock()
Expand All @@ -372,38 +372,10 @@ func (d *GenericDiscoveringDynamicSharedInformerFactory[Informer, Lister, Generi
continue
}

listers[gvr] = informer.Lister()
informers[gvr] = informer
}

return listers, notSynced
}

// Lister returns a lister for the given resource-type, whether it is
// known by this informer factory, and whether it is synced.
func (d *GenericDiscoveringDynamicSharedInformerFactory[Informer, Lister, GenericInformer]) Lister(gvr schema.GroupVersionResource) (lister Lister, known, synced bool) {
d.informersLock.RLock()
defer d.informersLock.RUnlock()

informer, ok := d.informers[gvr]
if !ok {
return lister, false, false
}

return informer.Lister(), true, informer.Informer().HasSynced()
}

// Informer returns an informer for the given resource-type if it exists, whether it is
// known by this informer factory, and whether it is synced.
func (d *GenericDiscoveringDynamicSharedInformerFactory[Informer, Lister, GenericInformer]) Informer(gvr schema.GroupVersionResource) (informer Informer, known, synced bool) {
d.informersLock.RLock()
defer d.informersLock.RUnlock()

genericInformer, ok := d.informers[gvr]
if !ok {
return informer, false, false
}

return genericInformer.Informer(), true, genericInformer.Informer().HasSynced()
return informers, notSynced
}

// GVREventHandler is an event handler that includes the GroupVersionResource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,9 @@ func claimFromSetKey(key string) apisv1alpha1.PermissionClaim {
}

func (c *controller) getInformerForGroupResource(group, resource string) (kcpkubernetesinformers.GenericClusterInformer, schema.GroupVersionResource, error) {
listers, _ := c.ddsif.Listers()
informers, _ := c.ddsif.Informers()

for gvr := range listers {
for gvr := range informers {
if gvr.Group == group && gvr.Resource == resource {
informer, err := c.ddsif.ForResource(gvr)
// once we find one, return.
Expand Down
10 changes: 5 additions & 5 deletions pkg/reconciler/workload/resource/resource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,11 @@ func (c *Controller) enqueueResourcesForNamespace(ns *corev1.Namespace) error {
logger = logger.WithValues("nsLocations", nsLocations.List())

logger.V(4).Info("getting listers")
listers, notSynced := c.ddsif.Listers()
informers, notSynced := c.ddsif.Informers()
var errs []error
for gvr, lister := range listers {
for gvr, informer := range informers {
logger = logger.WithValues("gvr", gvr.String())
objs, err := lister.ByCluster(clusterName).ByNamespace(ns.Name).List(labels.Everything())
objs, err := informer.Lister().ByCluster(clusterName).ByNamespace(ns.Name).List(labels.Everything())
if err != nil {
errs = append(errs, fmt.Errorf("error listing %q in %s|%s: %w", gvr, clusterName, ns.Name, err))
continue
Expand Down Expand Up @@ -438,9 +438,9 @@ func (c *Controller) enqueueSyncTarget(obj interface{}) {
func (c *Controller) enqueueSyncTargetKey(syncTargetKey string) {
logger := logging.WithReconciler(klog.Background(), ControllerName).WithValues("syncTargetKey", syncTargetKey)

listers, _ := c.ddsif.Listers()
informers, _ := c.ddsif.Informers()
queued := map[string]int{}
for gvr := range listers {
for gvr := range informers {
inf, err := c.ddsif.ForResource(gvr)
if err != nil {
runtime.HandleError(err)
Expand Down
198 changes: 198 additions & 0 deletions pkg/syncer/controllermanager/controllermanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
Copyright 2022 The KCP Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllermanager

import (
"context"
"time"

"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/syncer/shared"
)

const (
ControllerNamePrefix = "syncer-controller-manager-"
)

// InformerSource is a dynamic source of informers per GVR,
// which notifies when informers are added or removed for some GVR.
// It is implemented by the DynamicSharedInformerFactory (in fact by
// both the scoped or cluster-aware variants).
type InformerSource struct {
// Subscribe registers for informer change notifications, returning a channel to which change notifications are sent.
// The id argument is the identifier of the subscriber, since there might be several subscribers subscribing
// to receive events from this InformerSource.
Subscribe func(id string) <-chan struct{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is id?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not "subscribe". It is "Events()". Subscribe would return and keep you subscribed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this is "subscribe". With subscribe I would expect either an event handler or the channel passed in, not returned (and logic to remove it when closed).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same as the DDSIF Subscribe() method.

// Subscribe registers for informer/discovery change notifications, returning a channel to which change notifications
// are sent.
func (d *GenericDiscoveringDynamicSharedInformerFactory[Informer, Lister, GenericInformer]) Subscribe(id string) <-chan struct{} {

id is the identifier of the subscriber, since there might be several subscribers subscribing to receive events from the InformerSource.


// Informers returns a map of per-resource-type SharedIndexInformers for all types that are
// known by this informer source, and that are synced.
//
// It also returns the list of informers that are known by this informer source, but sill not synced.
Informers func() (informers map[schema.GroupVersionResource]cache.SharedIndexInformer, notSynced []schema.GroupVersionResource)
}

// ManagedController defines a controller that should be managed by a ControllerManager,
// to be started when the required GVRs are supported, and stopped when the required GVRs
// are not supported anymore.
type ManagedController struct {
RequiredGVRs []schema.GroupVersionResource
Create CreateControllerFunc
}

type StartControllerFunc func(ctx context.Context)
type CreateControllerFunc func(ctx context.Context) (StartControllerFunc, error)

// NewControllerManager creates a new ControllerManager which will manage (create/start/stop) GVR-specific controllers according to informers
// available in the provided InformerSource.
func NewControllerManager(ctx context.Context, suffix string, informerSource InformerSource, controllers map[string]ManagedController) *ControllerManager {
controllerManager := ControllerManager{
name: ControllerNamePrefix + suffix,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerNamePrefix+suffix),
informerSource: informerSource,
managedControllers: controllers,
startedControllers: map[string]context.CancelFunc{},
}

apisChanged := informerSource.Subscribe(controllerManager.name)

logger := klog.FromContext(ctx)

go func() {
for {
select {
case <-ctx.Done():
return
case <-apisChanged:
logger.V(4).Info("got API change notification")
controllerManager.queue.Add("resync") // this queue only ever has one key in it, as long as it's constant we are OK
}
}
}()

return &controllerManager
}

// ControllerManager is a component that manages (create/start/stop) GVR-specific controllers according to available GVRs.
// It reacts to the changes of supported GVRs in a DiscoveringDynamicSharedInformerFactory
// (the GVRs for which an informer has been automatically created, started and synced),
// and starts / stops registered GVRs-specific controllers according to the GVRs they depend on.
//
// For example this allows starting PVC / PV controllers only when PVC / PV resources are exposed by the Syncer and UpSyncer
// virtual workspaces, and Informers for them have been started and synced by the corresponding ddsif.
type ControllerManager struct {
davidfestal marked this conversation as resolved.
Show resolved Hide resolved
name string
queue workqueue.RateLimitingInterface
informerSource InformerSource
managedControllers map[string]ManagedController
startedControllers map[string]context.CancelFunc
}

// Start starts the controller, which stops when ctx.Done() is closed.
func (c *ControllerManager) Start(ctx context.Context) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()

logger := logging.WithReconciler(klog.FromContext(ctx), c.name)
logger.Info("Starting controller manager")
defer logger.Info("Shutting down controller manager")

go wait.UntilWithContext(ctx, c.startWorker, time.Second)
<-ctx.Done()
}

func (c *ControllerManager) startWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}

func (c *ControllerManager) processNextWorkItem(ctx context.Context) bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)

c.process(ctx)
c.queue.Forget(key)
return true
}
davidfestal marked this conversation as resolved.
Show resolved Hide resolved

func (c *ControllerManager) process(ctx context.Context) {
logger := klog.FromContext(ctx)
controllersToStart := map[string]CreateControllerFunc{}
syncedInformers, notSynced := c.informerSource.Informers()
controllerLoop:
for controllerName, managedController := range c.managedControllers {
requiredGVRs := managedController.RequiredGVRs
for _, gvr := range requiredGVRs {
informer := syncedInformers[gvr]
if informer == nil {
if shared.ContainsGVR(notSynced, gvr) {
logger.V(2).Info("waiting for the informer to be synced before starting controller", "gvr", gvr, "controller", controllerName)
c.queue.AddAfter("resync", time.Second)
continue controllerLoop
}
// The informer doesn't even exist for this GVR.
// Let's ignore this controller for now: one of the required GVRs has no informer started
// (because it has not been found on the SyncTarget in the supported resources to sync).
// If this required GVR is supported later on, the updateControllers() method will be called
// again after an API change notification comes through the informerSource.
continue controllerLoop
}
}
controllersToStart[controllerName] = managedController.Create
}

// Remove obsolete controllers that don't have their required GVRs anymore
for controllerName, cancelFunc := range c.startedControllers {
if _, ok := controllersToStart[controllerName]; ok {
// The controller is still expected => don't remove it
continue
}
// The controller should not be running
// Stop it and remove it from the list of started controllers
cancelFunc()
delete(c.startedControllers, controllerName)
}

// Create and start missing controllers that have their required GVRs synced
for controllerName, create := range controllersToStart {
if _, ok := c.startedControllers[controllerName]; ok {
// The controller is already started
continue
}

// Create the controller
start, err := create(ctx)
if err != nil {
logger.Error(err, "failed creating controller", "controller", controllerName)
davidfestal marked this conversation as resolved.
Show resolved Hide resolved
continue
}

// Start the controller
controllerContext, cancelFunc := context.WithCancel(ctx)
go start(controllerContext)
c.startedControllers[controllerName] = cancelFunc
davidfestal marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading