Skip to content

Commit

Permalink
Enqueue location upon synctarget update
Browse files Browse the repository at this point in the history
We cannot directly read the logicalcluster path from synctarget
to get the related placement, since the value of that path is not
unique. Instead, we need to get the location of the related synctarget
at first, which always has the path annotation.

Signed-off-by: Jian Qiu <jqiu@redhat.com>
  • Loading branch information
qiujian16 committed Jan 17, 2023
1 parent 9358474 commit 50b0986
Showing 1 changed file with 34 additions and 49 deletions.
83 changes: 34 additions & 49 deletions pkg/reconciler/workload/placement/placement_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (

apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
"github.com/kcp-dev/kcp/pkg/apis/core"
corev1alpha1 "github.com/kcp-dev/kcp/pkg/apis/core/v1alpha1"
schedulingv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/scheduling/v1alpha1"
workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1"
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster"
Expand Down Expand Up @@ -101,23 +100,25 @@ func NewController(
indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName,
})

logger := logging.WithReconciler(klog.Background(), ControllerName)

locationInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueueLocation,
AddFunc: func(obj interface{}) { c.enqueueLocation(obj, logger) },
UpdateFunc: func(old, obj interface{}) {
oldLoc := old.(*schedulingv1alpha1.Location)
newLoc := obj.(*schedulingv1alpha1.Location)
if !reflect.DeepEqual(oldLoc.Spec, newLoc.Spec) || !reflect.DeepEqual(oldLoc.Labels, newLoc.Labels) {
c.enqueueLocation(obj)
c.enqueueLocation(obj, logger)
}
},
DeleteFunc: c.enqueueLocation,
DeleteFunc: func(obj interface{}) { c.enqueueLocation(obj, logger) },
},
)

syncTargetInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueueSyncTarget,
AddFunc: func(obj interface{}) { c.enqueueSyncTarget(obj, logger) },
UpdateFunc: func(old, obj interface{}) {
oldCluster := old.(*workloadv1alpha1.SyncTarget)
oldClusterCopy := *oldCluster
Expand All @@ -137,24 +138,23 @@ func NewController(

// compare ignoring heart-beat
if !reflect.DeepEqual(oldClusterCopy, newClusterCopy) {
c.enqueueSyncTarget(obj)
c.enqueueSyncTarget(obj, logger)
}
},
DeleteFunc: c.enqueueSyncTarget,
DeleteFunc: func(obj interface{}) { c.enqueueSyncTarget(obj, logger) },
},
)

logger := logging.WithReconciler(klog.Background(), ControllerName)
placementInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.enqueuePlacement(obj, logger, "") },
UpdateFunc: func(_, obj interface{}) { c.enqueuePlacement(obj, logger, "") },
DeleteFunc: func(obj interface{}) { c.enqueuePlacement(obj, logger, "") },
AddFunc: func(obj interface{}) { c.enqueuePlacement(obj, logger) },
UpdateFunc: func(_, obj interface{}) { c.enqueuePlacement(obj, logger) },
DeleteFunc: func(obj interface{}) { c.enqueuePlacement(obj, logger) },
})

apiBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueueAPIBinding,
UpdateFunc: func(_, obj interface{}) { c.enqueueAPIBinding(obj) },
DeleteFunc: c.enqueueAPIBinding,
AddFunc: func(obj interface{}) { c.enqueueAPIBinding(obj, logger) },
UpdateFunc: func(_, obj interface{}) { c.enqueueAPIBinding(obj, logger) },
DeleteFunc: func(obj interface{}) { c.enqueueAPIBinding(obj, logger) },
})

return c, nil
Expand Down Expand Up @@ -188,7 +188,7 @@ type controller struct {
}

// enqueueLocation finds placement ref to this location at first, and then namespaces bound to this placement.
func (c *controller) enqueueLocation(obj interface{}) {
func (c *controller) enqueueLocation(obj interface{}, logger logr.Logger) {
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}
Expand All @@ -215,24 +215,24 @@ func (c *controller) enqueueLocation(obj interface{}) {
placements = append(placements, placementsByPath...)
}

logger := logging.WithObject(logging.WithReconciler(klog.Background(), ControllerName), location)
logger = logger.WithValues(logging.FromPrefix("locationReason", location))
for _, placement := range placements {
c.enqueuePlacement(placement, logger, " because of Location")
c.enqueuePlacement(placement, logger)
}
}

func (c *controller) enqueuePlacement(obj interface{}, logger logr.Logger, logSuffix string) {
func (c *controller) enqueuePlacement(obj interface{}, logger logr.Logger) {
key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}

logging.WithQueueKey(logger, key).V(2).Info(fmt.Sprintf("queueing Placement%s", logSuffix))
logging.WithQueueKey(logger, key).V(2).Info(fmt.Sprintf("queueing Placement"))
c.queue.Add(key)
}

func (c *controller) enqueueAPIBinding(obj interface{}) {
func (c *controller) enqueueAPIBinding(obj interface{}, logger logr.Logger) {
key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
Expand All @@ -250,50 +250,35 @@ func (c *controller) enqueueAPIBinding(obj interface{}) {
return
}

logger := logging.WithObject(logging.WithReconciler(klog.Background(), ControllerName), obj.(*apisv1alpha1.APIBinding))
logger = logger.WithValues(logging.FromPrefix("apiBindingReason", obj.(*apisv1alpha1.APIBinding)))

for _, placement := range placements {
c.enqueuePlacement(placement, logger, " because of APIBinding")
c.enqueuePlacement(placement, logger)
}
}

func (c *controller) enqueueSyncTarget(obj interface{}) {
key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
func (c *controller) enqueueSyncTarget(obj interface{}, logger logr.Logger) {
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}

clusterName, _, _, err := kcpcache.SplitMetaClusterNamespaceKey(key)
if err != nil {
runtime.HandleError(err)
syncTarget, ok := obj.(*workloadv1alpha1.SyncTarget)
if !ok {
runtime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
return
}

cluster, err := c.logicalClusterLister.Cluster(clusterName).Get(corev1alpha1.LogicalClusterName)
// Get all locations in the same cluster and enqueue locations.
locations, err := c.locationLister.Cluster(logicalcluster.From(syncTarget)).List(labels.Everything())
if err != nil {
runtime.HandleError(err)
return
}

// placements referencing by cluster name
placements, err := c.placementIndexer.ByIndex(bySelectedLocationPath, logicalcluster.From(cluster).String())
if err != nil {
runtime.HandleError(err)
return
}
if path := cluster.Annotations[core.LogicalClusterPathAnnotationKey]; path != "" {
// placements referencing by path
placementsByPath, err := c.placementIndexer.ByIndex(bySelectedLocationPath, path)
if err != nil {
runtime.HandleError(err)
return
}
placements = append(placements, placementsByPath...)
}
logger = logger.WithValues(logging.FromPrefix("syncTargetReason", syncTarget))

logger := logging.WithObject(logging.WithReconciler(klog.Background(), ControllerName), obj.(*workloadv1alpha1.SyncTarget))
for _, placement := range placements {
c.enqueuePlacement(placement, logger, " because of SyncTarget")
for _, location := range locations {
c.enqueueLocation(location, logger)
}
}

Expand Down

0 comments on commit 50b0986

Please sign in to comment.