Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Enqueue location upon synctarget update #2624

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 34 additions & 49 deletions pkg/reconciler/workload/placement/placement_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (

apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
"github.com/kcp-dev/kcp/pkg/apis/core"
corev1alpha1 "github.com/kcp-dev/kcp/pkg/apis/core/v1alpha1"
schedulingv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/scheduling/v1alpha1"
workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1"
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster"
Expand Down Expand Up @@ -101,23 +100,25 @@ func NewController(
indexers.ByLogicalClusterPathAndName: indexers.IndexByLogicalClusterPathAndName,
})

logger := logging.WithReconciler(klog.Background(), ControllerName)

locationInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueueLocation,
AddFunc: func(obj interface{}) { c.enqueueLocation(obj, logger) },
UpdateFunc: func(old, obj interface{}) {
oldLoc := old.(*schedulingv1alpha1.Location)
newLoc := obj.(*schedulingv1alpha1.Location)
if !reflect.DeepEqual(oldLoc.Spec, newLoc.Spec) || !reflect.DeepEqual(oldLoc.Labels, newLoc.Labels) {
c.enqueueLocation(obj)
c.enqueueLocation(obj, logger)
}
},
DeleteFunc: c.enqueueLocation,
DeleteFunc: func(obj interface{}) { c.enqueueLocation(obj, logger) },
},
)

syncTargetInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueueSyncTarget,
AddFunc: func(obj interface{}) { c.enqueueSyncTarget(obj, logger) },
UpdateFunc: func(old, obj interface{}) {
oldCluster := old.(*workloadv1alpha1.SyncTarget)
oldClusterCopy := *oldCluster
Expand All @@ -137,24 +138,23 @@ func NewController(

// compare ignoring heart-beat
if !reflect.DeepEqual(oldClusterCopy, newClusterCopy) {
c.enqueueSyncTarget(obj)
c.enqueueSyncTarget(obj, logger)
}
},
DeleteFunc: c.enqueueSyncTarget,
DeleteFunc: func(obj interface{}) { c.enqueueSyncTarget(obj, logger) },
},
)

logger := logging.WithReconciler(klog.Background(), ControllerName)
placementInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.enqueuePlacement(obj, logger, "") },
UpdateFunc: func(_, obj interface{}) { c.enqueuePlacement(obj, logger, "") },
DeleteFunc: func(obj interface{}) { c.enqueuePlacement(obj, logger, "") },
AddFunc: func(obj interface{}) { c.enqueuePlacement(obj, logger) },
UpdateFunc: func(_, obj interface{}) { c.enqueuePlacement(obj, logger) },
DeleteFunc: func(obj interface{}) { c.enqueuePlacement(obj, logger) },
})

apiBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueueAPIBinding,
UpdateFunc: func(_, obj interface{}) { c.enqueueAPIBinding(obj) },
DeleteFunc: c.enqueueAPIBinding,
AddFunc: func(obj interface{}) { c.enqueueAPIBinding(obj, logger) },
UpdateFunc: func(_, obj interface{}) { c.enqueueAPIBinding(obj, logger) },
DeleteFunc: func(obj interface{}) { c.enqueueAPIBinding(obj, logger) },
})

return c, nil
Expand Down Expand Up @@ -188,7 +188,7 @@ type controller struct {
}

// enqueueLocation finds placement ref to this location at first, and then namespaces bound to this placement.
func (c *controller) enqueueLocation(obj interface{}) {
func (c *controller) enqueueLocation(obj interface{}, logger logr.Logger) {
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}
Expand All @@ -215,24 +215,24 @@ func (c *controller) enqueueLocation(obj interface{}) {
placements = append(placements, placementsByPath...)
}

logger := logging.WithObject(logging.WithReconciler(klog.Background(), ControllerName), location)
logger = logger.WithValues(logging.FromPrefix("locationReason", location)...)
for _, placement := range placements {
c.enqueuePlacement(placement, logger, " because of Location")
c.enqueuePlacement(placement, logger)
}
}

func (c *controller) enqueuePlacement(obj interface{}, logger logr.Logger, logSuffix string) {
func (c *controller) enqueuePlacement(obj interface{}, logger logr.Logger) {
key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}

logging.WithQueueKey(logger, key).V(2).Info(fmt.Sprintf("queueing Placement%s", logSuffix))
logging.WithQueueKey(logger, key).V(2).Info("queueing Placement")
c.queue.Add(key)
}

func (c *controller) enqueueAPIBinding(obj interface{}) {
func (c *controller) enqueueAPIBinding(obj interface{}, logger logr.Logger) {
key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
Expand All @@ -250,50 +250,35 @@ func (c *controller) enqueueAPIBinding(obj interface{}) {
return
}

logger := logging.WithObject(logging.WithReconciler(klog.Background(), ControllerName), obj.(*apisv1alpha1.APIBinding))
logger = logger.WithValues(logging.FromPrefix("apiBindingReason", obj.(*apisv1alpha1.APIBinding))...)

for _, placement := range placements {
c.enqueuePlacement(placement, logger, " because of APIBinding")
c.enqueuePlacement(placement, logger)
}
}

func (c *controller) enqueueSyncTarget(obj interface{}) {
key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
func (c *controller) enqueueSyncTarget(obj interface{}, logger logr.Logger) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very noisy. We should rethink whether we really need to enqueue half of the world.

  1. What is the exact reason we have to enqueue a sync target?
  2. What can we do to get the placements more directly?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goal: only sync the placement that actually schedules to that SyncTarget.

Copy link
Contributor Author

@qiujian16 qiujian16 Jan 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally we should only enqueue location and scheduled synctarget here. The missing part is we do not know what APIs are compatible in the location. So if a synctarget is updated with "api compaible/non-comptaible", we have to re-select the clusters.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if a synctarget is updated with "api compaible/non-comptaible"

For that we could have a special case, couldn't we?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can we queue only those locations that are affected, e.g. that include the SyncTarget? If you remove a label from a SyncTarget, we could also see that and queue those locations where this SyncTarget was removed from.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally we should only enqueue location and scheduled synctarget here. The missing part is we do not know what APIs are compatible in the location. So if a synctarget is updated with "api compaible/non-comptaible", we have to re-select the clusters.

Actually, I now think the fix is not correct: the old code did not look at the SyncTarget, but on the LogicalCluster. The LogicalCluster always has a path annotation. What is missing is that we don't label the logical cluster for replication. Compare pkg/reconciler/tenancy/replicatelogicalcluster.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvmd:

On the other hand, the new change does not really cause more noise, but it will list all locations and from there get all placements that reference these locations. So there is hope that these are not all placements, vs. the old code that queued all placements.

if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}

clusterName, _, _, err := kcpcache.SplitMetaClusterNamespaceKey(key)
if err != nil {
runtime.HandleError(err)
syncTarget, ok := obj.(*workloadv1alpha1.SyncTarget)
if !ok {
runtime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
return
}

cluster, err := c.logicalClusterLister.Cluster(clusterName).Get(corev1alpha1.LogicalClusterName)
// Get all locations in the same cluster and enqueue locations.
locations, err := c.locationLister.Cluster(logicalcluster.From(syncTarget)).List(labels.Everything())
if err != nil {
runtime.HandleError(err)
return
}

// placements referencing by cluster name
placements, err := c.placementIndexer.ByIndex(bySelectedLocationPath, logicalcluster.From(cluster).String())
if err != nil {
runtime.HandleError(err)
return
}
if path := cluster.Annotations[core.LogicalClusterPathAnnotationKey]; path != "" {
// placements referencing by path
placementsByPath, err := c.placementIndexer.ByIndex(bySelectedLocationPath, path)
if err != nil {
runtime.HandleError(err)
return
}
placements = append(placements, placementsByPath...)
}
logger = logger.WithValues(logging.FromPrefix("syncTargetReason", syncTarget)...)

logger := logging.WithObject(logging.WithReconciler(klog.Background(), ControllerName), obj.(*workloadv1alpha1.SyncTarget))
for _, placement := range placements {
c.enqueuePlacement(placement, logger, " because of SyncTarget")
for _, location := range locations {
c.enqueueLocation(location, logger)
}
}

Expand Down